lib/storage: remove extDB from indexDB, search indexDBs independently (#9431)

Removing extDB from indexDB makes prev, curr, and next indexDBs independent.
I.e. the search is performed independently in prev and curr, the results are
then merged.
    
Additionally, since no search is now performed in extDB:
- all indexDB search methods now return the original maps used for populating
  the result, without invermediate conversion to slices.
 - `NoExtDB` suffix has been removed from method names
  
This has been extracted from #8134.
    
Signed-off-by: Andrei Baidarov <baidarov@nebius.com>
Co-authored-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Andrei Baidarov
2025-08-13 07:36:09 +02:00
committed by GitHub
parent fe0afc3fea
commit 16d75ab0bd
10 changed files with 965 additions and 871 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -27,8 +27,8 @@ func TestIndexDB_MetricIDsNotMappedToTSIDsAreDeleted(t *testing.T) {
synctest.Run(func() {
s := MustOpenStorage(t.Name(), OpenOptions{})
defer s.MustClose()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
type want struct {
missingMetricIDs []uint64
@@ -37,7 +37,7 @@ func TestIndexDB_MetricIDsNotMappedToTSIDsAreDeleted(t *testing.T) {
}
assertGetTSIDsFromMetricIDs := func(metricIDs []uint64, want want) {
t.Helper()
tsids, err := idb.getTSIDsFromMetricIDs(nil, metricIDs, noDeadline)
tsids, err := idbCurr.getTSIDsFromMetricIDs(nil, metricIDs, noDeadline)
if err != nil {
t.Fatalf("getTSIDsFromMetricIDs() failed unexpectedly: %v", err)
}
@@ -48,13 +48,13 @@ func TestIndexDB_MetricIDsNotMappedToTSIDsAreDeleted(t *testing.T) {
if diff := cmp.Diff(want.missingMetricIDs, missingMetricIDs); diff != "" {
t.Fatalf("unexpected tsids (-want, +got):\n%s", diff)
}
if got, want := idb.extDB.missingTSIDsForMetricID.Load(), want.missingTSIDsForMetricID; got != want {
if got, want := idbCurr.missingTSIDsForMetricID.Load(), want.missingTSIDsForMetricID; got != want {
t.Fatalf("unexpected missingTSIDsForMetricID metric value: got %d, want %d", got, want)
}
wantDeletedMetricIDs := &uint64set.Set{}
wantDeletedMetricIDs.AddMulti(want.deletedMetricIDs)
if !s.getDeletedMetricIDs().Equal(wantDeletedMetricIDs) {
t.Fatalf("deleted metricIDs set is different from %v", want.deletedMetricIDs)
t.Fatalf("deleted metricIDs set is different from %v: %v", want.deletedMetricIDs, s.getDeletedMetricIDs().AppendTo(nil))
}
}

View File

@@ -6,6 +6,7 @@ import (
"math/rand"
"reflect"
"regexp"
"slices"
"sort"
"sync/atomic"
"testing"
@@ -73,12 +74,12 @@ func TestTagFiltersToMetricIDsCache(t *testing.T) {
s := MustOpenStorage(path, OpenOptions{})
defer s.MustClose()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr, idbNext := s.getIndexDBs()
defer s.putIndexDBs(idbPrev, idbCurr, idbNext)
key := []byte("key")
idb.putMetricIDsToTagFiltersCache(nil, want, key)
got, ok := idb.getMetricIDsFromTagFiltersCache(nil, key)
idbCurr.putMetricIDsToTagFiltersCache(nil, want, key)
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
if !ok {
t.Fatalf("expected metricIDs to be found in cache but they weren't: %v", want)
}
@@ -98,13 +99,13 @@ func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) {
defer fs.MustRemoveDir(path)
s := MustOpenStorage(path, OpenOptions{})
defer s.MustClose()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
key := []byte("key")
emptyMetricIDs := []uint64(nil)
idb.putMetricIDsToTagFiltersCache(nil, emptyMetricIDs, key)
got, ok := idb.getMetricIDsFromTagFiltersCache(nil, key)
idbCurr.putMetricIDsToTagFiltersCache(nil, emptyMetricIDs, key)
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
if !ok {
t.Fatalf("expected empty metricID list to be found in cache but it wasn't")
}
@@ -114,64 +115,6 @@ func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) {
}
func TestMergeSortedMetricIDs(t *testing.T) {
f := func(a, b []uint64) {
t.Helper()
m := make(map[uint64]bool)
var resultExpected []uint64
for _, v := range a {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
for _, v := range b {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
sort.Slice(resultExpected, func(i, j int) bool {
return resultExpected[i] < resultExpected[j]
})
result := mergeSortedMetricIDs(a, b)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", a, b, result, resultExpected)
}
result = mergeSortedMetricIDs(b, a)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", b, a, result, resultExpected)
}
}
f(nil, nil)
f([]uint64{1}, nil)
f(nil, []uint64{23})
f([]uint64{1234}, []uint64{0})
f([]uint64{1}, []uint64{1})
f([]uint64{1}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{2, 3})
f([]uint64{0, 1, 7, 8, 9, 13, 20}, []uint64{1, 2, 7, 13, 15})
f([]uint64{0, 1, 2, 3, 4}, []uint64{5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{4, 5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{3, 4, 5, 6, 7, 8})
f([]uint64{2, 3, 4}, []uint64{1, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 4, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7})
f([]uint64{}, []uint64{1, 2, 3})
f([]uint64{0}, []uint64{1, 2, 3})
f([]uint64{1}, []uint64{1, 2, 3})
f([]uint64{1, 2}, []uint64{3, 4})
}
func TestReverseBytes(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
@@ -570,7 +513,7 @@ func TestIndexDBOpenClose(t *testing.T) {
tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ {
var isReadOnly atomic.Bool
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
db := mustOpenIndexDB(tableName, &s, &isReadOnly, false)
db.MustClose()
}
fs.MustRemoveDir(tableName)
@@ -584,26 +527,26 @@ func TestIndexDB(t *testing.T) {
const path = "TestIndexDB-serial"
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
mns, tsids, err := testIndexDBGetOrCreateTSIDByName(db, metricGroups, timestamp)
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
mns, tsids, err := testIndexDBGetOrCreateTSIDByName(idbCurr, metricGroups, timestamp)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBCheckTSIDByName(db, mns, tsids, timestamp, false); err != nil {
if err := testIndexDBCheckTSIDByName(idbCurr, mns, tsids, timestamp, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Re-open the storage and verify it works as expected.
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
s = MustOpenStorage(path, OpenOptions{})
db, putIndexDB = s.getCurrIndexDB()
if err := testIndexDBCheckTSIDByName(db, mns, tsids, timestamp, false); err != nil {
idbPrev, idbCurr = s.getPrevAndCurrIndexDBs()
if err := testIndexDBCheckTSIDByName(idbCurr, mns, tsids, timestamp, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
})
@@ -611,17 +554,17 @@ func TestIndexDB(t *testing.T) {
t.Run("concurrent", func(t *testing.T) {
const path = "TestIndexDB-concurrent"
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
mns, tsid, err := testIndexDBGetOrCreateTSIDByName(db, metricGroups, timestamp)
mns, tsid, err := testIndexDBGetOrCreateTSIDByName(idbCurr, metricGroups, timestamp)
if err != nil {
ch <- err
return
}
if err := testIndexDBCheckTSIDByName(db, mns, tsid, timestamp, true); err != nil {
if err := testIndexDBCheckTSIDByName(idbCurr, mns, tsid, timestamp, true); err != nil {
ch <- err
return
}
@@ -640,7 +583,7 @@ func TestIndexDB(t *testing.T) {
}
}
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
})
@@ -692,15 +635,6 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int, timestamp i
}
func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, timestamp int64, isConcurrent bool) error {
hasValue := func(lvs []string, v []byte) bool {
for _, lv := range lvs {
if string(v) == lv {
return true
}
}
return false
}
timeseriesCounters := make(map[uint64]bool)
var genTSID generationTSID
var tsidLocal TSID
@@ -756,7 +690,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, tim
if err != nil {
return fmt.Errorf("error in SearchLabelValues(labelName=%q): %w", "__name__", err)
}
if !hasValue(lvs, mn.MetricGroup) {
if _, ok := lvs[string(mn.MetricGroup)]; !ok {
return fmt.Errorf("SearchLabelValues(labelName=%q): couldn't find %q; found %q", "__name__", mn.MetricGroup, lvs)
}
for i := range mn.Tags {
@@ -765,7 +699,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, tim
if err != nil {
return fmt.Errorf("error in SearchLabelValues(labelName=%q): %w", tag.Key, err)
}
if !hasValue(lvs, tag.Value) {
if _, ok := lvs[string(tag.Value)]; !ok {
return fmt.Errorf("SearchLabelValues(labelName=%q): couldn't find %q; found %q", tag.Key, tag.Value, lvs)
}
allLabelNames[string(tag.Key)] = true
@@ -777,11 +711,11 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, tim
if err != nil {
return fmt.Errorf("error in SearchLabelNames(empty filter, global time range): %w", err)
}
if !hasValue(lns, []byte("__name__")) {
if _, ok := lns["__name__"]; !ok {
return fmt.Errorf("cannot find __name__ in %q", lns)
}
for labelName := range allLabelNames {
if !hasValue(lns, []byte(labelName)) {
if _, ok := lns[labelName]; !ok {
return fmt.Errorf("cannot find %q in %q", labelName, lns)
}
}
@@ -1541,8 +1475,8 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
s := MustOpenStorage(path, opts)
db, putIndexDB := s.getCurrIndexDB()
if db.generation == 0 {
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
if idbCurr.generation == 0 {
t.Fatalf("expected indexDB generation to be not 0")
}
@@ -1563,7 +1497,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
// check new series were registered in indexDB
added := db.s.newTimeseriesCreated.Load()
added := idbCurr.s.newTimeseriesCreated.Load()
if added != metricRowsN {
t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added)
}
@@ -1579,13 +1513,13 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
var genTSID generationTSID
for _, mr := range mrs {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
if genTSID.generation != db.generation {
if genTSID.generation != idbCurr.generation {
t.Fatalf("expected all entries in tsidCache to have the same indexDB generation: %d;"+
"got %d", db.generation, genTSID.generation)
"got %d", idbCurr.generation, genTSID.generation)
}
}
prevGeneration := db.generation
putIndexDB()
prevGeneration := idbCurr.generation
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
// force index rotation
s.mustRotateIndexDB(time.Now())
@@ -1597,12 +1531,12 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
t.Fatalf("expected tsidCache after rotation to contain %d rows; got %d", metricRowsN, cs2.EntriesCount)
}
dbNew, putIndexDB := s.getCurrIndexDB()
if dbNew.generation == 0 {
idbPrev, idbCurr = s.getPrevAndCurrIndexDBs()
if idbCurr.generation == 0 {
t.Fatalf("expected new indexDB generation to be not 0")
}
if dbNew.generation == prevGeneration {
t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", dbNew.generation)
if idbCurr.generation == prevGeneration {
t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", idbCurr.generation)
}
// Re-insert rows again and verify that all the entries belong to new generation
@@ -1611,12 +1545,12 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
for _, mr := range mrs {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
if genTSID.generation != dbNew.generation {
t.Fatalf("unexpected generation for data after rotation; got %d; want %d", genTSID.generation, dbNew.generation)
if genTSID.generation != idbCurr.generation {
t.Fatalf("unexpected generation for data after rotation; got %d; want %d", genTSID.generation, idbCurr.generation)
}
}
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
}
@@ -1624,9 +1558,9 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
func TestSearchTSIDWithTimeRange(t *testing.T) {
const path = "TestSearchTSIDWithTimeRange"
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
is := db.getIndexSearch(noDeadline)
is := idbCurr.getIndexSearch(noDeadline)
// Create a bunch of per-day time series
const days = 5
@@ -1675,7 +1609,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn)
createAllIndexesForMetricName(db, &mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, &mn, &genTSID.TSID, date)
}
metricIDs.Add(genTSID.TSID.MetricID)
}
@@ -1683,12 +1617,12 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs
}
db.putIndexSearch(is)
idbCurr.putIndexSearch(is)
// Flush index to disk, so it becomes visible for search
s.DebugFlush()
is2 := db.getIndexSearch(noDeadline)
is2 := idbCurr.getIndexSearch(noDeadline)
// Check that all the metrics are found for all the days.
for date := baseDate - days + 1; date <= baseDate; date++ {
@@ -1709,10 +1643,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if !allMetricIDs.Equal(metricIDs) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}
db.putIndexSearch(is2)
idbCurr.putIndexSearch(is2)
// add a metric that will be deleted shortly
is3 := db.getIndexSearch(noDeadline)
is3 := idbCurr.getIndexSearch(noDeadline)
day := days
date := baseDate - uint64(day)
mn := newMN("deletedMetric", day, 999)
@@ -1725,13 +1659,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
var genTSID generationTSID
if !is3.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn)
createAllIndexesForMetricName(db, &mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, &mn, &genTSID.TSID, date)
}
// delete the added metric. It is expected it won't be returned during searches
deletedSet := &uint64set.Set{}
deletedSet.Add(genTSID.TSID.MetricID)
s.setDeletedMetricIDs(deletedSet)
db.putIndexSearch(is3)
idbCurr.putIndexSearch(is3)
s.DebugFlush()
// Check SearchLabelNames with the specified time range.
@@ -1739,23 +1673,23 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(timestamp) - msecPerDay,
MaxTimestamp: int64(timestamp),
}
lns, err := db.SearchLabelNames(nil, nil, tr, 10000, 1e9, noDeadline)
lns, err := idbCurr.SearchLabelNames(nil, nil, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNames(timeRange=%s): %s", &tr, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
got := sortedSlice(lns)
if !reflect.DeepEqual(got, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", got, labelNames)
}
// Check SearchLabelValues with the specified time range.
lvs, err := db.SearchLabelValues(nil, "", nil, tr, 10000, 1e9, noDeadline)
lvs, err := idbCurr.SearchLabelValues(nil, "", nil, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValues(timeRange=%s): %s", &tr, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
got = sortedSlice(lvs)
if !reflect.DeepEqual(got, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", got, labelValues)
}
// Create a filter that will match series that occur across multiple days
@@ -1777,7 +1711,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(timestamp - 2*msecPerHour - 1),
MaxTimestamp: int64(timestamp),
}
matchedTSIDs, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
matchedTSIDs, err := searchTSIDsInTest(idbCurr, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
@@ -1786,63 +1720,63 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
// Check SearchLabelNames with the specified filter.
lns, err = db.SearchLabelNames(nil, []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
lns, err = idbCurr.SearchLabelNames(nil, []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNames(filters=%s): %s", tfs, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
got = sortedSlice(lns)
if !reflect.DeepEqual(got, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", got, labelNames)
}
// Check SearchLabelNames with the specified filter and time range.
lns, err = db.SearchLabelNames(nil, []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
lns, err = idbCurr.SearchLabelNames(nil, []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNames(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
got = sortedSlice(lns)
if !reflect.DeepEqual(got, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", got, labelNames)
}
// Check SearchLabelNames with filters on metric name and time range.
lns, err = db.SearchLabelNames(nil, []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline)
lns, err = idbCurr.SearchLabelNames(nil, []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNames(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
got = sortedSlice(lns)
if !reflect.DeepEqual(got, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", got, labelNames)
}
// Check SearchLabelValues with the specified filter.
lvs, err = db.SearchLabelValues(nil, "", []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
lvs, err = idbCurr.SearchLabelValues(nil, "", []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValues(filters=%s): %s", tfs, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
got = sortedSlice(lvs)
if !reflect.DeepEqual(got, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", got, labelValues)
}
// Check SearchLabelValues with the specified filter and time range.
lvs, err = db.SearchLabelValues(nil, "", []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
lvs, err = idbCurr.SearchLabelValues(nil, "", []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValues(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
got = sortedSlice(lvs)
if !reflect.DeepEqual(got, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", got, labelValues)
}
// Check SearchLabelValues with filters on metric name and time range.
lvs, err = db.SearchLabelValues(nil, "", []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline)
lvs, err = idbCurr.SearchLabelValues(nil, "", []*TagFilters{tfsMetricName}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValues(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
got = sortedSlice(lvs)
if !reflect.DeepEqual(got, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", got, labelValues)
}
// Perform a search across all the days, should match all metrics
@@ -1851,7 +1785,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MaxTimestamp: int64(timestamp),
}
matchedTSIDs, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
matchedTSIDs, err = searchTSIDsInTest(idbCurr, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
@@ -1860,7 +1794,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
// Check GetTSDBStatus with nil filters.
status, err := db.GetTSDBStatus(nil, nil, baseDate, "day", 5, 1e6, noDeadline)
status, err := idbCurr.GetTSDBStatus(nil, nil, baseDate, "day", 5, 1e6, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatus with nil filters: %s", err)
}
@@ -1974,7 +1908,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil {
t.Fatalf("cannot add filter: %s", err)
}
status, err = db.GetTSDBStatus(nil, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
status, err = idbCurr.GetTSDBStatus(nil, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatus: %s", err)
}
@@ -2000,7 +1934,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
// Check GetTSDBStatus, which matches all the series on a global time range
status, err = db.GetTSDBStatus(nil, nil, 0, "day", 5, 1e6, noDeadline)
status, err = idbCurr.GetTSDBStatus(nil, nil, 0, "day", 5, 1e6, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatus: %s", err)
}
@@ -2055,7 +1989,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if err := tfs.Add([]byte("UniqueId"), []byte("0|1|3"), false, true); err != nil {
t.Fatalf("cannot add filter: %s", err)
}
status, err = db.GetTSDBStatus(nil, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
status, err = idbCurr.GetTSDBStatus(nil, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatus: %s", err)
}
@@ -2081,7 +2015,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
// Check GetTSDBStatus with non-nil filter on global time range, which matches only 15 series
status, err = db.GetTSDBStatus(nil, []*TagFilters{tfs}, 0, "", 5, 1e6, noDeadline)
status, err = idbCurr.GetTSDBStatus(nil, []*TagFilters{tfs}, 0, "", 5, 1e6, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatus: %s", err)
}
@@ -2106,7 +2040,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
}
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
}
@@ -2144,9 +2078,9 @@ func TestSearchContainsTimeRange(t *testing.T) {
path := t.Name()
fs.MustRemoveDir(path)
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
is := db.getIndexSearch(noDeadline)
is := idbCurr.getIndexSearch(noDeadline)
// Create a bunch of per-day time series
const (
@@ -2199,19 +2133,19 @@ func TestSearchContainsTimeRange(t *testing.T) {
var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn)
createAllIndexesForMetricName(db, &mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, &mn, &genTSID.TSID, date)
}
metricIDs.Add(genTSID.TSID.MetricID)
}
perDayMetricIDs[date] = &metricIDs
}
db.putIndexSearch(is)
idbCurr.putIndexSearch(is)
// Flush index to disk, so it becomes visible for search
s.DebugFlush()
is2 := db.getIndexSearch(noDeadline)
is2 := idbCurr.getIndexSearch(noDeadline)
// Check that all the metrics are found for all the days.
for date := rotationDate - days + 1; date <= rotationDate; date++ {
@@ -2225,23 +2159,23 @@ func TestSearchContainsTimeRange(t *testing.T) {
}
}
db.putIndexSearch(is2)
putIndexDB()
idbCurr.putIndexSearch(is2)
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
// rotate indexdb
s.mustRotateIndexDB(rotationDay)
db, putIndexDB = s.getCurrIndexDB()
idbPrev, idbNext := s.getPrevAndCurrIndexDBs()
// perform search for 0:0 tenant
// results of previous search requests shouldn't affect it
isExt := db.extDB.getIndexSearch(noDeadline)
isPrev := idbPrev.getIndexSearch(noDeadline)
// search for range that covers prev indexDB for dates before ingestion
tr := TimeRange{
MinTimestamp: int64(rotationMillis - msecPerDay*(days)),
MaxTimestamp: int64(rotationMillis),
}
if !isExt.containsTimeRange(tr) {
if !isPrev.containsTimeRange(tr) {
t.Fatalf("expected to have given time range at prev IndexDB")
}
@@ -2250,21 +2184,30 @@ func TestSearchContainsTimeRange(t *testing.T) {
MinTimestamp: int64(rotationMillis + msecPerDay*(days+4)),
MaxTimestamp: int64(rotationMillis + msecPerDay*(days+2)),
}
if isExt.containsTimeRange(tr) {
if isPrev.containsTimeRange(tr) {
t.Fatalf("not expected to have given time range at prev IndexDB")
}
key := isExt.marshalCommonPrefix(nil, nsPrefixDateToMetricID)
key := isPrev.marshalCommonPrefix(nil, nsPrefixDateToMetricID)
db.extDB.minMissingTimestampByKeyLock.Lock()
minMissingTimetamp := db.extDB.minMissingTimestampByKey[string(key)]
db.extDB.minMissingTimestampByKeyLock.Unlock()
idbPrev.minMissingTimestampByKeyLock.Lock()
minMissingTimetamp := idbPrev.minMissingTimestampByKey[string(key)]
idbPrev.minMissingTimestampByKeyLock.Unlock()
if minMissingTimetamp != tr.MinTimestamp {
t.Fatalf("unexpected minMissingTimestamp for 0:0 tenant got %d, want %d", minMissingTimetamp, tr.MinTimestamp)
}
db.extDB.putIndexSearch(isExt)
putIndexDB()
idbPrev.putIndexSearch(isPrev)
s.putPrevAndCurrIndexDBs(idbPrev, idbNext)
s.MustClose()
fs.MustRemoveDir(path)
}
func sortedSlice(m map[string]struct{}) []string {
s := make([]string, 0, len(m))
for k := range m {
s = append(s, k)
}
slices.Sort(s)
return s
}

View File

@@ -43,7 +43,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const path = "BenchmarkIndexDBAddTSIDs"
timestamp := time.Date(2025, 3, 17, 0, 0, 0, 0, time.UTC).UnixMilli()
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
const recordsPerLoop = 1e3
@@ -66,13 +66,13 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
startOffset := 0
for pb.Next() {
benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, timestamp, startOffset, recordsPerLoop)
benchmarkIndexDBAddTSIDs(idbCurr, &genTSID, &mn, timestamp, startOffset, recordsPerLoop)
startOffset += recordsPerLoop
}
})
b.StopTimer()
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
}
@@ -97,7 +97,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
const path = "BenchmarkHeadPostingForMatchers"
timestamp := int64(0)
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
// Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66
var mn MetricName
@@ -110,7 +110,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
mn.sortTags()
generateTSID(&genTSID.TSID, &mn)
createAllIndexesForMetricName(db, &mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, &mn, &genTSID.TSID, date)
}
for n := 0; n < 10; n++ {
ns := strconv.Itoa(n)
@@ -126,7 +126,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
// Make sure all the items can be searched.
db.s.DebugFlush()
s.DebugFlush()
b.ResetTimer()
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
@@ -135,14 +135,14 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
// index instead of per-day index.
tr := globalIndexTimeRange
for i := 0; i < b.N; i++ {
is := db.getIndexSearch(noDeadline)
is := idbCurr.getIndexSearch(noDeadline)
metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9)
db.putIndexSearch(is)
idbCurr.putIndexSearch(is)
if err != nil {
b.Fatalf("unexpected error in searchMetricIDs: %s", err)
}
if len(metricIDs) != expectedMetricIDs {
b.Fatalf("unexpected metricIDs found; got %d; want %d", len(metricIDs), expectedMetricIDs)
if metricIDs.Len() != expectedMetricIDs {
b.Fatalf("unexpected metricIDs found; got %d; want %d", metricIDs.Len(), expectedMetricIDs)
}
}
}
@@ -254,7 +254,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
benchSearch(b, tfs, 88889)
})
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
}
@@ -263,7 +263,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
const path = "BenchmarkIndexDBGetTSIDs"
timestamp := time.Date(2025, 3, 17, 0, 0, 0, 0, time.UTC).UnixMilli()
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
const recordsPerLoop = 1000
const recordsCount = 1e5
@@ -283,9 +283,9 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
for i := 0; i < recordsCount; i++ {
generateTSID(&genTSID.TSID, &mn)
createAllIndexesForMetricName(db, &mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, &mn, &genTSID.TSID, date)
}
db.s.DebugFlush()
idbCurr.s.DebugFlush()
b.SetBytes(recordsPerLoop)
b.ReportAllocs()
@@ -297,19 +297,19 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
mnLocal.CopyFrom(&mn)
mnLocal.sortTags()
for pb.Next() {
is := db.getIndexSearch(noDeadline)
is := idbCurr.getIndexSearch(noDeadline)
for i := 0; i < recordsPerLoop; i++ {
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
if !is.getTSIDByMetricName(&genTSIDLocal, metricNameLocal, date) {
panic(fmt.Errorf("cannot obtain tsid for row %d", i))
}
}
db.putIndexSearch(is)
idbCurr.putIndexSearch(is)
}
})
b.StopTimer()
putIndexDB()
s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
s.MustClose()
fs.MustRemoveDir(path)
}

View File

@@ -94,12 +94,13 @@ type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call.
MetricBlockRef MetricBlockRef
// idb is used for MetricName lookup for the found data blocks.
idb *indexDB
// storage is used for finding data blocks and MetricName lookup for those
// data blocks.
storage *Storage
// putIndexDB decrements the idb ref counter. Must be called in
// Search.MustClose().
putIndexDB func()
// idbCurr and idbPrev are used for MetricName lookup for the found data blocks.
idbCurr *indexDB
idbPrev *indexDB
// retentionDeadline is used for filtering out blocks outside the configured retention.
retentionDeadline int64
@@ -131,8 +132,9 @@ func (s *Search) reset() {
s.MetricBlockRef.MetricName = s.MetricBlockRef.MetricName[:0]
s.MetricBlockRef.BlockRef = nil
s.idb = nil
s.putIndexDB = nil
s.storage = nil
s.idbPrev = nil
s.idbCurr = nil
s.retentionDeadline = 0
s.ts.reset()
s.tr = TimeRange{}
@@ -154,7 +156,6 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
qt = qt.NewChild("init series search: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
indexTR := storage.adjustTimeRange(tr)
dataTR := tr
if s.needClosing {
@@ -163,18 +164,16 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
retentionDeadline := int64(fasttime.UnixTimestamp()*1e3) - storage.retentionMsecs
s.reset()
s.idb, s.putIndexDB = storage.getCurrIndexDB()
s.storage = storage
s.idbPrev, s.idbCurr = storage.getPrevAndCurrIndexDBs()
s.retentionDeadline = retentionDeadline
s.tr = tr
s.tfss = tfss
s.deadline = deadline
s.needClosing = true
var tsids []TSID
metricIDs, err := s.idb.searchMetricIDs(qt, tfss, indexTR, maxMetrics, deadline)
if err == nil {
tsids, err = s.idb.getTSIDsFromMetricIDs(qt, metricIDs, deadline)
}
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
// It is ok to call Init on non-nil err.
// Init must be called before returning because it will fail
// on Search.MustClose otherwise.
@@ -187,13 +186,55 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
return len(tsids)
}
// searchTSIDs searches the TSIDs that correspond to filters within the given
// time range.
//
// The method will fail if the number of found TSIDs exceeds maxMetrics or the
// search has not completed within the specified deadline.
func (s *Search) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("search TSIDs: filters=%s, timeRange=%s, maxMetrics=%d", tfss, &tr, maxMetrics)
defer qt.Done()
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) ([]TSID, error) {
var tsids []TSID
metricIDs, err := idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err == nil {
tsids, err = idb.getTSIDsFromMetricIDs(qt, metricIDs, deadline)
}
return tsids, err
}
merge := func(data [][]TSID) []TSID {
tsidss := make([][]TSID, 0, len(data))
for _, d := range data {
if len(d) > 0 {
tsidss = append(tsidss, d)
}
}
if len(tsidss) == 0 {
return nil
}
if len(tsidss) == 1 {
return tsidss[0]
}
return mergeSortedTSIDs(tsidss)
}
tsids, err := searchAndMerge(qt, s.storage, tr, search, merge)
if err != nil {
return nil, err
}
return tsids, nil
}
// MustClose closes the Search.
func (s *Search) MustClose() {
if !s.needClosing {
logger.Panicf("BUG: missing Init call before MustClose")
}
s.ts.MustClose()
s.putIndexDB()
s.storage.putPrevAndCurrIndexDBs(s.idbPrev, s.idbCurr)
s.reset()
}
@@ -225,14 +266,14 @@ func (s *Search) NextMetricBlock() bool {
continue
}
var ok bool
s.MetricBlockRef.MetricName, ok = s.idb.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, false)
s.MetricBlockRef.MetricName, ok = s.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID)
if !ok {
// Skip missing metricName for tsid.MetricID.
// It should be automatically fixed. See indexDB.searchMetricNameWithCache for details.
continue
}
// for performance reasons parse metricGroup conditionally
if s.idb.s.metricsTracker != nil {
if s.storage.metricsTracker != nil {
var err error
// MetricName must be sorted and marshalled with MetricName.Marshal()
// it guarantees that first tag is metricGroup
@@ -241,7 +282,7 @@ func (s *Search) NextMetricBlock() bool {
s.err = fmt.Errorf("cannot unmarshal metricGroup from MetricBlockRef.MetricName: %w", err)
return false
}
s.idb.s.metricsTracker.RegisterQueryRequest(0, 0, s.metricGroupBuf)
s.storage.metricsTracker.RegisterQueryRequest(0, 0, s.metricGroupBuf)
}
s.prevMetricID = tsid.MetricID
}
@@ -257,6 +298,28 @@ func (s *Search) NextMetricBlock() bool {
return false
}
func (s *Search) searchMetricName(metricName []byte, metricID uint64) ([]byte, bool) {
mn := s.storage.getMetricNameFromCache(metricName, metricID)
if len(mn) > len(metricName) {
return mn, true
}
mn, found := s.idbCurr.searchMetricName(metricName, metricID, true)
if found {
s.storage.putMetricNameToCache(metricID, mn)
return mn, true
}
// Fallback to previous indexDB.
mn, found = s.idbPrev.searchMetricName(metricName, metricID, true)
if found {
s.storage.putMetricNameToCache(metricID, mn)
return mn, true
}
return metricName, false
}
// SearchQuery is used for sending search queries from vmselect to vmstorage.
type SearchQuery struct {
// The time range for searching time series

View File

@@ -70,6 +70,10 @@ type Storage struct {
// lock file for exclusive access to the storage on the given path.
flockF *os.File
// idbPrev contains the previously used indexdb.
// idbCurr becomes idbPrev after the indexDB rotation.
idbPrev atomic.Pointer[indexDB]
// idbCurr contains the currently used indexdb.
idbCurr atomic.Pointer[indexDB]
@@ -285,9 +289,7 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
fs.MustMkdirIfNotExist(idbSnapshotsPath)
idbNext, idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
idbCurr.SetExtDB(idbPrev)
idbNext.SetExtDB(idbCurr)
s.idbPrev.Store(idbPrev)
s.idbCurr.Store(idbCurr)
s.idbNext.Store(idbNext)
@@ -400,13 +402,11 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
func (s *Storage) DebugFlush() {
s.tb.DebugFlush()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
idb.tb.DebugFlush()
idb.doExtDB(func(extDB *indexDB) {
extDB.tb.DebugFlush()
})
idbCurr.tb.DebugFlush()
idbPrev.tb.DebugFlush()
hour := fasttime.UnixHour()
s.updateCurrHourMetricIDs(hour)
@@ -445,16 +445,14 @@ func (s *Storage) MustCreateSnapshot() string {
dstMetadataDir := filepath.Join(dstDir, metadataDirname)
fs.MustCopyDirectory(srcMetadataDir, dstMetadataDir)
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
idbSnapshot := filepath.Join(srcDir, indexdbDirname, snapshotsDirname, snapshotName)
currSnapshot := filepath.Join(idbSnapshot, idb.name)
idb.tb.MustCreateSnapshotAt(currSnapshot)
idb.doExtDB(func(extDB *indexDB) {
prevSnapshot := filepath.Join(idbSnapshot, extDB.name)
extDB.tb.MustCreateSnapshotAt(prevSnapshot)
})
currSnapshot := filepath.Join(idbSnapshot, idbCurr.name)
idbCurr.tb.MustCreateSnapshotAt(currSnapshot)
prevSnapshot := filepath.Join(idbSnapshot, idbPrev.name)
idbPrev.tb.MustCreateSnapshotAt(prevSnapshot)
dstIdbDir := filepath.Join(dstDir, indexdbDirname)
fs.MustSymlinkRelative(idbSnapshot, dstIdbDir)
@@ -542,33 +540,44 @@ func (s *Storage) MustDeleteStaleSnapshots(maxAge time.Duration) {
}
}
// getCurrAndNextIndexDBs increments refcount for the current and next indexDBs
// and returns them along with a cleanup function that decrements their refcounts.
// Returned indexDBs shouldn't be used after cleanup function was called.
func (s *Storage) getCurrAndNextIndexDBs() (*indexDB, *indexDB, func()) {
// getPrevAndCurrIndexDBs increments the refcount for prev and curr indexDBs and
// returns them.
func (s *Storage) getPrevAndCurrIndexDBs() (prev, curr *indexDB) {
s.idbLock.Lock()
defer s.idbLock.Unlock()
idbCurr := s.idbCurr.Load()
idbCurr.incRef()
idbNext := s.idbNext.Load()
idbNext.incRef()
return idbCurr, idbNext, func() {
idbCurr.decRef()
idbNext.decRef()
}
curr = s.idbCurr.Load()
prev = s.idbPrev.Load()
curr.incRef()
prev.incRef()
return prev, curr
}
// getCurrIndexDBs increments refcount for the current indexDB and returns it along with
// a cleanup function that decrements its refcount.
// Returned indexDB shouldn't be used after cleanup function was called.
func (s *Storage) getCurrIndexDB() (*indexDB, func()) {
// getIndexDBs increments the refcount for all indexDBs (prev, curr,
// and next) and returns them.
func (s *Storage) getIndexDBs() (prev, curr, next *indexDB) {
s.idbLock.Lock()
defer s.idbLock.Unlock()
idbCurr := s.idbCurr.Load()
idbCurr.incRef()
return idbCurr, func() {
idbCurr.decRef()
}
prev = s.idbPrev.Load()
curr = s.idbCurr.Load()
next = s.idbNext.Load()
next.incRef()
curr.incRef()
prev.incRef()
return prev, curr, next
}
// putPrevAndCurrIndexDBs decrements the refcount of prev and curr indexDBs.
func (s *Storage) putPrevAndCurrIndexDBs(prev, curr *indexDB) {
prev.decRef()
curr.decRef()
}
// putPrevAndCurrIndexDBs decrements the refcount of all indexDBs (prev, curr,
// and next).
func (s *Storage) putIndexDBs(prev, curr, next *indexDB) {
prev.decRef()
curr.decRef()
next.decRef()
}
// Metrics contains essential metrics for the Storage.
@@ -754,9 +763,10 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
}
m.NextRetentionSeconds = uint64(d)
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idb.UpdateMetrics(&m.IndexDBMetrics)
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
idbCurr.UpdateMetrics(&m.IndexDBMetrics)
idbPrev.UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
@@ -822,13 +832,11 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
func (s *Storage) notifyReadWriteMode() {
s.tb.NotifyReadWriteMode()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
idb.tb.NotifyReadWriteMode()
idb.doExtDB(func(extDB *indexDB) {
extDB.tb.NotifyReadWriteMode()
})
idbCurr.tb.NotifyReadWriteMode()
idbPrev.tb.NotifyReadWriteMode()
}
func (s *Storage) startRetentionWatcher() {
@@ -905,7 +913,7 @@ func (s *Storage) mustRotateIndexDB(currentTime time.Time) {
// Create new indexdb table, which will be used as idbNext
newTableName := nextIndexDBTableName()
idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName)
idbNew := mustOpenIndexDB(idbNewPath, s, &s.isReadOnly)
idbNew := mustOpenIndexDB(idbNewPath, s, &s.isReadOnly, false)
// Update nextRotationTimestamp
nextRotationTimestamp := currentTime.Unix() + s.retentionMsecs/1000
@@ -915,18 +923,18 @@ func (s *Storage) mustRotateIndexDB(currentTime time.Time) {
// Set idbNext to idbNew
idbNext := s.idbNext.Load()
idbNew.SetExtDB(idbNext)
s.idbNext.Store(idbNew)
// Set idbCurr to idbNext
idbCurr := s.idbCurr.Load()
s.idbCurr.Store(idbNext)
idbPrev := s.idbPrev.Load()
s.idbPrev.Store(idbCurr)
idbCurr.noRegisterNewSeries.Store(true)
// Schedule data removal for idbPrev
idbCurr.doExtDB(func(extDB *indexDB) {
extDB.scheduleToDrop()
})
idbCurr.SetExtDB(nil)
idbPrev.scheduleToDrop()
idbPrev.decRef()
s.idbLock.Unlock()
@@ -988,8 +996,9 @@ func (s *Storage) MustClose() {
s.tb.MustClose()
// Closing idbNext will also close idbCurr and idbPrev.
s.idbNext.Load().MustClose()
s.idbCurr.Load().MustClose()
s.idbPrev.Load().MustClose()
// Save caches.
s.mustSaveCache(s.tsidCache, "metricName_tsid")
@@ -1244,6 +1253,111 @@ func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64
return deadline
}
func (s *Storage) getMetricNameFromCache(dst []byte, metricID uint64) []byte {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
return s.metricNameCache.Get(dst, key[:])
}
func (s *Storage) putMetricNameToCache(metricID uint64, metricName []byte) {
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
s.metricNameCache.Set(key[:], metricName)
}
// searchAndMerge concurrently performs a search operation on all IndexDBs.
// The individual search results are then merged (merge function applied
// only if there is more than one index).
//
// The function creates a child query tracer for each search function call and
// closes it once the search() returns. Thus, implementations of search func
// must not close the query tracer that they receive.
func searchAndMerge[T any](qt *querytracer.Tracer, s *Storage, tr TimeRange, search func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (T, error), merge func([]T) T) (T, error) {
qt = qt.NewChild("search indexDBs: timeRange=%v", &tr)
defer qt.Done()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
var idbs = []*indexDB{idbPrev, idbCurr}
qtSearch := qt.NewChild("search %d indexDBs in parallel", len(idbs))
var wg sync.WaitGroup
data := make([]T, len(idbs))
errs := make([]error, len(idbs))
for i, idb := range idbs {
searchTR := s.adjustTimeRange(tr)
qtChild := qtSearch.NewChild("search indexDB %s: timeRange=%v", idb.name, &searchTR)
wg.Add(1)
go func(qt *querytracer.Tracer, i int, idb *indexDB, tr TimeRange) {
defer wg.Done()
defer qt.Done()
data[i], errs[i] = search(qt, idb, tr)
}(qtChild, i, idb, searchTR)
}
wg.Wait()
qtSearch.Done()
for _, err := range errs {
if err != nil {
var zeroValue T
return zeroValue, err
}
}
qtMerge := qt.NewChild("merge search results")
result := merge(data)
qtMerge.Done()
return result, nil
}
// searchAndMergeUniq is a specific searchAndMerge operation that is common for
// most index searches. It expects each individual search to return a set of
// strings. The results of all individual searches are then unioned and the
// resulting set is converted into a slice. If result contains more than maxResults
// elements, it is truncated to maxResults.
//
// The final result is not sorted since it must be done by vmselect.
func searchAndMergeUniq(qt *querytracer.Tracer, s *Storage, tr TimeRange, search func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (map[string]struct{}, error), maxResults int) ([]string, error) {
merge := func(data []map[string]struct{}) map[string]struct{} {
if len(data) == 0 {
return nil
}
totalLen := 0
for _, d := range data {
totalLen += len(d)
}
if totalLen > maxResults {
totalLen = maxResults
}
all := make(map[string]struct{}, totalLen)
for _, d := range data {
for v := range d {
if len(all) >= maxResults {
return all
}
all[v] = struct{}{}
}
}
return all
}
m, err := searchAndMerge(qt, s, tr, search, merge)
if err != nil {
return nil, err
}
res := make([]string, 0, len(m))
for k := range m {
res = append(res, k)
}
return res, nil
}
// SearchMetricNames returns marshaled metric names matching the given tfss on
// the given tr.
//
@@ -1258,13 +1372,33 @@ func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64
// time range is ignored and the metrics are searched within the entire
// retention period, i.e. the global index are used for searching.
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
tr = s.adjustTimeRange(tr)
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
return idb.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
qt = qt.NewChild("search metric names: filters=%s, timeRange=%s, maxMetrics: %d", tfss, &tr, maxMetrics)
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) ([]string, error) {
return idb.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
merge := func(data [][]string) []string {
var n int
for _, d := range data {
n += len(d)
}
seen := make(map[string]struct{}, n)
all := make([]string, 0, n)
for _, d := range data {
for _, v := range d {
if _, ok := seen[v]; !ok {
all = append(all, v)
seen[v] = struct{}{}
}
}
}
return all
}
res, err := searchAndMerge(qt, s, tr, search, merge)
if err != nil {
return nil, err
}
qt.Donef("found %d metric names", len(res))
return res, nil
}
// ErrDeadlineExceeded is returned when the request times out.
@@ -1276,19 +1410,47 @@ var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded")
// an error will be returned. Otherwise, the function returns the number of
// metrics deleted.
func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMetrics int) (int, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
qt = qt.NewChild("delete series: filters=%s, maxMetrics=%d", tfss, maxMetrics)
defer qt.Done()
deletedCount, err := idb.DeleteTSIDs(qt, tfss, maxMetrics)
if err != nil {
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
if len(tfss) == 0 {
return 0, nil
}
// Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs.
// Not deleting in parallel because the deletion operation is rare.
deletedMetricIDs := &uint64set.Set{}
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
var (
dmisPrev *uint64set.Set
dmisCurr *uint64set.Set
err error
)
qt.Printf("start deleting from previous indexDB")
dmisPrev, err = idbPrev.DeleteSeries(qt, tfss, maxMetrics)
if err != nil {
return 0, err
}
qt.Printf("deleted %d metricIDs from previous indexDB", dmisPrev.Len())
deletedMetricIDs.UnionMayOwn(dmisPrev)
qt.Printf("start deleting from current indexDB")
dmisCurr, err = idbCurr.DeleteSeries(qt, tfss, maxMetrics)
if err != nil {
return 0, err
}
qt.Printf("deleted %d metricIDs from current indexDB", dmisCurr.Len())
deletedMetricIDs.UnionMayOwn(dmisCurr)
// Do not reset MetricID->MetricName cache, since it must be used only
// after filtering out deleted metricIDs.
return deletedCount, nil
n := deletedMetricIDs.Len()
qt.Donef("deleted %d unique metricIDs", n)
return n, nil
}
// SearchLabelNames searches for label names matching the given tfss on tr.
@@ -1301,10 +1463,18 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters, maxMe
// time range is ignored and the label names are searched within the entire
// retention period, i.e. the global index are used for searching.
func (s *Storage) SearchLabelNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int, deadline uint64) ([]string, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
tr = s.adjustTimeRange(tr)
return idb.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
qt = qt.NewChild("search for label names: filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", tfss, &tr, maxLabelNames, maxMetrics)
defer qt.Done()
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (map[string]struct{}, error) {
return idb.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
res, err := searchAndMergeUniq(qt, s, tr, search, maxLabelNames)
if err != nil {
return nil, err
}
qt.Printf("found %d label names", len(res))
return res, nil
}
// SearchLabelValues searches for label values for the given labelName, filters
@@ -1318,11 +1488,18 @@ func (s *Storage) SearchLabelNames(qt *querytracer.Tracer, tfss []*TagFilters, t
// time range is ignored and the label values are searched within the entire
// retention period, i.e. the global index are used for searching.
func (s *Storage) SearchLabelValues(qt *querytracer.Tracer, labelName string, tfss []*TagFilters, tr TimeRange, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
tr = s.adjustTimeRange(tr)
qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics)
defer qt.Done()
return idb.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (map[string]struct{}, error) {
return idb.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
res, err := searchAndMergeUniq(qt, s, tr, search, maxLabelValues)
if err != nil {
return nil, err
}
qt.Printf("found %d label values", len(res))
return res, err
}
// SearchTagValueSuffixes returns all the tag value suffixes for the given
@@ -1342,13 +1519,16 @@ func (s *Storage) SearchLabelValues(qt *querytracer.Tracer, labelName string, tf
// If -disablePerDayIndex is set or the time range is more than 40 days, the
// time range is ignored and the tag value suffixes are searched within the
// entire retention period, i.e. the global index are used for searching.
func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix string,
delimiter byte, maxTagValueSuffixes int, deadline uint64,
) ([]string, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
tr = s.adjustTimeRange(tr)
return idb.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (map[string]struct{}, error) {
return idb.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
}
res, err := searchAndMergeUniq(qt, s, tr, search, maxTagValueSuffixes)
if err != nil {
return nil, err
}
qt.Printf("found %d tag value suffixes", len(res))
return res, err
}
// SearchGraphitePaths returns all the matching paths for the given graphite
@@ -1362,11 +1542,17 @@ func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, t
// time range is ignored and the graphite paths are searched within the entire
// retention period, i.e. global index are used for searching.
func (s *Storage) SearchGraphitePaths(qt *querytracer.Tracer, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
tr = s.adjustTimeRange(tr)
query = replaceAlternateRegexpsWithGraphiteWildcards(query)
return idb.SearchGraphitePaths(qt, tr, nil, query, maxPaths, deadline)
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (map[string]struct{}, error) {
return idb.SearchGraphitePaths(qt, tr, nil, query, maxPaths, deadline)
}
res, err := searchAndMergeUniq(qt, s, tr, search, maxPaths)
if err != nil {
return nil, err
}
qt.Printf("found %d graphite paths", len(res))
return res, err
}
// replaceAlternateRegexpsWithGraphiteWildcards replaces (foo|..|bar) with {foo,...,bar} in b and returns the new value.
@@ -1414,11 +1600,23 @@ func replaceAlternateRegexpsWithGraphiteWildcards(b []byte) []byte {
// GetSeriesCount returns the approximate number of unique time series.
//
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
// up to two times - in curr and prev indexDBs.
func (s *Storage) GetSeriesCount(deadline uint64) (uint64, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
return idb.GetSeriesCount(deadline)
tr := TimeRange{
MinTimestamp: 0,
MaxTimestamp: time.Now().UnixMilli(),
}
search := func(_ *querytracer.Tracer, idb *indexDB, _ TimeRange) (uint64, error) {
return idb.GetSeriesCount(deadline)
}
merge := func(data []uint64) uint64 {
var total uint64
for _, cnt := range data {
total += cnt
}
return total
}
return searchAndMerge(nil, s, tr, search, merge)
}
// GetTSDBStatus returns TSDB status data for /api/v1/status/tsdb
@@ -1429,15 +1627,31 @@ func (s *Storage) GetSeriesCount(deadline uint64) (uint64, error) {
// Otherwise, the date is ignored and the status is calculated for the entire
// retention period, i.e. the global index are used for calculation.
func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
qt = qt.NewChild("getting TSDB status")
defer qt.Done()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
if s.disablePerDayIndex {
date = globalIndexDate
}
res, err := idb.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline)
qtChild := qt.NewChild("getting TSDB status in indexDB %q", idbCurr.name)
res, err := idbCurr.GetTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics, deadline)
qtChild.Done()
if err != nil {
return nil, err
}
if !res.hasEntries() {
qtChild = qt.NewChild("getting TSDB status in indexDB %q", idbPrev.name)
res, err = idbPrev.GetTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics, deadline)
qtChild.Done()
if err != nil {
return nil, err
}
}
if s.metricsTracker != nil && len(res.SeriesCountByMetricName) > 0 {
// for performance reason always check if metricsTracker is configured
names := make([]string, len(res.SeriesCountByMetricName))
@@ -1622,11 +1836,13 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
var newSeriesCount uint64
var seriesRepopulated uint64
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
generation := idb.generation
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
generation := idbCurr.generation
isCurr := idbCurr.getIndexSearch(noDeadline)
defer idbCurr.putIndexSearch(isCurr)
isPrev := idbPrev.getIndexSearch(noDeadline)
defer idbPrev.putIndexSearch(isPrev)
var firstWarn error
for i := range mrs {
mr := &mrs[i]
@@ -1645,19 +1861,19 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
s.invalidRawMetricNames.Add(1)
continue
}
mn.sortTags()
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
s.storeTSIDToCaches(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
} else if !s.dateMetricIDCache.Has(generation, date, genTSID.TSID.MetricID) {
if !is.hasDateMetricIDNoExtDB(date, genTSID.TSID.MetricID) {
if !isCurr.hasDateMetricID(date, genTSID.TSID.MetricID) {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
@@ -1665,7 +1881,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
continue
}
mn.sortTags()
idb.createPerDayIndexes(date, &genTSID.TSID, mn)
idbCurr.createPerDayIndexes(date, &genTSID.TSID, mn)
}
s.dateMetricIDCache.Set(generation, date, genTSID.TSID.MetricID)
}
@@ -1688,12 +1904,12 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
if isCurr.getTSIDByMetricName(&genTSID, metricNameBuf, date) || isPrev.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
// Slower path - the TSID has been found in indexdb.
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
seriesRepopulated++
}
@@ -1701,12 +1917,12 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
continue
}
// Slowest path - there is no TSID in indexdb for the given mr.MetricNameRaw. Create it.
// Slowest path - there isCurr no TSID in indexdb for the given mr.MetricNameRaw. Create it.
generateTSID(&genTSID.TSID, mn)
// Schedule creating TSID indexes instead of creating them synchronously.
// This should keep stable the ingestion rate when new time series are ingested.
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
s.storeTSIDToCaches(mr.MetricNameRaw, &genTSID, date)
newSeriesCount++
@@ -1725,11 +1941,13 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) int {
logNewSeries := s.logNewSeries.Load() || s.logNewSeriesUntil.Load() >= fasttime.UnixTimestamp()
idb, idbNext, putIndexDBs := s.getCurrAndNextIndexDBs()
defer putIndexDBs()
generation := idb.generation
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
idbPrev, idbCurr, idbNext := s.getIndexDBs()
defer s.putIndexDBs(idbPrev, idbCurr, idbNext)
generation := idbCurr.generation
isCurr := idbCurr.getIndexSearch(noDeadline)
defer idbCurr.putIndexSearch(isCurr)
isPrev := idbPrev.getIndexSearch(noDeadline)
defer idbPrev.putIndexSearch(isPrev)
hmPrev := s.prevHourMetricIDs.Load()
hmCurr := s.currHourMetricIDs.Load()
@@ -1836,7 +2054,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
mn.sortTags()
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
s.storeTSIDToCaches(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
@@ -1867,12 +2085,12 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
s.metricsTracker.RegisterIngestRequest(0, 0, mn.MetricGroup)
// Search for TSID for the given mr.MetricNameRaw in the indexdb.
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
if isCurr.getTSIDByMetricName(&genTSID, metricNameBuf, date) || isPrev.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
// Slower path - the TSID has been found in indexdb.
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
seriesRepopulated++
}
@@ -1890,7 +2108,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
generateTSID(&genTSID.TSID, mn)
createAllIndexesForMetricName(idb, mn, &genTSID.TSID, date)
createAllIndexesForMetricName(idbCurr, mn, &genTSID.TSID, date)
genTSID.generation = generation
s.storeTSIDToCaches(mr.MetricNameRaw, &genTSID, date)
newSeriesCount++
@@ -1925,7 +2143,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
}
if err := s.updatePerDateData(idb, rows, dstMrs, hmPrev, hmCurr); err != nil {
if err := s.updatePerDateData(idbCurr, rows, dstMrs, hmPrev, hmCurr); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
}
@@ -2043,7 +2261,7 @@ func (s *Storage) prefillNextIndexDB(idbNext *indexDB, rows []rawRow, mrs []*Met
}
// Check whether the given (date, metricID) is already present in idbNext.
if isNext.hasDateMetricIDNoExtDB(date, metricID) {
if isNext.hasDateMetricID(date, metricID) {
// Indexes are already pre-filled at idbNext.
//
// Register the (generation, date, metricID) entry in the cache,
@@ -2190,7 +2408,7 @@ func (s *Storage) updatePerDateData(idb *indexDB, rows []rawRow, mrs []*MetricRo
for _, dmid := range pendingDateMetricIDs {
date := dmid.date
metricID := dmid.tsid.MetricID
if !is.hasDateMetricIDNoExtDB(date, metricID) {
if !is.hasDateMetricID(date, metricID) {
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day index.
// It is OK if the (date, metricID) entry is added multiple times to indexdb
// by concurrent goroutines.
@@ -2627,9 +2845,9 @@ func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB)
currPath := filepath.Join(path, tableNames[1])
prevPath := filepath.Join(path, tableNames[0])
next = mustOpenIndexDB(nextPath, s, &s.isReadOnly)
curr = mustOpenIndexDB(currPath, s, &s.isReadOnly)
prev = mustOpenIndexDB(prevPath, s, &s.isReadOnly)
next = mustOpenIndexDB(nextPath, s, &s.isReadOnly, false)
curr = mustOpenIndexDB(currPath, s, &s.isReadOnly, false)
prev = mustOpenIndexDB(prevPath, s, &s.isReadOnly, true)
return next, curr, prev
}

View File

@@ -27,11 +27,11 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
}
const numMetrics = 10
date := uint64(tr.MinTimestamp) / msecPerDay
idb, putCurrIndexDB := s.getCurrIndexDB()
defer putCurrIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
var wantMetricIDs []uint64
// Symulate corrupted index by inserting `(date, tag) -> metricID`
// Simulate corrupted index by inserting `(date, tag) -> metricID`
// entries only.
for i := range numMetrics {
metricName := []byte(fmt.Sprintf("metric_%d", i))
@@ -51,11 +51,11 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
ii.Next()
kbPool.Put(kb)
idb.tb.AddItems(ii.Items)
idbCurr.tb.AddItems(ii.Items)
putIndexItems(ii)
}
idb.tb.DebugFlush()
idbCurr.tb.DebugFlush()
tfsAll := NewTagFilters()
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
@@ -64,7 +64,7 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
tfssAll := []*TagFilters{tfsAll}
searchMetricIDs := func() []uint64 {
metricIDs, err := idb.searchMetricIDs(nil, tfssAll, tr, 1e9, noDeadline)
metricIDs, err := idbCurr.searchMetricIDs(nil, tfssAll, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("searchMetricIDs() failed unexpectedly: %v", err))
}
@@ -104,7 +104,7 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
t.Fatalf("unexpected metric names (-want, +got):\n%s", diff)
}
// As a result they cannot be searched anymore.
if diff := cmp.Diff([]uint64{}, searchMetricIDs()); diff != "" {
if diff := cmp.Diff([]uint64(nil), searchMetricIDs()); diff != "" {
t.Fatalf("unexpected metricIDs (-want, +got):\n%s", diff)
}
})

View File

@@ -16,6 +16,8 @@ import (
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
@@ -1130,11 +1132,11 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
assertTagFiltersCached := func(tfss []*TagFilters, tr TimeRange, want bool) {
t.Helper()
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
tfssKey := marshalTagFiltersKey(nil, tfss, tr, true)
_, got := idb.getMetricIDsFromTagFiltersCache(nil, tfssKey)
_, got := idbCurr.getMetricIDsFromTagFiltersCache(nil, tfssKey)
if got != want {
t.Errorf("unexpected tag filters in cache %v %v: got %t, want %t", tfss, &tr, got, want)
}
@@ -1765,15 +1767,14 @@ func TestStorageRotateIndexDB(t *testing.T) {
wg.Wait()
s.DebugFlush()
idbCurr, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
idbPrev := idbCurr.extDB
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
isCurr := idbCurr.getIndexSearch(noDeadline)
defer idbCurr.putIndexSearch(isCurr)
isPrev := idbPrev.getIndexSearch(noDeadline)
defer idbPrev.putIndexSearch(isPrev)
return testCountAllMetricNamesNoExtDB(isPrev, tr), testCountAllMetricNamesNoExtDB(isCurr, tr)
return testCountAllMetricNamesInIndex(isPrev, tr), testCountAllMetricNamesInIndex(isCurr, tr)
}
var oldCurr int
@@ -1793,7 +1794,7 @@ func TestStorageRotateIndexDB(t *testing.T) {
}
}
func testCountAllMetricNamesNoExtDB(is *indexSearch, tr TimeRange) int {
func testCountAllMetricNamesInIndex(is *indexSearch, tr TimeRange) int {
tfss := NewTagFilters()
if err := tfss.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
@@ -1804,10 +1805,13 @@ func testCountAllMetricNamesNoExtDB(is *indexSearch, tr TimeRange) int {
}
metricNames := map[string]bool{}
var metricName []byte
for _, metricID := range metricIDs {
metricName, _ = is.searchMetricName(metricName[:0], metricID)
metricNames[string(metricName)] = true
}
metricIDs.ForEach(func(part []uint64) bool {
for _, metricID := range part {
metricName, _ = is.searchMetricName(metricName[:0], metricID)
metricNames[string(metricName)] = true
}
return true
})
return len(metricNames)
}
@@ -3434,7 +3438,7 @@ func TestStorageSearchMetricNamesWithoutPerDayIndex(t *testing.T) {
)
rng := rand.New(rand.NewSource(1))
opts := testStorageSearchWithoutPerDayIndexOptions{
wantEmpty: []string(nil),
wantEmpty: []string{},
wantPerTimeRange: make(map[TimeRange]any),
wantAll: []string{},
}
@@ -3479,6 +3483,7 @@ func TestStorageSearchMetricNamesWithoutPerDayIndex(t *testing.T) {
}
got[i] = string(mn.MetricGroup)
}
slices.Sort(got)
if !reflect.DeepEqual(got, want) {
t.Errorf("[%v] unexpected metric names: got %v, want %v", &tr, got, want)
}
@@ -3652,7 +3657,7 @@ func TestStorageSearchGraphitePathsWithoutPerDayIndex(t *testing.T) {
)
rng := rand.New(rand.NewSource(1))
opts := testStorageSearchWithoutPerDayIndexOptions{
wantEmpty: []string(nil),
wantEmpty: []string{},
wantPerTimeRange: make(map[TimeRange]any),
wantAll: []string{},
}
@@ -3889,6 +3894,30 @@ func TestStorageAddRows_currHourMetricIDs(t *testing.T) {
})
}
// testSearchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted. The function panics in in case of error.
// The function is not a part of Storage beause it is currently used in unit
// tests only.
func testSearchMetricIDs(s *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) []uint64 {
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) ([]uint64, error) {
return idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
}
merge := func(data [][]uint64) []uint64 {
s := &uint64set.Set{}
for _, d := range data {
s.AddMulti(d)
}
all := s.AppendTo(nil)
return all
}
metricIDs, err := searchAndMerge(nil, s, tr, search, merge)
if err != nil {
panic(fmt.Sprintf("searching metricIDs failed unexpectedly: %s", err))
}
return metricIDs
}
// testCountAllMetricIDs is a test helper function that counts the IDs of
// all time series within the given time range.
func testCountAllMetricIDs(s *Storage, tr TimeRange) int {
@@ -3896,15 +3925,7 @@ func testCountAllMetricIDs(s *Storage, tr TimeRange) int {
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
}
if s.disablePerDayIndex {
tr = globalIndexTimeRange
}
idb, putIndexDB := s.getCurrIndexDB()
defer putIndexDB()
ids, err := idb.searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("seachMetricIDs() failed unexpectedly: %s", err))
}
ids := testSearchMetricIDs(s, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
return len(ids)
}
@@ -4449,7 +4470,7 @@ func TestMustOpenIndexDBTables_noTables(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBIsNotNil(t, prev)
assertIndexDBIsNotNil(t, curr)
assertIndexDBIsNotNil(t, next)
@@ -4470,7 +4491,7 @@ func TestMustOpenIndexDBTables_prevOnly(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBName(t, prev, prevName)
assertIndexDBIsNotNil(t, curr)
assertIndexDBIsNotNil(t, next)
@@ -4496,7 +4517,7 @@ func TestMustOpenIndexDBTables_currAndPrev(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBName(t, prev, prevName)
assertIndexDBName(t, curr, currName)
assertIndexDBIsNotNil(t, next)
@@ -4525,7 +4546,7 @@ func TestMustOpenIndexDBTables_nextAndCurrAndPrev(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBName(t, prev, prevName)
assertIndexDBName(t, curr, currName)
assertIndexDBName(t, next, nextName)
@@ -4563,7 +4584,7 @@ func TestMustOpenIndexDBTables_ObsoleteDirsAreRemoved(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBName(t, prev, prevName)
assertIndexDBName(t, curr, currName)
assertIndexDBName(t, next, nextName)
@@ -4594,7 +4615,7 @@ func TestMustRotateIndexDBs_dirNames(t *testing.T) {
defer s.MustClose()
next := s.idbNext.Load()
curr := s.idbCurr.Load()
prev := curr.extDB
prev := s.idbPrev.Load()
assertIndexDBName(t, prev, prevName)
assertIndexDBName(t, curr, currName)
assertIndexDBName(t, next, nextName)
@@ -4602,7 +4623,7 @@ func TestMustRotateIndexDBs_dirNames(t *testing.T) {
s.mustRotateIndexDB(time.Now())
next = s.idbNext.Load()
curr = s.idbCurr.Load()
prev = curr.extDB
prev = s.idbPrev.Load()
newNextName := next.name
newNextPath := filepath.Join(idbPath, newNextName)
assertPathsDoNotExist(t, prevPath)

View File

@@ -1,6 +1,7 @@
package storage
import (
"container/heap"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -99,3 +100,63 @@ func (t *TSID) Less(b *TSID) bool {
}
return t.MetricID < b.MetricID
}
// mergeSortedTSIDs merges sorted TSID slices into one. Duplicates are removed.
func mergeSortedTSIDs(tsidss [][]TSID) []TSID {
var h tsidHeap
var n int
for _, tsids := range tsidss {
if len(tsids) > 0 {
h = append(h, tsids)
n += len(tsids)
}
}
all := make([]TSID, 0, n)
heap.Init(&h)
for h.Len() > 0 {
top := h[0]
tsid := top[0]
if len(all) == 0 || tsid != all[len(all)-1] {
all = append(all, tsid)
}
if len(top) == 1 {
heap.Pop(&h)
} else {
h[0] = top[1:]
heap.Fix(&h, 0)
}
}
return all
}
// tsidHeap is a slice of tsidItems that implements methods that allow to use it
// as a heap. It is used for implementing N-way merge of N sorted TSID slices.
// See mergeSortedTSIDs().
//
// Slice elements initially must not be empty.
type tsidHeap [][]TSID
func (h tsidHeap) Len() int {
return len(h)
}
func (h tsidHeap) Less(i, j int) bool {
return h[i][0].Less(&h[j][0])
}
func (h tsidHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *tsidHeap) Push(_ any) {
panic(fmt.Errorf("BUG: Push shouldn't be called"))
}
func (h *tsidHeap) Pop() any {
a := *h
x := a[len(a)-1]
*h = a[:len(a)-1]
return x
}

View File

@@ -7,6 +7,8 @@ import (
"sync"
"testing"
"testing/quick"
"github.com/google/go-cmp/cmp"
)
func TestMarshaledTSIDSize(t *testing.T) {
@@ -150,3 +152,91 @@ func testTSIDMarshalUnmarshal(t *testing.T, tsid *TSID) {
t.Fatalf("unexpected tsid unmarshaled from suffixed dst; got\n%+v; want\n%+v", &tsid2, tsid)
}
}
func TestMergeSortedTSIDs(t *testing.T) {
var (
tsidss [][]TSID
want []TSID
)
// nil slice
tsidss = nil
want = []TSID{}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// slice of nils
tsidss = [][]TSID{nil, nil, nil}
want = []TSID{}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// empty slice
tsidss = [][]TSID{}
want = []TSID{}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// slice of empty slices
tsidss = [][]TSID{{}, {}, {}}
want = []TSID{}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
id := func(metricID uint64) TSID {
return TSID{MetricID: metricID}
}
// all unique
tsidss = [][]TSID{
{id(3), id(7), id(11), id(15)},
{id(1), id(5), id(9), id(13)},
{id(4), id(8), id(12), id(16)},
{id(2), id(6), id(10), id(14)},
}
want = []TSID{
id(1), id(2), id(3), id(4),
id(5), id(6), id(7), id(8),
id(9), id(10), id(11), id(12),
id(13), id(14), id(15), id(16),
}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// with duplicates
tsidss = [][]TSID{
{id(3), id(5), id(7), id(11), id(15)},
{id(1), id(5), id(8), id(9), id(13)},
{id(4), id(6), id(8), id(12), id(16)},
{id(2), id(6), id(7), id(10), id(14)},
}
want = []TSID{
id(1), id(2), id(3), id(4),
id(5), id(6), id(7), id(8),
id(9), id(10), id(11), id(12),
id(13), id(14), id(15), id(16),
}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// variable length
tsidss = [][]TSID{
{id(3), id(7)},
{id(1), id(5), id(9), id(13)},
{},
{id(4), id(8), id(16)},
{id(2)},
}
want = []TSID{
id(1), id(2), id(3), id(4), id(5), id(7), id(8), id(9), id(13), id(16),
}
if diff := cmp.Diff(want, mergeSortedTSIDs(tsidss)); diff != "" {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
}