Compare commits

...

2 Commits

Author SHA1 Message Date
f41gh7
ef5887a419 fix heap.Remove race condition
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2025-03-04 00:15:28 +01:00
f41gh7
7e07766f66 lib/blockcache: reduce lock contention for Get requests
BlockCache uses LRU cache algorithm and it obtains mutex lock for each
Get request. It's already uses sharding to reduce possible lock
contention. However, if concurrent requests perform search requests for
the same metricIDs or metricIDs located at the same index blocks.
It's possible the case, when such sharding is inefficient.

 Proposed change replaces mutex with read-write mutex. And at fast path
it obtains only read lock and upgrades it to write lock if needed.

According to the CPU profiles the most expensive operation is c.m[k.Part] map lookups.
And with this change it will be performed under read lock.

goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache
cpu: Apple M1 Pro
BenchmarkCacheGet-10                4306            232297 ns/op          43.05 MB/s      351 B/op           2 allocs/op

goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache
cpu: Apple M1 Pro
BenchmarkCacheGet-10                6922            175681 ns/op          56.92 MB/s         216 B/op          1 allocs/op

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8204
2025-02-28 18:37:35 +01:00
2 changed files with 28 additions and 7 deletions

View File

@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: provide alternative registry for all VictoriaMetrics components at [Quay.io](https://quay.io/organization/victoriametrics).
* FEATURE: [vmalert-tool](https://docs.victoriametrics.com/vmalert-tool/): add command-line flag `-httpListenPort` to specify the port used during testing. If not provided, a random unoccupied port will be assigned. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8393).
* FEATURE: [vmalert-tool](https://docs.victoriametrics.com/vmalert-tool/): make the temporary storage path for unittest unique, allowing user to run multiple vmalert-tool processes on a single host simultaneously. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8393).
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/): improve performance for metric search requests on high load. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8204) for details.
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/): fix the incorrect caching of extMetricsIDs when a query timeout error occurs. This can lead to incorrect query results. Thanks to @changshun-shi for [the bug report issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8345).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): respect time filter when exploring time series for [influxdb mode](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8259) for details.

View File

@@ -182,7 +182,7 @@ type cache struct {
getMaxSizeBytes func() int
// mu protects all the fields below.
mu sync.Mutex
mu sync.RWMutex
// m contains cached blocks keyed by Key.Part and then by Key.Offset
m map[any]map[uint64]*cacheEntry
@@ -280,8 +280,8 @@ func (c *cache) cleanByTimeout() {
func (c *cache) GetBlock(k Key) Block {
c.requests.Add(1)
var e *cacheEntry
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
pes := c.m[k.Part]
if pes != nil {
@@ -289,16 +289,33 @@ func (c *cache) GetBlock(k Key) Block {
if e != nil {
// Fast path - the block already exists in the cache, so return it to the caller.
currentTime := fasttime.UnixTimestamp()
if e.lastAccessTime != currentTime {
e.lastAccessTime = currentTime
heap.Fix(&c.lah, e.heapIdx)
v := e.b
lat := atomic.LoadUint64(&e.lastAccessTime)
c.mu.RUnlock()
// update records once in 5 seconds
// it should spread write locks and reduce lock contention
if lat+5 < currentTime {
c.mu.Lock()
// concurrent thread may already fix heap
lat = atomic.LoadUint64(&e.lastAccessTime)
if lat+5 < currentTime && e.heapIdx > 0 {
atomic.StoreUint64(&e.lastAccessTime, currentTime)
heap.Fix(&c.lah, e.heapIdx)
}
c.mu.Unlock()
}
return e.b
return v
}
}
c.mu.RUnlock()
// Slow path - the entry is missing in the cache.
c.mu.Lock()
c.perKeyMisses[k]++
c.misses.Add(1)
c.mu.Unlock()
return nil
}
@@ -412,5 +429,8 @@ func (lah *lastAccessHeap) Pop() any {
h[len(h)-1] = nil
*lah = h[:len(h)-1]
// mark element as removed from heap
// it allows to properly apply heap.Fix at GetBlock
e.heapIdx = -1
return e
}