lib/storage: rotate dateMetricIDCache instead of resetting (#10169)

Currently, `dateMetricIDCache` is reset when it is full and it is never
reset is not full but the data it stores is no longer needed. This leads
to the following problems:
- During regular data ingestion the cache sizeBytes may exceed max
allowed size and the cache gets reset which may potentially slow down
data ingestion (see #10064)
- The cache is per-indexDB. This means that in partition index (#8134)
there will be as many instances of this cache as the number of
partitions. If someone performs a backfill across all partitions, this
will fill all caches and they will never get reset even if no more
historical data is ingested.

So the solution is to periodically rotate the cache. After first
rotation the data is not deleted but moved to `prev` storage. After
second rotation `prev` gets deleted. This gives the cache an opportunity
to restore the `prev` data if it is still in use. Based on #10167.

This PR also removes the introduced recently introduced
`-storage.cacheSizeIndexDBDateMetricID` flag (see #10135). This should
be safe since it is new and its use case is very niche, i.e. no one
would really use it.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev
2025-12-17 15:43:05 +01:00
committed by GitHub
parent 20ad9cd395
commit d9c07dbc0b
5 changed files with 153 additions and 145 deletions

View File

@@ -75,8 +75,6 @@ var (
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBDateMetricID = flagutil.NewBytes("storage.cacheSizeIndexDBDateMetricID", 0, "Overrides max size for indexdb/date_metricID cache. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
disablePerDayIndex = flag.Bool("disablePerDayIndex", false, "Disable per-day index and use global index for all searches. "+
"This may improve performance and decrease disk space usage for the use cases with fixed set of timeseries scattered across a "+
@@ -127,7 +125,6 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetMetricNamesStatsCacheSize(cacheSizeMetricNamesStats.IntN())
storage.SetMetricNameCacheSize(cacheSizeStorageMetricName.IntN())
storage.SetMetadataStorageSize(metadataStorageSize.IntN())
storage.SetDateMetricIDCacheSize(cacheSizeIndexDBDateMetricID.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
@@ -670,7 +667,6 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSizeMaxBytes)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheRequests)
@@ -695,7 +691,6 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheResetsCount)
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheResets)
metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/tsid"}`, m.TSIDCacheCollisions)
@@ -705,6 +700,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_syncs_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheSyncsCount)
metrics.WriteCounterUint64(w, `vm_cache_rotations_total{type="indexdb/metricID"}`, idbm.MetricIDCacheRotationsCount)
metrics.WriteCounterUint64(w, `vm_cache_rotations_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheRotationsCount)
metrics.WriteCounterUint64(w, `vm_deleted_metrics_total{type="indexdb"}`, m.DeletedMetricsCount)

View File

@@ -29,6 +29,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): expose `vmauth_user_request_backend_requests_total` and `vmauth_unauthorized_user_request_backend_requests_total` [metrics](https://docs.victoriametrics.com/victoriametrics/vmauth/#monitoring), which track the number of requests sent to backends. These counts may exceed `vmauth_user_requests_total` and `vmauth_unauthorized_user_requests_total` when requests are retried across multiple backends.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix `vmauth_user_request_backend_errors_total` and `vmauth_unauthorized_user_request_backend_errors_total` to only reflect backend request errors. Previously, these counters could be overcounted with user request error.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): rotate `dateMetricIDCache` instead of resetting it. This should make the eviction less agressive. Since the cache does not have fixed max size anymore the `-storage.cacheSizeIndexDBDateMetricID` flag has been removed. See [#10064](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10053) and PR [#10169](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10169).
## [v1.132.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.132.0)

View File

@@ -4,97 +4,104 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
var maxDateMetricIDCacheSize uint64
// SetDateMetricIDCacheSize overrides the default size of dateMetricIDCache
func SetDateMetricIDCacheSize(size int) {
maxDateMetricIDCacheSize = uint64(size)
}
func getDateMetricIDCacheSize() uint64 {
if maxDateMetricIDCacheSize <= 0 {
return uint64(float64(memory.Allowed()) / 256)
}
return maxDateMetricIDCacheSize
}
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
// dateMetricIDCache stores (date, metricIDs) entries that have been added to
// the index. It is used during data ingestion to decide whether a new entry
// needs to be added to the per-day index.
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
// Contains immutable map
byDate atomic.Pointer[byDateMetricIDMap]
// Contains immutable (date, metricIDs) entries.
curr atomic.Pointer[byDateMetricIDMap]
// Contains mutable map protected by mu
byDateMutable *byDateMetricIDMap
// Contains the number of slow accesses to byDateMutable.
// Is used for deciding when to merge byDateMutable to byDate.
// Contains immutable (date, metricIDs) entries that used to be current
// before cache rotation. It is used to implement periodic cache clean-up.
// Protected by mu.
prev *byDateMetricIDMap
// Contains mutable (date metricIDs) entries that either have been added to
// the cache recently or migrated from prev. Protected by mu.
next *byDateMetricIDMap
// Contains the number of slow accesses to next. Is used for deciding when
// to merge next to curr. Protected by mu.
slowHits int
// Protected by mu.
// Contains the number times the next was merged into curr. Protected by mu.
syncsCount uint64
// Protected by mu.
resetsCount uint64
// Contains the number times the cache has been rotated. Protected by mu.
rotationsCount uint64
mu sync.Mutex
stopCh chan struct{}
rotationStoppedCh chan struct{}
}
func newDateMetricIDCache() *dateMetricIDCache {
var dmc dateMetricIDCache
dmc.resetLocked()
dmc := dateMetricIDCache{
prev: newByDateMetricIDMap(),
next: newByDateMetricIDMap(),
stopCh: make(chan struct{}),
rotationStoppedCh: make(chan struct{}),
}
dmc.curr.Store(newByDateMetricIDMap())
go dmc.startRotation()
return &dmc
}
func (dmc *dateMetricIDCache) resetLocked() {
// Do not reset syncsCount and resetsCount
dmc.byDate.Store(newByDateMetricIDMap())
dmc.byDateMutable = newByDateMetricIDMap()
dmc.slowHits = 0
dmc.resetsCount++
func (dmc *dateMetricIDCache) MustStop() {
close(dmc.stopCh)
<-dmc.rotationStoppedCh
}
type dateMetricIDCacheStats struct {
Size uint64
SizeBytes uint64
SizeMaxBytes uint64
ResetsCount uint64
SyncsCount uint64
Size uint64
SizeBytes uint64
SyncsCount uint64
RotationsCount uint64
}
func (dmc *dateMetricIDCache) Stats() dateMetricIDCacheStats {
s := dateMetricIDCacheStats{
SizeMaxBytes: getDateMetricIDCacheSize(),
}
dmc.mu.Lock()
defer dmc.mu.Unlock()
for _, metricIDs := range dmc.byDate.Load().m {
s.Size += uint64(metricIDs.Len())
s.SizeBytes += metricIDs.SizeBytes()
var s dateMetricIDCacheStats
for _, metricIDs := range dmc.curr.Load().m {
if metricIDs.Len() > 0 {
// empty uint64set.Set still occupies a few bytes. Ignore them.
s.Size += uint64(metricIDs.Len())
s.SizeBytes += metricIDs.SizeBytes()
}
}
for _, metricIDs := range dmc.byDateMutable.m {
s.Size += uint64(metricIDs.Len())
s.SizeBytes += metricIDs.SizeBytes()
for _, metricIDs := range dmc.prev.m {
if metricIDs.Len() > 0 {
s.Size += uint64(metricIDs.Len())
s.SizeBytes += metricIDs.SizeBytes()
}
}
for _, metricIDs := range dmc.next.m {
if metricIDs.Len() > 0 {
s.Size += uint64(metricIDs.Len())
s.SizeBytes += metricIDs.SizeBytes()
}
}
s.ResetsCount = dmc.resetsCount
s.SyncsCount = dmc.syncsCount
s.RotationsCount = dmc.rotationsCount
return s
}
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
if byDate := dmc.byDate.Load(); byDate.get(date).Has(metricID) {
curr := dmc.curr.Load()
vCurr := curr.get(date)
if vCurr.Has(metricID) {
// Fast path. The majority of calls must go here.
return true
}
@@ -109,19 +116,29 @@ func (dmc *dateMetricIDCache) hasSlow(date, metricID uint64) bool {
// First, check immutable map again because the entry may have been moved to
// the immutable map by the time the caller acquires the lock.
byDate := dmc.byDate.Load()
v := byDate.get(date)
if v.Has(metricID) {
curr := dmc.curr.Load()
vCurr := curr.get(date)
if vCurr.Has(metricID) {
return true
}
// Then check mutable map.
vMutable := dmc.byDateMutable.get(date)
ok := vMutable.Has(metricID)
// Then check next and prev.
vNext := dmc.next.getOrCreate(date)
ok := vNext.Has(metricID)
if !ok {
vPrev := dmc.prev.get(date)
ok = vPrev.Has(metricID)
if ok {
// The metricID is in prev but is still in use. Migrate it to next.
vNext.Add(metricID)
}
}
if ok {
dmc.slowHits++
if dmc.slowHits > (v.Len()+vMutable.Len())/2 {
// It is cheaper to merge byDateMutable into byDate than to pay inter-cpu sync costs when accessing vMutable.
if dmc.slowHits > (vCurr.Len()+vNext.Len())/2 {
// It is cheaper to merge next into curr than to pay inter-cpu sync
// costs when accessing next.
dmc.syncLocked()
dmc.slowHits = 0
}
@@ -131,48 +148,50 @@ func (dmc *dateMetricIDCache) hasSlow(date, metricID uint64) bool {
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
dmc.mu.Lock()
v := dmc.byDateMutable.getOrCreate(date)
v := dmc.next.getOrCreate(date)
v.Add(metricID)
dmc.mu.Unlock()
}
func (dmc *dateMetricIDCache) syncLocked() {
if len(dmc.byDateMutable.m) == 0 {
if len(dmc.next.m) == 0 {
// Nothing to sync.
return
}
// Merge data from byDate into byDateMutable and then atomically replace byDate with the merged data.
byDate := dmc.byDate.Load()
byDateMutable := dmc.byDateMutable
byDateMutable.hotEntry.Store(nil)
// Merge data from curr into next and then atomically replace curr with the
// merged data.
curr := dmc.curr.Load()
next := dmc.next
next.hotEntry.Store(nil)
keepDatesMap := make(map[uint64]struct{}, len(byDateMutable.m))
for date, metricIDsMutable := range byDateMutable.m {
keepDatesMap := make(map[uint64]struct{}, len(next.m))
for date, vNext := range next.m {
keepDatesMap[date] = struct{}{}
metricIDs := byDate.get(date)
if metricIDs == nil {
vCurr := curr.get(date)
if vCurr == nil {
// Nothing to merge
continue
}
metricIDs = metricIDs.Clone()
metricIDs.Union(metricIDsMutable)
byDateMutable.m[date] = metricIDs
vCurr = vCurr.Clone()
vCurr.Union(vNext)
next.m[date] = vCurr
}
// Copy entries from byDate, which are missing in byDateMutable
allDatesMap := make(map[uint64]struct{}, len(byDate.m))
for date, metricIDs := range byDate.m {
// Copy entries from curr, which are missing in next
allDatesMap := make(map[uint64]struct{}, len(curr.m))
for date, vCurr := range curr.m {
allDatesMap[date] = struct{}{}
v := byDateMutable.get(date)
if v != nil {
vNext := next.get(date)
if vNext != nil {
continue
}
byDateMutable.m[date] = metricIDs
next.m[date] = vCurr
}
if len(byDateMutable.m) > 2 {
// Keep only entries for the last two dates from allDatesMap plus all the entries for byDateMutable.
if len(next.m) > 2 {
// Keep only entries for the last two dates from allDatesMap plus all
// the entries for next.
dates := make([]uint64, 0, len(allDatesMap))
for date := range allDatesMap {
dates = append(dates, date)
@@ -186,29 +205,46 @@ func (dmc *dateMetricIDCache) syncLocked() {
for _, date := range dates {
keepDatesMap[date] = struct{}{}
}
for date := range byDateMutable.m {
for date := range next.m {
if _, ok := keepDatesMap[date]; !ok {
delete(byDateMutable.m, date)
delete(next.m, date)
}
}
}
var sizeBytes uint64
for _, v := range dmc.byDateMutable.m {
sizeBytes += v.SizeBytes()
}
// Atomically replace byDate with byDateMutable
dmc.byDate.Store(dmc.byDateMutable)
dmc.byDateMutable = newByDateMetricIDMap()
// Atomically replace curr with next.
dmc.curr.Store(dmc.next)
dmc.next = newByDateMetricIDMap()
dmc.syncsCount++
}
if sizeBytes > getDateMetricIDCacheSize() {
dmc.resetLocked()
func (dmc *dateMetricIDCache) startRotation() {
d := timeutil.AddJitterToDuration(10 * time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-dmc.stopCh:
close(dmc.rotationStoppedCh)
return
case <-ticker.C:
dmc.rotate()
}
}
}
// rotate atomically rotates next, curr, and prev cache parts.
func (dmc *dateMetricIDCache) rotate() {
dmc.mu.Lock()
defer dmc.mu.Unlock()
curr := dmc.curr.Load()
dmc.prev = curr
dmc.curr.Store(dmc.next)
dmc.next = newByDateMetricIDMap()
dmc.rotationsCount++
}
// dateMetricIDs holds the date and corresponding metricIDs together and is used
// for implementing hot entry fast path in byDateMetricIDMap.
type dateMetricIDs struct {

View File

@@ -6,12 +6,12 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func TestDateMetricIDCacheSerial(t *testing.T) {
c := newDateMetricIDCache()
defer c.MustStop()
if err := testDateMetricIDCache(c, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -19,6 +19,7 @@ func TestDateMetricIDCacheSerial(t *testing.T) {
func TestDateMetricIDCacheConcurrent(t *testing.T) {
c := newDateMetricIDCache()
defer c.MustStop()
ch := make(chan error, 5)
for i := 0; i < 5; i++ {
go func() {
@@ -63,9 +64,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
c.mu.Unlock()
}
if i%34323 == 0 {
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
// Two rotations are needed to clear the cache.
c.rotate()
c.rotate()
m = make(map[dmk]bool)
}
}
@@ -87,13 +88,15 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
}
}
// Verify c.Reset
// Verify that cache becomes empty after two rotations.
if n := c.Stats().Size; !concurrent && n < 123 {
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
}
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
c.rotate()
if n := c.Stats().Size; !concurrent && n < 123 {
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
}
c.rotate()
if n := c.Stats().Size; !concurrent && n > 0 {
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
}
@@ -108,6 +111,7 @@ func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
numMetrics = 100000
)
dmc := newDateMetricIDCache()
defer dmc.MustStop()
var wg sync.WaitGroup
for i := range concurrency {
wg.Add(1)
@@ -124,39 +128,9 @@ func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
wg.Wait()
}
func TestDateMetricIDCache_SizeMaxBytes(t *testing.T) {
defer SetDateMetricIDCacheSize(0)
assertSizeMaxBytes := func(dmc *dateMetricIDCache, want uint64) {
t.Helper()
if got := dmc.Stats().SizeMaxBytes; got != want {
t.Fatalf("unexpected sizeMaxBytes: got %d, want %d", got, want)
}
}
defaultSizeMaxBytes := uint64(float64(memory.Allowed()) / 256)
var dmc *dateMetricIDCache
// Default.
dmc = newDateMetricIDCache()
assertSizeMaxBytes(dmc, defaultSizeMaxBytes)
// Overriden.
SetDateMetricIDCacheSize(1024)
dmc = newDateMetricIDCache()
assertSizeMaxBytes(dmc, 1024)
// Overriden at runtime.
SetDateMetricIDCacheSize(2048)
assertSizeMaxBytes(dmc, 2048)
// Reset to default at runtime.
SetDateMetricIDCacheSize(0)
assertSizeMaxBytes(dmc, defaultSizeMaxBytes)
}
func TestDateMetricIDCache_Size(t *testing.T) {
dmc := newDateMetricIDCache()
defer dmc.MustStop()
for i := range 100_000 {
date := 12345 + uint64(i%30)
metricID := uint64(i)
@@ -182,6 +156,7 @@ func TestDateMetricIDCache_Size(t *testing.T) {
func TestDateMetricIDCache_SizeBytes(t *testing.T) {
dmc := newDateMetricIDCache()
defer dmc.MustStop()
metricIDs := &uint64set.Set{}
for i := range 100_000 {
date := uint64(123)

View File

@@ -221,11 +221,10 @@ type IndexDBMetrics struct {
MetricIDCacheSyncsCount uint64
MetricIDCacheRotationsCount uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
DateMetricIDCacheSizeMaxBytes uint64
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheRotationsCount uint64
IndexDBRefCount uint64
@@ -290,9 +289,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
if dmcs.SizeBytes > m.DateMetricIDCacheSizeBytes {
m.DateMetricIDCacheSize = dmcs.Size
m.DateMetricIDCacheSizeBytes = dmcs.SizeBytes
m.DateMetricIDCacheSizeMaxBytes = dmcs.SizeMaxBytes
m.DateMetricIDCacheSyncsCount = dmcs.SyncsCount
m.DateMetricIDCacheResetsCount = dmcs.ResetsCount
m.DateMetricIDCacheRotationsCount = dmcs.RotationsCount
}
m.IndexDBRefCount += uint64(db.refCount.Load())
@@ -334,10 +332,12 @@ func (db *indexDB) decRef() {
db.tagFiltersToMetricIDsCache.MustStop()
db.loopsPerDateTagFilterCache.MustStop()
db.metricIDCache.MustStop()
db.dateMetricIDCache.MustStop()
db.tagFiltersToMetricIDsCache = nil
db.loopsPerDateTagFilterCache = nil
db.metricIDCache = nil
db.dateMetricIDCache = nil
if !db.mustDrop.Load() {
return