mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-25 20:47:45 +03:00
lib/storage: shard metricIdCache
The current implementation has a bottleneck – a single mutex to access `prev`/`next` metric sets. Each rotation results in storage utilization spikes since lock-free `curr` is almost empty, and cache needs to promote metrics from `prev` to `next`. This is an attempt to reduce contention by spliting cache into separate shards. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367
This commit is contained in:
@@ -47,6 +47,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix [alert restore](https://docs.victoriametrics.com/victoriametrics/vmalert/#alerts-state-on-restarts) when a group contains many rules and is slow to complete evaluation. Previously, the restore process might not retrieve the correct previous alert state. See [#10335](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10335).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): do not skip sending alert notifications to `-notifier.url` if remote write requests to `-remoteWrite.url` fail. See [#10376](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10376).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): fix `changes()` function when gaps between samples exceed the lookbehind window. Previously, it could yield a non-zero value even when the sample value remained unchanged. See [#10280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10280).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): previously ingestion could hit lock contention that triggered frequent context switches and storage connection saturation spikes; now the contention is removed to keep ingestion steady. See [#10367](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367).
|
||||
|
||||
## [v1.134.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.134.0)
|
||||
|
||||
@@ -79,6 +80,10 @@ Released at 2026-01-16
|
||||
Released at 2026-01-02
|
||||
|
||||
**Update Note 1:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Upgrading to per-partition index requires registering all active time series. Expect slow down of data ingestion and queries during upgrade roll-out. This is a one-time operation. Additionally, for users with retention periods shorter than 1 month the disk usage may increase.
|
||||
**Update Note 2:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Running this version in deployments with big datasets may cause high CPU utilization. See [10297](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10297).
|
||||
**Update Note 3:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): lock contention in the ingestion path may cause frequent context switches and storage connection saturation spikes. See [10367](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10367). Addressed in `v1.135.0`.
|
||||
**Update Note 4:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): TSDB status may be empty if the partition index does not have records for the requested date. See [10315](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10315). Addressed in `v1.135.0`.
|
||||
**Update Note 5:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): `indexdb/tagFiltersToMetricIDs`, `indexdb/metricID` and `indexdb/date_metricID` cache metrics are not reported properly. See [10275](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10275). Addressed in `v1.135.0`.
|
||||
|
||||
* SECURITY: upgrade base docker image (Alpine) from 3.22.2 to 3.23.2. See [Alpine 3.23.2 release notes](https://www.alpinelinux.org/posts/Alpine-3.23.2-released.html).
|
||||
|
||||
|
||||
@@ -4,7 +4,10 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
@@ -13,9 +16,119 @@ import (
|
||||
// during data ingestion to decide whether a new entry needs to be added to the
|
||||
// global index.
|
||||
//
|
||||
// The cache avoids synchronization on the read path if possible to reduce
|
||||
// contention. Based on dateMetricIDCache ideas.
|
||||
// The cache consists of multiple shards and avoids synchronization on the read
|
||||
// path if possible to reduce contention.
|
||||
type metricIDCache struct {
|
||||
shards []metricIDCacheShard
|
||||
|
||||
// The shards are rotated in groups, one group at a time.
|
||||
// rotationGroupSize tells the number of shards in one group,
|
||||
// rotationGroupCount tells how many groups to rotate, and
|
||||
// rotationGroupPeriod tells how often a group is rotated.
|
||||
rotationGroupSize int
|
||||
rotationGroupCount int
|
||||
rotationGroupPeriod time.Duration
|
||||
|
||||
stopCh chan struct{}
|
||||
rotationStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newMetricIDCache() *metricIDCache {
|
||||
// Shards based on the number of CPUs are taken from
|
||||
// lib/blockcache/blockcache.go.
|
||||
rotationGroupSize := cgroup.AvailableCPUs()
|
||||
rotationGroupCount := cgroup.AvailableCPUs()
|
||||
if rotationGroupCount > 16 {
|
||||
rotationGroupCount = 16
|
||||
}
|
||||
numShards := rotationGroupSize * rotationGroupCount
|
||||
|
||||
c := metricIDCache{
|
||||
shards: make([]metricIDCacheShard, numShards),
|
||||
rotationGroupSize: rotationGroupSize,
|
||||
rotationGroupCount: rotationGroupCount,
|
||||
rotationGroupPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
}
|
||||
for i := range numShards {
|
||||
c.shards[i].prev = &uint64set.Set{}
|
||||
c.shards[i].next = &uint64set.Set{}
|
||||
c.shards[i].curr.Store(&uint64set.Set{})
|
||||
}
|
||||
go c.startRotation()
|
||||
return &c
|
||||
}
|
||||
|
||||
func (c *metricIDCache) MustStop() {
|
||||
close(c.stopCh)
|
||||
<-c.rotationStoppedCh
|
||||
}
|
||||
|
||||
func (c *metricIDCache) numShards() uint64 {
|
||||
return uint64(len(c.shards))
|
||||
}
|
||||
|
||||
func (c *metricIDCache) fullRotationPeriod() time.Duration {
|
||||
return time.Duration(c.rotationGroupCount) * c.rotationGroupPeriod
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Stats() metricIDCacheStats {
|
||||
var stats metricIDCacheStats
|
||||
for i := range len(c.shards) {
|
||||
s := c.shards[i].Stats()
|
||||
stats.Size += s.Size
|
||||
stats.SizeBytes += s.SizeBytes
|
||||
stats.SyncsCount += s.SyncsCount
|
||||
stats.RotationsCount += s.RotationsCount
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Has(metricID uint64) bool {
|
||||
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
|
||||
return c.shards[shardIdx].Has(metricID)
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Set(metricID uint64) {
|
||||
shardIdx := fastHashUint64(metricID) % uint64(len(c.shards))
|
||||
c.shards[shardIdx].Set(metricID)
|
||||
}
|
||||
|
||||
func (c *metricIDCache) rotate(rotationGroup int) {
|
||||
for i := range len(c.shards) {
|
||||
if i/c.rotationGroupSize == rotationGroup {
|
||||
c.shards[i].rotate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metricIDCache) startRotation() {
|
||||
ticker := time.NewTicker(c.rotationGroupPeriod)
|
||||
defer ticker.Stop()
|
||||
rotationGroup := 0
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
close(c.rotationStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
// each tick rotate only subset of size metricIDCacheRotationGroupSize
|
||||
// to avoid slow access for all shards at once
|
||||
rotationGroup = (rotationGroup + 1) % c.rotationGroupCount
|
||||
c.rotate(rotationGroup)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type metricIDCacheStats struct {
|
||||
Size uint64
|
||||
SizeBytes uint64
|
||||
SyncsCount uint64
|
||||
RotationsCount uint64
|
||||
}
|
||||
|
||||
type metricIDCacheShardNopad struct {
|
||||
// Contains immutable set of metricIDs.
|
||||
curr atomic.Pointer[uint64set.Set]
|
||||
|
||||
@@ -39,36 +152,16 @@ type metricIDCache struct {
|
||||
rotationsCount uint64
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
stopCh chan struct{}
|
||||
rotationStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newMetricIDCache() *metricIDCache {
|
||||
c := metricIDCache{
|
||||
prev: &uint64set.Set{},
|
||||
next: &uint64set.Set{},
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
}
|
||||
c.curr.Store(&uint64set.Set{})
|
||||
go c.startRotation()
|
||||
return &c
|
||||
type metricIDCacheShard struct {
|
||||
metricIDCacheShardNopad
|
||||
|
||||
// The padding prevents false sharing
|
||||
_ [atomicutil.CacheLineSize - unsafe.Sizeof(metricIDCacheShardNopad{})%atomicutil.CacheLineSize]byte
|
||||
}
|
||||
|
||||
func (c *metricIDCache) MustStop() {
|
||||
close(c.stopCh)
|
||||
<-c.rotationStoppedCh
|
||||
}
|
||||
|
||||
type metricIDCacheStats struct {
|
||||
Size uint64
|
||||
SizeBytes uint64
|
||||
SyncsCount uint64
|
||||
RotationsCount uint64
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Stats() metricIDCacheStats {
|
||||
func (c *metricIDCacheShard) Stats() metricIDCacheStats {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
@@ -91,7 +184,7 @@ func (c *metricIDCache) Stats() metricIDCacheStats {
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Has(metricID uint64) bool {
|
||||
func (c *metricIDCacheShard) Has(metricID uint64) bool {
|
||||
if c.curr.Load().Has(metricID) {
|
||||
// Fast path. The majority of calls must go here.
|
||||
return true
|
||||
@@ -101,7 +194,7 @@ func (c *metricIDCache) Has(metricID uint64) bool {
|
||||
return c.hasSlow(metricID)
|
||||
}
|
||||
|
||||
func (c *metricIDCache) hasSlow(metricID uint64) bool {
|
||||
func (c *metricIDCacheShard) hasSlow(metricID uint64) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
@@ -132,7 +225,7 @@ func (c *metricIDCache) hasSlow(metricID uint64) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *metricIDCache) Set(metricID uint64) {
|
||||
func (c *metricIDCacheShard) Set(metricID uint64) {
|
||||
c.mu.Lock()
|
||||
c.next.Add(metricID)
|
||||
c.mu.Unlock()
|
||||
@@ -140,7 +233,7 @@ func (c *metricIDCache) Set(metricID uint64) {
|
||||
|
||||
// syncLocked merges data from curr into next and atomically replaces curr with
|
||||
// next.
|
||||
func (c *metricIDCache) syncLocked() {
|
||||
func (c *metricIDCacheShard) syncLocked() {
|
||||
curr := c.curr.Load()
|
||||
c.next.Union(curr)
|
||||
c.curr.Store(c.next)
|
||||
@@ -148,23 +241,8 @@ func (c *metricIDCache) syncLocked() {
|
||||
c.syncsCount++
|
||||
}
|
||||
|
||||
func (c *metricIDCache) startRotation() {
|
||||
d := timeutil.AddJitterToDuration(10 * time.Minute)
|
||||
ticker := time.NewTicker(d)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
close(c.rotationStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.rotate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rotate atomically rotates next, curr, and prev cache parts.
|
||||
func (c *metricIDCache) rotate() {
|
||||
func (c *metricIDCacheShard) rotate() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
curr := c.curr.Load()
|
||||
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
)
|
||||
|
||||
func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
@@ -16,9 +16,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(3 * c.fullRotationPeriod())
|
||||
if c.Has(123) {
|
||||
t.Fatalf("entry is still in cache")
|
||||
}
|
||||
@@ -30,12 +28,11 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
time.Sleep(5 * time.Minute)
|
||||
time.Sleep(c.rotationGroupPeriod - time.Second)
|
||||
if !c.Has(123) {
|
||||
t.Fatalf("entry not in cache")
|
||||
}
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(2 * c.fullRotationPeriod())
|
||||
if c.Has(123) {
|
||||
t.Fatalf("entry is still in cache")
|
||||
}
|
||||
@@ -48,7 +45,7 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
defer c.MustStop()
|
||||
c.Set(123)
|
||||
for range 10_000 {
|
||||
time.Sleep(5 * time.Minute)
|
||||
time.Sleep(c.rotationGroupPeriod - time.Second)
|
||||
if !c.Has(123) {
|
||||
t.Fatalf("entry not in cache")
|
||||
}
|
||||
@@ -58,7 +55,8 @@ func TestMetricIDCache_ClearedWhenUnused(t *testing.T) {
|
||||
|
||||
func TestMetricIDCache_Stats(t *testing.T) {
|
||||
assertStats := func(t *testing.T, c *metricIDCache, want metricIDCacheStats) {
|
||||
if diff := cmp.Diff(want, c.Stats()); diff != "" {
|
||||
t.Helper()
|
||||
if diff := cmp.Diff(want, c.Stats(), cmpopts.IgnoreFields(metricIDCacheStats{}, "SizeBytes")); diff != "" {
|
||||
t.Fatalf("unexpected stats (-want, +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
@@ -72,14 +70,11 @@ func TestMetricIDCache_Stats(t *testing.T) {
|
||||
|
||||
// Add metricIDs and check stats.
|
||||
// At this point, all metricIDs are in next.
|
||||
metricIDs := uint64set.Set{}
|
||||
for metricID := range uint64(100_000) {
|
||||
c.Set(metricID)
|
||||
metricIDs.Add(metricID)
|
||||
}
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
SizeBytes: metricIDs.SizeBytes(),
|
||||
Size: 100_000,
|
||||
})
|
||||
|
||||
// Get all metricIDs and check stats.
|
||||
@@ -91,26 +86,24 @@ func TestMetricIDCache_Stats(t *testing.T) {
|
||||
}
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
SizeBytes: metricIDs.SizeBytes(),
|
||||
SyncsCount: 1,
|
||||
SyncsCount: c.numShards(),
|
||||
})
|
||||
|
||||
// Wait until next rotation.
|
||||
// Wait until all groups are rotated.
|
||||
// curr metricIDs will be moved to prev.
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(c.fullRotationPeriod() + time.Second)
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
Size: 100_000,
|
||||
SizeBytes: metricIDs.SizeBytes(),
|
||||
SyncsCount: 1,
|
||||
RotationsCount: 1,
|
||||
SyncsCount: c.numShards(),
|
||||
RotationsCount: c.numShards(),
|
||||
})
|
||||
|
||||
// Wait until another rotation.
|
||||
// Wait until all groups are rotated.
|
||||
// The cache now should be empty.
|
||||
time.Sleep(15 * time.Minute)
|
||||
time.Sleep(c.fullRotationPeriod())
|
||||
assertStats(t, c, metricIDCacheStats{
|
||||
SyncsCount: 1,
|
||||
RotationsCount: 2,
|
||||
SyncsCount: c.numShards(),
|
||||
RotationsCount: 2 * c.numShards(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -7,11 +7,29 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type benchCacheState int
|
||||
|
||||
const (
|
||||
benchCacheStateCold benchCacheState = iota
|
||||
benchCacheStateWarm
|
||||
benchCacheStateRotated
|
||||
)
|
||||
|
||||
var benchCacheStates = [...]benchCacheState{benchCacheStateCold, benchCacheStateWarm, benchCacheStateRotated}
|
||||
|
||||
func (s benchCacheState) String() string {
|
||||
return [...]string{" cold", " warm", "rotated"}[s]
|
||||
}
|
||||
|
||||
func BenchmarkMetricIDCache_Has(b *testing.B) {
|
||||
f := func(b *testing.B, numMetricIDs, distance int64, hitsOnly, warmUp bool) {
|
||||
f := func(b *testing.B, numMetricIDs, distance int64, hitsOnly bool, state benchCacheState) {
|
||||
b.Helper()
|
||||
c := newMetricIDCache()
|
||||
defer c.MustStop()
|
||||
|
||||
warmUp := state == benchCacheStateWarm || state == benchCacheStateRotated
|
||||
rotate := state == benchCacheStateRotated
|
||||
|
||||
metricIDMin := time.Now().UnixNano()
|
||||
metricIDMax := metricIDMin + numMetricIDs*distance
|
||||
for metricID := metricIDMin; metricID <= metricIDMax; metricID += distance {
|
||||
@@ -20,7 +38,11 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
|
||||
b.Fatalf("metricID not in cache: %d", metricID)
|
||||
}
|
||||
}
|
||||
if rotate {
|
||||
c.rotate(rand.Intn(c.rotationGroupCount))
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
if hitsOnly {
|
||||
metricID := metricIDMin + rand.Int63n(numMetricIDs)*distance
|
||||
@@ -45,27 +67,24 @@ func BenchmarkMetricIDCache_Has(b *testing.B) {
|
||||
}
|
||||
})
|
||||
b.ReportAllocs()
|
||||
b.ReportMetric(float64(c.Stats().SizeBytes), "sizeBytes")
|
||||
}
|
||||
|
||||
subB := func(numMetricIDs, distance int64, hitsOnly, warmUp bool) {
|
||||
hitsOrMisses := "hitsss"
|
||||
subB := func(numMetricIDs, distance int64, hitsOnly bool, state benchCacheState) {
|
||||
hitsOrMisses := " hits-only"
|
||||
if !hitsOnly {
|
||||
hitsOrMisses = "misses"
|
||||
hitsOrMisses = "misses-only"
|
||||
}
|
||||
coldOrWarm := "cold"
|
||||
if warmUp {
|
||||
coldOrWarm = "warm"
|
||||
}
|
||||
name := fmt.Sprintf("%s/%s/n%d/d%d", hitsOrMisses, coldOrWarm, numMetricIDs, distance)
|
||||
name := fmt.Sprintf("%s/%s/n%d/d%d", hitsOrMisses, state, numMetricIDs, distance)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
f(b, numMetricIDs, distance, hitsOnly, warmUp)
|
||||
f(b, numMetricIDs, distance, hitsOnly, state)
|
||||
})
|
||||
}
|
||||
for _, hitsOnly := range []bool{true, false} {
|
||||
for _, warmUp := range []bool{false, true} {
|
||||
for _, state := range benchCacheStates {
|
||||
for _, numMetricIDs := range []int64{100_000, 1_000_000, 10_000_000} {
|
||||
for _, distance := range []int64{1, 10, 100} {
|
||||
subB(numMetricIDs, distance, hitsOnly, warmUp)
|
||||
subB(numMetricIDs, distance, hitsOnly, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user