diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 4a2e42c42e..017bbaf466 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -1223,11 +1223,7 @@ func getCommonParamsInternal(r *http.Request, startTime time.Time, requireNonEmp if err != nil { return nil, err } - // Limit the `end` arg to the current time +2 days in the same way - // as it is limited during data ingestion. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/blob/ea06d2fd3ccbbb6aa4480ab3b04f7b671408be2a/lib/storage/table.go#L378 - // This should fix possible timestamp overflow - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2669 - maxTS := startTime.UnixNano()/1e6 + 2*24*3600*1000 + maxTS := int64(math.MaxInt64 / 1_000_000) if end > maxTS { end = maxTS } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1b46b625f9..1c8a7668ec 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -33,6 +33,8 @@ import ( var ( retentionPeriod = flagutil.NewRetentionDuration("retentionPeriod", "1M", "Data with timestamps outside the retentionPeriod is automatically deleted. The minimum retentionPeriod is 24h or 1d. "+ "See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter") + futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+ + "The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention") snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*") forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*") forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*") @@ -135,7 +137,12 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN()) if retentionPeriod.Duration() < 24*time.Hour { - logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod) + logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s. "+ + "See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention", retentionPeriod) + } + if futureRetention.Duration() < 2*24*time.Hour { + logger.Fatalf("-futureRetention cannot be smaller than 2 days; got %s. "+ + "See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention", futureRetention) } if *idbPrefillStart > 23*time.Hour { logger.Panicf("-storage.idbPrefillStart cannot exceed 23 hours; got %s", idbPrefillStart) @@ -145,6 +152,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { WG = syncwg.WaitGroup{} opts := storage.OpenOptions{ Retention: retentionPeriod.Duration(), + FutureRetention: futureRetention.Duration(), MaxHourlySeries: getMaxHourlySeries(), MaxDailySeries: getMaxDailySeries(), DisablePerDayIndex: *disablePerDayIndex, diff --git a/apptest/tests/backup_restore_test.go b/apptest/tests/backup_restore_test.go index c2f3fbfe05..f704be06a1 100644 --- a/apptest/tests/backup_restore_test.go +++ b/apptest/tests/backup_restore_test.go @@ -28,6 +28,7 @@ func TestSingleBackupRestore(t *testing.T) { return tc.MustStartVmsingle("vmsingle", []string{ "-storageDataPath=" + storageDataPath, "-retentionPeriod=100y", + "-futureRetention=2y", }) }, stopSUT: func() { @@ -60,11 +61,13 @@ func TestClusterBackupRestore(t *testing.T) { Vmstorage1Flags: []string{ "-storageDataPath=" + storage1DataPath, "-retentionPeriod=100y", + "-futureRetention=2y", }, Vmstorage2Instance: "vmstorage2", Vmstorage2Flags: []string{ "-storageDataPath=" + storage2DataPath, "-retentionPeriod=100y", + "-futureRetention=2y", }, VminsertInstance: "vminsert", VminsertFlags: []string{}, @@ -97,10 +100,16 @@ func TestClusterBackupRestore(t *testing.T) { func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) { t := tc.T() - genData := func(count int, prefix string, start, step int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) { - recs = make([]string, count) - wantSeries = make([]map[string]string, count) - wantQueryResults = make([]*apptest.QueryResult, count) + type data struct { + samples []string + wantSeries []map[string]string + wantQueryResults []*apptest.QueryResult + } + + genData := func(count int, prefix string, start, step int64) data { + recs := make([]string, count) + wantSeries := make([]map[string]string, count) + wantQueryResults := make([]*apptest.QueryResult, count) for i := range count { name := fmt.Sprintf("%s_%03d", prefix, i) value := float64(i) @@ -113,7 +122,15 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) { Samples: []*apptest.Sample{{Timestamp: timestamp, Value: value}}, } } - return recs, wantSeries, wantQueryResults + return data{recs, wantSeries, wantQueryResults} + } + + concatData := func(d1, d2 data) data { + var d data + d.samples = slices.Concat(d1.samples, d2.samples) + d.wantSeries = slices.Concat(d1.wantSeries, d2.wantSeries) + d.wantQueryResults = slices.Concat(d1.wantQueryResults, d2.wantQueryResults) + return d } backupBaseDir, err := filepath.Abs(filepath.Join(tc.Dir(), "backups")) @@ -190,10 +207,20 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) { // Use the same number of metrics and time range for all the data ingestions // below. const numMetrics = 1000 - // With 1000 metrics (one per minute), the time range spans 2 months. start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() end := time.Date(2025, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli() step := (end - start) / numMetrics + batch1 := genData(numMetrics, "batch1", start, step) + batch2 := genData(numMetrics, "batch2", start, step) + batches12 := concatData(batch1, batch2) + + now := time.Now().UTC() + startFuture := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() + endFuture := time.Date(now.Year()+1, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli() + stepFuture := (endFuture - startFuture) / numMetrics + batch1Future := genData(numMetrics, "batch1", startFuture, stepFuture) + batch2Future := genData(numMetrics, "batch2", startFuture, stepFuture) + batches12Future := concatData(batch1Future, batch2Future) // Verify backup/restore: // @@ -207,24 +234,25 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) { // - Start vmsingle // - Ensure that the queries return batch1 data only. - batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start, step) - batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start, step) - wantBatch12Series := slices.Concat(wantBatch1Series, wantBatch2Series) - wantBatch12QueryResults := slices.Concat(wantBatch1QueryResults, wantBatch2QueryResults) - sut := opts.startSUT() - sut.PrometheusAPIV1ImportPrometheus(t, batch1Data, apptest.QueryOpts{}) + sut.PrometheusAPIV1ImportPrometheus(t, batch1.samples, apptest.QueryOpts{}) + sut.PrometheusAPIV1ImportPrometheus(t, batch1Future.samples, apptest.QueryOpts{}) sut.ForceFlush(t) - assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series) - assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults) + assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, batch1.wantSeries) + assertSeries(sut, `{__name__=~"batch1.*"}`, startFuture, endFuture, batch1Future.wantSeries) + assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, batch1.wantQueryResults) + assertQueryResults(sut, `{__name__=~"batch1.*"}`, startFuture, endFuture, stepFuture, batch1Future.wantQueryResults) createBackup(sut, "batch1") - sut.PrometheusAPIV1ImportPrometheus(t, batch2Data, apptest.QueryOpts{}) + sut.PrometheusAPIV1ImportPrometheus(t, batch2.samples, apptest.QueryOpts{}) + sut.PrometheusAPIV1ImportPrometheus(t, batch2Future.samples, apptest.QueryOpts{}) sut.ForceFlush(t) - assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12Series) - assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, wantBatch12QueryResults) + assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, batches12.wantSeries) + assertSeries(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, batches12Future.wantSeries) + assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, batches12.wantQueryResults) + assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, stepFuture, batches12Future.wantQueryResults) createBackup(sut, "batch12") opts.stopSUT() @@ -233,6 +261,8 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) { sut = opts.startSUT() - assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series) - assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults) + assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, batch1.wantSeries) + assertSeries(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, batch1Future.wantSeries) + assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, batch1.wantQueryResults) + assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, stepFuture, batch1Future.wantQueryResults) } diff --git a/apptest/tests/future_timestamps_test.go b/apptest/tests/future_timestamps_test.go new file mode 100644 index 0000000000..e7a07a0377 --- /dev/null +++ b/apptest/tests/future_timestamps_test.go @@ -0,0 +1,211 @@ +package tests + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +func TestSingleFutureTimestamps(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + opts := testFutureTimestampsOpts{ + start: func() apptest.PrometheusWriteQuerier { + return tc.MustStartVmsingle("vmsingle", []string{ + "-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"), + "-retentionPeriod=100y", + "-futureRetention=100y", + }) + }, + stop: func() { + tc.StopApp("vmsingle") + }, + } + + testFutureTimestamps(tc, opts) +} + +func TestClusterFutureTimestamps(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + opts := testFutureTimestampsOpts{ + start: func() apptest.PrometheusWriteQuerier { + return tc.MustStartCluster(&apptest.ClusterOptions{ + Vmstorage1Instance: "vmstorage1", + Vmstorage1Flags: []string{ + "-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage1"), + "-retentionPeriod=100y", + "-futureRetention=100y", + }, + Vmstorage2Instance: "vmstorage2", + Vmstorage2Flags: []string{ + "-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage2"), + "-retentionPeriod=100y", + "-futureRetention=100y", + }, + VminsertInstance: "vminsert", + VminsertFlags: []string{}, + VmselectInstance: "vmselect", + VmselectFlags: []string{}, + }) + }, + stop: func() { + tc.StopApp("vminsert") + tc.StopApp("vmselect") + tc.StopApp("vmstorage1") + tc.StopApp("vmstorage2") + }, + } + + testFutureTimestamps(tc, opts) +} + +type testFutureTimestampsOpts struct { + start func() apptest.PrometheusWriteQuerier + stop func() +} + +func testFutureTimestamps(tc *apptest.TestCase, opts testFutureTimestampsOpts) { + t := tc.T() + + // assertSeries retrieves set of all metric names from the storage and + // compares it with the expected set. + assertSeries := func(app apptest.PrometheusQuerier, prefix string, start, end int64, want []map[string]string) { + t.Helper() + + query := fmt.Sprintf(`{__name__=~"metric_%s.*"}`, prefix) + tc.Assert(&apptest.AssertOptions{ + Msg: "unexpected /api/v1/series response", + Got: func() any { + return app.PrometheusAPIV1Series(t, query, apptest.QueryOpts{ + Start: fmt.Sprintf("%d", start), + End: fmt.Sprintf("%d", end), + }).Sort() + }, + Want: &apptest.PrometheusAPIV1SeriesResponse{ + Status: "success", + Data: want, + }, + FailNow: true, + }) + } + + // assertSeries retrieves all data from the storage and compares it with the + // expected result. + assertQueryResults := func(app apptest.PrometheusQuerier, prefix string, start, end, step int64, want []*apptest.QueryResult) { + t.Helper() + + query := fmt.Sprintf(`{__name__=~"metric_%s.*"}`, prefix) + tc.Assert(&apptest.AssertOptions{ + Msg: "unexpected /api/v1/query_range response", + Got: func() any { + return app.PrometheusAPIV1QueryRange(t, query, apptest.QueryOpts{ + Start: fmt.Sprintf("%d", start), + End: fmt.Sprintf("%d", end), + Step: fmt.Sprintf("%dms", step), + MaxLookback: fmt.Sprintf("%dms", step-1), + NoCache: "1", + }) + }, + Want: &apptest.PrometheusAPIV1QueryResponse{ + Status: "success", + Data: &apptest.QueryData{ + ResultType: "matrix", + Result: want, + }, + }, + FailNow: true, + }) + } + + f := func(prefix string, startTime, endTime time.Time, wantEmpty bool) { + const numMetrics = 1000 + start := startTime.UnixMilli() + end := endTime.UnixMilli() + step := (end - start) / numMetrics + data := genFutureTimestampsData(prefix, numMetrics, start, step) + if wantEmpty { + data.wantSeries = []map[string]string{} + data.wantQueryResults = []*apptest.QueryResult{} + } + + // Ingest data and check query results. + sut := opts.start() + sut.PrometheusAPIV1ImportPrometheus(t, data.samples, apptest.QueryOpts{}) + sut.ForceFlush(t) + assertSeries(sut, prefix, start, end, data.wantSeries) + assertQueryResults(sut, prefix, start, end, step, data.wantQueryResults) + + // Ensure the queries work after restrart. + opts.stop() + sut = opts.start() + assertSeries(sut, prefix, start, end, data.wantSeries) + assertQueryResults(sut, prefix, start, end, step, data.wantQueryResults) + + opts.stop() + } + + now := time.Now().UTC() + retentionLimit := 100 * 365 * 24 * time.Hour + var start, end time.Time + + start = time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC) + end = time.Date(now.Year(), now.Month(), now.Day()+2, 0, 0, 0, 0, time.UTC) + f("future_1d", start, end, false) + + start = time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC) + end = time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC) + f("future_1m", start, end, false) + + start = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC) + end = time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC) + f("future_1y", start, end, false) + + start = now.Add(retentionLimit - 24*time.Hour) + end = now.Add(retentionLimit) + f("future_1d_before_limit", start, end, false) + + start = now.Add(retentionLimit + time.Minute) + end = now.Add(retentionLimit + 24*time.Hour) + f("future_1d_beyond_limit", start, end, true) + +} + +type futureTimestampsData struct { + samples []string + wantSeries []map[string]string + wantQueryResults []*apptest.QueryResult +} + +func genFutureTimestampsData(prefix string, numMetrics, start, step int64) futureTimestampsData { + samples := make([]string, numMetrics) + wantSeries := make([]map[string]string, numMetrics) + wantQueryResults := make([]*apptest.QueryResult, numMetrics) + for i := range numMetrics { + metricName := fmt.Sprintf("metric_%s_%04d", prefix, i) + labelName := fmt.Sprintf("label_%s_%04d", prefix, i) + labelValue := fmt.Sprintf("value_%s_%04d", prefix, i) + value := i + timestamp := start + i*step + samples[i] = fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp) + wantSeries[i] = map[string]string{ + "__name__": metricName, + labelName: "value", + "label": labelValue, + } + wantQueryResults[i] = &apptest.QueryResult{ + Metric: map[string]string{ + "__name__": metricName, + labelName: "value", + "label": labelValue, + }, + Samples: []*apptest.Sample{{Timestamp: timestamp, Value: float64(value)}}, + } + } + return futureTimestampsData{samples, wantSeries, wantQueryResults} +} diff --git a/docs/victoriametrics/README.md b/docs/victoriametrics/README.md index accc45d281..775981365f 100644 --- a/docs/victoriametrics/README.md +++ b/docs/victoriametrics/README.md @@ -1511,6 +1511,16 @@ value than before, then data outside the configured period will be eventually de VictoriaMetrics does not support indefinite retention, but you can specify an arbitrarily high duration, e.g. `-retentionPeriod=100y`. +By default, VictoriaMetrics doesn't accept samples with timestamps bigger than `now+2d`, e.g. 2 days in the future. +If you need accepting samples with bigger timestamps, then specify the desired "future retention" via `-futureRetention` command-line flag. +This flag accepts values starting from `2d`. + +For example, the following command starts VictoriaMetrics, which accepts samples with timestamps up to a year in the future: + +```sh +/path/to/victoria-metrics -futureRetention=1y +``` + ### Multiple retentions Distinct retentions for distinct time series can be configured via [retention filters](#retention-filters) diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 920a39f0dd..ed2b74671f 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -31,6 +31,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel * FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): now `Run query` link on the Alerting Rules page correctly propagates the alert’s interval and evaluation time. See [#10366](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10366). * FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules): add new `MetricNameStatsCacheUtilizationIsTooHigh` alerting rule to track overutilization of [Metric names usage stats tracker](https://docs.victoriametrics.com/victoriametrics/#track-ingested-metrics-usage) (used in [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer)). See [#10840](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10840). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add `vm_streamaggr_counter_resets_total` metric for `total*`, `increase*` and `rate*` outputs that is useful for aggregation behaviour tracking. These metrics help to identify issues described in [Troubleshooting: counter resets](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#counter-resets). +* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add the support of ingestion and retrieval of samples with timestamps in the future. The new `-futureRetention` flag controls how far in the future the timestamps are allowed to be. See [827](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/827) and [10718](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10718). * BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix increased memory usage after upgrade to [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0) by properly accounting for internal buffer count when calculating per-storage buffer size. See [#10725](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10725#issuecomment-4282256709). * BUGFIX: all VictoriaMetrics components: properly parse IPv6 source address when accepting connections with proxy protocol v2 enabled. See [#10839](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10839). Thanks to @andriibeee for the contribution. diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 1c9f974723..c287db9be0 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "fmt" + "math" "math/rand" "reflect" "regexp" @@ -164,7 +165,9 @@ func TestSearch_VariousTimeRanges(t *testing.T) { mrs[i].Value = float64(i) } - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3b9ecda7ae..277e18c355 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -33,6 +33,7 @@ import ( ) const ( + retention2Days = 2 * 24 * time.Hour retention31Days = 31 * 24 * time.Hour retentionMax = 100 * 12 * retention31Days idbPrefilStart = time.Hour @@ -60,9 +61,10 @@ type Storage struct { // indexdb rotation. legacyNextRotationTimestamp atomic.Int64 - path string - cachePath string - retentionMsecs int64 + path string + cachePath string + retentionMsecs int64 + futureRetentionMsecs int64 // lock file for exclusive access to the storage on the given path. flockF *os.File @@ -160,6 +162,7 @@ type Storage struct { // OpenOptions optional args for MustOpenStorage type OpenOptions struct { Retention time.Duration + FutureRetention time.Duration MaxHourlySeries int MaxDailySeries int DisablePerDayIndex bool @@ -181,6 +184,7 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage { if retention <= 0 || retention > retentionMax { retention = retentionMax } + futureRetention := max(opts.FutureRetention, retention2Days) idbPrefillStart := opts.IDBPrefillStart if idbPrefillStart <= 0 { idbPrefillStart = time.Hour @@ -189,6 +193,7 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage { path: path, cachePath: filepath.Join(path, cacheDirname), retentionMsecs: retention.Milliseconds(), + futureRetentionMsecs: futureRetention.Milliseconds(), stopCh: make(chan struct{}), idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000, } diff --git a/lib/storage/storage_synctest_test.go b/lib/storage/storage_synctest_test.go index 2e6ad35657..9a31b0d329 100644 --- a/lib/storage/storage_synctest_test.go +++ b/lib/storage/storage_synctest_test.go @@ -5,11 +5,13 @@ package storage import ( "fmt" "math/rand" + "path/filepath" "testing" "testing/synctest" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/google/go-cmp/cmp" ) @@ -825,66 +827,320 @@ func TestStorageNextDayMetricIDs_update(t *testing.T) { }) } +// TestStorageLastPartitionMetrics checks that "last partition" metrics +// correspond to the current partition and not some future partition. func TestStorageLastPartitionMetrics(t *testing.T) { defer testRemoveAll(t) + + addRows := func(t *testing.T, s *Storage, prefix string, tr TimeRange) { + t.Helper() + const numSeries = 1000 + rng := rand.New(rand.NewSource(1)) + mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, prefix, tr) + want := s.newTimeseriesCreated.Load() + numSeries + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + if got := s.newTimeseriesCreated.Load(); got != want { + t.Errorf("unexpected number of new timeseries: got %d, want %d", got, want) + } + // wait for merged parts to be attached to the table + time.Sleep(time.Minute) + } + assertLastPartitionEmpty := func(t *testing.T, s *Storage) { + t.Helper() + var m Metrics + s.UpdateMetrics(&m) + lpm := m.TableMetrics.LastPartition + if lpm.SmallPartsCount != 0 { + t.Fatalf("unexpected last partition SmallPartsCount: got %d, want 0", lpm.SmallPartsCount) + } + if lpm.IndexDBMetrics.FileBlocksCount != 0 { + t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got %d, want 0", lpm.IndexDBMetrics.FileBlocksCount) + } + } + assertLastPartitionNonEmpty := func(t *testing.T, s *Storage) { + t.Helper() + var m Metrics + s.UpdateMetrics(&m) + lpm := m.TableMetrics.LastPartition + if lpm.SmallPartsCount == 0 { + t.Fatalf("unexpected last partition SmallPartsCount: got 0, want > 0") + } + if lpm.IndexDBMetrics.FileBlocksCount == 0 { + t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got 0, want > 0") + } + } + synctest.Test(t, func(t *testing.T) { // Advance current time to 2h before the next month, 2000-01-31T22:00:00Z. time.Sleep(31*24*time.Hour - 2*time.Hour) ct := time.Now().UTC() + // Open the storage, make sure current partition is empty. + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: 2 * 365 * 24 * time.Hour, + }) + defer s.MustClose() + assertLastPartitionEmpty(t, s) + + // Insert rows with future timestamps. Current partition must be empty. + addRows(t, s, "future", TimeRange{ + MinTimestamp: ct.Add(365 * 24 * time.Hour).UnixMilli(), + MaxTimestamp: ct.Add(366 * 24 * time.Hour).UnixMilli(), + }) + assertLastPartitionEmpty(t, s) + + // Insert rows with timestamps within current partition. + // Current partition must be not empty. + addRows(t, s, "current", TimeRange{ + MinTimestamp: ct.UnixMilli(), + MaxTimestamp: ct.Add(time.Hour).UnixMilli(), + }) + assertLastPartitionNonEmpty(t, s) + + // Advance current time to the the next month, 2000-02-01T00:30:00Z. + // last partition is now 2000-02 and it must be empty. + time.Sleep(2*time.Hour + time.Minute*30) + assertLastPartitionEmpty(t, s) + }) +} + +func TestStorage_futureAndHistoricalRetention(t *testing.T) { + defer testRemoveAll(t) + + assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) { + t.Helper() + tfs := NewTagFilters() + if err := tfs.Add(nil, []byte(".*"), false, true); err != nil { + t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err) + } + if err := testAssertSearchResult(s, tr, tfs, want); err != nil { + t.Fatalf("[now: %v tr: %v] search failed unexpectedly: %v", time.Now().UTC(), &tr, err) + } + } + + synctest.Test(t, func(t *testing.T) { + // synctests start at 2000-01-01T00:00:00Z + + var s *Storage + retention := 180 * 24 * time.Hour + futureRetention := 180 * 24 * time.Hour + + s = MustOpenStorage(t.Name(), OpenOptions{ + Retention: retention, + FutureRetention: futureRetention, + }) + + // Ingest samples for previous and future year. 10 samples per day. + const numSeries = 10 + start := time.Date(1999, 1, 1, 0, 0, 0, 0, time.UTC) + end := time.Date(2001, 1, 1, 0, 0, 0, 0, time.UTC) + rng := rand.New(rand.NewSource(1)) + wantData := make(map[TimeRange][]MetricRow) + for day := start; day.Before(end); { + prefix := fmt.Sprintf("metric_%d_%d_%d", day.Year(), day.Month(), day.Day()) + tr := TimeRange{ + MinTimestamp: day.UnixMilli(), + MaxTimestamp: day.UnixMilli() + msecPerDay - 1, + } + mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, prefix, tr) + wantData[tr] = mrs + s.AddRows(mrs, defaultPrecisionBits) + + day = time.Date(day.Year(), day.Month(), day.Day()+1, 0, 0, 0, 0, time.UTC) + } + s.DebugFlush() + + // Advance time one partition at a time. Before each time advancement, + // check the query results for each day between the original start and end + // time. + // + // This is to test how historical and future retentions affect the + // stored data over time. + now := time.Now().UTC() + dataEnd := now.Add(futureRetention - 24*time.Hour) + for now.Before(end) { + for day := start; day.Before(end); { + tr := TimeRange{ + MinTimestamp: day.UnixMilli(), + MaxTimestamp: day.UnixMilli() + msecPerDay - 1, + } + dataStart := now.Add(-retention) + if day.Before(dataStart) || day.After(dataEnd) { + assertData(t, s, tr, nil) + } else { + assertData(t, s, tr, wantData[tr]) + } + day = time.Date(day.Year(), day.Month(), day.Day()+1, 0, 0, 0, 0, time.UTC) + } + + s.MustClose() + nextMonth := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC) + time.Sleep(nextMonth.Sub(now)) + now = nextMonth + s = MustOpenStorage(t.Name(), OpenOptions{ + Retention: retention, + FutureRetention: futureRetention, + }) + } + + s.MustClose() + }) +} + +func TestStorage_defaultFutureRetention(t *testing.T) { + defer testRemoveAll(t) + + assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) { + t.Helper() + tfs := NewTagFilters() + if err := tfs.Add(nil, []byte(".*"), false, true); err != nil { + t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err) + } + if err := testAssertSearchResult(s, tr, tfs, want); err != nil { + t.Fatalf("[now: %v tr: %v] search failed unexpectedly: %v", time.Now().UTC(), &tr, err) + } + } + + synctest.Test(t, func(t *testing.T) { + // synctests start at 2000-01-01T00:00:00Z + s := MustOpenStorage(t.Name(), OpenOptions{}) defer s.MustClose() - assertLastPartitionEmpty := func() { - t.Helper() - var m Metrics - s.UpdateMetrics(&m) - lpm := m.TableMetrics.LastPartition - if lpm.SmallPartsCount != 0 { - t.Fatalf("unexpected last partition SmallPartsCount: got %d, want 0", lpm.SmallPartsCount) - } - if lpm.IndexDBMetrics.FileBlocksCount != 0 { - t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got %d, want 0", lpm.IndexDBMetrics.FileBlocksCount) - } - } - assertLastPartitionNonEmpty := func() { - t.Helper() - var m Metrics - s.UpdateMetrics(&m) - lpm := m.TableMetrics.LastPartition - if lpm.SmallPartsCount == 0 { - t.Fatalf("unexpected last partition SmallPartsCount: got 0, want > 0") - } - if lpm.IndexDBMetrics.FileBlocksCount == 0 { - t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got 0, want > 0") - } - } - - // make sure last partition is empty before ingestion - assertLastPartitionEmpty() - - const numSeries = 1000 - + // Ingest samples for this and several days in the future. 10 samples + // per hour. + const numSeries = 10 + start := time.Now().UTC() + end := start.Add(10 * 24 * time.Hour) rng := rand.New(rand.NewSource(1)) - tr := TimeRange{ - MinTimestamp: ct.Add(-time.Hour).UnixMilli(), - MaxTimestamp: ct.UnixMilli(), - } - mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric.", tr) - s.AddRows(mrs, 1) - s.DebugFlush() - if got, want := s.newTimeseriesCreated.Load(), uint64(numSeries); got != want { - t.Errorf("unexpected number of new timeseries: got %d, want %d", got, want) - } - // wait for merged parts to be attached to the table - time.Sleep(time.Minute) + wantData := make(map[TimeRange][]MetricRow) + for ts := start; ts.Before(end); { + prefix := fmt.Sprintf("metric_%04d_%02d_%02d_%02d", ts.Year(), ts.Month(), ts.Day(), ts.Hour()) + tr := TimeRange{ + MinTimestamp: ts.UnixMilli(), + MaxTimestamp: ts.UnixMilli() + msecPerHour - 1, + } + mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, prefix, tr) + wantData[tr] = mrs + s.AddRows(mrs, defaultPrecisionBits) + + ts = ts.Add(time.Hour) + } + s.DebugFlush() + + dataStart := start + dataEnd := dataStart.Add(2*24*time.Hour - time.Hour) + for ts := start; ts.Before(end); ts = ts.Add(time.Hour) { + tr := TimeRange{ + MinTimestamp: ts.UnixMilli(), + MaxTimestamp: ts.UnixMilli() + msecPerHour - 1, + } + if ts.Before(dataStart) || ts.After(dataEnd) { + assertData(t, s, tr, nil) + } else { + assertData(t, s, tr, wantData[tr]) + } + } - // last created partition is empty, but we're still at current month - assertLastPartitionNonEmpty() - // Advance current time to the the next month, 2000-02-01T00:30:00Z. - // last partition must be 2000-02 now - time.Sleep(2*time.Hour + time.Minute*30) - // current month partition has no data ingested - assertLastPartitionEmpty() + }) +} + +func TestStorage_partitionsOutsideRetentionAreRemoved(t *testing.T) { + defer testRemoveAll(t) + + assertPathExists := func(t *testing.T, path string, want bool) { + t.Helper() + if got := fs.IsPathExist(path); got != want { + t.Fatalf("unexpected path existence test result for %s: got %t, want %t", path, got, want) + } + } + + assertPtExists := func(t *testing.T, pt string, want bool) { + t.Helper() + assertPathExists(t, filepath.Join(t.Name(), "data", "small", pt), want) + assertPathExists(t, filepath.Join(t.Name(), "data", "big", pt), want) + assertPathExists(t, filepath.Join(t.Name(), "data", "indexdb", pt), want) + } + + synctest.Test(t, func(t *testing.T) { + // synctests start at 2000-01-01T00:00:00Z + + retention := 80 * 24 * time.Hour + futureRetention := 180 * 24 * time.Hour + s := MustOpenStorage(t.Name(), OpenOptions{ + Retention: retention, + FutureRetention: futureRetention, + }) + + // Ingest samples with future timestamps that span the entire retention. + // This should create the corresponding partitions. + rng := rand.New(rand.NewSource(1)) + mrs := testGenerateMetricRowsWithPrefix(rng, 1000, "metric", TimeRange{ + MinTimestamp: time.Now().Add(-retention).UnixMilli(), + MaxTimestamp: time.Now().Add(futureRetention - time.Second).UnixMilli(), + }) + s.AddRows(mrs, defaultPrecisionBits) + s.DebugFlush() + + assertPtExists(t, "1999_09", false) + assertPtExists(t, "1999_10", true) + assertPtExists(t, "1999_11", true) + assertPtExists(t, "1999_12", true) + assertPtExists(t, "2000_01", true) + assertPtExists(t, "2000_02", true) + assertPtExists(t, "2000_03", true) + assertPtExists(t, "2000_04", true) + assertPtExists(t, "2000_05", true) + assertPtExists(t, "2000_06", true) + assertPtExists(t, "2000_07", false) + + // Reopen storage with smaller future retention. Future partitions + // outside the new future retention must be removed. + s.MustClose() + s = MustOpenStorage(t.Name(), OpenOptions{ + Retention: retention, + FutureRetention: 45 * 24 * time.Hour, + }) + + // Wait for background task to remove future partitions. + time.Sleep(2 * time.Minute) + + assertPtExists(t, "1999_09", false) + assertPtExists(t, "1999_10", true) + assertPtExists(t, "1999_11", true) + assertPtExists(t, "1999_12", true) + assertPtExists(t, "2000_01", true) + assertPtExists(t, "2000_02", true) + assertPtExists(t, "2000_03", false) + assertPtExists(t, "2000_04", false) + assertPtExists(t, "2000_05", false) + assertPtExists(t, "2000_06", false) + assertPtExists(t, "2000_07", false) + + // Reopen storage with smaller retention. Historical partitions + // outside the new future retention must be removed. + s.MustClose() + s = MustOpenStorage(t.Name(), OpenOptions{ + Retention: 45 * 24 * time.Hour, + FutureRetention: 45 * 24 * time.Hour, + }) + + // Wait for background task to remove future partitions. + time.Sleep(2 * time.Minute) + + assertPtExists(t, "1999_09", false) + assertPtExists(t, "1999_10", false) + assertPtExists(t, "1999_11", true) + assertPtExists(t, "1999_12", true) + assertPtExists(t, "2000_01", true) + assertPtExists(t, "2000_02", true) + assertPtExists(t, "2000_03", false) + assertPtExists(t, "2000_04", false) + assertPtExists(t, "2000_05", false) + assertPtExists(t, "2000_06", false) + assertPtExists(t, "2000_07", false) + + s.MustClose() }) } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a94a56290c..4d902508fc 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1877,7 +1877,7 @@ func TestStorageRowsNotAdded(t *testing.T) { }) retention = 48 * time.Hour - minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli() + minTimestamp = maxUnixMilli + 1 maxTimestamp = minTimestamp + 1000 f(&options{ name: "TooBigTimestamps", @@ -2024,7 +2024,9 @@ func TestStorageSearchMetricNames_VariousTimeRanges(t *testing.T) { } slices.Sort(want) - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() @@ -2282,7 +2284,9 @@ func TestStorageSearchLabelNames_VariousTimeRanges(t *testing.T) { want = append(want, "__name__") slices.Sort(want) - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() @@ -2331,7 +2335,9 @@ func TestStorageSearchLabelValues_VariousTimeRanges(t *testing.T) { } slices.Sort(want) - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() @@ -2371,7 +2377,9 @@ func TestStorageSearchTagValueSuffixes_VariousTimeRanges(t *testing.T) { } slices.Sort(want) - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() @@ -2410,7 +2418,9 @@ func TestStorageSearchGraphitePaths_VariousTimeRanges(t *testing.T) { } slices.Sort(want) - s := MustOpenStorage(t.Name(), OpenOptions{}) + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: time.Duration(math.MaxInt64), + }) defer s.MustClose() s.AddRows(mrs, defaultPrecisionBits) s.DebugFlush() @@ -2458,6 +2468,42 @@ func testStorageOpOnVariousTimeRanges(t *testing.T, op func(t *testing.T, tr Tim MaxTimestamp: time.Date(2000, 12, 31, 23, 59, 59, 999_999_999, time.UTC).UnixMilli(), }) }) + + t.Run("future-1h", func(t *testing.T) { + now := time.Now().UTC() + op(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+2, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + t.Run("future-1d", func(t *testing.T) { + now := time.Now().UTC() + op(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day()+2, 1, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + t.Run("future-1m", func(t *testing.T) { + now := time.Now().UTC() + op(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + t.Run("future-1y", func(t *testing.T) { + now := time.Now().UTC() + op(t, TimeRange{ + MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + t.Run("future-limit", func(t *testing.T) { + maxTime := time.UnixMilli(maxUnixMilli).UTC() + op(t, TimeRange{ + MinTimestamp: maxTime.Add(-24 * time.Hour).UnixMilli(), + MaxTimestamp: maxTime.UnixMilli(), + }) + }) } func TestStorageSearchLabelValues_EmptyValuesAreNotReturned(t *testing.T) { @@ -4068,3 +4114,219 @@ func TestStorageMetrics_IndexDBBlockCaches(t *testing.T) { } assertMetrics(s) } + +// TestStorage_futureTimestamps checks that samples with future timestamps can +// be ingested, searched, and deleted. +func TestStorage_futureTimestamps(t *testing.T) { + defer testRemoveAll(t) + + type want struct { + mrs []MetricRow + metricNames []string + labelNames []string + labelValues []string + } + + const numMetrics = 1000 + genData := func(prefix string, tr TimeRange) want { + mrs := make([]MetricRow, numMetrics) + metricNames := make([]string, numMetrics) + labelNames := make([]string, numMetrics) + labelValues := make([]string, numMetrics) + step := (tr.MaxTimestamp - tr.MinTimestamp) / numMetrics + for i := range numMetrics { + metricName := fmt.Sprintf("metric_%s_%04d", prefix, i) + labelName := fmt.Sprintf("label_%s_%04d", prefix, i) + labelValue := fmt.Sprintf("value_%s_%04d", prefix, i) + mn := MetricName{ + MetricGroup: []byte(metricName), + Tags: []Tag{ + {Key: []byte(labelName), Value: []byte("value")}, + {Key: []byte("label"), Value: []byte(labelValue)}, + }, + } + mrs[i].MetricNameRaw = mn.marshalRaw(nil) + mrs[i].Timestamp = tr.MinTimestamp + int64(i)*step + mrs[i].Value = float64(i) + metricNames[i] = metricName + labelNames[i] = labelName + labelValues[i] = labelValue + } + labelNames = append(labelNames, "__name__", "label") + return want{mrs, metricNames, labelNames, labelValues} + } + + assertMetricNames := func(t *testing.T, s *Storage, tr TimeRange, want []string) { + t.Helper() + tfs := NewTagFilters() + if err := tfs.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + got, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, 1e9, noDeadline) + if err != nil { + t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err) + } + for i, name := range got { + var mn MetricName + if err := mn.UnmarshalString(name); err != nil { + t.Fatalf("Could not unmarshal metric name %q: %v", name, err) + } + got[i] = string(mn.MetricGroup) + } + slices.Sort(got) + slices.Sort(want) + if diff := cmp.Diff(want, got); diff != "" { + t.Fatalf("unexpected metric names (-want, +got):\n%s", diff) + } + } + assertLabelNames := func(t *testing.T, s *Storage, tr TimeRange, want []string) { + t.Helper() + got, err := s.SearchLabelNames(nil, nil, tr, 1e9, 1e9, noDeadline) + if err != nil { + t.Fatalf("SearchLabelNames() failed unexpectedly: %s", err) + } + slices.Sort(got) + slices.Sort(want) + if diff := cmp.Diff(want, got); diff != "" { + t.Fatalf("unexpected label names (-want, +got):\n%s", diff) + } + } + assertLabelValues := func(t *testing.T, s *Storage, tr TimeRange, want []string) { + t.Helper() + got, err := s.SearchLabelValues(nil, "label", nil, tr, 1e9, 1e9, noDeadline) + if err != nil { + t.Fatalf("SearchLabelValues() failed unexpectedly: %s", err) + } + slices.Sort(got) + slices.Sort(want) + if diff := cmp.Diff(want, got); diff != "" { + t.Fatalf("unexpected label values (-want, +got):\n%s", diff) + } + } + assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) { + t.Helper() + tfs := NewTagFilters() + if err := tfs.Add(nil, []byte(".*"), false, true); err != nil { + t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err) + } + if err := testAssertSearchResult(s, tr, tfs, want); err != nil { + t.Fatalf("search failed unexpectedly: %v", err) + } + } + deleteAllSeries := func(t *testing.T, s *Storage) { + tfs := NewTagFilters() + if err := tfs.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { + t.Fatalf("unexpected error in TagFilters.Add: %v", err) + } + if _, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9); err != nil { + t.Fatalf("DeleteSeries() failed unexpectedly: %s", err) + } + } + + concatUniq := func(s1, s2 []string) []string { + var s []string + seen := make(map[string]bool) + for _, v := range slices.Concat(s1, s2) { + if !seen[v] { + s = append(s, v) + seen[v] = true + } + } + return s + } + + futureRetention := time.Duration(math.MaxInt64) + + f := func(t *testing.T, tr TimeRange) { + want := genData("batch1", tr) + + s := MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: futureRetention, + }) + s.AddRows(want.mrs, defaultPrecisionBits) + s.DebugFlush() + assertMetricNames(t, s, tr, want.metricNames) + assertLabelNames(t, s, tr, want.labelNames) + assertLabelValues(t, s, tr, want.labelValues) + assertData(t, s, tr, want.mrs) + + // Reopen storage. + s.MustClose() + s = MustOpenStorage(t.Name(), OpenOptions{ + FutureRetention: futureRetention, + }) + assertMetricNames(t, s, tr, want.metricNames) + assertLabelNames(t, s, tr, want.labelNames) + assertLabelValues(t, s, tr, want.labelValues) + assertData(t, s, tr, want.mrs) + + // Insert more data and force background merge tasks. + want2 := genData("batch2", tr) + s.AddRows(want2.mrs, defaultPrecisionBits) + s.DebugFlush() + if err := s.ForceMergePartitions(""); err != nil { + t.Fatalf("ForceMergePartitions() failed unexpectedly: %s", err) + } + assertMetricNames(t, s, tr, slices.Concat(want.metricNames, want2.metricNames)) + assertLabelNames(t, s, tr, concatUniq(want.labelNames, want2.labelNames)) + assertLabelValues(t, s, tr, slices.Concat(want.labelValues, want2.labelValues)) + assertData(t, s, tr, slices.Concat(want.mrs, want2.mrs)) + + // Delete all series. + deleteAllSeries(t, s) + assertMetricNames(t, s, tr, []string{}) + assertLabelNames(t, s, tr, []string{}) + assertLabelValues(t, s, tr, []string{}) + assertData(t, s, tr, nil) + + s.MustClose() + } + + t.Run("future-1h", func(t *testing.T) { + now := time.Now().UTC() + f(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+2, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + + t.Run("future-1d", func(t *testing.T) { + now := time.Now().UTC() + f(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day()+2, 1, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + + t.Run("future-1m", func(t *testing.T) { + now := time.Now().UTC() + f(t, TimeRange{ + MinTimestamp: time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + + t.Run("future-1y", func(t *testing.T) { + now := time.Now().UTC() + f(t, TimeRange{ + MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + + t.Run("future-10y", func(t *testing.T) { + now := time.Now().UTC() + f(t, TimeRange{ + MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(), + MaxTimestamp: time.Date(now.Year()+11, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1, + }) + }) + + t.Run("future-limit", func(t *testing.T) { + maxTime := time.UnixMilli(maxUnixMilli).UTC() + f(t, TimeRange{ + MinTimestamp: maxTime.Add(-24 * time.Hour).UnixMilli(), + MaxTimestamp: maxTime.UnixMilli(), + }) + }) +} diff --git a/lib/storage/table.go b/lib/storage/table.go index 1019c8f620..83fe7d3e04 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -410,13 +410,13 @@ func (tb *table) MustGetIndexDBIDByHour(hour uint64) uint64 { func (tb *table) getMinMaxTimestamps() (int64, int64) { now := int64(fasttime.UnixTimestamp() * 1000) minTimestamp := now - tb.s.retentionMsecs - maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) if minTimestamp < 0 { // Negative timestamps aren't supported by the storage. minTimestamp = 0 } - if maxTimestamp < 0 { - maxTimestamp = (1 << 63) - 1 + maxTimestamp := int64(maxUnixMilli) + if maxUnixMilli-now > tb.s.futureRetentionMsecs { + maxTimestamp = now + tb.s.futureRetentionMsecs } return minTimestamp, maxTimestamp } @@ -436,12 +436,14 @@ func (tb *table) retentionWatcher() { case <-ticker.C: } - minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs + nowMsecs := int64(fasttime.UnixTimestamp() * 1000) + minTimestamp := nowMsecs - tb.s.retentionMsecs + maxTimestamp := nowMsecs + tb.s.futureRetentionMsecs var ptwsDrop []*partitionWrapper tb.ptwsLock.Lock() dst := tb.ptws[:0] for _, ptw := range tb.ptws { - if ptw.pt.tr.MaxTimestamp < minTimestamp { + if ptw.pt.tr.MaxTimestamp < minTimestamp || ptw.pt.tr.MinTimestamp > maxTimestamp { ptwsDrop = append(ptwsDrop, ptw) } else { dst = append(dst, ptw) diff --git a/lib/storage/time.go b/lib/storage/time.go index eee9c2c157..a036ccd9db 100644 --- a/lib/storage/time.go +++ b/lib/storage/time.go @@ -120,4 +120,16 @@ func (tr *TimeRange) contains(timestamp int64) bool { const ( msecPerDay = 24 * 3600 * 1000 msecPerHour = 3600 * 1000 + + // maxUnixMilli is the max millisecond that is allowed to be used as the + // sample timestamp. + // + // Go's Duration is an int64 and is in nanoseconds. In order for time.Time + // math operations and conversion to millis/micros/nanos to work correctly, + // the max datetime must be limited to math.MaxInt64 nanoseconds, Which is + // time.UnixMicro(math.MaxInt64/1000) == 2262-04-11 23:47:16.854775 UTC. + // + // Round it to the last millisecond of the last complete partition: + // 2262-03-31 23:59:59.999 UTC. + maxUnixMilli = 9222422399999 ) diff --git a/lib/storage/time_test.go b/lib/storage/time_test.go index 4975c7893f..7e880e1cfc 100644 --- a/lib/storage/time_test.go +++ b/lib/storage/time_test.go @@ -229,3 +229,10 @@ func TestIsFirstHourOfDay(t *testing.T) { lastHourOfDay := time.Date(2000, 1, 1, 23, 59, 59, 999_999_999, time.UTC) f(lastHourOfDay, false) } + +func TestMaxUnixMilli(t *testing.T) { + lastFuturePtMaxTime := time.Date(2262, 3, 31, 23, 59, 59, 999_000_000, time.UTC) + if got, want := lastFuturePtMaxTime.UnixMilli(), int64(maxUnixMilli); got != want { + t.Fatalf("unexpected maxUnixMilli: got %d, want %d", got, want) + } +} diff --git a/lib/timeutil/time.go b/lib/timeutil/time.go index 85807ae8c9..7dc205ebc0 100644 --- a/lib/timeutil/time.go +++ b/lib/timeutil/time.go @@ -268,16 +268,25 @@ func multiplyByDecimalExp(n int64, decimalExp int64) (int64, bool) { var decimalMultipliers = [...]int64{0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18} +const ( + maxValidSecond = math.MaxInt64 / 1_000_000_000 + maxValidMilli = math.MaxInt64 / 1_000_000 + maxValidMicro = math.MaxInt64 / 1_000 + minValidSecond = math.MinInt64 / 1_000_000_000 + minValidMilli = math.MinInt64 / 1_000_000 + minValidMicro = math.MinInt64 / 1_000 +) + func getUnixTimestampNanoseconds(n int64) int64 { - if n < (1<<31) && n >= (-1<<31) { + if n <= maxValidSecond && n >= minValidSecond { // The timestamp is in seconds. return n * 1e9 } - if n < 1e3*(1<<31) && n >= 1e3*(-1<<31) { + if n <= maxValidMilli && n >= minValidMilli { // The timestamp is in milliseconds. return n * 1e6 } - if n < 1e6*(1<<31) && n >= 1e6*(-1<<31) { + if n <= maxValidMicro && n >= minValidMicro { // The timestamp is in microseconds. return n * 1e3 }