lib/storage: support samples with future timestamps (#10718)

Add the support of storage and retrieval of samples with future
timestamps as requested in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/827

What to expect:

- By default, the max future timestamp is still limited to `now+2d`. To
change it, set the `-futureRetention` flag in `vmstorage`. The max flag
value is currently limited to `100y`. It can be extended if we see a
demand for this, but it can't be more than `~ 290y` due to how the time
duration is implemented in Go. The flag value can't be less than `2d`.
- downsampling and retention filters (available in enterprise edition)
are currently not supported for future timestamps
- If `vmstorage` restarts with a smaller value of `-futureRetention`
flag, any future partitions that are outside the new future retention
will be automatically deleted.
- Data ingestion, data retrieval, backup/restore, timeseries (soft)
deletion, and other operations work with future timestamps the same way
as with the historical timestamps.
- In the cluster version, the affected binaries are `vmstorage` and
`vmselect`. This means that `vmselect` version must match `vmstorage`
version if you want to query future timestamps. `vminsert` was not
affected, so its version can be a lower one.
- If you downgrade the `vmstorage`, the data with future timestamps will
remain on disk and memory (per-partition caches) but won't be available
for querying.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
Signed-off-by: Artem Fetishev <149964189+rtm0@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
This commit is contained in:
Artem Fetishev
2026-04-23 18:12:33 +02:00
committed by Artem Fetishev
parent dcb314ab38
commit e26de23739
14 changed files with 930 additions and 92 deletions

View File

@@ -1422,11 +1422,7 @@ func getCommonParamsInternal(r *http.Request, startTime time.Time, requireNonEmp
if err != nil {
return nil, err
}
// Limit the `end` arg to the current time +2 days in the same way
// as it is limited during data ingestion.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/blob/ea06d2fd3ccbbb6aa4480ab3b04f7b671408be2a/lib/storage/table.go#L378
// This should fix possible timestamp overflow - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2669
maxTS := startTime.UnixNano()/1e6 + 2*24*3600*1000
maxTS := int64(math.MaxInt64 / 1_000_000)
if end > maxTS {
end = maxTS
}

View File

@@ -35,6 +35,8 @@ import (
var (
retentionPeriod = flagutil.NewRetentionDuration("retentionPeriod", "1M", "Data with timestamps outside the retentionPeriod is automatically deleted. The minimum retentionPeriod is 24h or 1d. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
httpListenAddrs = flagutil.NewArrayString("httpListenAddr", "Address to listen for incoming http requests. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flagutil.NewArrayBool("httpListenAddr.useProxyProtocol", "Whether to use proxy protocol for connections accepted at the given -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
@@ -148,7 +150,12 @@ func main() {
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
if retentionPeriod.Duration() < 24*time.Hour {
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention", retentionPeriod)
}
if futureRetention.Duration() < 2*24*time.Hour {
logger.Fatalf("-futureRetention cannot be smaller than 2 days; got %s. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention", futureRetention)
}
if *idbPrefillStart > 23*time.Hour {
logger.Panicf("-storage.idbPrefillStart cannot exceed 23 hours; got %s", idbPrefillStart)
@@ -157,6 +164,7 @@ func main() {
startTime := time.Now()
opts := storage.OpenOptions{
Retention: retentionPeriod.Duration(),
FutureRetention: futureRetention.Duration(),
MaxHourlySeries: getMaxHourlySeries(),
MaxDailySeries: getMaxDailySeries(),
DisablePerDayIndex: *disablePerDayIndex,

View File

@@ -28,6 +28,7 @@ func TestSingleBackupRestore(t *testing.T) {
return tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + storageDataPath,
"-retentionPeriod=100y",
"-futureRetention=2y",
})
},
stopSUT: func() {
@@ -60,11 +61,13 @@ func TestClusterBackupRestore(t *testing.T) {
Vmstorage1Flags: []string{
"-storageDataPath=" + storage1DataPath,
"-retentionPeriod=100y",
"-futureRetention=2y",
},
Vmstorage2Instance: "vmstorage2",
Vmstorage2Flags: []string{
"-storageDataPath=" + storage2DataPath,
"-retentionPeriod=100y",
"-futureRetention=2y",
},
VminsertInstance: "vminsert",
VminsertFlags: []string{},
@@ -97,10 +100,16 @@ func TestClusterBackupRestore(t *testing.T) {
func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
t := tc.T()
genData := func(count int, prefix string, start, step int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) {
recs = make([]string, count)
wantSeries = make([]map[string]string, count)
wantQueryResults = make([]*apptest.QueryResult, count)
type data struct {
samples []string
wantSeries []map[string]string
wantQueryResults []*apptest.QueryResult
}
genData := func(count int, prefix string, start, step int64) data {
recs := make([]string, count)
wantSeries := make([]map[string]string, count)
wantQueryResults := make([]*apptest.QueryResult, count)
for i := range count {
name := fmt.Sprintf("%s_%03d", prefix, i)
value := float64(i)
@@ -113,7 +122,15 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
Samples: []*apptest.Sample{{Timestamp: timestamp, Value: value}},
}
}
return recs, wantSeries, wantQueryResults
return data{recs, wantSeries, wantQueryResults}
}
concatData := func(d1, d2 data) data {
var d data
d.samples = slices.Concat(d1.samples, d2.samples)
d.wantSeries = slices.Concat(d1.wantSeries, d2.wantSeries)
d.wantQueryResults = slices.Concat(d1.wantQueryResults, d2.wantQueryResults)
return d
}
backupBaseDir, err := filepath.Abs(filepath.Join(tc.Dir(), "backups"))
@@ -190,10 +207,20 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
// Use the same number of metrics and time range for all the data ingestions
// below.
const numMetrics = 1000
// With 1000 metrics (one per minute), the time range spans 2 months.
start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2025, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
step := (end - start) / numMetrics
batch1 := genData(numMetrics, "batch1", start, step)
batch2 := genData(numMetrics, "batch2", start, step)
batches12 := concatData(batch1, batch2)
now := time.Now().UTC()
startFuture := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
endFuture := time.Date(now.Year()+1, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
stepFuture := (endFuture - startFuture) / numMetrics
batch1Future := genData(numMetrics, "batch1", startFuture, stepFuture)
batch2Future := genData(numMetrics, "batch2", startFuture, stepFuture)
batches12Future := concatData(batch1Future, batch2Future)
// Verify backup/restore:
//
@@ -207,24 +234,25 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
// - Start vmsingle
// - Ensure that the queries return batch1 data only.
batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start, step)
batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start, step)
wantBatch12Series := slices.Concat(wantBatch1Series, wantBatch2Series)
wantBatch12QueryResults := slices.Concat(wantBatch1QueryResults, wantBatch2QueryResults)
sut := opts.startSUT()
sut.PrometheusAPIV1ImportPrometheus(t, batch1Data, apptest.QueryOpts{})
sut.PrometheusAPIV1ImportPrometheus(t, batch1.samples, apptest.QueryOpts{})
sut.PrometheusAPIV1ImportPrometheus(t, batch1Future.samples, apptest.QueryOpts{})
sut.ForceFlush(t)
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, batch1.wantSeries)
assertSeries(sut, `{__name__=~"batch1.*"}`, startFuture, endFuture, batch1Future.wantSeries)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, batch1.wantQueryResults)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, startFuture, endFuture, stepFuture, batch1Future.wantQueryResults)
createBackup(sut, "batch1")
sut.PrometheusAPIV1ImportPrometheus(t, batch2Data, apptest.QueryOpts{})
sut.PrometheusAPIV1ImportPrometheus(t, batch2.samples, apptest.QueryOpts{})
sut.PrometheusAPIV1ImportPrometheus(t, batch2Future.samples, apptest.QueryOpts{})
sut.ForceFlush(t)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12Series)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, wantBatch12QueryResults)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, batches12.wantSeries)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, batches12Future.wantSeries)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, batches12.wantQueryResults)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, stepFuture, batches12Future.wantQueryResults)
createBackup(sut, "batch12")
opts.stopSUT()
@@ -233,6 +261,8 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
sut = opts.startSUT()
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, batch1.wantSeries)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, batch1Future.wantSeries)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, batch1.wantQueryResults)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, startFuture, endFuture, stepFuture, batch1Future.wantQueryResults)
}

View File

@@ -0,0 +1,211 @@
package tests
import (
"fmt"
"path/filepath"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
func TestSingleFutureTimestamps(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
opts := testFutureTimestampsOpts{
start: func() apptest.PrometheusWriteQuerier {
return tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
"-futureRetention=100y",
})
},
stop: func() {
tc.StopApp("vmsingle")
},
}
testFutureTimestamps(tc, opts)
}
func TestClusterFutureTimestamps(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
opts := testFutureTimestampsOpts{
start: func() apptest.PrometheusWriteQuerier {
return tc.MustStartCluster(&apptest.ClusterOptions{
Vmstorage1Instance: "vmstorage1",
Vmstorage1Flags: []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage1"),
"-retentionPeriod=100y",
"-futureRetention=100y",
},
Vmstorage2Instance: "vmstorage2",
Vmstorage2Flags: []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage2"),
"-retentionPeriod=100y",
"-futureRetention=100y",
},
VminsertInstance: "vminsert",
VminsertFlags: []string{},
VmselectInstance: "vmselect",
VmselectFlags: []string{},
})
},
stop: func() {
tc.StopApp("vminsert")
tc.StopApp("vmselect")
tc.StopApp("vmstorage1")
tc.StopApp("vmstorage2")
},
}
testFutureTimestamps(tc, opts)
}
type testFutureTimestampsOpts struct {
start func() apptest.PrometheusWriteQuerier
stop func()
}
func testFutureTimestamps(tc *apptest.TestCase, opts testFutureTimestampsOpts) {
t := tc.T()
// assertSeries retrieves set of all metric names from the storage and
// compares it with the expected set.
assertSeries := func(app apptest.PrometheusQuerier, prefix string, start, end int64, want []map[string]string) {
t.Helper()
query := fmt.Sprintf(`{__name__=~"metric_%s.*"}`, prefix)
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/series response",
Got: func() any {
return app.PrometheusAPIV1Series(t, query, apptest.QueryOpts{
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
}).Sort()
},
Want: &apptest.PrometheusAPIV1SeriesResponse{
Status: "success",
Data: want,
},
FailNow: true,
})
}
// assertSeries retrieves all data from the storage and compares it with the
// expected result.
assertQueryResults := func(app apptest.PrometheusQuerier, prefix string, start, end, step int64, want []*apptest.QueryResult) {
t.Helper()
query := fmt.Sprintf(`{__name__=~"metric_%s.*"}`, prefix)
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/query_range response",
Got: func() any {
return app.PrometheusAPIV1QueryRange(t, query, apptest.QueryOpts{
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
Step: fmt.Sprintf("%dms", step),
MaxLookback: fmt.Sprintf("%dms", step-1),
NoCache: "1",
})
},
Want: &apptest.PrometheusAPIV1QueryResponse{
Status: "success",
Data: &apptest.QueryData{
ResultType: "matrix",
Result: want,
},
},
FailNow: true,
})
}
f := func(prefix string, startTime, endTime time.Time, wantEmpty bool) {
const numMetrics = 1000
start := startTime.UnixMilli()
end := endTime.UnixMilli()
step := (end - start) / numMetrics
data := genFutureTimestampsData(prefix, numMetrics, start, step)
if wantEmpty {
data.wantSeries = []map[string]string{}
data.wantQueryResults = []*apptest.QueryResult{}
}
// Ingest data and check query results.
sut := opts.start()
sut.PrometheusAPIV1ImportPrometheus(t, data.samples, apptest.QueryOpts{})
sut.ForceFlush(t)
assertSeries(sut, prefix, start, end, data.wantSeries)
assertQueryResults(sut, prefix, start, end, step, data.wantQueryResults)
// Ensure the queries work after restrart.
opts.stop()
sut = opts.start()
assertSeries(sut, prefix, start, end, data.wantSeries)
assertQueryResults(sut, prefix, start, end, step, data.wantQueryResults)
opts.stop()
}
now := time.Now().UTC()
retentionLimit := 100 * 365 * 24 * time.Hour
var start, end time.Time
start = time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC)
end = time.Date(now.Year(), now.Month(), now.Day()+2, 0, 0, 0, 0, time.UTC)
f("future_1d", start, end, false)
start = time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC)
end = time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC)
f("future_1m", start, end, false)
start = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC)
end = time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC)
f("future_1y", start, end, false)
start = now.Add(retentionLimit - 24*time.Hour)
end = now.Add(retentionLimit)
f("future_1d_before_limit", start, end, false)
start = now.Add(retentionLimit + time.Minute)
end = now.Add(retentionLimit + 24*time.Hour)
f("future_1d_beyond_limit", start, end, true)
}
type futureTimestampsData struct {
samples []string
wantSeries []map[string]string
wantQueryResults []*apptest.QueryResult
}
func genFutureTimestampsData(prefix string, numMetrics, start, step int64) futureTimestampsData {
samples := make([]string, numMetrics)
wantSeries := make([]map[string]string, numMetrics)
wantQueryResults := make([]*apptest.QueryResult, numMetrics)
for i := range numMetrics {
metricName := fmt.Sprintf("metric_%s_%04d", prefix, i)
labelName := fmt.Sprintf("label_%s_%04d", prefix, i)
labelValue := fmt.Sprintf("value_%s_%04d", prefix, i)
value := i
timestamp := start + i*step
samples[i] = fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
wantSeries[i] = map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
}
wantQueryResults[i] = &apptest.QueryResult{
Metric: map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
},
Samples: []*apptest.Sample{{Timestamp: timestamp, Value: float64(value)}},
}
}
return futureTimestampsData{samples, wantSeries, wantQueryResults}
}

View File

@@ -1511,6 +1511,16 @@ value than before, then data outside the configured period will be eventually de
VictoriaMetrics does not support indefinite retention, but you can specify an arbitrarily high duration, e.g. `-retentionPeriod=100y`.
By default, VictoriaMetrics doesn't accept samples with timestamps bigger than `now+2d`, e.g. 2 days in the future.
If you need accepting samples with bigger timestamps, then specify the desired "future retention" via `-futureRetention` command-line flag.
This flag accepts values starting from `2d`.
For example, the following command starts VictoriaMetrics, which accepts samples with timestamps up to a year in the future:
```sh
/path/to/victoria-metrics -futureRetention=1y
```
### Multiple retentions
Distinct retentions for distinct time series can be configured via [retention filters](#retention-filters)

View File

@@ -31,6 +31,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): now `Run query` link on the Alerting Rules page correctly propagates the alerts interval and evaluation time. See [#10366](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10366).
* FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules): add new `MetricNameStatsCacheUtilizationIsTooHigh` alerting rule to track overutilization of [Metric names usage stats tracker](https://docs.victoriametrics.com/victoriametrics/#track-ingested-metrics-usage) (used in [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer)). See [#10840](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10840).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add `vm_streamaggr_counter_resets_total` metric for `total*`, `increase*` and `rate*` outputs that is useful for aggregation behaviour tracking. These metrics help to identify issues described in [Troubleshooting: counter resets](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#counter-resets).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add the support of ingestion and retrieval of samples with timestamps in the future. The new `-futureRetention` flag controls how far in the future the timestamps are allowed to be. See [827](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/827) and [10718](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10718).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix increased memory usage after upgrade to [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0) by properly accounting for internal buffer count when calculating per-storage buffer size. See [#10725](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10725#issuecomment-4282256709).
* BUGFIX: all VictoriaMetrics components: properly parse IPv6 source address when accepting connections with proxy protocol v2 enabled. See [#10839](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10839). Thanks to @andriibeee for the contribution.

View File

@@ -3,6 +3,7 @@ package storage
import (
"bytes"
"fmt"
"math"
"math/rand"
"reflect"
"regexp"
@@ -178,7 +179,9 @@ func TestSearch_VariousTimeRanges(t *testing.T) {
mrs[i].Value = float64(i)
}
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()

View File

@@ -35,6 +35,7 @@ import (
)
const (
retention2Days = 2 * 24 * time.Hour
retention31Days = 31 * 24 * time.Hour
retentionMax = 100 * 12 * retention31Days
idbPrefilStart = time.Hour
@@ -66,9 +67,10 @@ type Storage struct {
// indexdb rotation.
legacyNextRotationTimestamp atomic.Int64
path string
cachePath string
retentionMsecs int64
path string
cachePath string
retentionMsecs int64
futureRetentionMsecs int64
// lock file for exclusive access to the storage on the given path.
flockF *os.File
@@ -177,6 +179,7 @@ type accountProjectKey struct {
// OpenOptions optional args for MustOpenStorage
type OpenOptions struct {
Retention time.Duration
FutureRetention time.Duration
MaxHourlySeries int
MaxDailySeries int
DisablePerDayIndex bool
@@ -198,6 +201,7 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
if retention <= 0 || retention > retentionMax {
retention = retentionMax
}
futureRetention := max(opts.FutureRetention, retention2Days)
idbPrefillStart := opts.IDBPrefillStart
if idbPrefillStart <= 0 {
idbPrefillStart = time.Hour
@@ -206,6 +210,7 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
path: path,
cachePath: filepath.Join(path, cacheDirname),
retentionMsecs: retention.Milliseconds(),
futureRetentionMsecs: futureRetention.Milliseconds(),
stopCh: make(chan struct{}),
idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000,
}

View File

@@ -5,11 +5,13 @@ package storage
import (
"fmt"
"math/rand"
"path/filepath"
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/google/go-cmp/cmp"
)
@@ -857,66 +859,339 @@ func TestStorageNextDayMetricIDs_update(t *testing.T) {
})
}
// TestStorageLastPartitionMetrics checks that "last partition" metrics
// correspond to the current partition and not some future partition.
func TestStorageLastPartitionMetrics(t *testing.T) {
defer testRemoveAll(t)
const (
accountID = 12
projectID = 34
)
addRows := func(t *testing.T, s *Storage, prefix string, tr TimeRange) {
t.Helper()
const numSeries = 1000
rng := rand.New(rand.NewSource(1))
mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, numSeries, prefix, tr)
want := s.newTimeseriesCreated.Load() + numSeries
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
if got := s.newTimeseriesCreated.Load(); got != want {
t.Errorf("unexpected number of new timeseries: got %d, want %d", got, want)
}
// wait for merged parts to be attached to the table
time.Sleep(time.Minute)
}
assertLastPartitionEmpty := func(t *testing.T, s *Storage) {
t.Helper()
var m Metrics
s.UpdateMetrics(&m)
lpm := m.TableMetrics.LastPartition
if lpm.SmallPartsCount != 0 {
t.Fatalf("unexpected last partition SmallPartsCount: got %d, want 0", lpm.SmallPartsCount)
}
if lpm.IndexDBMetrics.FileBlocksCount != 0 {
t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got %d, want 0", lpm.IndexDBMetrics.FileBlocksCount)
}
}
assertLastPartitionNonEmpty := func(t *testing.T, s *Storage) {
t.Helper()
var m Metrics
s.UpdateMetrics(&m)
lpm := m.TableMetrics.LastPartition
if lpm.SmallPartsCount == 0 {
t.Fatalf("unexpected last partition SmallPartsCount: got 0, want > 0")
}
if lpm.IndexDBMetrics.FileBlocksCount == 0 {
t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got 0, want > 0")
}
}
synctest.Test(t, func(t *testing.T) {
// Advance current time to 2h before the next month, 2000-01-31T22:00:00Z.
time.Sleep(31*24*time.Hour - 2*time.Hour)
ct := time.Now().UTC()
// Open the storage, make sure current partition is empty.
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: 2 * 365 * 24 * time.Hour,
})
defer s.MustClose()
assertLastPartitionEmpty(t, s)
// Insert rows with future timestamps. Current partition must be empty.
addRows(t, s, "future", TimeRange{
MinTimestamp: ct.Add(365 * 24 * time.Hour).UnixMilli(),
MaxTimestamp: ct.Add(366 * 24 * time.Hour).UnixMilli(),
})
assertLastPartitionEmpty(t, s)
// Insert rows with timestamps within current partition.
// Current partition must be not empty.
addRows(t, s, "current", TimeRange{
MinTimestamp: ct.UnixMilli(),
MaxTimestamp: ct.Add(time.Hour).UnixMilli(),
})
assertLastPartitionNonEmpty(t, s)
// Advance current time to the the next month, 2000-02-01T00:30:00Z.
// last partition is now 2000-02 and it must be empty.
time.Sleep(2*time.Hour + time.Minute*30)
assertLastPartitionEmpty(t, s)
})
}
func TestStorage_futureAndHistoricalRetention(t *testing.T) {
defer testRemoveAll(t)
const (
accountID = 12
projectID = 34
)
assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) {
t.Helper()
tfs := NewTagFilters(accountID, projectID)
if err := tfs.Add(nil, []byte(".*"), false, true); err != nil {
t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err)
}
if err := testAssertSearchResult(s, tr, tfs, want); err != nil {
t.Fatalf("[now: %v tr: %v] search failed unexpectedly: %v", time.Now().UTC(), &tr, err)
}
}
synctest.Test(t, func(t *testing.T) {
// synctests start at 2000-01-01T00:00:00Z
var s *Storage
retention := 180 * 24 * time.Hour
futureRetention := 180 * 24 * time.Hour
s = MustOpenStorage(t.Name(), OpenOptions{
Retention: retention,
FutureRetention: futureRetention,
})
// Ingest samples for previous and future year. 10 samples per day.
const numSeries = 10
start := time.Date(1999, 1, 1, 0, 0, 0, 0, time.UTC)
end := time.Date(2001, 1, 1, 0, 0, 0, 0, time.UTC)
rng := rand.New(rand.NewSource(1))
wantData := make(map[TimeRange][]MetricRow)
for day := start; day.Before(end); {
prefix := fmt.Sprintf("metric_%d_%d_%d", day.Year(), day.Month(), day.Day())
tr := TimeRange{
MinTimestamp: day.UnixMilli(),
MaxTimestamp: day.UnixMilli() + msecPerDay - 1,
}
mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, numSeries, prefix, tr)
wantData[tr] = mrs
s.AddRows(mrs, defaultPrecisionBits)
day = time.Date(day.Year(), day.Month(), day.Day()+1, 0, 0, 0, 0, time.UTC)
}
s.DebugFlush()
// Advance time one partition at a time. Before each time advancement,
// check the query results for each day between the original start and end
// time.
//
// This is to test how historical and future retentions affect the
// stored data over time.
now := time.Now().UTC()
dataEnd := now.Add(futureRetention - 24*time.Hour)
for now.Before(end) {
for day := start; day.Before(end); {
tr := TimeRange{
MinTimestamp: day.UnixMilli(),
MaxTimestamp: day.UnixMilli() + msecPerDay - 1,
}
dataStart := now.Add(-retention)
if day.Before(dataStart) || day.After(dataEnd) {
assertData(t, s, tr, nil)
} else {
assertData(t, s, tr, wantData[tr])
}
day = time.Date(day.Year(), day.Month(), day.Day()+1, 0, 0, 0, 0, time.UTC)
}
s.MustClose()
nextMonth := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC)
time.Sleep(nextMonth.Sub(now))
now = nextMonth
s = MustOpenStorage(t.Name(), OpenOptions{
Retention: retention,
FutureRetention: futureRetention,
})
}
s.MustClose()
})
}
func TestStorage_defaultFutureRetention(t *testing.T) {
defer testRemoveAll(t)
const (
accountID = 12
projectID = 34
)
assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) {
t.Helper()
tfs := NewTagFilters(accountID, projectID)
if err := tfs.Add(nil, []byte(".*"), false, true); err != nil {
t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err)
}
if err := testAssertSearchResult(s, tr, tfs, want); err != nil {
t.Fatalf("[now: %v tr: %v] search failed unexpectedly: %v", time.Now().UTC(), &tr, err)
}
}
synctest.Test(t, func(t *testing.T) {
// synctests start at 2000-01-01T00:00:00Z
s := MustOpenStorage(t.Name(), OpenOptions{})
defer s.MustClose()
assertLastPartitionEmpty := func() {
t.Helper()
var m Metrics
s.UpdateMetrics(&m)
lpm := m.TableMetrics.LastPartition
if lpm.SmallPartsCount != 0 {
t.Fatalf("unexpected last partition SmallPartsCount: got %d, want 0", lpm.SmallPartsCount)
}
if lpm.IndexDBMetrics.FileBlocksCount != 0 {
t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got %d, want 0", lpm.IndexDBMetrics.FileBlocksCount)
}
}
assertLastPartitionNonEmpty := func() {
t.Helper()
var m Metrics
s.UpdateMetrics(&m)
lpm := m.TableMetrics.LastPartition
if lpm.SmallPartsCount == 0 {
t.Fatalf("unexpected last partition SmallPartsCount: got 0, want > 0")
}
if lpm.IndexDBMetrics.FileBlocksCount == 0 {
t.Fatalf("unexpected last partition IndexDBMetrics.FileBlocksCount: got 0, want > 0")
}
}
// make sure last partition is empty before ingestion
assertLastPartitionEmpty()
const numSeries = 1000
// Ingest samples for this and several days in the future. 10 samples
// per hour.
const numSeries = 10
start := time.Now().UTC()
end := start.Add(10 * 24 * time.Hour)
rng := rand.New(rand.NewSource(1))
tr := TimeRange{
MinTimestamp: ct.Add(-time.Hour).UnixMilli(),
MaxTimestamp: ct.UnixMilli(),
}
mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, 0, 0, numSeries, "metric.", tr)
s.AddRows(mrs, 1)
s.DebugFlush()
if got, want := s.newTimeseriesCreated.Load(), uint64(numSeries); got != want {
t.Errorf("unexpected number of new timeseries: got %d, want %d", got, want)
}
// wait for merged parts to be attached to the table
time.Sleep(time.Minute)
wantData := make(map[TimeRange][]MetricRow)
for ts := start; ts.Before(end); {
prefix := fmt.Sprintf("metric_%04d_%02d_%02d_%02d", ts.Year(), ts.Month(), ts.Day(), ts.Hour())
tr := TimeRange{
MinTimestamp: ts.UnixMilli(),
MaxTimestamp: ts.UnixMilli() + msecPerHour - 1,
}
mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, numSeries, prefix, tr)
wantData[tr] = mrs
s.AddRows(mrs, defaultPrecisionBits)
// last created partition is empty, but we're still at current month
assertLastPartitionNonEmpty()
// Advance current time to the the next month, 2000-02-01T00:30:00Z.
// last partition must be 2000-02 now
time.Sleep(2*time.Hour + time.Minute*30)
// current month partition has no data ingested
assertLastPartitionEmpty()
ts = ts.Add(time.Hour)
}
s.DebugFlush()
dataStart := start
dataEnd := dataStart.Add(2*24*time.Hour - time.Hour)
for ts := start; ts.Before(end); ts = ts.Add(time.Hour) {
tr := TimeRange{
MinTimestamp: ts.UnixMilli(),
MaxTimestamp: ts.UnixMilli() + msecPerHour - 1,
}
if ts.Before(dataStart) || ts.After(dataEnd) {
assertData(t, s, tr, nil)
} else {
assertData(t, s, tr, wantData[tr])
}
}
})
}
func TestStorage_partitionsOutsideRetentionAreRemoved(t *testing.T) {
defer testRemoveAll(t)
const (
accountID = 12
projectID = 34
)
assertPathExists := func(t *testing.T, path string, want bool) {
t.Helper()
if got := fs.IsPathExist(path); got != want {
t.Fatalf("unexpected path existence test result for %s: got %t, want %t", path, got, want)
}
}
assertPtExists := func(t *testing.T, pt string, want bool) {
t.Helper()
assertPathExists(t, filepath.Join(t.Name(), "data", "small", pt), want)
assertPathExists(t, filepath.Join(t.Name(), "data", "big", pt), want)
assertPathExists(t, filepath.Join(t.Name(), "data", "indexdb", pt), want)
}
synctest.Test(t, func(t *testing.T) {
// synctests start at 2000-01-01T00:00:00Z
retention := 80 * 24 * time.Hour
futureRetention := 180 * 24 * time.Hour
s := MustOpenStorage(t.Name(), OpenOptions{
Retention: retention,
FutureRetention: futureRetention,
})
// Ingest samples with future timestamps that span the entire retention.
// This should create the corresponding partitions.
rng := rand.New(rand.NewSource(1))
mrs := testGenerateMetricRowsWithPrefixForTenantID(rng, accountID, projectID, 1000, "metric", TimeRange{
MinTimestamp: time.Now().Add(-retention).UnixMilli(),
MaxTimestamp: time.Now().Add(futureRetention - time.Second).UnixMilli(),
})
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
assertPtExists(t, "1999_09", false)
assertPtExists(t, "1999_10", true)
assertPtExists(t, "1999_11", true)
assertPtExists(t, "1999_12", true)
assertPtExists(t, "2000_01", true)
assertPtExists(t, "2000_02", true)
assertPtExists(t, "2000_03", true)
assertPtExists(t, "2000_04", true)
assertPtExists(t, "2000_05", true)
assertPtExists(t, "2000_06", true)
assertPtExists(t, "2000_07", false)
// Reopen storage with smaller future retention. Future partitions
// outside the new future retention must be removed.
s.MustClose()
s = MustOpenStorage(t.Name(), OpenOptions{
Retention: retention,
FutureRetention: 45 * 24 * time.Hour,
})
// Wait for background task to remove future partitions.
time.Sleep(2 * time.Minute)
assertPtExists(t, "1999_09", false)
assertPtExists(t, "1999_10", true)
assertPtExists(t, "1999_11", true)
assertPtExists(t, "1999_12", true)
assertPtExists(t, "2000_01", true)
assertPtExists(t, "2000_02", true)
assertPtExists(t, "2000_03", false)
assertPtExists(t, "2000_04", false)
assertPtExists(t, "2000_05", false)
assertPtExists(t, "2000_06", false)
assertPtExists(t, "2000_07", false)
// Reopen storage with smaller retention. Historical partitions
// outside the new future retention must be removed.
s.MustClose()
s = MustOpenStorage(t.Name(), OpenOptions{
Retention: 45 * 24 * time.Hour,
FutureRetention: 45 * 24 * time.Hour,
})
// Wait for background task to remove future partitions.
time.Sleep(2 * time.Minute)
assertPtExists(t, "1999_09", false)
assertPtExists(t, "1999_10", false)
assertPtExists(t, "1999_11", true)
assertPtExists(t, "1999_12", true)
assertPtExists(t, "2000_01", true)
assertPtExists(t, "2000_02", true)
assertPtExists(t, "2000_03", false)
assertPtExists(t, "2000_04", false)
assertPtExists(t, "2000_05", false)
assertPtExists(t, "2000_06", false)
assertPtExists(t, "2000_07", false)
s.MustClose()
})
}

View File

@@ -2157,7 +2157,7 @@ func TestStorageRowsNotAdded(t *testing.T) {
})
retention = 48 * time.Hour
minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli()
minTimestamp = maxUnixMilli + 1
maxTimestamp = minTimestamp + 1000
f(&options{
name: "TooBigTimestamps",
@@ -2315,7 +2315,9 @@ func TestStorageSearchMetricNames_VariousTimeRanges(t *testing.T) {
}
slices.Sort(want)
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
@@ -2581,7 +2583,9 @@ func TestStorageSearchLabelNames_VariousTimeRanges(t *testing.T) {
want = append(want, "__name__")
slices.Sort(want)
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
@@ -2636,7 +2640,9 @@ func TestStorageSearchLabelValues_VariousTimeRanges(t *testing.T) {
}
slices.Sort(want)
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
@@ -2684,7 +2690,9 @@ func TestStorageSearchTagValueSuffixes_VariousTimeRanges(t *testing.T) {
}
slices.Sort(want)
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
@@ -2731,7 +2739,9 @@ func TestStorageSearchGraphitePaths_VariousTimeRanges(t *testing.T) {
}
slices.Sort(want)
s := MustOpenStorage(t.Name(), OpenOptions{})
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: time.Duration(math.MaxInt64),
})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
@@ -2779,6 +2789,42 @@ func testStorageOpOnVariousTimeRanges(t *testing.T, op func(t *testing.T, tr Tim
MaxTimestamp: time.Date(2000, 12, 31, 23, 59, 59, 999_999_999, time.UTC).UnixMilli(),
})
})
t.Run("future-1h", func(t *testing.T) {
now := time.Now().UTC()
op(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+2, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1d", func(t *testing.T) {
now := time.Now().UTC()
op(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day()+2, 1, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1m", func(t *testing.T) {
now := time.Now().UTC()
op(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1y", func(t *testing.T) {
now := time.Now().UTC()
op(t, TimeRange{
MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-limit", func(t *testing.T) {
maxTime := time.UnixMilli(maxUnixMilli).UTC()
op(t, TimeRange{
MinTimestamp: maxTime.Add(-24 * time.Hour).UnixMilli(),
MaxTimestamp: maxTime.UnixMilli(),
})
})
}
func TestStorageSearchLabelValues_EmptyValuesAreNotReturned(t *testing.T) {
@@ -4462,3 +4508,226 @@ func TestStorageMetrics_IndexDBBlockCaches(t *testing.T) {
}
assertMetrics(s)
}
// TestStorage_futureTimestamps checks that samples with future timestamps can
// be ingested, searched, and deleted.
func TestStorage_futureTimestamps(t *testing.T) {
defer testRemoveAll(t)
const (
accountID = 12
projectID = 34
)
type want struct {
mrs []MetricRow
metricNames []string
labelNames []string
labelValues []string
}
const numMetrics = 1000
genData := func(prefix string, tr TimeRange) want {
mrs := make([]MetricRow, numMetrics)
metricNames := make([]string, numMetrics)
labelNames := make([]string, numMetrics)
labelValues := make([]string, numMetrics)
step := (tr.MaxTimestamp - tr.MinTimestamp) / numMetrics
for i := range numMetrics {
metricName := fmt.Sprintf("metric_%s_%04d", prefix, i)
labelName := fmt.Sprintf("label_%s_%04d", prefix, i)
labelValue := fmt.Sprintf("value_%s_%04d", prefix, i)
mn := MetricName{
MetricGroup: []byte(metricName),
AccountID: accountID,
ProjectID: projectID,
Tags: []Tag{
{Key: []byte(labelName), Value: []byte("value")},
{Key: []byte("label"), Value: []byte(labelValue)},
},
}
mrs[i].MetricNameRaw = mn.marshalRaw(nil)
mrs[i].Timestamp = tr.MinTimestamp + int64(i)*step
mrs[i].Value = float64(i)
metricNames[i] = metricName
labelNames[i] = labelName
labelValues[i] = labelValue
}
labelNames = append(labelNames, "__name__", "label")
return want{mrs, metricNames, labelNames, labelValues}
}
assertMetricNames := func(t *testing.T, s *Storage, tr TimeRange, want []string) {
t.Helper()
tfs := NewTagFilters(accountID, projectID)
if err := tfs.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
}
got, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, 1e9, noDeadline)
if err != nil {
t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err)
}
for i, name := range got {
var mn MetricName
if err := mn.UnmarshalString(name); err != nil {
t.Fatalf("Could not unmarshal metric name %q: %v", name, err)
}
got[i] = string(mn.MetricGroup)
}
slices.Sort(got)
slices.Sort(want)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("unexpected metric names (-want, +got):\n%s", diff)
}
}
assertLabelNames := func(t *testing.T, s *Storage, tr TimeRange, want []string) {
t.Helper()
got, err := s.SearchLabelNames(nil, accountID, projectID, nil, tr, 1e9, 1e9, noDeadline)
if err != nil {
t.Fatalf("SearchLabelNames() failed unexpectedly: %s", err)
}
slices.Sort(got)
slices.Sort(want)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("unexpected label names (-want, +got):\n%s", diff)
}
}
assertLabelValues := func(t *testing.T, s *Storage, tr TimeRange, want []string) {
t.Helper()
got, err := s.SearchLabelValues(nil, accountID, projectID, "label", nil, tr, 1e9, 1e9, noDeadline)
if err != nil {
t.Fatalf("SearchLabelValues() failed unexpectedly: %s", err)
}
slices.Sort(got)
slices.Sort(want)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("unexpected label values (-want, +got):\n%s", diff)
}
}
assertData := func(t *testing.T, s *Storage, tr TimeRange, want []MetricRow) {
t.Helper()
tfs := NewTagFilters(accountID, projectID)
if err := tfs.Add(nil, []byte(".*"), false, true); err != nil {
t.Fatalf("TagFilters.Add() failed unexpectedly: %v", err)
}
if err := testAssertSearchResult(s, tr, tfs, want); err != nil {
t.Fatalf("search failed unexpectedly: %v", err)
}
}
deleteAllSeries := func(t *testing.T, s *Storage) {
tfs := NewTagFilters(accountID, projectID)
if err := tfs.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
}
if _, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9); err != nil {
t.Fatalf("DeleteSeries() failed unexpectedly: %s", err)
}
}
concatUniq := func(s1, s2 []string) []string {
var s []string
seen := make(map[string]bool)
for _, v := range slices.Concat(s1, s2) {
if !seen[v] {
s = append(s, v)
seen[v] = true
}
}
return s
}
futureRetention := time.Duration(math.MaxInt64)
f := func(t *testing.T, tr TimeRange) {
want := genData("batch1", tr)
s := MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: futureRetention,
})
s.AddRows(want.mrs, defaultPrecisionBits)
s.DebugFlush()
assertMetricNames(t, s, tr, want.metricNames)
assertLabelNames(t, s, tr, want.labelNames)
assertLabelValues(t, s, tr, want.labelValues)
assertData(t, s, tr, want.mrs)
// Reopen storage.
s.MustClose()
s = MustOpenStorage(t.Name(), OpenOptions{
FutureRetention: futureRetention,
})
assertMetricNames(t, s, tr, want.metricNames)
assertLabelNames(t, s, tr, want.labelNames)
assertLabelValues(t, s, tr, want.labelValues)
assertData(t, s, tr, want.mrs)
// Insert more data and force background merge tasks.
want2 := genData("batch2", tr)
s.AddRows(want2.mrs, defaultPrecisionBits)
s.DebugFlush()
if err := s.ForceMergePartitions(""); err != nil {
t.Fatalf("ForceMergePartitions() failed unexpectedly: %s", err)
}
assertMetricNames(t, s, tr, slices.Concat(want.metricNames, want2.metricNames))
assertLabelNames(t, s, tr, concatUniq(want.labelNames, want2.labelNames))
assertLabelValues(t, s, tr, slices.Concat(want.labelValues, want2.labelValues))
assertData(t, s, tr, slices.Concat(want.mrs, want2.mrs))
// Delete all series.
deleteAllSeries(t, s)
assertMetricNames(t, s, tr, []string{})
assertLabelNames(t, s, tr, []string{})
assertLabelValues(t, s, tr, []string{})
assertData(t, s, tr, nil)
s.MustClose()
}
t.Run("future-1h", func(t *testing.T) {
now := time.Now().UTC()
f(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+2, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1d", func(t *testing.T) {
now := time.Now().UTC()
f(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month(), now.Day()+2, 1, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1m", func(t *testing.T) {
now := time.Now().UTC()
f(t, TimeRange{
MinTimestamp: time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year(), now.Month()+2, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-1y", func(t *testing.T) {
now := time.Now().UTC()
f(t, TimeRange{
MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year()+2, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-10y", func(t *testing.T) {
now := time.Now().UTC()
f(t, TimeRange{
MinTimestamp: time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(now.Year()+11, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() - 1,
})
})
t.Run("future-limit", func(t *testing.T) {
maxTime := time.UnixMilli(maxUnixMilli).UTC()
f(t, TimeRange{
MinTimestamp: maxTime.Add(-24 * time.Hour).UnixMilli(),
MaxTimestamp: maxTime.UnixMilli(),
})
})
}

View File

@@ -410,13 +410,13 @@ func (tb *table) MustGetIndexDBIDByHour(hour uint64) uint64 {
func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.s.retentionMsecs
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage.
minTimestamp = 0
}
if maxTimestamp < 0 {
maxTimestamp = (1 << 63) - 1
maxTimestamp := int64(maxUnixMilli)
if maxUnixMilli-now > tb.s.futureRetentionMsecs {
maxTimestamp = now + tb.s.futureRetentionMsecs
}
return minTimestamp, maxTimestamp
}
@@ -436,12 +436,14 @@ func (tb *table) retentionWatcher() {
case <-ticker.C:
}
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs
nowMsecs := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := nowMsecs - tb.s.retentionMsecs
maxTimestamp := nowMsecs + tb.s.futureRetentionMsecs
var ptwsDrop []*partitionWrapper
tb.ptwsLock.Lock()
dst := tb.ptws[:0]
for _, ptw := range tb.ptws {
if ptw.pt.tr.MaxTimestamp < minTimestamp {
if ptw.pt.tr.MaxTimestamp < minTimestamp || ptw.pt.tr.MinTimestamp > maxTimestamp {
ptwsDrop = append(ptwsDrop, ptw)
} else {
dst = append(dst, ptw)

View File

@@ -120,4 +120,16 @@ func (tr *TimeRange) contains(timestamp int64) bool {
const (
msecPerDay = 24 * 3600 * 1000
msecPerHour = 3600 * 1000
// maxUnixMilli is the max millisecond that is allowed to be used as the
// sample timestamp.
//
// Go's Duration is an int64 and is in nanoseconds. In order for time.Time
// math operations and conversion to millis/micros/nanos to work correctly,
// the max datetime must be limited to math.MaxInt64 nanoseconds, Which is
// time.UnixMicro(math.MaxInt64/1000) == 2262-04-11 23:47:16.854775 UTC.
//
// Round it to the last millisecond of the last complete partition:
// 2262-03-31 23:59:59.999 UTC.
maxUnixMilli = 9222422399999
)

View File

@@ -229,3 +229,10 @@ func TestIsFirstHourOfDay(t *testing.T) {
lastHourOfDay := time.Date(2000, 1, 1, 23, 59, 59, 999_999_999, time.UTC)
f(lastHourOfDay, false)
}
func TestMaxUnixMilli(t *testing.T) {
lastFuturePtMaxTime := time.Date(2262, 3, 31, 23, 59, 59, 999_000_000, time.UTC)
if got, want := lastFuturePtMaxTime.UnixMilli(), int64(maxUnixMilli); got != want {
t.Fatalf("unexpected maxUnixMilli: got %d, want %d", got, want)
}
}

View File

@@ -268,16 +268,25 @@ func multiplyByDecimalExp(n int64, decimalExp int64) (int64, bool) {
var decimalMultipliers = [...]int64{0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18}
const (
maxValidSecond = math.MaxInt64 / 1_000_000_000
maxValidMilli = math.MaxInt64 / 1_000_000
maxValidMicro = math.MaxInt64 / 1_000
minValidSecond = math.MinInt64 / 1_000_000_000
minValidMilli = math.MinInt64 / 1_000_000
minValidMicro = math.MinInt64 / 1_000
)
func getUnixTimestampNanoseconds(n int64) int64 {
if n < (1<<31) && n >= (-1<<31) {
if n <= maxValidSecond && n >= minValidSecond {
// The timestamp is in seconds.
return n * 1e9
}
if n < 1e3*(1<<31) && n >= 1e3*(-1<<31) {
if n <= maxValidMilli && n >= minValidMilli {
// The timestamp is in milliseconds.
return n * 1e6
}
if n < 1e6*(1<<31) && n >= 1e6*(-1<<31) {
if n <= maxValidMicro && n >= minValidMicro {
// The timestamp is in microseconds.
return n * 1e3
}