mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/storage: replace tffs loops cache impl with lrucache
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -12,8 +12,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -126,7 +125,7 @@ type indexDB struct {
|
||||
|
||||
// Cache for (date, tagFilter) -> loopsCount, which is used for reducing
|
||||
// the amount of work when matching a set of filters.
|
||||
loopsPerDateTagFilterCache *workingsetcache.Cache
|
||||
loopsPerDateTagFilterCache *lrucache.Cache
|
||||
|
||||
// A cache that stores metricIDs that have been added to the index.
|
||||
// The cache is not populated on startup nor does it store a complete set of
|
||||
@@ -164,6 +163,10 @@ func getTagFiltersCacheSize() uint64 {
|
||||
return maxTagFiltersCacheSize
|
||||
}
|
||||
|
||||
func getTagFiltersLoopsCacheSize() uint64 {
|
||||
return uint64(float64(memory.Allowed()) / 128)
|
||||
}
|
||||
|
||||
var maxMetricIDsForDirectLabelsLookup int = 100e3
|
||||
|
||||
func mustOpenIndexDB(id uint64, tr TimeRange, name, path string, s *Storage, isReadOnly *atomic.Bool, noRegisterNewSeries bool) *indexDB {
|
||||
@@ -181,7 +184,7 @@ func mustOpenIndexDB(id uint64, tr TimeRange, name, path string, s *Storage, isR
|
||||
tb: tb,
|
||||
s: s,
|
||||
tagFiltersToMetricIDsCache: tfssCache,
|
||||
loopsPerDateTagFilterCache: workingsetcache.New(memory.Allowed() / 128),
|
||||
loopsPerDateTagFilterCache: lrucache.NewCache(getTagFiltersLoopsCacheSize),
|
||||
metricIDCache: newMetricIDCache(),
|
||||
dateMetricIDCache: newDateMetricIDCache(),
|
||||
}
|
||||
@@ -268,15 +271,13 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||
|
||||
// Report only once and for either the first met indexDB instance or whose
|
||||
// loopsPerDateTagFilterCache is utilized the most.
|
||||
var cs fastcache.Stats
|
||||
db.loopsPerDateTagFilterCache.UpdateStats(&cs)
|
||||
if cs.BytesSize > m.LoopsPerDateTagFilterCacheSizeBytes {
|
||||
m.LoopsPerDateTagFilterCacheSize = cs.EntriesCount
|
||||
m.LoopsPerDateTagFilterCacheSizeBytes = cs.BytesSize
|
||||
m.LoopsPerDateTagFilterCacheSizeMaxBytes = cs.MaxBytesSize
|
||||
if db.loopsPerDateTagFilterCache.SizeBytes() > m.LoopsPerDateTagFilterCacheSizeBytes {
|
||||
m.LoopsPerDateTagFilterCacheSize = uint64(db.loopsPerDateTagFilterCache.Len())
|
||||
m.LoopsPerDateTagFilterCacheSizeBytes = db.loopsPerDateTagFilterCache.SizeBytes()
|
||||
m.LoopsPerDateTagFilterCacheSizeMaxBytes = db.loopsPerDateTagFilterCache.SizeMaxBytes()
|
||||
}
|
||||
m.LoopsPerDateTagFilterCacheRequests += cs.GetCalls
|
||||
m.LoopsPerDateTagFilterCacheMisses += cs.Misses
|
||||
m.LoopsPerDateTagFilterCacheRequests += db.loopsPerDateTagFilterCache.Requests()
|
||||
m.LoopsPerDateTagFilterCacheMisses += db.loopsPerDateTagFilterCache.Misses()
|
||||
|
||||
// Report only once and for either the first met indexDB instance or whose
|
||||
// metricIDCache is utilized the most.
|
||||
@@ -316,7 +317,7 @@ func (db *indexDB) MustClose() {
|
||||
|
||||
// Free space occupied by caches owned by db.
|
||||
db.tagFiltersToMetricIDsCache.MustStop()
|
||||
db.loopsPerDateTagFilterCache.Stop()
|
||||
db.loopsPerDateTagFilterCache.MustStop()
|
||||
db.metricIDCache.MustStop()
|
||||
db.dateMetricIDCache.MustStop()
|
||||
|
||||
@@ -3131,25 +3132,32 @@ func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *t
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], is.db.name, date, tf, is.accountID, is.projectID)
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
kb.B = is.db.loopsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
|
||||
if len(kb.B) != 3*8 {
|
||||
e := is.db.loopsPerDateTagFilterCache.GetEntry(bytesutil.ToUnsafeString(is.kb.B))
|
||||
if e == nil {
|
||||
return 0, 0, 0
|
||||
}
|
||||
loopsCount := encoding.UnmarshalInt64(kb.B)
|
||||
filterLoopsCount := encoding.UnmarshalInt64(kb.B[8:])
|
||||
timestamp := encoding.UnmarshalUint64(kb.B[16:])
|
||||
return loopsCount, filterLoopsCount, timestamp
|
||||
v := e.(*tagFiltersLoops)
|
||||
return v.loopsCount, v.filterLoopsCount, v.timestamp
|
||||
}
|
||||
|
||||
type tagFiltersLoops struct {
|
||||
loopsCount int64
|
||||
filterLoopsCount int64
|
||||
timestamp uint64
|
||||
}
|
||||
|
||||
func (v *tagFiltersLoops) SizeBytes() uint64 {
|
||||
return uint64(unsafe.Sizeof(*v))
|
||||
}
|
||||
|
||||
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, filterLoopsCount int64) {
|
||||
currentTimestamp := fasttime.UnixTimestamp()
|
||||
v := tagFiltersLoops{
|
||||
loopsCount: loopsCount,
|
||||
filterLoopsCount: filterLoopsCount,
|
||||
timestamp: fasttime.UnixTimestamp(),
|
||||
}
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], is.db.name, date, tf, is.accountID, is.projectID)
|
||||
kb := kbPool.Get()
|
||||
kb.B = encoding.MarshalInt64(kb.B[:0], loopsCount)
|
||||
kb.B = encoding.MarshalInt64(kb.B, filterLoopsCount)
|
||||
kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
|
||||
is.db.loopsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||
kbPool.Put(kb)
|
||||
is.db.loopsPerDateTagFilterCache.PutEntry(string(is.kb.B), &v)
|
||||
}
|
||||
|
||||
func appendDateTagFilterCacheKey(dst []byte, indexDBName string, date uint64, tf *tagFilter, accountID, projectID uint32) []byte {
|
||||
|
||||
Reference in New Issue
Block a user