Compare commits

...

13 Commits

Author SHA1 Message Date
Aliaksandr Valialkin
88f8670ede lib/fs: add MustStopDirRemover for waiting until pending directories are removed on graceful shutdown
This patch is mainly required for laggy NFS. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
2019-09-05 11:13:17 +03:00
Aliaksandr Valialkin
9eb5de334f lib/storage: typo fix 2019-09-04 19:58:01 +03:00
Aliaksandr Valialkin
6954e126fc app/vmselect/promql: ignore grouping by destination label in count_values, since such a grouping is performed automatically 2019-09-04 19:58:01 +03:00
Aliaksandr Valialkin
bce35b8dd9 README.md: mention that Prometheus doesn't drop data when VictoriaMetrics restarts 2019-09-04 18:40:39 +03:00
Aliaksandr Valialkin
16dd145586 lib/storage: remove duplicate tag keys on MetricName.Marshal call
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/172
2019-09-04 18:13:45 +03:00
Aliaksandr Valialkin
cd2c9e39da deployment/docker: switch Go builder from Go 1.12.9 to Go 1.13.0 2019-09-04 17:17:23 +03:00
Aliaksandr Valialkin
305e7bc981 app/vmselect/promql: do not return artificial points beyond the last point in time series 2019-09-04 16:35:34 +03:00
Aliaksandr Valialkin
9721d06c6a app/vmselect/prometheus: do not adjust start and end args in /api/v1/query_range if nocache=1 arg is set
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/171
2019-09-04 13:10:09 +03:00
Aliaksandr Valialkin
4862e93024 lib/fs: try harder with directory removal on NFS in the event of temporary lock
Do not give up after 11 attempts of directory removal on laggy NFS.

Add `vm_nfs_dir_remove_failed_attempts_total` metric for counting the number of failed attempts
on directory removal.

Log failed attempts on directory removal after long sleep times.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162
2019-09-04 12:24:50 +03:00
Aliaksandr Valialkin
db4560ca31 app/vmselect/promql: reset timeseries name on group_left and group_right as Prometheus does 2019-09-03 20:42:54 +03:00
Aliaksandr Valialkin
1575a560f0 app/vmselect/netstorage: adaptively adjust the maximum inmemory file size for storing temporary blocks
The maximum inmemory file size now depends on `-memory.allowedPercent`.
This should improve performance and reduce the number of filesystem calls
on machines with big amounts of RAM when performing heavy queries
over big number of samples and time series.
2019-09-03 13:32:09 +03:00
Aliaksandr Valialkin
e1d76ec1f3 lib/storage: invalidate tagFilters -> TSIDS cache when newly added index data becomes visible to search
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/163
2019-08-29 15:08:35 +03:00
Aliaksandr Valialkin
aeaa5de5fe lib/prombp: apply ba06b47c16
The following commands used:

gofmt -r '(uint64(x)&0x7F)<<shift -> uint64(x&0x7F)<<shift' -w ./lib/prompb/
gofmt -r '(int64(x)&0x7F)<<shift -> int64(x&0x7F)<<shift' -w ./lib/prompb/
2019-08-29 13:35:27 +03:00
24 changed files with 484 additions and 172 deletions

View File

@@ -186,6 +186,9 @@ Follow the following steps during the upgrade:
2) Wait until the process stops. This can take a few seconds.
3) Start the upgraded VictoriaMetrics.
Prometheus doesn't drop data during VictoriaMetrics restart.
See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details.
### How to apply new config to VictoriaMetrics?
@@ -195,6 +198,9 @@ VictoriaMetrics must be restarted for applying new config:
2) Wait until the process stops. This can take a few seconds.
3) Start VictoriaMetrics with the new config.
Prometheus doesn't drop data during VictoriaMetrics restart.
See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details.
### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)?

View File

@@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
@@ -43,6 +44,8 @@ func main() {
vmstorage.Stop()
vmselect.Stop()
fs.MustStopDirRemover()
logger.Infof("the VictoriaMetrics has been stopped in %s", time.Since(startTime))
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -92,7 +93,7 @@ func setUp() {
func processFlags() {
flag.Parse()
for _, fs := range []struct {
for _, fv := range []struct {
flag string
value string
}{
@@ -103,8 +104,8 @@ func processFlags() {
{flag: "loggerLevel", value: testLogLevel},
} {
// panics if flag doesn't exist
if err := flag.Lookup(fs.flag).Value.Set(fs.value); err != nil {
log.Fatalf("unable to set %q with value %q, err: %v", fs.flag, fs.value, err)
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
}
}
}
@@ -121,13 +122,14 @@ func waitFor(timeout time.Duration, f func() bool) error {
}
func tearDown() {
vminsert.Stop()
vmstorage.Stop()
vmselect.Stop()
if err := httpserver.Stop(*httpListenAddr); err != nil {
log.Fatalf("cannot stop the webservice: %s", err)
}
os.RemoveAll(storagePath)
vminsert.Stop()
vmstorage.Stop()
vmselect.Stop()
fs.MustRemoveAll(storagePath)
fs.MustStopDirRemover()
}
func TestWriteRead(t *testing.T) {

View File

@@ -1,7 +1,6 @@
package netstorage
import (
"bufio"
"fmt"
"io/ioutil"
"os"
@@ -10,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
@@ -30,13 +30,23 @@ func InitTmpBlocksDir(tmpDirPath string) {
var tmpBlocksDir string
const maxInmemoryTmpBlocksFile = 512 * 1024
func maxInmemoryTmpBlocksFile() int {
mem := memory.Allowed()
maxLen := mem / 1024
if maxLen < 64*1024 {
return 64 * 1024
}
return maxLen
}
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
return float64(maxInmemoryTmpBlocksFile())
})
type tmpBlocksFile struct {
buf []byte
f *os.File
bw *bufio.Writer
f *os.File
offset uint64
}
@@ -44,7 +54,9 @@ type tmpBlocksFile struct {
func getTmpBlocksFile() *tmpBlocksFile {
v := tmpBlocksFilePool.Get()
if v == nil {
return &tmpBlocksFile{}
return &tmpBlocksFile{
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
}
}
return v.(*tmpBlocksFile)
}
@@ -53,7 +65,6 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) {
tbf.MustClose()
tbf.buf = tbf.buf[:0]
tbf.f = nil
tbf.bw = nil
tbf.offset = 0
tmpBlocksFilePool.Put(tbf)
}
@@ -69,22 +80,6 @@ func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
}
func getBufioWriter(f *os.File) *bufio.Writer {
v := bufioWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2)
}
bw := v.(*bufio.Writer)
bw.Reset(f)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bufioWriterPool.Put(bw)
}
var bufioWriterPool sync.Pool
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
// WriteBlock writes b to tbf.
@@ -92,28 +87,31 @@ var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_tota
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = storage.MarshalBlock(bb.B[:0], b)
var addr tmpBlockAddr
addr.offset = tbf.offset
tbfBufLen := len(tbf.buf)
tbf.buf = storage.MarshalBlock(tbf.buf, b)
addr.size = len(tbf.buf) - tbfBufLen
addr.size = len(bb.B)
tbf.offset += uint64(addr.size)
if tbf.offset <= maxInmemoryTmpBlocksFile {
if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, bb.B...)
return addr, nil
}
// Slow path: flush the data from tbf.buf to file.
if tbf.f == nil {
f, err := ioutil.TempFile(tmpBlocksDir, "")
if err != nil {
return addr, err
}
tbf.f = f
tbf.bw = getBufioWriter(f)
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.bw.Write(tbf.buf)
tbf.buf = tbf.buf[:0]
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], bb.B...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
}
@@ -124,15 +122,15 @@ func (tbf *tmpBlocksFile) Finalize() error {
if tbf.f == nil {
return nil
}
err := tbf.bw.Flush()
putBufioWriter(tbf.bw)
tbf.bw = nil
if _, err := tbf.f.Write(tbf.buf); err != nil {
return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err)
}
tbf.buf = tbf.buf[:0]
if _, err := tbf.f.Seek(0, 0); err != nil {
logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
}
mustFadviseRandomRead(tbf.f)
return err
return nil
}
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
@@ -167,10 +165,6 @@ func (tbf *tmpBlocksFile) MustClose() {
if tbf.f == nil {
return
}
if tbf.bw != nil {
putBufioWriter(tbf.bw)
tbf.bw = nil
}
fname := tbf.f.Name()
// Remove the file at first, then close it.

View File

@@ -30,7 +30,7 @@ func TestTmpBlocksFileSerial(t *testing.T) {
}
func TestTmpBlocksFileConcurrent(t *testing.T) {
concurrency := 4
concurrency := 3
ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
@@ -69,7 +69,7 @@ func testTmpBlocksFile() error {
_, _, _ = b.MarshalData(0, 0)
return &b
}
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile / 2, 2 * maxInmemoryTmpBlocksFile} {
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
err := func() error {
tbf := getTmpBlocksFile()
defer putTmpBlocksFile(tbf)
@@ -94,7 +94,7 @@ func testTmpBlocksFile() error {
}
// Read blocks in parallel and verify them
concurrency := 3
concurrency := 2
workCh := make(chan int)
doneCh := make(chan error)
for i := 0; i < concurrency; i++ {

View File

@@ -557,7 +557,9 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error {
if err := promql.ValidateMaxPointsPerTimeseries(start, end, step); err != nil {
return err
}
start, end = promql.AdjustStartEnd(start, end, step)
if mayCache {
start, end = promql.AdjustStartEnd(start, end, step)
}
ec := promql.EvalConfig{
Start: start,

View File

@@ -353,6 +353,25 @@ func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
if err != nil {
return nil, err
}
// Remove dstLabel from grouping like Prometheus does.
modifier := &afa.ae.Modifier
switch strings.ToLower(modifier.Op) {
case "without":
modifier.Args = append(modifier.Args, dstLabel)
case "by":
dstArgs := modifier.Args[:0]
for _, arg := range modifier.Args {
if arg == dstLabel {
continue
}
dstArgs = append(dstArgs, arg)
}
modifier.Args = dstArgs
default:
// Do nothing
}
afe := func(tss []*timeseries) []*timeseries {
m := make(map[float64]bool)
for _, ts := range tss {

View File

@@ -322,6 +322,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
}
src := tssRight[0]
for _, ts := range tssLeft {
resetMetricGroupIfRequired(be, ts)
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
rvsLeft = append(rvsLeft, ts)
rvsRight = append(rvsRight, src)
@@ -332,6 +333,7 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
}
src := tssLeft[0]
for _, ts := range tssRight {
resetMetricGroupIfRequired(be, ts)
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
rvsLeft = append(rvsLeft, src)
rvsRight = append(rvsRight, ts)

View File

@@ -1859,9 +1859,9 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`vector * on(foo) group_left() duplicate_timeseries`, func(t *testing.T) {
t.Run(`vector * on(foo) group_left() duplicate_nonoverlapping_timeseries`, func(t *testing.T) {
t.Parallel()
q := `label_set(time()/10, "foo", "bar") + on(foo) group_left() (
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left() (
label_set(time() < 1400, "foo", "bar", "op", "le"),
label_set(time() >= 1400, "foo", "bar", "op", "ge"),
)`
@@ -1870,13 +1870,85 @@ func TestExecSuccess(t *testing.T) {
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`vector * on(foo) group_left(__name__)`, func(t *testing.T) {
t.Parallel()
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left(__name__)
label_set(time(), "foo", "bar", "__name__", "aaa")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("aaa")
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`vector * on(foo) group_right()`, func(t *testing.T) {
t.Parallel()
q := `sort(label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_right(xx) (
label_set(time(), "foo", "bar", "__name__", "aaa"),
label_set(time()+3, "foo", "bar", "__name__", "yyy","ppp", "123"),
))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1103, 1323, 1543, 1763, 1983, 2203},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("ppp"),
Value: []byte("123"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector * on() group_left scalar`, func(t *testing.T) {
t.Parallel()
q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)`
@@ -3818,6 +3890,107 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5, r6}
f(q, resultExpected)
})
t.Run(`count_values by (xxx)`, func(t *testing.T) {
t.Parallel()
q := `count_values("xxx", label_set(10, "foo", "bar", "xxx", "aaa") or label_set(floor(time()/600), "foo", "bar", "baz", "xx")) by (xxx)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, nan, nan, nan, nan, nan},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("xxx"),
Value: []byte("1"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, 1, 1, 1, nan, nan},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("xxx"),
Value: []byte("2"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, nan, 1, 1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("xxx"),
Value: []byte("3"),
},
}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{
{
Key: []byte("xxx"),
Value: []byte("10"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`count_values without (baz)`, func(t *testing.T) {
t.Parallel()
q := `count_values("xxx", label_set(floor(time()/600), "foo", "bar")) without (baz)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, nan, nan, nan, nan, nan},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xxx"),
Value: []byte("1"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, 1, 1, 1, nan, nan},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xxx"),
Value: []byte("2"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, nan, 1, 1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xxx"),
Value: []byte("3"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
}
func TestExecError(t *testing.T) {

View File

@@ -211,7 +211,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
rfa.prevValue = nan
rfa.prevTimestamp = tStart - maxPrevInterval
if i > 0 && timestamps[i-1] > rfa.prevTimestamp {
if i < len(timestamps) && i > 0 && timestamps[i-1] > rfa.prevTimestamp {
rfa.prevValue = values[i-1]
rfa.prevTimestamp = timestamps[i-1]
}

View File

@@ -359,7 +359,7 @@ func TestRollupNoWindowNoPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{2, 0, 0, 0, 0, 0, 0, 0}
valuesExpected := []float64{2, 0, 0, 0, nan, nan, nan, nan}
timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144, 148}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@@ -390,7 +390,7 @@ func TestRollupWindowNoPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{34, 34, 34, nan}
valuesExpected := []float64{nan, nan, nan, nan}
timestampsExpected := []int64{161, 171, 181, 191}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@@ -421,7 +421,7 @@ func TestRollupNoWindowPartialPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{12, 44, 34, 34}
valuesExpected := []float64{12, 44, 34, nan}
timestampsExpected := []int64{100, 120, 140, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@@ -466,7 +466,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{44, 34, 34, 34}
valuesExpected := []float64{44, 34, 34, nan}
timestampsExpected := []int64{100, 120, 140, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
@@ -480,7 +480,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 54, 44, 34}
valuesExpected := []float64{nan, 54, 44, nan}
timestampsExpected := []int64{0, 50, 100, 150}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})

View File

@@ -1,5 +1,5 @@
DOCKER_NAMESPACE := victoriametrics
BUILDER_IMAGE := local/builder:go1.12.9
BUILDER_IMAGE := local/builder:go1.13.0
CERTS_IMAGE := local/certs:1.0.2
package-certs:

View File

@@ -1,2 +1,2 @@
FROM golang:1.12.9
FROM golang:1.13.0
STOPSIGNAL SIGINT

111
lib/fs/dir_remover.go Normal file
View File

@@ -0,0 +1,111 @@
package fs
import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
func mustRemoveAll(path string) bool {
err := os.RemoveAll(path)
if err == nil {
// Make sure the parent directory doesn't contain references
// to the current directory.
mustSyncParentDirIfExists(path)
return true
}
if !isTemporaryNFSError(err) {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
}
// NFS prevents from removing directories with open files.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Schedule for later directory removal.
nfsDirRemoveFailedAttempts.Inc()
select {
case removeDirCh <- path:
default:
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
}
return false
}
var nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
var removeDirCh = make(chan string, 1024)
func dirRemover() {
const minSleepTime = 100 * time.Millisecond
const maxSleepTime = time.Second
sleepTime := minSleepTime
for {
var path string
select {
case path = <-removeDirCh:
default:
if atomic.LoadUint64(&stopDirRemover) != 0 {
return
}
time.Sleep(minSleepTime)
continue
}
if mustRemoveAll(path) {
sleepTime = minSleepTime
continue
}
// Couldn't remove the directory at the path because of NFS lock.
// Sleep for a while and try again.
// Do not limit the amount of time required for deleting the directory,
// since this may break on laggy NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/162 .
time.Sleep(sleepTime)
if sleepTime < maxSleepTime {
sleepTime *= 2
} else {
logger.Errorf("failed to remove directory %q due to NFS lock; retrying later", path)
}
}
}
func isTemporaryNFSError(err error) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
errStr := err.Error()
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
}
var dirRemoverWG sync.WaitGroup
var stopDirRemover uint64
func init() {
dirRemoverWG.Add(1)
go func() {
defer dirRemoverWG.Done()
dirRemover()
}()
}
// MustStopDirRemover must be called in the end of graceful shutdown
// in order to wait for removing the remaining directories from removeDirCh.
//
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover
// is called.
func MustStopDirRemover() {
atomic.StoreUint64(&stopDirRemover, 1)
doneCh := make(chan struct{})
go func() {
dirRemoverWG.Wait()
close(doneCh)
}()
const maxWaitTime = 5 * time.Second
select {
case <-doneCh:
return
case <-time.After(maxWaitTime):
logger.Panicf("FATAL: cannot stop dirRemover in %s", maxWaitTime)
}
}

View File

@@ -6,9 +6,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -248,62 +246,7 @@ func mustSyncParentDirIfExists(path string) {
//
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func MustRemoveAll(path string) {
err := os.RemoveAll(path)
if err == nil {
// Make sure the parent directory doesn't contain references
// to the current directory.
mustSyncParentDirIfExists(path)
return
}
if !isTemporaryNFSError(err) {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
}
// NFS prevents from removing directories with open files.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Schedule for later directory removal.
select {
case removeDirCh <- path:
default:
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
}
}
var removeDirCh = make(chan string, 1024)
func dirRemover() {
for path := range removeDirCh {
attempts := 0
for {
err := os.RemoveAll(path)
if err == nil {
break
}
if !isTemporaryNFSError(err) {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
}
// NFS prevents from removing directories with open files.
// Sleep for a while and try again in the hope open files will be closed.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
attempts++
if attempts > 10 {
logger.Panicf("FATAL: cannot remove %q in %d attempts: %s", path, attempts, err)
}
time.Sleep(100 * time.Millisecond)
}
// Make sure the parent directory doesn't contain references
// to the current directory.
mustSyncParentDirIfExists(path)
}
}
func isTemporaryNFSError(err error) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
errStr := err.Error()
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
}
func init() {
go dirRemover()
_ = mustRemoveAll(path)
}
// HardLinkFiles makes hard links for all the files from srcDir in dstDir.

View File

@@ -59,6 +59,8 @@ const rawItemsFlushInterval = time.Second
type Table struct {
path string
flushCallback func()
partsLock sync.Mutex
parts []*partWrapper
@@ -121,8 +123,11 @@ func (pw *partWrapper) decRef() {
// OpenTable opens a table on the given path.
//
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
// The table is created if it doesn't exist yet.
func OpenTable(path string) (*Table, error) {
func OpenTable(path string, flushCallback func()) (*Table, error) {
path = filepath.Clean(path)
logger.Infof("opening table %q...", path)
startTime := time.Now()
@@ -145,11 +150,12 @@ func OpenTable(path string) (*Table, error) {
}
tb := &Table{
path: path,
parts: pws,
mergeIdx: uint64(time.Now().UnixNano()),
flockF: flockF,
stopCh: make(chan struct{}),
path: path,
flushCallback: flushCallback,
parts: pws,
mergeIdx: uint64(time.Now().UnixNano()),
flockF: flockF,
stopCh: make(chan struct{}),
}
tb.startPartMergers()
tb.startRawItemsFlusher()
@@ -444,6 +450,9 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
if err := tb.mergeParts(pws, nil, true); err != nil {
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
}
if tb.flushCallback != nil {
tb.flushCallback()
}
}
for {

View File

@@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"sort"
"sync/atomic"
"testing"
"time"
)
@@ -39,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) {
func() {
// Re-open the table and verify the search works.
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}
@@ -74,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) {
// Re-open the table and verify the search works.
func() {
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}
@@ -146,7 +147,11 @@ func testTableSearchSerial(tb *Table, items []string) error {
}
func newTestTable(path string, itemsCount int) (*Table, []string, error) {
tb, err := OpenTable(path)
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil {
return nil, nil, fmt.Errorf("cannot open table: %s", err)
}
@@ -159,6 +164,9 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
items[i] = item
}
tb.DebugFlush()
if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 {
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
}
sort.Strings(items)
return tb, items, nil

View File

@@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
// Force finishing pending merges
tb.MustClose()
tb, err = OpenTable(path)
tb, err = OpenTable(path, nil)
if err != nil {
b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -20,7 +21,7 @@ func TestTableOpenClose(t *testing.T) {
}()
// Create a new table
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
@@ -30,7 +31,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times.
for i := 0; i < 10; i++ {
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
@@ -44,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path)
}()
tb1, err := OpenTable(path)
tb1, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}
defer tb1.MustClose()
for i := 0; i < 10; i++ {
tb2, err := OpenTable(path)
tb2, err := OpenTable(path, nil)
if err == nil {
tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table")
@@ -68,7 +69,11 @@ func TestTableAddItemSerial(t *testing.T) {
_ = os.RemoveAll(path)
}()
tb, err := OpenTable(path)
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -78,6 +83,9 @@ func TestTableAddItemSerial(t *testing.T) {
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
@@ -91,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) {
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path)
tb, err = OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -124,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
_ = os.RemoveAll(path)
}()
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -155,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
}()
// Verify snapshots contain all the data.
tb1, err := OpenTable(snapshot1)
tb1, err := OpenTable(snapshot1, nil)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
defer tb1.MustClose()
tb2, err := OpenTable(snapshot2)
tb2, err := OpenTable(snapshot2, nil)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -205,7 +213,11 @@ func TestTableAddItemsConcurrent(t *testing.T) {
_ = os.RemoveAll(path)
}()
tb, err := OpenTable(path)
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -215,6 +227,10 @@ func TestTableAddItemsConcurrent(t *testing.T) {
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
if m.ItemsCount != itemsCount {
@@ -227,7 +243,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path)
tb, err = OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
@@ -269,7 +285,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper()
for i := 0; i < 10; i++ {
tb, err := OpenTable(path)
tb, err := OpenTable(path, nil)
if err != nil {
t.Fatalf("cannot re-open %q: %s", path, err)
}

View File

@@ -31,7 +31,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -118,7 +118,7 @@ func skipRemote(dAtA []byte) (n int, err error) {
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -176,7 +176,7 @@ func skipRemote(dAtA []byte) (n int, err error) {
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
innerWire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}

View File

@@ -43,7 +43,7 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -82,7 +82,7 @@ func (m *Sample) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
m.Timestamp |= (int64(b) & 0x7F) << shift
m.Timestamp |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -128,7 +128,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte, dstLabels []Label, dstSamples []Samp
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -255,7 +255,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -283,7 +283,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -312,7 +312,7 @@ func (m *Label) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -363,7 +363,7 @@ func skipTypes(dAtA []byte) (n int, err error) {
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
@@ -421,7 +421,7 @@ func skipTypes(dAtA []byte) (n int, err error) {
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
innerWire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}

View File

@@ -116,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
}
tb, err := mergeset.OpenTable(path)
tb, err := mergeset.OpenTable(path, invalidateTagCache)
if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
}
@@ -405,7 +405,7 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
return dst, nil
}
func (db *indexDB) invalidateTagCache() {
func invalidateTagCache() {
// This function must be fast, since it is called each
// time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1)
@@ -513,8 +513,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
return fmt.Errorf("cannot create indexes: %s", err)
}
// Invalidate tag cache, since it doesn't contain tags for the created mn -> TSID mapping.
db.invalidateTagCache()
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagCache flushCallback passed to OpenTable.
return nil
}
@@ -890,7 +890,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
db.updateDeletedMetricIDs(metricIDs)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
db.invalidateTagCache()
invalidateTagCache()
// Do not reset uselessTagFiltersCache, since the found metricIDs
// on cache miss are filtered out later with deletedMetricIDs.

View File

@@ -367,17 +367,8 @@ func (mn *MetricName) Unmarshal(src []byte) error {
}
}
// Verify no identical tag keys.
if len(mn.Tags) > 0 {
prevKey := mn.Tags[0].Key
for i := range mn.Tags[1:] {
t := &mn.Tags[1+i]
if bytes.Equal(t.Key, prevKey) {
return fmt.Errorf("found duplicate key %q", prevKey)
}
prevKey = t.Key
}
}
// There is no need in verifying for identical tag keys,
// since they must be handled in MetricName.Marshal inside marshalTags.
return nil
}
@@ -584,8 +575,15 @@ func (ts *canonicalTagsSort) Swap(i, j int) {
}
func marshalTags(dst []byte, tags []Tag) []byte {
var prevKey []byte
for i := range tags {
dst = tags[i].Marshal(dst)
t := &tags[i]
if string(prevKey) == string(t.Key) {
// Skip duplicate keys, since they aren't allowed in Prometheus data model.
continue
}
prevKey = t.Key
dst = t.Marshal(dst)
}
return dst
}

View File

@@ -34,6 +34,32 @@ func testMetricNameSortTags(t *testing.T, tags, expectedTags []string) {
}
}
func TestMetricNameMarshalDuplicateKeys(t *testing.T) {
var mn MetricName
mn.MetricGroup = []byte("xxx")
mn.AddTag("foo", "bar")
mn.AddTag("duplicate", "tag")
mn.AddTag("duplicate", "tag")
mn.AddTag("tt", "xx")
mn.AddTag("duplicate", "tag2")
var mnExpected MetricName
mnExpected.MetricGroup = []byte("xxx")
mnExpected.AddTag("duplicate", "tag")
mnExpected.AddTag("foo", "bar")
mnExpected.AddTag("tt", "xx")
mn.sortTags()
data := mn.Marshal(nil)
var mn1 MetricName
if err := mn1.Unmarshal(data); err != nil {
t.Fatalf("cannot unmarshal mn %s: %s", &mn, err)
}
if !reflect.DeepEqual(&mnExpected, &mn1) {
t.Fatalf("unexpected mn unmarshaled;\ngot\n%+v\nwant\n%+v", &mn1, &mnExpected)
}
}
func TestMetricNameMarshalUnmarshal(t *testing.T) {
for i := 0; i < 10; i++ {
for tagsCount := 0; tagsCount < 10; tagsCount++ {