lib/storage: use lrucache for tfss cache (#10072)

The purpose of this PR is the same as #10000, except `lrucache` is used
for implementing tfss cache.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev
2025-11-27 14:18:03 +01:00
committed by GitHub
parent 19c0477976
commit 4cd727a511
8 changed files with 181 additions and 248 deletions

View File

@@ -692,6 +692,8 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexps"}`, storage.RegexpCacheMisses())
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexpPrefixes"}`, storage.RegexpPrefixesCacheMisses())
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheResets)
metrics.WriteCounterUint64(w, `vm_deleted_metrics_total{type="indexdb"}`, m.DeletedMetricsCount)
metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/tsid"}`, m.TSIDCacheCollisions)

View File

@@ -17,6 +17,8 @@ import (
//
// Call NewCache() for creating new Cache.
type Cache struct {
resets atomic.Uint64
shards []*cache
cleanerMustStopCh chan struct{}
@@ -60,6 +62,14 @@ func (c *Cache) MustStop() {
<-c.cleanerStoppedCh
}
// Reset resets the cache.
func (c *Cache) Reset() {
c.resets.Add(1)
for _, shard := range c.shards {
shard.Reset()
}
}
// GetEntry returns an Entry for the given key k from c.
func (c *Cache) GetEntry(k string) Entry {
idx := uint64(0)
@@ -109,7 +119,8 @@ func (c *Cache) SizeMaxBytes() uint64 {
return n
}
// Requests returns the number of requests served by c.
// Requests returns the number of requests served by c since cache creation or
// last reset.
func (c *Cache) Requests() uint64 {
n := uint64(0)
for _, shard := range c.shards {
@@ -118,7 +129,8 @@ func (c *Cache) Requests() uint64 {
return n
}
// Misses returns the number of cache misses for c.
// Misses returns the number of cache misses for c since cache creation or last
// reset.
func (c *Cache) Misses() uint64 {
n := uint64(0)
for _, shard := range c.shards {
@@ -127,6 +139,11 @@ func (c *Cache) Misses() uint64 {
return n
}
// Resets returns the number of cache resets since its creation.
func (c *Cache) Resets() uint64 {
return c.resets.Load()
}
func (c *Cache) cleaner() {
d := timeutil.AddJitterToDuration(time.Second * 53)
ticker := time.NewTicker(d)
@@ -200,6 +217,17 @@ func newCache(getMaxSizeBytes func() uint64) *cache {
return &c
}
func (c *cache) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.m = make(map[string]*cacheEntry)
c.lah = nil
c.requests.Store(0)
c.misses.Store(0)
c.sizeBytes.Store(0)
}
func (c *cache) updateSizeBytes(n uint64) {
c.sizeBytes.Add(n)
}

View File

@@ -26,6 +26,9 @@ func TestCache(t *testing.T) {
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
}
if n := c.Resets(); n != 0 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
}
k := "foobar"
var e testEntry
entrySize := e.SizeBytes()
@@ -43,6 +46,9 @@ func TestCache(t *testing.T) {
if n := c.Misses(); n != 0 {
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
}
if n := c.Resets(); n != 0 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
}
// Obtain this entry from the cache
if e1 := c.GetEntry(k); e1 != &e {
t.Fatalf("unexpected entry obtained; got %v; want %v", e1, &e)
@@ -53,6 +59,9 @@ func TestCache(t *testing.T) {
if n := c.Misses(); n != 0 {
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
}
if n := c.Resets(); n != 0 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
}
// Obtain non-existing entry from the cache
if e1 := c.GetEntry("non-existing-key"); e1 != nil {
t.Fatalf("unexpected non-nil block obtained for non-existing key: %v", e1)
@@ -63,6 +72,9 @@ func TestCache(t *testing.T) {
if n := c.Misses(); n != 1 {
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
}
if n := c.Resets(); n != 0 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
}
// Store the entry again.
c.PutEntry(k, &e)
if n := c.SizeBytes(); n != entrySize {
@@ -77,12 +89,33 @@ func TestCache(t *testing.T) {
if n := c.Misses(); n != 1 {
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
}
if n := c.Resets(); n != 0 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
}
// Manually clean the cache. The entry shouldn't be deleted because it was recently accessed.
c.cleanByTimeout()
if n := c.SizeBytes(); n != entrySize {
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, entrySize)
}
// Reset cache.
c.Reset()
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
}
if n := c.SizeBytes(); n != 0 {
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
}
if n := c.Requests(); n != 0 {
t.Fatalf("unexpected number of requests; got %d; want %d", n, 0)
}
if n := c.Misses(); n != 0 {
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
}
if n := c.Resets(); n != 1 {
t.Fatalf("unexpected Resets(); got %d; want %d", n, 1)
}
}
func TestCacheConcurrentAccess(_ *testing.T) {

View File

@@ -16,7 +16,6 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metricsql"
"github.com/cespare/xxhash/v2"
@@ -26,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/lrucache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
@@ -127,7 +127,7 @@ type indexDB struct {
noRegisterNewSeries atomic.Bool
// Cache for fast TagFilters -> MetricIDs lookup.
tagFiltersToMetricIDsCache *workingsetcache.Cache
tagFiltersToMetricIDsCache *lrucache.Cache
// The parent storage.
s *Storage
@@ -144,16 +144,16 @@ type indexDB struct {
indexSearchPool sync.Pool
}
var maxTagFiltersCacheSize int
var maxTagFiltersCacheSize uint64
// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
func SetTagFiltersCacheSize(size int) {
maxTagFiltersCacheSize = size
maxTagFiltersCacheSize = uint64(size)
}
func getTagFiltersCacheSize() int {
func getTagFiltersCacheSize() uint64 {
if maxTagFiltersCacheSize <= 0 {
return int(float64(memory.Allowed()) / 32)
return uint64(float64(memory.Allowed()) / 32)
}
return maxTagFiltersCacheSize
}
@@ -173,21 +173,17 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool, noRegiste
logger.Panicf("FATAL: cannot parse indexdb path %q: %s", path, err)
}
tb := mergeset.MustOpenTable(path, dataFlushInterval, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen.
mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize()
tfssCache := lrucache.NewCache(getTagFiltersCacheSize)
tb := mergeset.MustOpenTable(path, dataFlushInterval, tfssCache.Reset, mergeTagToMetricIDsRows, isReadOnly)
db := &indexDB{
generation: gen,
tb: tb,
name: name,
minMissingTimestampByKey: make(map[string]int64),
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
tagFiltersToMetricIDsCache: tfssCache,
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
loopsPerDateTagFilterCache: workingsetcache.New(memory.Allowed() / 128),
dateMetricIDCache: newDateMetricIDCache(),
}
db.noRegisterNewSeries.Store(noRegisterNewSeries)
@@ -204,6 +200,7 @@ type IndexDBMetrics struct {
TagFiltersToMetricIDsCacheSizeMaxBytes uint64
TagFiltersToMetricIDsCacheRequests uint64
TagFiltersToMetricIDsCacheMisses uint64
TagFiltersToMetricIDsCacheResets uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
@@ -246,15 +243,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.CompositeFilterSuccessConversions = compositeFilterSuccessConversions.Load()
m.CompositeFilterMissingConversions = compositeFilterMissingConversions.Load()
var cs fastcache.Stats
cs.Reset()
db.tagFiltersToMetricIDsCache.UpdateStats(&cs)
m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
m.TagFiltersToMetricIDsCacheMisses += cs.Misses
m.TagFiltersToMetricIDsCacheSize += uint64(db.tagFiltersToMetricIDsCache.Len())
m.TagFiltersToMetricIDsCacheSizeBytes += uint64(db.tagFiltersToMetricIDsCache.SizeBytes())
m.TagFiltersToMetricIDsCacheSizeMaxBytes += uint64(db.tagFiltersToMetricIDsCache.SizeMaxBytes())
m.TagFiltersToMetricIDsCacheRequests += db.tagFiltersToMetricIDsCache.Requests()
m.TagFiltersToMetricIDsCacheMisses += db.tagFiltersToMetricIDsCache.Misses()
m.TagFiltersToMetricIDsCacheResets += db.tagFiltersToMetricIDsCache.Resets()
m.DateMetricIDCacheSize += uint64(db.dateMetricIDCache.EntriesCount())
m.DateMetricIDCacheSizeBytes += uint64(db.dateMetricIDCache.SizeBytes())
@@ -296,7 +290,7 @@ func (db *indexDB) decRef() {
db.tb = nil
// Free space occupied by caches owned by db.
db.tagFiltersToMetricIDsCache.Stop()
db.tagFiltersToMetricIDsCache.MustStop()
db.loopsPerDateTagFilterCache.Stop()
db.tagFiltersToMetricIDsCache = nil
@@ -312,33 +306,36 @@ func (db *indexDB) decRef() {
logger.Infof("indexDB %q has been dropped", tbPath)
}
var tagBufPool bytesutil.ByteBufferPool
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
qt = qt.NewChild("search for metricIDs in tag filters cache")
defer qt.Done()
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
if len(buf.B) == 0 {
// getMetricIDsFromTagFiltersCache retrieves the set of metricIDs that
// correspond to the given (tffs, tr) key.
//
// The caller must convert the (tfss, tr) to a byte slice and use it as the key
// when calling this method (see marshalTagFiltersKey()).
//
// The caller must not modify the set of metricIDs returned by this method.
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) (*uint64set.Set, bool) {
qt.Printf("search for metricIDs in tag filters cache")
v := db.tagFiltersToMetricIDsCache.GetEntry(bytesutil.ToUnsafeString(key))
if v == nil {
qt.Printf("cache miss")
return nil, false
}
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
metricIDs := mustUnmarshalMetricIDs(nil, buf.B)
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
metricIDs := v.(*uint64set.Set)
qt.Printf("found %d metricIDs in cache", metricIDs.Len())
return metricIDs, true
}
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
defer qt.Done()
buf := tagBufPool.Get()
buf.B = marshalMetricIDs(buf.B, metricIDs)
qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
qt.Printf("stored %d metricIDs into cache", len(metricIDs))
tagBufPool.Put(buf)
// putMetricIDsToTagFiltersCache stores the set of metricIDs that
// correspond to the given (tffs, tr) key into the cache.
//
// The caller must convert the (tfss, tr) to a byte slice and use it as the key
// when calling this method (see marshalTagFiltersKey()).
//
// The caller must not modify the set of metricIDs after calling this method.
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs *uint64set.Set, key []byte) {
qt.Printf("put %d metricIDs in cache", metricIDs.Len())
db.tagFiltersToMetricIDsCache.PutEntry(string(key), metricIDs)
qt.Printf("stored %d metricIDs into cache", metricIDs.Len())
}
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
@@ -363,16 +360,9 @@ func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
db.s.metricIDCache.Set(key[:], buf[:])
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
// There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
prefix := ^uint64(0)
if versioned {
prefix = tagFiltersKeyGen.Load()
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange) []byte {
// Round start and end times to per-day granularity according to per-day inverted index.
startDate, endDate := tr.DateRange()
dst = encoding.MarshalUint64(dst, prefix)
dst = encoding.MarshalUint64(dst, startDate)
dst = encoding.MarshalUint64(dst, endDate)
for _, tfs := range tfss {
@@ -384,64 +374,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
return dst
}
func invalidateTagFiltersCache() {
// This function must be fast, since it is called each time new timeseries is added.
tagFiltersKeyGen.Add(1)
}
var tagFiltersKeyGen atomicutil.Uint64
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
if len(metricIDs) == 0 {
// Add one zero byte to indicate an empty metricID list and skip
// compression to save CPU cycles.
//
// An empty slice passed to ztsd won't be compressed and therefore
// nothing will be added to dst and if dst is empty the record won't be
// added to the cache. As the result, the search for a given filter will
// be performed again and again. This may lead to cases like this:
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009
return append(dst, 0)
}
// Compress metricIDs, so they occupy less space in the cache.
//
// The srcBuf is a []byte cast of metricIDs.
srcBuf := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(metricIDs))), 8*len(metricIDs))
dst = encoding.CompressZSTDLevel(dst, srcBuf, 1)
return dst
}
func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 {
if len(src) == 1 && src[0] == 0 {
// One zero byte indicates an empty metricID list.
// See marshalMetricIDs().
return dst
}
// Decompress src into dstBuf.
//
// dstBuf is a []byte cast of dst.
dstBuf := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(dst))), 8*cap(dst))
dstBuf = dstBuf[:8*len(dst)]
dstBufLen := len(dstBuf)
var err error
dstBuf, err = encoding.DecompressZSTD(dstBuf, src)
if err != nil {
logger.Panicf("FATAL: cannot decompress metricIDs: %s", err)
}
if (len(dstBuf)-dstBufLen)%8 != 0 {
logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen)
}
// Convert dstBuf back to dst
dst = unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(dstBuf))), cap(dstBuf)/8)
dst = dst[:len(dstBuf)/8]
return dst
}
type indexSearch struct {
db *indexDB
ts mergeset.TableSearch
@@ -1617,7 +1549,7 @@ func (db *indexDB) saveDeletedMetricIDs(metricIDs *uint64set.Set) {
db.s.updateDeletedMetricIDs(metricIDs)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
invalidateTagFiltersCache()
db.tagFiltersToMetricIDsCache.Reset()
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
db.s.resetAndSaveTSIDCache()
@@ -1676,9 +1608,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
}
// searchMetricIDs returns metricIDs for the given tfss and tr.
//
// The returned metricIDs are sorted.
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) (*uint64set.Set, error) {
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
@@ -1689,11 +1619,11 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t
tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr)
metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qt, tfKeyBuf.B)
if ok {
// Fast path - metricIDs found in the cache
if len(metricIDs) > maxMetrics {
if metricIDs.Len() > maxMetrics {
return nil, errTooManyTimeseries(maxMetrics)
}
return metricIDs, nil
@@ -1701,12 +1631,11 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t
// Slow path - search for metricIDs in the db
is := db.getIndexSearch(deadline)
metricIDsSet, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
db.putIndexSearch(is)
if err != nil {
return nil, fmt.Errorf("error when searching for metricIDs: %w", err)
}
metricIDs = metricIDsSet.AppendTo(nil)
// Store metricIDs in the cache.
db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)
@@ -1729,56 +1658,61 @@ func (db *indexDB) SearchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
if err != nil {
return nil, err
}
if len(metricIDs) == 0 {
if metricIDs.Len() == 0 {
return nil, nil
}
tsids := make([]TSID, len(metricIDs))
tsids := make([]TSID, metricIDs.Len())
metricIDsToDelete := &uint64set.Set{}
i := 0
err = func() error {
is := db.getIndexSearch(deadline)
defer db.putIndexSearch(is)
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
paceLimiter := 0
is := db.getIndexSearch(deadline)
defer db.putIndexSearch(is)
metricIDs.ForEach(func(metricIDs []uint64) bool {
for _, metricID := range metricIDs {
if paceLimiter&paceLimiterSlowIterationsMask == 0 {
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
return false
}
}
paceLimiter++
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
err = db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return err
return false
}
err = nil
if !is.getTSIDByMetricID(tsid, metricID) {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to un-flushed entries.
// Mark the metricID as deleted, so it is created again when new sample
// for the given time series is ingested next time.
if is.db.s.wasMetricIDMissingBefore(metricID) {
is.db.missingTSIDsForMetricID.Add(1)
if db.s.wasMetricIDMissingBefore(metricID) {
db.missingTSIDsForMetricID.Add(1)
metricIDsToDelete.Add(metricID)
}
continue
}
is.db.putToMetricIDCache(metricID, tsid)
db.putToMetricIDCache(metricID, tsid)
i++
}
return nil
}()
return true
})
if err != nil {
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs: %w", err)
}
tsids = tsids[:i]
qt.Printf("load %d TSIDs for %d metricIDs", len(tsids), len(metricIDs))
qt.Printf("found %d TSIDs for %d metricIDs", len(tsids), metricIDs.Len())
// Sort the found tsids, since they must be passed to TSID search
// in the sorted order.
@@ -1807,37 +1741,45 @@ func (db *indexDB) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
if err != nil {
return nil, err
}
if len(metricIDs) == 0 {
if metricIDs.Len() == 0 {
return nil, nil
}
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
metricNames := make([]string, 0, len(metricIDs))
metricNames := make([]string, 0, metricIDs.Len())
metricIDsToDelete := &uint64set.Set{}
var metricName []byte
var ok bool
for i, metricID := range metricIDs {
if i&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(deadline); err != nil {
return nil, err
paceLimiter := 0
is := db.getIndexSearch(deadline)
defer db.putIndexSearch(is)
metricIDs.ForEach(func(metricIDs []uint64) bool {
for _, metricID := range metricIDs {
if paceLimiter&paceLimiterSlowIterationsMask == 0 {
if err = checkSearchDeadlineAndPace(deadline); err != nil {
return false
}
}
}
paceLimiter++
metricName, ok = is.searchMetricNameWithCache(metricName[:0], metricID)
if !ok {
// Cannot find metricName for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to un-flushed entries.
// Mark the metricID as deleted, so it is created again when new sample
// for the given time series is ingested next time.
if db.s.wasMetricIDMissingBefore(metricID) {
db.missingMetricNamesForMetricID.Add(1)
metricIDsToDelete.Add(metricID)
metricName, ok = is.searchMetricNameWithCache(metricName[:0], metricID)
if !ok {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to un-flushed entries.
// Mark the metricID as deleted, so it is created again when new sample
// for the given time series is ingested next time.
if db.s.wasMetricIDMissingBefore(metricID) {
db.missingMetricNamesForMetricID.Add(1)
metricIDsToDelete.Add(metricID)
}
continue
}
continue
metricNames = append(metricNames, string(metricName))
}
metricNames = append(metricNames, string(metricName))
return true
})
if err != nil {
return nil, err
}
if metricIDsToDelete.Len() > 0 {

View File

@@ -23,49 +23,6 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
// Try marshaling and unmarshaling to an empty dst
data := marshalMetricIDs(nil, metricIDs)
result := mustUnmarshalMetricIDs(nil, data)
if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
}
// Try marshaling and unmarshaling to non-empty dst
dataPrefix := []byte("prefix")
data = marshalMetricIDs(dataPrefix, metricIDs)
if len(data) < len(dataPrefix) {
t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix))
}
if string(data[:len(dataPrefix)]) != string(dataPrefix) {
t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix)
}
data = data[len(dataPrefix):]
resultPrefix := []uint64{889432422, 89243, 9823}
result = mustUnmarshalMetricIDs(resultPrefix, data)
if len(result) < len(resultPrefix) {
t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix))
}
if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) {
t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix)
}
result = result[len(resultPrefix):]
if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil)
f([]uint64{0})
f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843})
f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0})
}
func TestTagFiltersToMetricIDsCache(t *testing.T) {
f := func(want []uint64) {
t.Helper()
@@ -80,11 +37,15 @@ func TestTagFiltersToMetricIDsCache(t *testing.T) {
defer s.putIndexDBs(idbPrev, idbCurr, idbNext)
key := []byte("key")
idbCurr.putMetricIDsToTagFiltersCache(nil, want, key)
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
wantSet := &uint64set.Set{}
wantSet.AddMulti(want)
idbCurr.putMetricIDsToTagFiltersCache(nil, wantSet, key)
gotSet, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
if !ok {
t.Fatalf("expected metricIDs to be found in cache but they weren't: %v", want)
}
got := gotSet.AppendTo(nil)
slices.Sort(want)
if !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected metricIDs in cache: got %v, want %v", got, want)
}
@@ -105,14 +66,13 @@ func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) {
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
key := []byte("key")
emptyMetricIDs := []uint64(nil)
idbCurr.putMetricIDsToTagFiltersCache(nil, emptyMetricIDs, key)
idbCurr.putMetricIDsToTagFiltersCache(nil, nil, key)
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
if !ok {
t.Fatalf("expected empty metricID list to be found in cache but it wasn't")
}
if len(got) > 0 {
t.Fatalf("unexpected found metricID list to be empty but got %v", got)
if got.Len() > 0 {
t.Fatalf("unexpected found metricID list to be empty but got %v", got.AppendTo(nil))
}
}

View File

@@ -2,7 +2,6 @@ package storage
import (
"fmt"
"math/rand"
"regexp"
"strconv"
"testing"
@@ -313,33 +312,3 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
s.MustClose()
fs.MustRemoveDir(path)
}
func BenchmarkMarshalUnmarshalMetricIDs(b *testing.B) {
rng := rand.New(rand.NewSource(1))
f := func(b *testing.B, numMetricIDs int) {
metricIDs := make([]uint64, numMetricIDs)
// metric IDs need to be sorted.
ts := uint64(time.Now().UnixNano())
for i := range numMetricIDs {
metricIDs[i] = ts + uint64(rng.Intn(100))
}
var marshalledLen int
b.ResetTimer()
for range b.N {
marshalled := marshalMetricIDs(nil, metricIDs)
marshalledLen = len(marshalled)
_ = mustUnmarshalMetricIDs(nil, marshalled)
}
b.StopTimer()
compressionRate := float64(numMetricIDs*8) / float64(marshalledLen)
b.ReportMetric(compressionRate, "compression-rate")
}
for _, n := range []int{0, 1, 10, 100, 1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("numMetricIDs-%d", n), func(b *testing.B) {
f(b, n)
})
}
}

View File

@@ -59,7 +59,7 @@ func TestStorageSearchTSIDs_CorruptedIndex(t *testing.T) {
if err != nil {
panic(fmt.Sprintf("searchMetricIDs() failed unexpectedly: %v", err))
}
return metricIDs
return metricIDs.AppendTo(nil)
}
searchTSIDs := func() []TSID {
tsids, err := s.SearchTSIDs(nil, tfssAll, tr, 1e9, noDeadline)
@@ -160,7 +160,7 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
if err != nil {
panic(fmt.Sprintf("searchMetricIDs() failed unexpectedly: %v", err))
}
return metricIDs
return metricIDs.AppendTo(nil)
}
searchMetricNames := func() []string {
metricNames, err := s.SearchMetricNames(nil, tfssAll, tr, 1e9, noDeadline)

View File

@@ -1017,7 +1017,7 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
tfssKey := marshalTagFiltersKey(nil, tfss, tr, true)
tfssKey := marshalTagFiltersKey(nil, tfss, tr)
_, got := idbCurr.getMetricIDsFromTagFiltersCache(nil, tfssKey)
if got != want {
t.Errorf("unexpected tag filters in cache %v %v: got %t, want %t", tfss, &tr, got, want)
@@ -3831,22 +3831,21 @@ func TestStorageAddRows_currHourMetricIDs(t *testing.T) {
// The function is not a part of Storage because it is currently used in unit
// tests only.
func testSearchMetricIDs(s *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) []uint64 {
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) ([]uint64, error) {
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (*uint64set.Set, error) {
return idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
}
merge := func(data [][]uint64) []uint64 {
s := &uint64set.Set{}
merge := func(data []*uint64set.Set) *uint64set.Set {
all := &uint64set.Set{}
for _, d := range data {
s.AddMulti(d)
all.Union(d)
}
all := s.AppendTo(nil)
return all
}
metricIDs, err := searchAndMerge(nil, s, tr, search, merge)
if err != nil {
panic(fmt.Sprintf("searching metricIDs failed unexpectedly: %s", err))
}
return metricIDs
return metricIDs.AppendTo(nil)
}
// testCountAllMetricIDs is a test helper function that counts the IDs of