mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/storage: optimize metricIDCache sharding (#10468)
Exploit uint64set data structure peculiarities (adjacent elements are stored in 64KiB buckets) to optimize metricIDCache memory footprint. As the result the cache utilizes 87% less memory and is up to 90% faster. See [benchstat.txt](https://github.com/user-attachments/files/25294076/benchstat.txt). Follow-up for #10388 and #10346. Thanks to @valyala for the optimization idea. --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -7,11 +7,12 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
const metricIDCacheShardCount = 16
|
||||
|
||||
// metricIDCache stores metricIDs that have been added to the index. It is used
|
||||
// during data ingestion to decide whether a new entry needs to be added to the
|
||||
// global index.
|
||||
@@ -19,39 +20,23 @@ import (
|
||||
// The cache consists of multiple shards and avoids synchronization on the read
|
||||
// path if possible to reduce contention.
|
||||
type metricIDCache struct {
|
||||
shards []metricIDCacheShard
|
||||
shards [metricIDCacheShardCount]metricIDCacheShard
|
||||
|
||||
// The shards are rotated in groups, one group at a time.
|
||||
// rotationGroupSize tells the number of shards in one group,
|
||||
// rotationGroupCount tells how many groups to rotate, and
|
||||
// rotationGroupPeriod tells how often a group is rotated.
|
||||
rotationGroupSize int
|
||||
rotationGroupCount int
|
||||
rotationGroupPeriod time.Duration
|
||||
// The shards are rotated, one shard at a time. rotationPeriod defines the
|
||||
// time interval between two successive rotations.
|
||||
rotationPeriod time.Duration
|
||||
|
||||
stopCh chan struct{}
|
||||
rotationStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newMetricIDCache() *metricIDCache {
|
||||
// Shards based on the number of CPUs are taken from
|
||||
// lib/blockcache/blockcache.go.
|
||||
rotationGroupSize := 1
|
||||
rotationGroupCount := cgroup.AvailableCPUs()
|
||||
if rotationGroupCount > 16 {
|
||||
rotationGroupCount = 16
|
||||
}
|
||||
numShards := rotationGroupSize * rotationGroupCount
|
||||
|
||||
c := metricIDCache{
|
||||
shards: make([]metricIDCacheShard, numShards),
|
||||
rotationGroupSize: rotationGroupSize,
|
||||
rotationGroupCount: rotationGroupCount,
|
||||
rotationGroupPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
rotationPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
}
|
||||
for i := range numShards {
|
||||
for i := range metricIDCacheShardCount {
|
||||
c.shards[i].prev = &uint64set.Set{}
|
||||
c.shards[i].next = &uint64set.Set{}
|
||||
c.shards[i].curr.Store(&uint64set.Set{})
|
||||
@@ -67,7 +52,7 @@ func (c *metricIDCache) MustStop() {
|
||||
|
||||
func (c *metricIDCache) Stats() metricIDCacheStats {
|
||||
var stats metricIDCacheStats
|
||||
for i := range len(c.shards) {
|
||||
for i := range metricIDCacheShardCount {
|
||||
s := c.shards[i].Stats()
|
||||
stats.Size += s.Size
|
||||
stats.SizeBytes += s.SizeBytes
|
||||
@@ -78,37 +63,30 @@ func (c *metricIDCache) Stats() metricIDCacheStats {
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Has(metricID uint64) bool {
|
||||
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
|
||||
shardIdx := (metricID / 65536) % metricIDCacheShardCount
|
||||
return c.shards[shardIdx].Has(metricID)
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Set(metricID uint64) {
|
||||
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
|
||||
shardIdx := (metricID / 65536) % metricIDCacheShardCount
|
||||
c.shards[shardIdx].Set(metricID)
|
||||
}
|
||||
|
||||
func (c *metricIDCache) rotate(rotationGroup int) {
|
||||
for i := range len(c.shards) {
|
||||
if i/c.rotationGroupSize == rotationGroup {
|
||||
c.shards[i].rotate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metricIDCache) startRotation() {
|
||||
ticker := time.NewTicker(c.rotationGroupPeriod)
|
||||
ticker := time.NewTicker(c.rotationPeriod)
|
||||
defer ticker.Stop()
|
||||
rotationGroup := 0
|
||||
var shardIdx int
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
close(c.rotationStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
// each tick rotate only subset of size metricIDCacheRotationGroupSize
|
||||
// to avoid slow access for all shards at once
|
||||
rotationGroup = (rotationGroup + 1) % c.rotationGroupCount
|
||||
c.rotate(rotationGroup)
|
||||
// Each tick rotate only one shard at a time to avoid slow access
|
||||
// for all shards at once.
|
||||
shardIdx %= metricIDCacheShardCount
|
||||
c.shards[shardIdx].rotate()
|
||||
shardIdx++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ func (c *metricIDCache) numShards() uint64 {
|
||||
}
|
||||
|
||||
func (c *metricIDCache) fullRotationPeriod() time.Duration {
|
||||
return time.Duration(c.rotationGroupCount) * c.rotationGroupPeriod
|
||||
return metricIDCacheShardCount * c.rotationPeriod
|
||||
}
|
||||
|
||||
func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
@@ -26,6 +26,9 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
// It takes 3 full rotation cycles for an entry to be evicted from the
|
||||
// cache:
|
||||
// [in next] -rotation1-> [in curr] -rotation2-> [in prev] -rotation3-> evicted.
|
||||
time.Sleep(3 * c.fullRotationPeriod())
|
||||
if c.Has(123) {
|
||||
t.Fatalf("entry is still in cache")
|
||||
@@ -38,11 +41,11 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
time.Sleep(c.rotationGroupPeriod - time.Second)
|
||||
time.Sleep(c.fullRotationPeriod())
|
||||
if !c.Has(123) {
|
||||
t.Fatalf("entry not in cache")
|
||||
}
|
||||
time.Sleep(2 * c.fullRotationPeriod())
|
||||
time.Sleep(3 * c.fullRotationPeriod())
|
||||
if c.Has(123) {
|
||||
t.Fatalf("entry is still in cache")
|
||||
}
|
||||
@@ -55,7 +58,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
for range 10_000 {
|
||||
time.Sleep(c.rotationGroupPeriod - time.Second)
|
||||
time.Sleep(c.fullRotationPeriod())
|
||||
if !c.Has(123) {
|
||||
t.Fatalf("entry not in cache")
|
||||
}
|
||||
@@ -71,6 +74,8 @@ func TestMetricIDCache_Stats(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
const numMetricIDs = 16 * 65536
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
@@ -80,22 +85,22 @@ func TestMetricIDCache_Stats(t *testing.T) {
|
||||
|
||||
// Add metricIDs and check stats.
|
||||
// At this point, all metricIDs are in next.
|
||||
for metricID := range uint64(100_000) {
|
||||
for metricID := range uint64(numMetricIDs) {
|
||||
c.Set(metricID)
|
||||
}
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
Size: numMetricIDs,
|
||||
})
|
||||
|
||||
// Get all metricIDs and check stats.
|
||||
// All metricIDs will be sync'ed from next to curr.
|
||||
for metricID := range uint64(100_000) {
|
||||
for metricID := range uint64(numMetricIDs) {
|
||||
if !c.Has(metricID) {
|
||||
t.Fatalf("metricID not in cache: %d", metricID)
|
||||
}
|
||||
}
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
Size: numMetricIDs,
|
||||
SyncsCount: c.numShards(),
|
||||
})
|
||||
|
||||
@@ -103,7 +108,7 @@ func TestMetricIDCache_Stats(t *testing.T) {
|
||||
// curr metricIDs will be moved to prev.
|
||||
time.Sleep(c.fullRotationPeriod() + time.Second)
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
Size: numMetricIDs,
|
||||
SyncsCount: c.numShards(),
|
||||
RotationsCount: c.numShards(),
|
||||
})
|
||||
|
||||
@@ -39,7 +39,7 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
|
||||
}
|
||||
}
|
||||
if rotate {
|
||||
c.rotate(rand.Intn(c.rotationGroupCount))
|
||||
c.shards[rand.Intn(metricIDCacheShardCount)].rotate()
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user