lib/storage: optimize metricIDCache sharding (#10468)

Exploit uint64set data structure peculiarities (adjacent elements are
stored in
64KiB buckets) to optimize metricIDCache memory footprint.

As the result the cache utilizes 87% less memory and is up to 90%
faster. See
[benchstat.txt](https://github.com/user-attachments/files/25294076/benchstat.txt).

Follow-up for #10388 and #10346.

Thanks to @valyala for the optimization idea.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev
2026-02-13 17:29:48 +01:00
committed by GitHub
parent 14bc51554b
commit e5c8581bad
3 changed files with 35 additions and 52 deletions

View File

@@ -7,11 +7,12 @@ import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
const metricIDCacheShardCount = 16
// metricIDCache stores metricIDs that have been added to the index. It is used
// during data ingestion to decide whether a new entry needs to be added to the
// global index.
@@ -19,39 +20,23 @@ import (
// The cache consists of multiple shards and avoids synchronization on the read
// path if possible to reduce contention.
type metricIDCache struct {
shards []metricIDCacheShard
shards [metricIDCacheShardCount]metricIDCacheShard
// The shards are rotated in groups, one group at a time.
// rotationGroupSize tells the number of shards in one group,
// rotationGroupCount tells how many groups to rotate, and
// rotationGroupPeriod tells how often a group is rotated.
rotationGroupSize int
rotationGroupCount int
rotationGroupPeriod time.Duration
// The shards are rotated, one shard at a time. rotationPeriod defines the
// time interval between two successive rotations.
rotationPeriod time.Duration
stopCh chan struct{}
rotationStoppedCh chan struct{}
}
func newMetricIDCache() *metricIDCache {
// Shards based on the number of CPUs are taken from
// lib/blockcache/blockcache.go.
rotationGroupSize := 1
rotationGroupCount := cgroup.AvailableCPUs()
if rotationGroupCount > 16 {
rotationGroupCount = 16
}
numShards := rotationGroupSize * rotationGroupCount
c := metricIDCache{
shards: make([]metricIDCacheShard, numShards),
rotationGroupSize: rotationGroupSize,
rotationGroupCount: rotationGroupCount,
rotationGroupPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
stopCh: make(chan struct{}),
rotationStoppedCh: make(chan struct{}),
rotationPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
stopCh: make(chan struct{}),
rotationStoppedCh: make(chan struct{}),
}
for i := range numShards {
for i := range metricIDCacheShardCount {
c.shards[i].prev = &uint64set.Set{}
c.shards[i].next = &uint64set.Set{}
c.shards[i].curr.Store(&uint64set.Set{})
@@ -67,7 +52,7 @@ func (c *metricIDCache) MustStop() {
func (c *metricIDCache) Stats() metricIDCacheStats {
var stats metricIDCacheStats
for i := range len(c.shards) {
for i := range metricIDCacheShardCount {
s := c.shards[i].Stats()
stats.Size += s.Size
stats.SizeBytes += s.SizeBytes
@@ -78,37 +63,30 @@ func (c *metricIDCache) Stats() metricIDCacheStats {
}
func (c *metricIDCache) Has(metricID uint64) bool {
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
shardIdx := (metricID / 65536) % metricIDCacheShardCount
return c.shards[shardIdx].Has(metricID)
}
func (c *metricIDCache) Set(metricID uint64) {
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
shardIdx := (metricID / 65536) % metricIDCacheShardCount
c.shards[shardIdx].Set(metricID)
}
func (c *metricIDCache) rotate(rotationGroup int) {
for i := range len(c.shards) {
if i/c.rotationGroupSize == rotationGroup {
c.shards[i].rotate()
}
}
}
func (c *metricIDCache) startRotation() {
ticker := time.NewTicker(c.rotationGroupPeriod)
ticker := time.NewTicker(c.rotationPeriod)
defer ticker.Stop()
rotationGroup := 0
var shardIdx int
for {
select {
case <-c.stopCh:
close(c.rotationStoppedCh)
return
case <-ticker.C:
// each tick rotate only subset of size metricIDCacheRotationGroupSize
// to avoid slow access for all shards at once
rotationGroup = (rotationGroup + 1) % c.rotationGroupCount
c.rotate(rotationGroup)
// Each tick rotate only one shard at a time to avoid slow access
// for all shards at once.
shardIdx %= metricIDCacheShardCount
c.shards[shardIdx].rotate()
shardIdx++
}
}
}

View File

@@ -16,7 +16,7 @@ func (c *metricIDCache) numShards() uint64 {
}
func (c *metricIDCache) fullRotationPeriod() time.Duration {
return time.Duration(c.rotationGroupCount) * c.rotationGroupPeriod
return metricIDCacheShardCount * c.rotationPeriod
}
func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
@@ -26,6 +26,9 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
c := newMetricIDCache()
defer c.MustStop()
c.Set(123)
// It takes 3 full rotation cycles for an entry to be evicted from the
// cache:
// [in next] -rotation1-> [in curr] -rotation2-> [in prev] -rotation3-> evicted.
time.Sleep(3 * c.fullRotationPeriod())
if c.Has(123) {
t.Fatalf("entry is still in cache")
@@ -38,11 +41,11 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
c := newMetricIDCache()
defer c.MustStop()
c.Set(123)
time.Sleep(c.rotationGroupPeriod - time.Second)
time.Sleep(c.fullRotationPeriod())
if !c.Has(123) {
t.Fatalf("entry not in cache")
}
time.Sleep(2 * c.fullRotationPeriod())
time.Sleep(3 * c.fullRotationPeriod())
if c.Has(123) {
t.Fatalf("entry is still in cache")
}
@@ -55,7 +58,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
defer c.MustStop()
c.Set(123)
for range 10_000 {
time.Sleep(c.rotationGroupPeriod - time.Second)
time.Sleep(c.fullRotationPeriod())
if !c.Has(123) {
t.Fatalf("entry not in cache")
}
@@ -71,6 +74,8 @@ func TestMetricIDCache_Stats(t *testing.T) {
}
}
const numMetricIDs = 16 * 65536
synctest.Test(t, func(t *testing.T) {
c := newMetricIDCache()
defer c.MustStop()
@@ -80,22 +85,22 @@ func TestMetricIDCache_Stats(t *testing.T) {
// Add metricIDs and check stats.
// At this point, all metricIDs are in next.
for metricID := range uint64(100_000) {
for metricID := range uint64(numMetricIDs) {
c.Set(metricID)
}
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
Size: numMetricIDs,
})
// Get all metricIDs and check stats.
// All metricIDs will be sync'ed from next to curr.
for metricID := range uint64(100_000) {
for metricID := range uint64(numMetricIDs) {
if !c.Has(metricID) {
t.Fatalf("metricID not in cache: %d", metricID)
}
}
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
Size: numMetricIDs,
SyncsCount: c.numShards(),
})
@@ -103,7 +108,7 @@ func TestMetricIDCache_Stats(t *testing.T) {
// curr metricIDs will be moved to prev.
time.Sleep(c.fullRotationPeriod() + time.Second)
assertStats(t, c, metricIDCacheStats{
Size: 100_000,
Size: numMetricIDs,
SyncsCount: c.numShards(),
RotationsCount: c.numShards(),
})

View File

@@ -39,7 +39,7 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
}
}
if rotate {
c.rotate(rand.Intn(c.rotationGroupCount))
c.shards[rand.Intn(metricIDCacheShardCount)].rotate()
}
b.ResetTimer()