mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-10 04:13:45 +03:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88f8670ede | ||
|
|
9eb5de334f | ||
|
|
6954e126fc | ||
|
|
bce35b8dd9 | ||
|
|
16dd145586 | ||
|
|
cd2c9e39da | ||
|
|
305e7bc981 | ||
|
|
9721d06c6a | ||
|
|
4862e93024 | ||
|
|
db4560ca31 | ||
|
|
1575a560f0 | ||
|
|
e1d76ec1f3 | ||
|
|
aeaa5de5fe |
@@ -186,6 +186,9 @@ Follow the following steps during the upgrade:
|
||||
2) Wait until the process stops. This can take a few seconds.
|
||||
3) Start the upgraded VictoriaMetrics.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart.
|
||||
See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details.
|
||||
|
||||
|
||||
### How to apply new config to VictoriaMetrics?
|
||||
|
||||
@@ -195,6 +198,9 @@ VictoriaMetrics must be restarted for applying new config:
|
||||
2) Wait until the process stops. This can take a few seconds.
|
||||
3) Start VictoriaMetrics with the new config.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart.
|
||||
See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details.
|
||||
|
||||
|
||||
### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)?
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
@@ -43,6 +44,8 @@ func main() {
|
||||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
|
||||
fs.MustStopDirRemover()
|
||||
|
||||
logger.Infof("the VictoriaMetrics has been stopped in %s", time.Since(startTime))
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
@@ -92,7 +93,7 @@ func setUp() {
|
||||
|
||||
func processFlags() {
|
||||
flag.Parse()
|
||||
for _, fs := range []struct {
|
||||
for _, fv := range []struct {
|
||||
flag string
|
||||
value string
|
||||
}{
|
||||
@@ -103,8 +104,8 @@ func processFlags() {
|
||||
{flag: "loggerLevel", value: testLogLevel},
|
||||
} {
|
||||
// panics if flag doesn't exist
|
||||
if err := flag.Lookup(fs.flag).Value.Set(fs.value); err != nil {
|
||||
log.Fatalf("unable to set %q with value %q, err: %v", fs.flag, fs.value, err)
|
||||
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
|
||||
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,13 +122,14 @@ func waitFor(timeout time.Duration, f func() bool) error {
|
||||
}
|
||||
|
||||
func tearDown() {
|
||||
vminsert.Stop()
|
||||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||
log.Fatalf("cannot stop the webservice: %s", err)
|
||||
}
|
||||
os.RemoveAll(storagePath)
|
||||
vminsert.Stop()
|
||||
vmstorage.Stop()
|
||||
vmselect.Stop()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
fs.MustStopDirRemover()
|
||||
}
|
||||
|
||||
func TestWriteRead(t *testing.T) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package netstorage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@@ -10,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
@@ -30,13 +30,23 @@ func InitTmpBlocksDir(tmpDirPath string) {
|
||||
|
||||
var tmpBlocksDir string
|
||||
|
||||
const maxInmemoryTmpBlocksFile = 512 * 1024
|
||||
func maxInmemoryTmpBlocksFile() int {
|
||||
mem := memory.Allowed()
|
||||
maxLen := mem / 1024
|
||||
if maxLen < 64*1024 {
|
||||
return 64 * 1024
|
||||
}
|
||||
return maxLen
|
||||
}
|
||||
|
||||
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
|
||||
return float64(maxInmemoryTmpBlocksFile())
|
||||
})
|
||||
|
||||
type tmpBlocksFile struct {
|
||||
buf []byte
|
||||
|
||||
f *os.File
|
||||
bw *bufio.Writer
|
||||
f *os.File
|
||||
|
||||
offset uint64
|
||||
}
|
||||
@@ -44,7 +54,9 @@ type tmpBlocksFile struct {
|
||||
func getTmpBlocksFile() *tmpBlocksFile {
|
||||
v := tmpBlocksFilePool.Get()
|
||||
if v == nil {
|
||||
return &tmpBlocksFile{}
|
||||
return &tmpBlocksFile{
|
||||
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
|
||||
}
|
||||
}
|
||||
return v.(*tmpBlocksFile)
|
||||
}
|
||||
@@ -53,7 +65,6 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) {
|
||||
tbf.MustClose()
|
||||
tbf.buf = tbf.buf[:0]
|
||||
tbf.f = nil
|
||||
tbf.bw = nil
|
||||
tbf.offset = 0
|
||||
tmpBlocksFilePool.Put(tbf)
|
||||
}
|
||||
@@ -69,22 +80,6 @@ func (addr tmpBlockAddr) String() string {
|
||||
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
|
||||
}
|
||||
|
||||
func getBufioWriter(f *os.File) *bufio.Writer {
|
||||
v := bufioWriterPool.Get()
|
||||
if v == nil {
|
||||
return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2)
|
||||
}
|
||||
bw := v.(*bufio.Writer)
|
||||
bw.Reset(f)
|
||||
return bw
|
||||
}
|
||||
|
||||
func putBufioWriter(bw *bufio.Writer) {
|
||||
bufioWriterPool.Put(bw)
|
||||
}
|
||||
|
||||
var bufioWriterPool sync.Pool
|
||||
|
||||
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
|
||||
|
||||
// WriteBlock writes b to tbf.
|
||||
@@ -92,28 +87,31 @@ var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_tota
|
||||
// It returns errors since the operation may fail on space shortage
|
||||
// and this must be handled.
|
||||
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
|
||||
bb := tmpBufPool.Get()
|
||||
defer tmpBufPool.Put(bb)
|
||||
bb.B = storage.MarshalBlock(bb.B[:0], b)
|
||||
|
||||
var addr tmpBlockAddr
|
||||
addr.offset = tbf.offset
|
||||
|
||||
tbfBufLen := len(tbf.buf)
|
||||
tbf.buf = storage.MarshalBlock(tbf.buf, b)
|
||||
addr.size = len(tbf.buf) - tbfBufLen
|
||||
addr.size = len(bb.B)
|
||||
tbf.offset += uint64(addr.size)
|
||||
if tbf.offset <= maxInmemoryTmpBlocksFile {
|
||||
if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
|
||||
// Fast path - the data fits tbf.buf
|
||||
tbf.buf = append(tbf.buf, bb.B...)
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
// Slow path: flush the data from tbf.buf to file.
|
||||
if tbf.f == nil {
|
||||
f, err := ioutil.TempFile(tmpBlocksDir, "")
|
||||
if err != nil {
|
||||
return addr, err
|
||||
}
|
||||
tbf.f = f
|
||||
tbf.bw = getBufioWriter(f)
|
||||
tmpBlocksFilesCreated.Inc()
|
||||
}
|
||||
_, err := tbf.bw.Write(tbf.buf)
|
||||
tbf.buf = tbf.buf[:0]
|
||||
_, err := tbf.f.Write(tbf.buf)
|
||||
tbf.buf = append(tbf.buf[:0], bb.B...)
|
||||
if err != nil {
|
||||
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
|
||||
}
|
||||
@@ -124,15 +122,15 @@ func (tbf *tmpBlocksFile) Finalize() error {
|
||||
if tbf.f == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := tbf.bw.Flush()
|
||||
putBufioWriter(tbf.bw)
|
||||
tbf.bw = nil
|
||||
if _, err := tbf.f.Write(tbf.buf); err != nil {
|
||||
return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err)
|
||||
}
|
||||
tbf.buf = tbf.buf[:0]
|
||||
if _, err := tbf.f.Seek(0, 0); err != nil {
|
||||
logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
|
||||
}
|
||||
mustFadviseRandomRead(tbf.f)
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
|
||||
@@ -167,10 +165,6 @@ func (tbf *tmpBlocksFile) MustClose() {
|
||||
if tbf.f == nil {
|
||||
return
|
||||
}
|
||||
if tbf.bw != nil {
|
||||
putBufioWriter(tbf.bw)
|
||||
tbf.bw = nil
|
||||
}
|
||||
fname := tbf.f.Name()
|
||||
|
||||
// Remove the file at first, then close it.
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestTmpBlocksFileSerial(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTmpBlocksFileConcurrent(t *testing.T) {
|
||||
concurrency := 4
|
||||
concurrency := 3
|
||||
ch := make(chan error, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
@@ -69,7 +69,7 @@ func testTmpBlocksFile() error {
|
||||
_, _, _ = b.MarshalData(0, 0)
|
||||
return &b
|
||||
}
|
||||
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile / 2, 2 * maxInmemoryTmpBlocksFile} {
|
||||
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
|
||||
err := func() error {
|
||||
tbf := getTmpBlocksFile()
|
||||
defer putTmpBlocksFile(tbf)
|
||||
@@ -94,7 +94,7 @@ func testTmpBlocksFile() error {
|
||||
}
|
||||
|
||||
// Read blocks in parallel and verify them
|
||||
concurrency := 3
|
||||
concurrency := 2
|
||||
workCh := make(chan int)
|
||||
doneCh := make(chan error)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
|
||||
@@ -557,7 +557,9 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error {
|
||||
if err := promql.ValidateMaxPointsPerTimeseries(start, end, step); err != nil {
|
||||
return err
|
||||
}
|
||||
start, end = promql.AdjustStartEnd(start, end, step)
|
||||
if mayCache {
|
||||
start, end = promql.AdjustStartEnd(start, end, step)
|
||||
}
|
||||
|
||||
ec := promql.EvalConfig{
|
||||
Start: start,
|
||||
|
||||
@@ -353,6 +353,25 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Remove dstLabel from grouping like Prometheus does.
|
||||
modifier := &afa.ae.Modifier
|
||||
switch strings.ToLower(modifier.Op) {
|
||||
case "without":
|
||||
modifier.Args = append(modifier.Args, dstLabel)
|
||||
case "by":
|
||||
dstArgs := modifier.Args[:0]
|
||||
for _, arg := range modifier.Args {
|
||||
if arg == dstLabel {
|
||||
continue
|
||||
}
|
||||
dstArgs = append(dstArgs, arg)
|
||||
}
|
||||
modifier.Args = dstArgs
|
||||
default:
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
afe := func(tss []*timeseries) []*timeseries {
|
||||
m := make(map[float64]bool)
|
||||
for _, ts := range tss {
|
||||
|
||||
@@ -322,6 +322,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
||||
}
|
||||
src := tssRight[0]
|
||||
for _, ts := range tssLeft {
|
||||
resetMetricGroupIfRequired(be, ts)
|
||||
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
|
||||
rvsLeft = append(rvsLeft, ts)
|
||||
rvsRight = append(rvsRight, src)
|
||||
@@ -332,6 +333,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
||||
}
|
||||
src := tssLeft[0]
|
||||
for _, ts := range tssRight {
|
||||
resetMetricGroupIfRequired(be, ts)
|
||||
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
|
||||
rvsLeft = append(rvsLeft, src)
|
||||
rvsRight = append(rvsRight, ts)
|
||||
|
||||
@@ -1859,9 +1859,9 @@ func TestExecSuccess(t *testing.T) {
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_left() duplicate_timeseries`, func(t *testing.T) {
|
||||
t.Run(`vector * on(foo) group_left() duplicate_nonoverlapping_timeseries`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `label_set(time()/10, "foo", "bar") + on(foo) group_left() (
|
||||
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left() (
|
||||
label_set(time() < 1400, "foo", "bar", "op", "le"),
|
||||
label_set(time() >= 1400, "foo", "bar", "op", "ge"),
|
||||
)`
|
||||
@@ -1870,13 +1870,85 @@ func TestExecSuccess(t *testing.T) {
|
||||
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
}}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_left(__name__)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left(__name__)
|
||||
label_set(time(), "foo", "bar", "__name__", "aaa")`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.MetricGroup = []byte("aaa")
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_right()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_right(xx) (
|
||||
label_set(time(), "foo", "bar", "__name__", "aaa"),
|
||||
label_set(time()+3, "foo", "bar", "__name__", "yyy","ppp", "123"),
|
||||
))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1103, 1323, 1543, 1763, 1983, 2203},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("ppp"),
|
||||
Value: []byte("123"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on() group_left scalar`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)`
|
||||
@@ -3818,6 +3890,107 @@ func TestExecSuccess(t *testing.T) {
|
||||
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5, r6}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`count_values by (xxx)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `count_values("xxx", label_set(10, "foo", "bar", "xxx", "aaa") or label_set(floor(time()/600), "foo", "bar", "baz", "xx")) by (xxx)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1, nan, nan, nan, nan, nan},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("1"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, 1, 1, 1, nan, nan},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("2"),
|
||||
},
|
||||
}
|
||||
r3 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, nan, nan, nan, 1, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("3"),
|
||||
},
|
||||
}
|
||||
r4 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1, 1, 1, 1, 1, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r4.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("10"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3, r4}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`count_values without (baz)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `count_values("xxx", label_set(floor(time()/600), "foo", "bar")) without (baz)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1, nan, nan, nan, nan, nan},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("1"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, 1, 1, 1, nan, nan},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("2"),
|
||||
},
|
||||
}
|
||||
r3 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, nan, nan, nan, 1, 1},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xxx"),
|
||||
Value: []byte("3"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
}
|
||||
|
||||
func TestExecError(t *testing.T) {
|
||||
|
||||
@@ -211,7 +211,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
||||
|
||||
rfa.prevValue = nan
|
||||
rfa.prevTimestamp = tStart - maxPrevInterval
|
||||
if i > 0 && timestamps[i-1] > rfa.prevTimestamp {
|
||||
if i < len(timestamps) && i > 0 && timestamps[i-1] > rfa.prevTimestamp {
|
||||
rfa.prevValue = values[i-1]
|
||||
rfa.prevTimestamp = timestamps[i-1]
|
||||
}
|
||||
|
||||
@@ -359,7 +359,7 @@ func TestRollupNoWindowNoPoints(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{2, 0, 0, 0, 0, 0, 0, 0}
|
||||
valuesExpected := []float64{2, 0, 0, 0, nan, nan, nan, nan}
|
||||
timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144, 148}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
@@ -390,7 +390,7 @@ func TestRollupWindowNoPoints(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{34, 34, 34, nan}
|
||||
valuesExpected := []float64{nan, nan, nan, nan}
|
||||
timestampsExpected := []int64{161, 171, 181, 191}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
@@ -421,7 +421,7 @@ func TestRollupNoWindowPartialPoints(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{12, 44, 34, 34}
|
||||
valuesExpected := []float64{12, 44, 34, nan}
|
||||
timestampsExpected := []int64{100, 120, 140, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
@@ -466,7 +466,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{44, 34, 34, 34}
|
||||
valuesExpected := []float64{44, 34, 34, nan}
|
||||
timestampsExpected := []int64{100, 120, 140, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
@@ -480,7 +480,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 54, 44, 34}
|
||||
valuesExpected := []float64{nan, 54, 44, nan}
|
||||
timestampsExpected := []int64{0, 50, 100, 150}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
DOCKER_NAMESPACE := victoriametrics
|
||||
BUILDER_IMAGE := local/builder:go1.12.9
|
||||
BUILDER_IMAGE := local/builder:go1.13.0
|
||||
CERTS_IMAGE := local/certs:1.0.2
|
||||
|
||||
package-certs:
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
FROM golang:1.12.9
|
||||
FROM golang:1.13.0
|
||||
STOPSIGNAL SIGINT
|
||||
|
||||
111
lib/fs/dir_remover.go
Normal file
111
lib/fs/dir_remover.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
func mustRemoveAll(path string) bool {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
mustSyncParentDirIfExists(path)
|
||||
return true
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
const minSleepTime = 100 * time.Millisecond
|
||||
const maxSleepTime = time.Second
|
||||
sleepTime := minSleepTime
|
||||
for {
|
||||
var path string
|
||||
select {
|
||||
case path = <-removeDirCh:
|
||||
default:
|
||||
if atomic.LoadUint64(&stopDirRemover) != 0 {
|
||||
return
|
||||
}
|
||||
time.Sleep(minSleepTime)
|
||||
continue
|
||||
}
|
||||
if mustRemoveAll(path) {
|
||||
sleepTime = minSleepTime
|
||||
continue
|
||||
}
|
||||
|
||||
// Couldn't remove the directory at the path because of NFS lock.
|
||||
// Sleep for a while and try again.
|
||||
// Do not limit the amount of time required for deleting the directory,
|
||||
// since this may break on laggy NFS.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
|
||||
time.Sleep(sleepTime)
|
||||
if sleepTime < maxSleepTime {
|
||||
sleepTime *= 2
|
||||
} else {
|
||||
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isTemporaryNFSError(err error) bool {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
var dirRemoverWG sync.WaitGroup
|
||||
var stopDirRemover uint64
|
||||
|
||||
func init() {
|
||||
dirRemoverWG.Add(1)
|
||||
go func() {
|
||||
defer dirRemoverWG.Done()
|
||||
dirRemover()
|
||||
}()
|
||||
}
|
||||
|
||||
// MustStopDirRemover must be called in the end of graceful shutdown
|
||||
// in order to wait for removing the remaining directories from removeDirCh.
|
||||
//
|
||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
|
||||
// is called.
|
||||
func MustStopDirRemover() {
|
||||
atomic.StoreUint64(&stopDirRemover, 1)
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
dirRemoverWG.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
const maxWaitTime = 5 * time.Second
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(maxWaitTime):
|
||||
logger.Panicf("FATAL: cannot stop dirRemover in %s", maxWaitTime)
|
||||
}
|
||||
}
|
||||
59
lib/fs/fs.go
59
lib/fs/fs.go
@@ -6,9 +6,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@@ -248,62 +246,7 @@ func mustSyncParentDirIfExists(path string) {
|
||||
//
|
||||
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
func MustRemoveAll(path string) {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
mustSyncParentDirIfExists(path)
|
||||
return
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
// Schedule for later directory removal.
|
||||
select {
|
||||
case removeDirCh <- path:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
|
||||
}
|
||||
}
|
||||
|
||||
var removeDirCh = make(chan string, 1024)
|
||||
|
||||
func dirRemover() {
|
||||
for path := range removeDirCh {
|
||||
attempts := 0
|
||||
for {
|
||||
err := os.RemoveAll(path)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
}
|
||||
// NFS prevents from removing directories with open files.
|
||||
// Sleep for a while and try again in the hope open files will be closed.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
|
||||
attempts++
|
||||
if attempts > 10 {
|
||||
logger.Panicf("FATAL: cannot remove %q in %d attempts: %s", path, attempts, err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
// Make sure the parent directory doesn't contain references
|
||||
// to the current directory.
|
||||
mustSyncParentDirIfExists(path)
|
||||
}
|
||||
}
|
||||
|
||||
func isTemporaryNFSError(err error) bool {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
func init() {
|
||||
go dirRemover()
|
||||
_ = mustRemoveAll(path)
|
||||
}
|
||||
|
||||
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
|
||||
|
||||
@@ -59,6 +59,8 @@ const rawItemsFlushInterval = time.Second
|
||||
type Table struct {
|
||||
path string
|
||||
|
||||
flushCallback func()
|
||||
|
||||
partsLock sync.Mutex
|
||||
parts []*partWrapper
|
||||
|
||||
@@ -121,8 +123,11 @@ func (pw *partWrapper) decRef() {
|
||||
|
||||
// OpenTable opens a table on the given path.
|
||||
//
|
||||
// Optional flushCallback is called every time new data batch is flushed
|
||||
// to the underlying storage and becomes visible to search.
|
||||
//
|
||||
// The table is created if it doesn't exist yet.
|
||||
func OpenTable(path string) (*Table, error) {
|
||||
func OpenTable(path string, flushCallback func()) (*Table, error) {
|
||||
path = filepath.Clean(path)
|
||||
logger.Infof("opening table %q...", path)
|
||||
startTime := time.Now()
|
||||
@@ -145,11 +150,12 @@ func OpenTable(path string) (*Table, error) {
|
||||
}
|
||||
|
||||
tb := &Table{
|
||||
path: path,
|
||||
parts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
flockF: flockF,
|
||||
stopCh: make(chan struct{}),
|
||||
path: path,
|
||||
flushCallback: flushCallback,
|
||||
parts: pws,
|
||||
mergeIdx: uint64(time.Now().UnixNano()),
|
||||
flockF: flockF,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
tb.startPartMergers()
|
||||
tb.startRawItemsFlusher()
|
||||
@@ -444,6 +450,9 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
|
||||
if err := tb.mergeParts(pws, nil, true); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
|
||||
}
|
||||
if tb.flushCallback != nil {
|
||||
tb.flushCallback()
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -39,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) {
|
||||
|
||||
func() {
|
||||
// Re-open the table and verify the search works.
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
@@ -74,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) {
|
||||
|
||||
// Re-open the table and verify the search works.
|
||||
func() {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
@@ -146,7 +147,11 @@ func testTableSearchSerial(tb *Table, items []string) error {
|
||||
}
|
||||
|
||||
func newTestTable(path string, itemsCount int) (*Table, []string, error) {
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open table: %s", err)
|
||||
}
|
||||
@@ -159,6 +164,9 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
|
||||
items[i] = item
|
||||
}
|
||||
tb.DebugFlush()
|
||||
if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 {
|
||||
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
|
||||
}
|
||||
|
||||
sort.Strings(items)
|
||||
return tb, items, nil
|
||||
|
||||
@@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
|
||||
|
||||
// Force finishing pending merges
|
||||
tb.MustClose()
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@@ -20,7 +21,7 @@ func TestTableOpenClose(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Create a new table
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create new table: %s", err)
|
||||
}
|
||||
@@ -30,7 +31,7 @@ func TestTableOpenClose(t *testing.T) {
|
||||
|
||||
// Re-open created table multiple times.
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open created table: %s", err)
|
||||
}
|
||||
@@ -44,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) {
|
||||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb1, err := OpenTable(path)
|
||||
tb1, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb2, err := OpenTable(path)
|
||||
tb2, err := OpenTable(path, nil)
|
||||
if err == nil {
|
||||
tb2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened table")
|
||||
@@ -68,7 +69,11 @@ func TestTableAddItemSerial(t *testing.T) {
|
||||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -78,6 +83,9 @@ func TestTableAddItemSerial(t *testing.T) {
|
||||
|
||||
// Verify items count after pending items flush.
|
||||
tb.DebugFlush()
|
||||
if atomic.LoadUint64(&flushes) == 0 {
|
||||
t.Fatalf("unexpected zero flushes")
|
||||
}
|
||||
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
@@ -91,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) {
|
||||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -124,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
||||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -155,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
|
||||
}()
|
||||
|
||||
// Verify snapshots contain all the data.
|
||||
tb1, err := OpenTable(snapshot1)
|
||||
tb1, err := OpenTable(snapshot1, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
defer tb1.MustClose()
|
||||
|
||||
tb2, err := OpenTable(snapshot2)
|
||||
tb2, err := OpenTable(snapshot2, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -205,7 +213,11 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
||||
_ = os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
tb, err := OpenTable(path)
|
||||
var flushes uint64
|
||||
flushCallback := func() {
|
||||
atomic.AddUint64(&flushes, 1)
|
||||
}
|
||||
tb, err := OpenTable(path, flushCallback)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -215,6 +227,10 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
||||
|
||||
// Verify items count after pending items flush.
|
||||
tb.DebugFlush()
|
||||
if atomic.LoadUint64(&flushes) == 0 {
|
||||
t.Fatalf("unexpected zero flushes")
|
||||
}
|
||||
|
||||
var m TableMetrics
|
||||
tb.UpdateMetrics(&m)
|
||||
if m.ItemsCount != itemsCount {
|
||||
@@ -227,7 +243,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
|
||||
testReopenTable(t, path, itemsCount)
|
||||
|
||||
// Add more items in order to verify merge between inmemory parts and file-based parts.
|
||||
tb, err = OpenTable(path)
|
||||
tb, err = OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open %q: %s", path, err)
|
||||
}
|
||||
@@ -269,7 +285,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
||||
t.Helper()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
tb, err := OpenTable(path)
|
||||
tb, err := OpenTable(path, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot re-open %q: %s", path, err)
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -118,7 +118,7 @@ func skipRemote(dAtA []byte) (n int, err error) {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -176,7 +176,7 @@ func skipRemote(dAtA []byte) (n int, err error) {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
innerWire |= (uint64(b) & 0x7F) << shift
|
||||
innerWire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -82,7 +82,7 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Timestamp |= (int64(b) & 0x7F) << shift
|
||||
m.Timestamp |= int64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -128,7 +128,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte, dstLabels []Label, dstSamples []Samp
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -255,7 +255,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -283,7 +283,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= (uint64(b) & 0x7F) << shift
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -312,7 +312,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= (uint64(b) & 0x7F) << shift
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -363,7 +363,7 @@ func skipTypes(dAtA []byte) (n int, err error) {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@@ -421,7 +421,7 @@ func skipTypes(dAtA []byte) (n int, err error) {
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
innerWire |= (uint64(b) & 0x7F) << shift
|
||||
innerWire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
|
||||
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
|
||||
}
|
||||
|
||||
tb, err := mergeset.OpenTable(path)
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagCache)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
|
||||
}
|
||||
@@ -405,7 +405,7 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
func (db *indexDB) invalidateTagCache() {
|
||||
func invalidateTagCache() {
|
||||
// This function must be fast, since it is called each
|
||||
// time new timeseries is added.
|
||||
atomic.AddUint64(&tagFiltersKeyGen, 1)
|
||||
@@ -513,8 +513,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
|
||||
return fmt.Errorf("cannot create indexes: %s", err)
|
||||
}
|
||||
|
||||
// Invalidate tag cache, since it doesn't contain tags for the created mn -> TSID mapping.
|
||||
db.invalidateTagCache()
|
||||
// There is no need in invalidating tag cache, since it is invalidated
|
||||
// on db.tb flush via invalidateTagCache flushCallback passed to OpenTable.
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -890,7 +890,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||
db.updateDeletedMetricIDs(metricIDs)
|
||||
|
||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||
db.invalidateTagCache()
|
||||
invalidateTagCache()
|
||||
|
||||
// Do not reset uselessTagFiltersCache, since the found metricIDs
|
||||
// on cache miss are filtered out later with deletedMetricIDs.
|
||||
|
||||
@@ -367,17 +367,8 @@ func (mn *MetricName) Unmarshal(src []byte) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Verify no identical tag keys.
|
||||
if len(mn.Tags) > 0 {
|
||||
prevKey := mn.Tags[0].Key
|
||||
for i := range mn.Tags[1:] {
|
||||
t := &mn.Tags[1+i]
|
||||
if bytes.Equal(t.Key, prevKey) {
|
||||
return fmt.Errorf("found duplicate key %q", prevKey)
|
||||
}
|
||||
prevKey = t.Key
|
||||
}
|
||||
}
|
||||
// There is no need in verifying for identical tag keys,
|
||||
// since they must be handled in MetricName.Marshal inside marshalTags.
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -584,8 +575,15 @@ func (ts *canonicalTagsSort) Swap(i, j int) {
|
||||
}
|
||||
|
||||
func marshalTags(dst []byte, tags []Tag) []byte {
|
||||
var prevKey []byte
|
||||
for i := range tags {
|
||||
dst = tags[i].Marshal(dst)
|
||||
t := &tags[i]
|
||||
if string(prevKey) == string(t.Key) {
|
||||
// Skip duplicate keys, since they aren't allowed in Prometheus data model.
|
||||
continue
|
||||
}
|
||||
prevKey = t.Key
|
||||
dst = t.Marshal(dst)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
@@ -34,6 +34,32 @@ func testMetricNameSortTags(t *testing.T, tags, expectedTags []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricNameMarshalDuplicateKeys(t *testing.T) {
|
||||
var mn MetricName
|
||||
mn.MetricGroup = []byte("xxx")
|
||||
mn.AddTag("foo", "bar")
|
||||
mn.AddTag("duplicate", "tag")
|
||||
mn.AddTag("duplicate", "tag")
|
||||
mn.AddTag("tt", "xx")
|
||||
mn.AddTag("duplicate", "tag2")
|
||||
|
||||
var mnExpected MetricName
|
||||
mnExpected.MetricGroup = []byte("xxx")
|
||||
mnExpected.AddTag("duplicate", "tag")
|
||||
mnExpected.AddTag("foo", "bar")
|
||||
mnExpected.AddTag("tt", "xx")
|
||||
|
||||
mn.sortTags()
|
||||
data := mn.Marshal(nil)
|
||||
var mn1 MetricName
|
||||
if err := mn1.Unmarshal(data); err != nil {
|
||||
t.Fatalf("cannot unmarshal mn %s: %s", &mn, err)
|
||||
}
|
||||
if !reflect.DeepEqual(&mnExpected, &mn1) {
|
||||
t.Fatalf("unexpected mn unmarshaled;\ngot\n%+v\nwant\n%+v", &mn1, &mnExpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricNameMarshalUnmarshal(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
for tagsCount := 0; tagsCount < 10; tagsCount++ {
|
||||
|
||||
Reference in New Issue
Block a user