mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-18 09:16:35 +03:00
lib/storage: use lrucache for tfss cache (#10072)
The purpose of this PR is the same as #10000, except `lrucache` is used for implementing tfss cache. --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -692,6 +692,8 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
|
||||
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexps"}`, storage.RegexpCacheMisses())
|
||||
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexpPrefixes"}`, storage.RegexpPrefixesCacheMisses())
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheResets)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_deleted_metrics_total{type="indexdb"}`, m.DeletedMetricsCount)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/tsid"}`, m.TSIDCacheCollisions)
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
//
|
||||
// Call NewCache() for creating new Cache.
|
||||
type Cache struct {
|
||||
resets atomic.Uint64
|
||||
|
||||
shards []*cache
|
||||
|
||||
cleanerMustStopCh chan struct{}
|
||||
@@ -60,6 +62,14 @@ func (c *Cache) MustStop() {
|
||||
<-c.cleanerStoppedCh
|
||||
}
|
||||
|
||||
// Reset resets the cache.
|
||||
func (c *Cache) Reset() {
|
||||
c.resets.Add(1)
|
||||
for _, shard := range c.shards {
|
||||
shard.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
// GetEntry returns an Entry for the given key k from c.
|
||||
func (c *Cache) GetEntry(k string) Entry {
|
||||
idx := uint64(0)
|
||||
@@ -109,7 +119,8 @@ func (c *Cache) SizeMaxBytes() uint64 {
|
||||
return n
|
||||
}
|
||||
|
||||
// Requests returns the number of requests served by c.
|
||||
// Requests returns the number of requests served by c since cache creation or
|
||||
// last reset.
|
||||
func (c *Cache) Requests() uint64 {
|
||||
n := uint64(0)
|
||||
for _, shard := range c.shards {
|
||||
@@ -118,7 +129,8 @@ func (c *Cache) Requests() uint64 {
|
||||
return n
|
||||
}
|
||||
|
||||
// Misses returns the number of cache misses for c.
|
||||
// Misses returns the number of cache misses for c since cache creation or last
|
||||
// reset.
|
||||
func (c *Cache) Misses() uint64 {
|
||||
n := uint64(0)
|
||||
for _, shard := range c.shards {
|
||||
@@ -127,6 +139,11 @@ func (c *Cache) Misses() uint64 {
|
||||
return n
|
||||
}
|
||||
|
||||
// Resets returns the number of cache resets since its creation.
|
||||
func (c *Cache) Resets() uint64 {
|
||||
return c.resets.Load()
|
||||
}
|
||||
|
||||
func (c *Cache) cleaner() {
|
||||
d := timeutil.AddJitterToDuration(time.Second * 53)
|
||||
ticker := time.NewTicker(d)
|
||||
@@ -200,6 +217,17 @@ func newCache(getMaxSizeBytes func() uint64) *cache {
|
||||
return &c
|
||||
}
|
||||
|
||||
func (c *cache) Reset() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.m = make(map[string]*cacheEntry)
|
||||
c.lah = nil
|
||||
c.requests.Store(0)
|
||||
c.misses.Store(0)
|
||||
c.sizeBytes.Store(0)
|
||||
}
|
||||
|
||||
func (c *cache) updateSizeBytes(n uint64) {
|
||||
c.sizeBytes.Add(n)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,9 @@ func TestCache(t *testing.T) {
|
||||
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
|
||||
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
|
||||
}
|
||||
if n := c.Resets(); n != 0 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
|
||||
}
|
||||
k := "foobar"
|
||||
var e testEntry
|
||||
entrySize := e.SizeBytes()
|
||||
@@ -43,6 +46,9 @@ func TestCache(t *testing.T) {
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Resets(); n != 0 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain this entry from the cache
|
||||
if e1 := c.GetEntry(k); e1 != &e {
|
||||
t.Fatalf("unexpected entry obtained; got %v; want %v", e1, &e)
|
||||
@@ -53,6 +59,9 @@ func TestCache(t *testing.T) {
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Resets(); n != 0 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
|
||||
}
|
||||
// Obtain non-existing entry from the cache
|
||||
if e1 := c.GetEntry("non-existing-key"); e1 != nil {
|
||||
t.Fatalf("unexpected non-nil block obtained for non-existing key: %v", e1)
|
||||
@@ -63,6 +72,9 @@ func TestCache(t *testing.T) {
|
||||
if n := c.Misses(); n != 1 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.Resets(); n != 0 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
|
||||
}
|
||||
// Store the entry again.
|
||||
c.PutEntry(k, &e)
|
||||
if n := c.SizeBytes(); n != entrySize {
|
||||
@@ -77,12 +89,33 @@ func TestCache(t *testing.T) {
|
||||
if n := c.Misses(); n != 1 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := c.Resets(); n != 0 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 0)
|
||||
}
|
||||
|
||||
// Manually clean the cache. The entry shouldn't be deleted because it was recently accessed.
|
||||
c.cleanByTimeout()
|
||||
if n := c.SizeBytes(); n != entrySize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, entrySize)
|
||||
}
|
||||
|
||||
// Reset cache.
|
||||
c.Reset()
|
||||
if n := c.SizeMaxBytes(); n != sizeMaxBytes {
|
||||
t.Fatalf("unexpected SizeMaxBytes(); got %d; want %d", n, sizeMaxBytes)
|
||||
}
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Requests(); n != 0 {
|
||||
t.Fatalf("unexpected number of requests; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Misses(); n != 0 {
|
||||
t.Fatalf("unexpected number of misses; got %d; want %d", n, 0)
|
||||
}
|
||||
if n := c.Resets(); n != 1 {
|
||||
t.Fatalf("unexpected Resets(); got %d; want %d", n, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheConcurrentAccess(_ *testing.T) {
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
@@ -26,6 +25,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/lrucache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
@@ -127,7 +127,7 @@ type indexDB struct {
|
||||
noRegisterNewSeries atomic.Bool
|
||||
|
||||
// Cache for fast TagFilters -> MetricIDs lookup.
|
||||
tagFiltersToMetricIDsCache *workingsetcache.Cache
|
||||
tagFiltersToMetricIDsCache *lrucache.Cache
|
||||
|
||||
// The parent storage.
|
||||
s *Storage
|
||||
@@ -144,16 +144,16 @@ type indexDB struct {
|
||||
indexSearchPool sync.Pool
|
||||
}
|
||||
|
||||
var maxTagFiltersCacheSize int
|
||||
var maxTagFiltersCacheSize uint64
|
||||
|
||||
// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
|
||||
func SetTagFiltersCacheSize(size int) {
|
||||
maxTagFiltersCacheSize = size
|
||||
maxTagFiltersCacheSize = uint64(size)
|
||||
}
|
||||
|
||||
func getTagFiltersCacheSize() int {
|
||||
func getTagFiltersCacheSize() uint64 {
|
||||
if maxTagFiltersCacheSize <= 0 {
|
||||
return int(float64(memory.Allowed()) / 32)
|
||||
return uint64(float64(memory.Allowed()) / 32)
|
||||
}
|
||||
return maxTagFiltersCacheSize
|
||||
}
|
||||
@@ -173,21 +173,17 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool, noRegiste
|
||||
logger.Panicf("FATAL: cannot parse indexdb path %q: %s", path, err)
|
||||
}
|
||||
|
||||
tb := mergeset.MustOpenTable(path, dataFlushInterval, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
|
||||
|
||||
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile because of tagFiltersKeyGen.
|
||||
mem := memory.Allowed()
|
||||
tagFiltersCacheSize := getTagFiltersCacheSize()
|
||||
|
||||
tfssCache := lrucache.NewCache(getTagFiltersCacheSize)
|
||||
tb := mergeset.MustOpenTable(path, dataFlushInterval, tfssCache.Reset, mergeTagToMetricIDsRows, isReadOnly)
|
||||
db := &indexDB{
|
||||
generation: gen,
|
||||
tb: tb,
|
||||
name: name,
|
||||
|
||||
minMissingTimestampByKey: make(map[string]int64),
|
||||
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
|
||||
tagFiltersToMetricIDsCache: tfssCache,
|
||||
s: s,
|
||||
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
|
||||
loopsPerDateTagFilterCache: workingsetcache.New(memory.Allowed() / 128),
|
||||
dateMetricIDCache: newDateMetricIDCache(),
|
||||
}
|
||||
db.noRegisterNewSeries.Store(noRegisterNewSeries)
|
||||
@@ -204,6 +200,7 @@ type IndexDBMetrics struct {
|
||||
TagFiltersToMetricIDsCacheSizeMaxBytes uint64
|
||||
TagFiltersToMetricIDsCacheRequests uint64
|
||||
TagFiltersToMetricIDsCacheMisses uint64
|
||||
TagFiltersToMetricIDsCacheResets uint64
|
||||
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSizeBytes uint64
|
||||
@@ -246,15 +243,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||
m.CompositeFilterSuccessConversions = compositeFilterSuccessConversions.Load()
|
||||
m.CompositeFilterMissingConversions = compositeFilterMissingConversions.Load()
|
||||
|
||||
var cs fastcache.Stats
|
||||
|
||||
cs.Reset()
|
||||
db.tagFiltersToMetricIDsCache.UpdateStats(&cs)
|
||||
m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
|
||||
m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
|
||||
m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
|
||||
m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
|
||||
m.TagFiltersToMetricIDsCacheMisses += cs.Misses
|
||||
m.TagFiltersToMetricIDsCacheSize += uint64(db.tagFiltersToMetricIDsCache.Len())
|
||||
m.TagFiltersToMetricIDsCacheSizeBytes += uint64(db.tagFiltersToMetricIDsCache.SizeBytes())
|
||||
m.TagFiltersToMetricIDsCacheSizeMaxBytes += uint64(db.tagFiltersToMetricIDsCache.SizeMaxBytes())
|
||||
m.TagFiltersToMetricIDsCacheRequests += db.tagFiltersToMetricIDsCache.Requests()
|
||||
m.TagFiltersToMetricIDsCacheMisses += db.tagFiltersToMetricIDsCache.Misses()
|
||||
m.TagFiltersToMetricIDsCacheResets += db.tagFiltersToMetricIDsCache.Resets()
|
||||
|
||||
m.DateMetricIDCacheSize += uint64(db.dateMetricIDCache.EntriesCount())
|
||||
m.DateMetricIDCacheSizeBytes += uint64(db.dateMetricIDCache.SizeBytes())
|
||||
@@ -296,7 +290,7 @@ func (db *indexDB) decRef() {
|
||||
db.tb = nil
|
||||
|
||||
// Free space occupied by caches owned by db.
|
||||
db.tagFiltersToMetricIDsCache.Stop()
|
||||
db.tagFiltersToMetricIDsCache.MustStop()
|
||||
db.loopsPerDateTagFilterCache.Stop()
|
||||
|
||||
db.tagFiltersToMetricIDsCache = nil
|
||||
@@ -312,33 +306,36 @@ func (db *indexDB) decRef() {
|
||||
logger.Infof("indexDB %q has been dropped", tbPath)
|
||||
}
|
||||
|
||||
var tagBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
|
||||
qt = qt.NewChild("search for metricIDs in tag filters cache")
|
||||
defer qt.Done()
|
||||
buf := tagBufPool.Get()
|
||||
defer tagBufPool.Put(buf)
|
||||
buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
|
||||
if len(buf.B) == 0 {
|
||||
// getMetricIDsFromTagFiltersCache retrieves the set of metricIDs that
|
||||
// correspond to the given (tffs, tr) key.
|
||||
//
|
||||
// The caller must convert the (tfss, tr) to a byte slice and use it as the key
|
||||
// when calling this method (see marshalTagFiltersKey()).
|
||||
//
|
||||
// The caller must not modify the set of metricIDs returned by this method.
|
||||
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) (*uint64set.Set, bool) {
|
||||
qt.Printf("search for metricIDs in tag filters cache")
|
||||
v := db.tagFiltersToMetricIDsCache.GetEntry(bytesutil.ToUnsafeString(key))
|
||||
if v == nil {
|
||||
qt.Printf("cache miss")
|
||||
return nil, false
|
||||
}
|
||||
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
|
||||
metricIDs := mustUnmarshalMetricIDs(nil, buf.B)
|
||||
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
|
||||
metricIDs := v.(*uint64set.Set)
|
||||
qt.Printf("found %d metricIDs in cache", metricIDs.Len())
|
||||
return metricIDs, true
|
||||
}
|
||||
|
||||
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
|
||||
qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
|
||||
defer qt.Done()
|
||||
buf := tagBufPool.Get()
|
||||
buf.B = marshalMetricIDs(buf.B, metricIDs)
|
||||
qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
|
||||
db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
|
||||
qt.Printf("stored %d metricIDs into cache", len(metricIDs))
|
||||
tagBufPool.Put(buf)
|
||||
// putMetricIDsToTagFiltersCache stores the set of metricIDs that
|
||||
// correspond to the given (tffs, tr) key into the cache.
|
||||
//
|
||||
// The caller must convert the (tfss, tr) to a byte slice and use it as the key
|
||||
// when calling this method (see marshalTagFiltersKey()).
|
||||
//
|
||||
// The caller must not modify the set of metricIDs after calling this method.
|
||||
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs *uint64set.Set, key []byte) {
|
||||
qt.Printf("put %d metricIDs in cache", metricIDs.Len())
|
||||
db.tagFiltersToMetricIDsCache.PutEntry(string(key), metricIDs)
|
||||
qt.Printf("stored %d metricIDs into cache", metricIDs.Len())
|
||||
}
|
||||
|
||||
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
|
||||
@@ -363,16 +360,9 @@ func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
|
||||
db.s.metricIDCache.Set(key[:], buf[:])
|
||||
}
|
||||
|
||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
|
||||
// There is no need in versioning the tagFilters key, since the tagFiltersToMetricIDsCache
|
||||
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
|
||||
prefix := ^uint64(0)
|
||||
if versioned {
|
||||
prefix = tagFiltersKeyGen.Load()
|
||||
}
|
||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange) []byte {
|
||||
// Round start and end times to per-day granularity according to per-day inverted index.
|
||||
startDate, endDate := tr.DateRange()
|
||||
dst = encoding.MarshalUint64(dst, prefix)
|
||||
dst = encoding.MarshalUint64(dst, startDate)
|
||||
dst = encoding.MarshalUint64(dst, endDate)
|
||||
for _, tfs := range tfss {
|
||||
@@ -384,64 +374,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
|
||||
return dst
|
||||
}
|
||||
|
||||
func invalidateTagFiltersCache() {
|
||||
// This function must be fast, since it is called each time new timeseries is added.
|
||||
tagFiltersKeyGen.Add(1)
|
||||
}
|
||||
|
||||
var tagFiltersKeyGen atomicutil.Uint64
|
||||
|
||||
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
|
||||
if len(metricIDs) == 0 {
|
||||
// Add one zero byte to indicate an empty metricID list and skip
|
||||
// compression to save CPU cycles.
|
||||
//
|
||||
// An empty slice passed to ztsd won't be compressed and therefore
|
||||
// nothing will be added to dst and if dst is empty the record won't be
|
||||
// added to the cache. As the result, the search for a given filter will
|
||||
// be performed again and again. This may lead to cases like this:
|
||||
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7009
|
||||
return append(dst, 0)
|
||||
}
|
||||
|
||||
// Compress metricIDs, so they occupy less space in the cache.
|
||||
//
|
||||
// The srcBuf is a []byte cast of metricIDs.
|
||||
srcBuf := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(metricIDs))), 8*len(metricIDs))
|
||||
|
||||
dst = encoding.CompressZSTDLevel(dst, srcBuf, 1)
|
||||
return dst
|
||||
}
|
||||
|
||||
func mustUnmarshalMetricIDs(dst []uint64, src []byte) []uint64 {
|
||||
if len(src) == 1 && src[0] == 0 {
|
||||
// One zero byte indicates an empty metricID list.
|
||||
// See marshalMetricIDs().
|
||||
return dst
|
||||
}
|
||||
|
||||
// Decompress src into dstBuf.
|
||||
//
|
||||
// dstBuf is a []byte cast of dst.
|
||||
dstBuf := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(dst))), 8*cap(dst))
|
||||
dstBuf = dstBuf[:8*len(dst)]
|
||||
dstBufLen := len(dstBuf)
|
||||
var err error
|
||||
dstBuf, err = encoding.DecompressZSTD(dstBuf, src)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot decompress metricIDs: %s", err)
|
||||
}
|
||||
if (len(dstBuf)-dstBufLen)%8 != 0 {
|
||||
logger.Panicf("FATAL: cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(dstBuf)-dstBufLen)
|
||||
}
|
||||
|
||||
// Convert dstBuf back to dst
|
||||
dst = unsafe.Slice((*uint64)(unsafe.Pointer(unsafe.SliceData(dstBuf))), cap(dstBuf)/8)
|
||||
dst = dst[:len(dstBuf)/8]
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
type indexSearch struct {
|
||||
db *indexDB
|
||||
ts mergeset.TableSearch
|
||||
@@ -1617,7 +1549,7 @@ func (db *indexDB) saveDeletedMetricIDs(metricIDs *uint64set.Set) {
|
||||
db.s.updateDeletedMetricIDs(metricIDs)
|
||||
|
||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||
invalidateTagFiltersCache()
|
||||
db.tagFiltersToMetricIDsCache.Reset()
|
||||
|
||||
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
|
||||
db.s.resetAndSaveTSIDCache()
|
||||
@@ -1676,9 +1608,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||
}
|
||||
|
||||
// searchMetricIDs returns metricIDs for the given tfss and tr.
|
||||
//
|
||||
// The returned metricIDs are sorted.
|
||||
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
|
||||
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) (*uint64set.Set, error) {
|
||||
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
|
||||
defer qt.Done()
|
||||
|
||||
@@ -1689,11 +1619,11 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t
|
||||
tfKeyBuf := tagFiltersKeyBufPool.Get()
|
||||
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
||||
|
||||
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
|
||||
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr)
|
||||
metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qt, tfKeyBuf.B)
|
||||
if ok {
|
||||
// Fast path - metricIDs found in the cache
|
||||
if len(metricIDs) > maxMetrics {
|
||||
if metricIDs.Len() > maxMetrics {
|
||||
return nil, errTooManyTimeseries(maxMetrics)
|
||||
}
|
||||
return metricIDs, nil
|
||||
@@ -1701,12 +1631,11 @@ func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, t
|
||||
|
||||
// Slow path - search for metricIDs in the db
|
||||
is := db.getIndexSearch(deadline)
|
||||
metricIDsSet, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
|
||||
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for metricIDs: %w", err)
|
||||
}
|
||||
metricIDs = metricIDsSet.AppendTo(nil)
|
||||
|
||||
// Store metricIDs in the cache.
|
||||
db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)
|
||||
@@ -1729,56 +1658,61 @@ func (db *indexDB) SearchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(metricIDs) == 0 {
|
||||
if metricIDs.Len() == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tsids := make([]TSID, len(metricIDs))
|
||||
tsids := make([]TSID, metricIDs.Len())
|
||||
metricIDsToDelete := &uint64set.Set{}
|
||||
i := 0
|
||||
err = func() error {
|
||||
is := db.getIndexSearch(deadline)
|
||||
defer db.putIndexSearch(is)
|
||||
for loopsPaceLimiter, metricID := range metricIDs {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
paceLimiter := 0
|
||||
is := db.getIndexSearch(deadline)
|
||||
defer db.putIndexSearch(is)
|
||||
metricIDs.ForEach(func(metricIDs []uint64) bool {
|
||||
for _, metricID := range metricIDs {
|
||||
if paceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
paceLimiter++
|
||||
|
||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||
tsid := &tsids[i]
|
||||
err := is.db.getFromMetricIDCache(tsid, metricID)
|
||||
err = db.getFromMetricIDCache(tsid, metricID)
|
||||
if err == nil {
|
||||
// Fast path - the tsid for metricID is found in cache.
|
||||
i++
|
||||
continue
|
||||
}
|
||||
if err != io.EOF {
|
||||
return err
|
||||
return false
|
||||
}
|
||||
err = nil
|
||||
if !is.getTSIDByMetricID(tsid, metricID) {
|
||||
// Cannot find TSID for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to un-flushed entries.
|
||||
// Mark the metricID as deleted, so it is created again when new sample
|
||||
// for the given time series is ingested next time.
|
||||
if is.db.s.wasMetricIDMissingBefore(metricID) {
|
||||
is.db.missingTSIDsForMetricID.Add(1)
|
||||
if db.s.wasMetricIDMissingBefore(metricID) {
|
||||
db.missingTSIDsForMetricID.Add(1)
|
||||
metricIDsToDelete.Add(metricID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
is.db.putToMetricIDCache(metricID, tsid)
|
||||
db.putToMetricIDCache(metricID, tsid)
|
||||
i++
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for TSIDs by metricIDs: %w", err)
|
||||
}
|
||||
|
||||
tsids = tsids[:i]
|
||||
qt.Printf("load %d TSIDs for %d metricIDs", len(tsids), len(metricIDs))
|
||||
qt.Printf("found %d TSIDs for %d metricIDs", len(tsids), metricIDs.Len())
|
||||
|
||||
// Sort the found tsids, since they must be passed to TSID search
|
||||
// in the sorted order.
|
||||
@@ -1807,37 +1741,45 @@ func (db *indexDB) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(metricIDs) == 0 {
|
||||
if metricIDs.Len() == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
is := db.getIndexSearch(noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
metricNames := make([]string, 0, len(metricIDs))
|
||||
metricNames := make([]string, 0, metricIDs.Len())
|
||||
metricIDsToDelete := &uint64set.Set{}
|
||||
var metricName []byte
|
||||
var ok bool
|
||||
for i, metricID := range metricIDs {
|
||||
if i&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(deadline); err != nil {
|
||||
return nil, err
|
||||
paceLimiter := 0
|
||||
is := db.getIndexSearch(deadline)
|
||||
defer db.putIndexSearch(is)
|
||||
metricIDs.ForEach(func(metricIDs []uint64) bool {
|
||||
for _, metricID := range metricIDs {
|
||||
if paceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err = checkSearchDeadlineAndPace(deadline); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
paceLimiter++
|
||||
|
||||
metricName, ok = is.searchMetricNameWithCache(metricName[:0], metricID)
|
||||
if !ok {
|
||||
// Cannot find metricName for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to un-flushed entries.
|
||||
// Mark the metricID as deleted, so it is created again when new sample
|
||||
// for the given time series is ingested next time.
|
||||
if db.s.wasMetricIDMissingBefore(metricID) {
|
||||
db.missingMetricNamesForMetricID.Add(1)
|
||||
metricIDsToDelete.Add(metricID)
|
||||
metricName, ok = is.searchMetricNameWithCache(metricName[:0], metricID)
|
||||
if !ok {
|
||||
// Cannot find TSID for the given metricID.
|
||||
// This may be the case on incomplete indexDB
|
||||
// due to snapshot or due to un-flushed entries.
|
||||
// Mark the metricID as deleted, so it is created again when new sample
|
||||
// for the given time series is ingested next time.
|
||||
if db.s.wasMetricIDMissingBefore(metricID) {
|
||||
db.missingMetricNamesForMetricID.Add(1)
|
||||
metricIDsToDelete.Add(metricID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
continue
|
||||
metricNames = append(metricNames, string(metricName))
|
||||
}
|
||||
metricNames = append(metricNames, string(metricName))
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if metricIDsToDelete.Len() > 0 {
|
||||
|
||||
@@ -23,49 +23,6 @@ import (
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
)
|
||||
|
||||
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
|
||||
f := func(metricIDs []uint64) {
|
||||
t.Helper()
|
||||
|
||||
// Try marshaling and unmarshaling to an empty dst
|
||||
data := marshalMetricIDs(nil, metricIDs)
|
||||
result := mustUnmarshalMetricIDs(nil, data)
|
||||
if !reflect.DeepEqual(result, metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
|
||||
}
|
||||
|
||||
// Try marshaling and unmarshaling to non-empty dst
|
||||
dataPrefix := []byte("prefix")
|
||||
data = marshalMetricIDs(dataPrefix, metricIDs)
|
||||
if len(data) < len(dataPrefix) {
|
||||
t.Fatalf("too short len(data)=%d; must be at least len(dataPrefix)=%d", len(data), len(dataPrefix))
|
||||
}
|
||||
if string(data[:len(dataPrefix)]) != string(dataPrefix) {
|
||||
t.Fatalf("unexpected prefix; got %q; want %q", data[:len(dataPrefix)], dataPrefix)
|
||||
}
|
||||
data = data[len(dataPrefix):]
|
||||
|
||||
resultPrefix := []uint64{889432422, 89243, 9823}
|
||||
result = mustUnmarshalMetricIDs(resultPrefix, data)
|
||||
if len(result) < len(resultPrefix) {
|
||||
t.Fatalf("too short result returned; len(result)=%d; must be at least len(resultPrefix)=%d", len(result), len(resultPrefix))
|
||||
}
|
||||
if !reflect.DeepEqual(result[:len(resultPrefix)], resultPrefix) {
|
||||
t.Fatalf("unexpected result prefix; got %d; want %d", result[:len(resultPrefix)], resultPrefix)
|
||||
}
|
||||
result = result[len(resultPrefix):]
|
||||
if (len(metricIDs) > 0 || len(result) > 0) && !reflect.DeepEqual(result, metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs after unmarshaling from prefix;\ngot\n%d\nwant\n%d", result, metricIDs)
|
||||
}
|
||||
}
|
||||
|
||||
f(nil)
|
||||
f([]uint64{0})
|
||||
f([]uint64{1})
|
||||
f([]uint64{1234, 678932943, 843289893843})
|
||||
f([]uint64{1, 2, 3, 4, 5, 6, 8989898, 823849234, 1<<64 - 1, 1<<32 - 1, 0})
|
||||
}
|
||||
|
||||
func TestTagFiltersToMetricIDsCache(t *testing.T) {
|
||||
f := func(want []uint64) {
|
||||
t.Helper()
|
||||
@@ -80,11 +37,15 @@ func TestTagFiltersToMetricIDsCache(t *testing.T) {
|
||||
defer s.putIndexDBs(idbPrev, idbCurr, idbNext)
|
||||
|
||||
key := []byte("key")
|
||||
idbCurr.putMetricIDsToTagFiltersCache(nil, want, key)
|
||||
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
|
||||
wantSet := &uint64set.Set{}
|
||||
wantSet.AddMulti(want)
|
||||
idbCurr.putMetricIDsToTagFiltersCache(nil, wantSet, key)
|
||||
gotSet, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
|
||||
if !ok {
|
||||
t.Fatalf("expected metricIDs to be found in cache but they weren't: %v", want)
|
||||
}
|
||||
got := gotSet.AppendTo(nil)
|
||||
slices.Sort(want)
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("unexpected metricIDs in cache: got %v, want %v", got, want)
|
||||
}
|
||||
@@ -105,14 +66,13 @@ func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) {
|
||||
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
|
||||
|
||||
key := []byte("key")
|
||||
emptyMetricIDs := []uint64(nil)
|
||||
idbCurr.putMetricIDsToTagFiltersCache(nil, emptyMetricIDs, key)
|
||||
idbCurr.putMetricIDsToTagFiltersCache(nil, nil, key)
|
||||
got, ok := idbCurr.getMetricIDsFromTagFiltersCache(nil, key)
|
||||
if !ok {
|
||||
t.Fatalf("expected empty metricID list to be found in cache but it wasn't")
|
||||
}
|
||||
if len(got) > 0 {
|
||||
t.Fatalf("unexpected found metricID list to be empty but got %v", got)
|
||||
if got.Len() > 0 {
|
||||
t.Fatalf("unexpected found metricID list to be empty but got %v", got.AppendTo(nil))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"testing"
|
||||
@@ -313,33 +312,3 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
s.MustClose()
|
||||
fs.MustRemoveDir(path)
|
||||
}
|
||||
|
||||
func BenchmarkMarshalUnmarshalMetricIDs(b *testing.B) {
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
|
||||
f := func(b *testing.B, numMetricIDs int) {
|
||||
metricIDs := make([]uint64, numMetricIDs)
|
||||
// metric IDs need to be sorted.
|
||||
ts := uint64(time.Now().UnixNano())
|
||||
for i := range numMetricIDs {
|
||||
metricIDs[i] = ts + uint64(rng.Intn(100))
|
||||
}
|
||||
|
||||
var marshalledLen int
|
||||
b.ResetTimer()
|
||||
for range b.N {
|
||||
marshalled := marshalMetricIDs(nil, metricIDs)
|
||||
marshalledLen = len(marshalled)
|
||||
_ = mustUnmarshalMetricIDs(nil, marshalled)
|
||||
}
|
||||
b.StopTimer()
|
||||
compressionRate := float64(numMetricIDs*8) / float64(marshalledLen)
|
||||
b.ReportMetric(compressionRate, "compression-rate")
|
||||
}
|
||||
|
||||
for _, n := range []int{0, 1, 10, 100, 1e3, 1e4, 1e5, 1e6, 1e7} {
|
||||
b.Run(fmt.Sprintf("numMetricIDs-%d", n), func(b *testing.B) {
|
||||
f(b, n)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ func TestStorageSearchTSIDs_CorruptedIndex(t *testing.T) {
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("searchMetricIDs() failed unexpectedly: %v", err))
|
||||
}
|
||||
return metricIDs
|
||||
return metricIDs.AppendTo(nil)
|
||||
}
|
||||
searchTSIDs := func() []TSID {
|
||||
tsids, err := s.SearchTSIDs(nil, tfssAll, tr, 1e9, noDeadline)
|
||||
@@ -160,7 +160,7 @@ func TestStorageSearchMetricNames_CorruptedIndex(t *testing.T) {
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("searchMetricIDs() failed unexpectedly: %v", err))
|
||||
}
|
||||
return metricIDs
|
||||
return metricIDs.AppendTo(nil)
|
||||
}
|
||||
searchMetricNames := func() []string {
|
||||
metricNames, err := s.SearchMetricNames(nil, tfssAll, tr, 1e9, noDeadline)
|
||||
|
||||
@@ -1017,7 +1017,7 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
|
||||
idbPrev, idbCurr := s.getPrevAndCurrIndexDBs()
|
||||
defer s.putPrevAndCurrIndexDBs(idbPrev, idbCurr)
|
||||
|
||||
tfssKey := marshalTagFiltersKey(nil, tfss, tr, true)
|
||||
tfssKey := marshalTagFiltersKey(nil, tfss, tr)
|
||||
_, got := idbCurr.getMetricIDsFromTagFiltersCache(nil, tfssKey)
|
||||
if got != want {
|
||||
t.Errorf("unexpected tag filters in cache %v %v: got %t, want %t", tfss, &tr, got, want)
|
||||
@@ -3831,22 +3831,21 @@ func TestStorageAddRows_currHourMetricIDs(t *testing.T) {
|
||||
// The function is not a part of Storage because it is currently used in unit
|
||||
// tests only.
|
||||
func testSearchMetricIDs(s *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) []uint64 {
|
||||
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) ([]uint64, error) {
|
||||
search := func(qt *querytracer.Tracer, idb *indexDB, tr TimeRange) (*uint64set.Set, error) {
|
||||
return idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
|
||||
}
|
||||
merge := func(data [][]uint64) []uint64 {
|
||||
s := &uint64set.Set{}
|
||||
merge := func(data []*uint64set.Set) *uint64set.Set {
|
||||
all := &uint64set.Set{}
|
||||
for _, d := range data {
|
||||
s.AddMulti(d)
|
||||
all.Union(d)
|
||||
}
|
||||
all := s.AppendTo(nil)
|
||||
return all
|
||||
}
|
||||
metricIDs, err := searchAndMerge(nil, s, tr, search, merge)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("searching metricIDs failed unexpectedly: %s", err))
|
||||
}
|
||||
return metricIDs
|
||||
return metricIDs.AppendTo(nil)
|
||||
}
|
||||
|
||||
// testCountAllMetricIDs is a test helper function that counts the IDs of
|
||||
|
||||
Reference in New Issue
Block a user