lib/storage: shard metricIdCache

The current implementation has a bottleneck – a single mutex to access
`prev`/`next` metric sets. Each rotation results in storage utilization
spikes since lock-free `curr` is almost empty, and cache needs to
promote metrics from `prev` to `next`.

This is an attempt to reduce contention by spliting cache into separate
shards.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367
This commit is contained in:
Andrei Baidarov
2026-01-30 11:19:40 +01:00
committed by GitHub
parent 9b40fd00e0
commit 6bc809813b
4 changed files with 179 additions and 84 deletions

View File

@@ -47,6 +47,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix [alert restore](https://docs.victoriametrics.com/victoriametrics/vmalert/#alerts-state-on-restarts) when a group contains many rules and is slow to complete evaluation. Previously, the restore process might not retrieve the correct previous alert state. See [#10335](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10335).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): do not skip sending alert notifications to `-notifier.url` if remote write requests to `-remoteWrite.url` fail. See [#10376](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10376).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): fix `changes()` function when gaps between samples exceed the lookbehind window. Previously, it could yield a non-zero value even when the sample value remained unchanged. See [#10280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10280).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): previously ingestion could hit lock contention that triggered frequent context switches and storage connection saturation spikes; now the contention is removed to keep ingestion steady. See [#10367](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367).
## [v1.134.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.134.0)
@@ -79,6 +80,10 @@ Released at 2026-01-16
Released at 2026-01-02
**Update Note 1:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Upgrading to per-partition index requires registering all active time series. Expect slow down of data ingestion and queries during upgrade roll-out. This is a one-time operation. Additionally, for users with retention periods shorter than 1 month the disk usage may increase.
**Update Note 2:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Running this version in deployments with big datasets may cause high CPU utilization. See [10297](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10297).
**Update Note 3:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): lock contention in the ingestion path may cause frequent context switches and storage connection saturation spikes. See [10367](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367). Addressed in `v1.135.0`.
**Update Note 4:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): TSDB status may be empty if the partition index does not have records for the requested date. See [10315](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10315). Addressed in `v1.135.0`.
**Update Note 5:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): `indexdb/tagFiltersToMetricIDs`, `indexdb/metricID` and `indexdb/date_metricID` cache metrics are not reported properly. See [10275](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10275). Addressed in `v1.135.0`.
* SECURITY: upgrade base docker image (Alpine) from 3.22.2 to 3.23.2. See [Alpine 3.23.2 release notes](https://www.alpinelinux.org/posts/Alpine-3.23.2-released.html).

View File

@@ -4,7 +4,10 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
@@ -13,9 +16,119 @@ import (
// during data ingestion to decide whether a new entry needs to be added to the
// global index.
//
// The cache avoids synchronization on the read path if possible to reduce
// contention. Based on dateMetricIDCache ideas.
// The cache consists of multiple shards and avoids synchronization on the read
// path if possible to reduce contention.
type metricIDCache struct {
shards []metricIDCacheShard
// The shards are rotated in groups, one group at a time.
// rotationGroupSize tells the number of shards in one group,
// rotationGroupCount tells how many groups to rotate, and
// rotationGroupPeriod tells how often a group is rotated.
rotationGroupSize int
rotationGroupCount int
rotationGroupPeriod time.Duration
stopCh chan struct{}
rotationStoppedCh chan struct{}
}
func newMetricIDCache() *metricIDCache {
// Shards based on the number of CPUs are taken from
// lib/blockcache/blockcache.go.
rotationGroupSize := cgroup.AvailableCPUs()
rotationGroupCount := cgroup.AvailableCPUs()
if rotationGroupCount > 16 {
rotationGroupCount = 16
}
numShards := rotationGroupSize * rotationGroupCount
c := metricIDCache{
shards: make([]metricIDCacheShard, numShards),
rotationGroupSize: rotationGroupSize,
rotationGroupCount: rotationGroupCount,
rotationGroupPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
stopCh: make(chan struct{}),
rotationStoppedCh: make(chan struct{}),
}
for i := range numShards {
c.shards[i].prev = &uint64set.Set{}
c.shards[i].next = &uint64set.Set{}
c.shards[i].curr.Store(&uint64set.Set{})
}
go c.startRotation()
return &c
}
func (c *metricIDCache) MustStop() {
close(c.stopCh)
<-c.rotationStoppedCh
}
func (c *metricIDCache) numShards() uint64 {
return uint64(len(c.shards))
}
func (c *metricIDCache) fullRotationPeriod() time.Duration {
return time.Duration(c.rotationGroupCount) * c.rotationGroupPeriod
}
func (c *metricIDCache) Stats() metricIDCacheStats {
var stats metricIDCacheStats
for i := range len(c.shards) {
s := c.shards[i].Stats()
stats.Size += s.Size
stats.SizeBytes += s.SizeBytes
stats.SyncsCount += s.SyncsCount
stats.RotationsCount += s.RotationsCount
}
return stats
}
func (c *metricIDCache) Has(metricID uint64) bool {
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
return c.shards[shardIdx].Has(metricID)
}
func (c *metricIDCache) Set(metricID uint64) {
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
c.shards[shardIdx].Set(metricID)
}
func (c *metricIDCache) rotate(rotationGroup int) {
for i := range len(c.shards) {
if i/c.rotationGroupSize == rotationGroup {
c.shards[i].rotate()
}
}
}
func (c *metricIDCache) startRotation() {
ticker := time.NewTicker(c.rotationGroupPeriod)
defer ticker.Stop()
rotationGroup := 0
for {
select {
case <-c.stopCh:
close(c.rotationStoppedCh)
return
case <-ticker.C:
// each tick rotate only subset of size metricIDCacheRotationGroupSize
// to avoid slow access for all shards at once
rotationGroup = (rotationGroup + 1) % c.rotationGroupCount
c.rotate(rotationGroup)
}
}
}
type metricIDCacheStats struct {
Size uint64
SizeBytes uint64
SyncsCount uint64
RotationsCount uint64
}
type metricIDCacheShardNopad struct {
// Contains immutable set of metricIDs.
curr atomic.Pointer[uint64set.Set]
@@ -39,36 +152,16 @@ type metricIDCache struct {
rotationsCount uint64
mu sync.Mutex
stopCh chan struct{}
rotationStoppedCh chan struct{}
}
func newMetricIDCache() *metricIDCache {
c := metricIDCache{
prev: &uint64set.Set{},
next: &uint64set.Set{},
stopCh: make(chan struct{}),
rotationStoppedCh: make(chan struct{}),
}
c.curr.Store(&uint64set.Set{})
go c.startRotation()
return &c
type metricIDCacheShard struct {
metricIDCacheShardNopad
// The padding prevents false sharing
_ [atomicutil.CacheLineSize - unsafe.Sizeof(metricIDCacheShardNopad{})%atomicutil.CacheLineSize]byte
}
func (c *metricIDCache) MustStop() {
close(c.stopCh)
<-c.rotationStoppedCh
}
type metricIDCacheStats struct {
Size uint64
SizeBytes uint64
SyncsCount uint64
RotationsCount uint64
}
func (c *metricIDCache) Stats() metricIDCacheStats {
func (c *metricIDCacheShard) Stats() metricIDCacheStats {
c.mu.Lock()
defer c.mu.Unlock()
@@ -91,7 +184,7 @@ func (c *metricIDCache) Stats() metricIDCacheStats {
return s
}
func (c *metricIDCache) Has(metricID uint64) bool {
func (c *metricIDCacheShard) Has(metricID uint64) bool {
if c.curr.Load().Has(metricID) {
// Fast path. The majority of calls must go here.
return true
@@ -101,7 +194,7 @@ func (c *metricIDCache) Has(metricID uint64) bool {
return c.hasSlow(metricID)
}
func (c *metricIDCache) hasSlow(metricID uint64) bool {
func (c *metricIDCacheShard) hasSlow(metricID uint64) bool {
c.mu.Lock()
defer c.mu.Unlock()
@@ -132,7 +225,7 @@ func (c *metricIDCache) hasSlow(metricID uint64) bool {
return ok
}
func (c *metricIDCache) Set(metricID uint64) {
func (c *metricIDCacheShard) Set(metricID uint64) {
c.mu.Lock()
c.next.Add(metricID)
c.mu.Unlock()
@@ -140,7 +233,7 @@ func (c *metricIDCache) Set(metricID uint64) {
// syncLocked merges data from curr into next and atomically replaces curr with
// next.
func (c *metricIDCache) syncLocked() {
func (c *metricIDCacheShard) syncLocked() {
curr := c.curr.Load()
c.next.Union(curr)
c.curr.Store(c.next)
@@ -148,23 +241,8 @@ func (c *metricIDCache) syncLocked() {
c.syncsCount++
}
func (c *metricIDCache) startRotation() {
d := timeutil.AddJitterToDuration(10 * time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
close(c.rotationStoppedCh)
return
case <-ticker.C:
c.rotate()
}
}
}
// rotate atomically rotates next, curr, and prev cache parts.
func (c *metricIDCache) rotate() {
func (c *metricIDCacheShard) rotate() {
c.mu.Lock()
defer c.mu.Unlock()
curr := c.curr.Load()

View File

@@ -5,8 +5,8 @@ import (
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
@@ -16,9 +16,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
c := newMetricIDCache()
defer c.MustStop()
c.Set(123)
time.Sleep(15 * time.Minute)
time.Sleep(15 * time.Minute)
time.Sleep(15 * time.Minute)
time.Sleep(3 * c.fullRotationPeriod())
if c.Has(123) {
t.Fatalf("entry is still in cache")
}
@@ -30,12 +28,11 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
c := newMetricIDCache()
defer c.MustStop()
c.Set(123)
time.Sleep(5 * time.Minute)
time.Sleep(c.rotationGroupPeriod - time.Second)
if !c.Has(123) {
t.Fatalf("entry not in cache")
}
time.Sleep(15 * time.Minute)
time.Sleep(15 * time.Minute)
time.Sleep(2 * c.fullRotationPeriod())
if c.Has(123) {
t.Fatalf("entry is still in cache")
}
@@ -48,7 +45,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
defer c.MustStop()
c.Set(123)
for range 10_000 {
time.Sleep(5 * time.Minute)
time.Sleep(c.rotationGroupPeriod - time.Second)
if !c.Has(123) {
t.Fatalf("entry not in cache")
}
@@ -58,7 +55,8 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
func TestMetricIDCache_Stats(t *testing.T) {
assertStats := func(t *testing.T, c *metricIDCache, want metricIDCacheStats) {
if diff := cmp.Diff(want, c.Stats()); diff != "" {
t.Helper()
if diff := cmp.Diff(want, c.Stats(), cmpopts.IgnoreFields(metricIDCacheStats{}, "SizeBytes")); diff != "" {
t.Fatalf("unexpected stats (-want, +got):\n%s", diff)
}
}
@@ -72,14 +70,11 @@ func TestMetricIDCache_Stats(t *testing.T) {
// Add metricIDs and check stats.
// At this point, all metricIDs are in next.
metricIDs := uint64set.Set{}
for metricID := range uint64(100_000) {
c.Set(metricID)
metricIDs.Add(metricID)
}
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
SizeBytes: metricIDs.SizeBytes(),
Size: 100_000,
})
// Get all metricIDs and check stats.
@@ -91,26 +86,24 @@ func TestMetricIDCache_Stats(t *testing.T) {
}
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
SizeBytes: metricIDs.SizeBytes(),
SyncsCount: 1,
SyncsCount: c.numShards(),
})
// Wait until next rotation.
// Wait until all groups are rotated.
// curr metricIDs will be moved to prev.
time.Sleep(15 * time.Minute)
time.Sleep(c.fullRotationPeriod() + time.Second)
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
SizeBytes: metricIDs.SizeBytes(),
SyncsCount: 1,
RotationsCount: 1,
SyncsCount: c.numShards(),
RotationsCount: c.numShards(),
})
// Wait until another rotation.
// Wait until all groups are rotated.
// The cache now should be empty.
time.Sleep(15 * time.Minute)
time.Sleep(c.fullRotationPeriod())
assertStats(t, c, metricIDCacheStats{
SyncsCount: 1,
RotationsCount: 2,
SyncsCount: c.numShards(),
RotationsCount: 2 * c.numShards(),
})
})
}

View File

@@ -7,11 +7,29 @@ import (
"time"
)
type benchCacheState int
const (
benchCacheStateCold benchCacheState = iota
benchCacheStateWarm
benchCacheStateRotated
)
var benchCacheStates = [...]benchCacheState{benchCacheStateCold, benchCacheStateWarm, benchCacheStateRotated}
func (s benchCacheState) String() string {
return [...]string{" cold", " warm", "rotated"}[s]
}
func BenchmarkMetricIDCache_Has(b *testing.B) {
f := func(b *testing.B, numMetricIDs, distance int64, hitsOnly, warmUp bool) {
f := func(b *testing.B, numMetricIDs, distance int64, hitsOnly bool, state benchCacheState) {
b.Helper()
c := newMetricIDCache()
defer c.MustStop()
warmUp := state == benchCacheStateWarm || state == benchCacheStateRotated
rotate := state == benchCacheStateRotated
metricIDMin := time.Now().UnixNano()
metricIDMax := metricIDMin + numMetricIDs*distance
for metricID := metricIDMin; metricID <= metricIDMax; metricID += distance {
@@ -20,7 +38,11 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
b.Fatalf("metricID not in cache: %d", metricID)
}
}
if rotate {
c.rotate(rand.Intn(c.rotationGroupCount))
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
if hitsOnly {
metricID := metricIDMin + rand.Int63n(numMetricIDs)*distance
@@ -45,27 +67,24 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
}
})
b.ReportAllocs()
b.ReportMetric(float64(c.Stats().SizeBytes), "sizeBytes")
}
subB := func(numMetricIDs, distance int64, hitsOnly, warmUp bool) {
hitsOrMisses := "hitsss"
subB := func(numMetricIDs, distance int64, hitsOnly bool, state benchCacheState) {
hitsOrMisses := " hits-only"
if !hitsOnly {
hitsOrMisses = "misses"
hitsOrMisses = "misses-only"
}
coldOrWarm := "cold"
if warmUp {
coldOrWarm = "warm"
}
name := fmt.Sprintf("%s/%s/n%d/d%d", hitsOrMisses, coldOrWarm, numMetricIDs, distance)
name := fmt.Sprintf("%s/%s/n%d/d%d", hitsOrMisses, state, numMetricIDs, distance)
b.Run(name, func(b *testing.B) {
f(b, numMetricIDs, distance, hitsOnly, warmUp)
f(b, numMetricIDs, distance, hitsOnly, state)
})
}
for _, hitsOnly := range []bool{true, false} {
for _, warmUp := range []bool{false, true} {
for _, state := range benchCacheStates {
for _, numMetricIDs := range []int64{100_000, 1_000_000, 10_000_000} {
for _, distance := range []int64{1, 10, 100} {
subB(numMetricIDs, distance, hitsOnly, warmUp)
subB(numMetricIDs, distance, hitsOnly, state)
}
}
}