mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/mergeset: reduce memory allocations on blockcache misses
This commit adds tmp inmemory and data blocks buffers for index search requests. It allows to reduce memory allocations on block cache misses. Since block cache puts block into cache only on after configured number of cache misses. Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9324
This commit is contained in:
@@ -53,6 +53,7 @@ Released at 2025-07-04
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): remove duplicate kubernetes targets from [service-discovery-debug](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging) page. See [8626](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8626) issue for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add command-line flag `-search.logSlowQueryStatsHeaders` for [query execution stats](https://docs.victoriametrics.com/victoriametrics/query-stats/). The new flag allows specifying the list of headers to log together with slow queries if user's request contains them. This flag is available only in VictoriaMetrics [enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/). See [Query Stats](https://docs.victoriametrics.com/victoriametrics/query-stats/) for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add ability to proxy `/api/v1/notifiers` to vmalert when `-vmalert.proxyURL` is set. See [9267](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9267) PR for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): reduce CPU usage caused by garbage collection during indexDB lookups on cache misses. See [#9324](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9324) for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `vm_cache_eviction_bytes_total` counter metrics to reflect cache evictions due to expiration, misses and cache size. See [9293](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9293) PR for details. Thanks to the @BenNF
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): enhance `MustReadAt` panic message to include filename for easier debugging of out-of-range reads. See [#9106](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9106).
|
||||
|
||||
|
||||
@@ -82,15 +82,17 @@ func (c *Cache) GetBlock(k Key) Block {
|
||||
return shard.GetBlock(k)
|
||||
}
|
||||
|
||||
// PutBlock puts the given block b under the given key k into c.
|
||||
func (c *Cache) PutBlock(k Key, b Block) {
|
||||
// TryPutBlock puts the given block b under the given key k into c.
|
||||
//
|
||||
// returns true if block was added to the cache
|
||||
func (c *Cache) TryPutBlock(k Key, b Block) bool {
|
||||
idx := uint64(0)
|
||||
if len(c.shards) > 1 {
|
||||
h := k.hashUint64()
|
||||
idx = h % uint64(len(c.shards))
|
||||
}
|
||||
shard := c.shards[idx]
|
||||
shard.PutBlock(k, b)
|
||||
return shard.TryPutBlock(k, b)
|
||||
}
|
||||
|
||||
// Len returns the number of blocks in the cache c.
|
||||
@@ -302,7 +304,7 @@ func (c *cache) GetBlock(k Key) Block {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) PutBlock(k Key, b Block) {
|
||||
func (c *cache) TryPutBlock(k Key, b Block) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
misses := c.perKeyMisses[k]
|
||||
@@ -313,9 +315,12 @@ func (c *cache) PutBlock(k Key, b Block) {
|
||||
// Do not cache the entry if there were up to *missesBeforeCaching unsuccessful attempts to access it.
|
||||
// This may be one-time-wonders entry, which won't be accessed more, so do not cache it
|
||||
// in order to save memory for frequently accessed items.
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// reset key misses counter
|
||||
// it should help to reduce memory on fast cache eviction
|
||||
c.perKeyMisses[k] = 0
|
||||
// Store b in the cache.
|
||||
pes := c.m[k.Part]
|
||||
if pes == nil {
|
||||
@@ -323,7 +328,7 @@ func (c *cache) PutBlock(k Key, b Block) {
|
||||
c.m[k.Part] = pes
|
||||
} else if pes[k.Offset] != nil {
|
||||
// The block has been already registered by concurrent goroutine.
|
||||
return
|
||||
return true
|
||||
}
|
||||
e := &cacheEntry{
|
||||
lastAccessTime: fasttime.UnixTimestamp(),
|
||||
@@ -337,6 +342,7 @@ func (c *cache) PutBlock(k Key, b Block) {
|
||||
for c.SizeBytes() > maxSizeBytes && len(c.lah) > 0 {
|
||||
c.removeLeastRecentlyAccessedItem()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *cache) removeLeastRecentlyAccessedItem() {
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestCache(t *testing.T) {
|
||||
var b testBlock
|
||||
blockSize := b.SizeBytes()
|
||||
// Put a single entry into cache
|
||||
c.PutBlock(k, &b)
|
||||
c.TryPutBlock(k, &b)
|
||||
if n := c.Len(); n != 1 {
|
||||
t.Fatalf("unexpected number of items in the cache; got %d; want %d", n, 1)
|
||||
}
|
||||
@@ -85,7 +85,7 @@ func TestCache(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < *missesBeforeCaching; i++ {
|
||||
// Store the missed entry to the cache. It shouldn't be stored because of the previous cache miss
|
||||
c.PutBlock(k, &b)
|
||||
c.TryPutBlock(k, &b)
|
||||
if n := c.SizeBytes(); n != 0 {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, 0)
|
||||
}
|
||||
@@ -101,7 +101,7 @@ func TestCache(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// Store the entry again. Now it must be stored because of the second cache miss.
|
||||
c.PutBlock(k, &b)
|
||||
c.TryPutBlock(k, &b)
|
||||
if n := c.SizeBytes(); n != blockSize {
|
||||
t.Fatalf("unexpected SizeBytes(); got %d; want %d", n, blockSize)
|
||||
}
|
||||
@@ -151,7 +151,7 @@ func testCacheSetGet(c *Cache, worker int) {
|
||||
Offset: uint64(worker*1000 + i),
|
||||
Part: part,
|
||||
}
|
||||
c.PutBlock(k, &b)
|
||||
c.TryPutBlock(k, &b)
|
||||
if b1 := c.GetBlock(k); b1 != &b {
|
||||
panic(fmt.Errorf("unexpected block obtained; got %v; want %v", b1, &b))
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func BenchmarkCacheGet(b *testing.B) {
|
||||
blocks := make([]*testBlock, blocksCount)
|
||||
for i := 0; i < blocksCount; i++ {
|
||||
blocks[i] = &testBlock{}
|
||||
c.PutBlock(Key{Offset: uint64(i)}, blocks[i])
|
||||
c.TryPutBlock(Key{Offset: uint64(i)}, blocks[i])
|
||||
}
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(blocks)))
|
||||
|
||||
@@ -38,6 +38,17 @@ type partSearch struct {
|
||||
ibItemIdx int
|
||||
|
||||
sparse bool
|
||||
|
||||
// tmpIB contains temporary inmemoryBlock reused during partSearch requests
|
||||
// It reduces memory allocations on cache misses
|
||||
//
|
||||
// tmpIB is valid until call to reset.
|
||||
tmpIB *inmemoryBlock
|
||||
// tmpIdB contains temporary indexBlock reused during partSearch requests
|
||||
// It reduces memory allocations on cache misses
|
||||
//
|
||||
// tmpIdB is valid until call to reset.
|
||||
tmpIdB *indexBlock
|
||||
}
|
||||
|
||||
func (ps *partSearch) reset() {
|
||||
@@ -52,6 +63,10 @@ func (ps *partSearch) reset() {
|
||||
|
||||
ps.sb.Reset()
|
||||
|
||||
ps.tmpIB.Reset()
|
||||
ps.tmpIdB.buf = ps.tmpIdB.buf[:0]
|
||||
ps.tmpIdB.bhs = ps.tmpIdB.bhs[:0]
|
||||
|
||||
ps.ib = nil
|
||||
ps.ibItemIdx = 0
|
||||
ps.sparse = false
|
||||
@@ -61,6 +76,12 @@ func (ps *partSearch) reset() {
|
||||
//
|
||||
// Use Seek for search in p.
|
||||
func (ps *partSearch) Init(p *part, sparse bool) {
|
||||
if ps.tmpIB == nil {
|
||||
ps.tmpIB = &inmemoryBlock{}
|
||||
}
|
||||
if ps.tmpIdB == nil {
|
||||
ps.tmpIdB = &indexBlock{}
|
||||
}
|
||||
ps.reset()
|
||||
|
||||
ps.p = p
|
||||
@@ -276,7 +297,11 @@ func (ps *partSearch) nextBHS() error {
|
||||
return fmt.Errorf("cannot read index block: %w", err)
|
||||
}
|
||||
b = idxb
|
||||
idxbCache.PutBlock(idxbKey, b)
|
||||
if idxbCache.TryPutBlock(idxbKey, b) {
|
||||
// cannot re-used tmpIdB anymore
|
||||
// it's now owned by idxbCache
|
||||
ps.tmpIdB = &indexBlock{}
|
||||
}
|
||||
}
|
||||
idxb := b.(*indexBlock)
|
||||
ps.bhs = idxb.bhs
|
||||
@@ -292,9 +317,8 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot decompress index block: %w", err)
|
||||
}
|
||||
idxb := &indexBlock{
|
||||
buf: append([]byte{}, ps.indexBuf...),
|
||||
}
|
||||
idxb := ps.tmpIdB
|
||||
idxb.buf = append(idxb.buf[:0], ps.indexBuf...)
|
||||
idxb.bhs, err = unmarshalBlockHeadersNoCopy(idxb.bhs[:0], idxb.buf, int(mr.blockHeadersCount))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err)
|
||||
@@ -318,7 +342,11 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
||||
return nil, err
|
||||
}
|
||||
b = ib
|
||||
cache.PutBlock(ibKey, b)
|
||||
if cache.TryPutBlock(ibKey, b) {
|
||||
// cannot re-used tmpIB anymore
|
||||
// it's now owned by cache
|
||||
ps.tmpIB = &inmemoryBlock{}
|
||||
}
|
||||
}
|
||||
ib := b.(*inmemoryBlock)
|
||||
return ib, nil
|
||||
@@ -333,7 +361,8 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
||||
ps.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.lensData, int(bh.lensBlockSize))
|
||||
ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset))
|
||||
|
||||
ib := &inmemoryBlock{}
|
||||
ps.tmpIB.Reset()
|
||||
ib := ps.tmpIB
|
||||
if err := ib.UnmarshalData(&ps.sb, bh.firstItem, bh.commonPrefix, bh.itemsCount, bh.marshalType); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal storage block with %d items: %w", bh.itemsCount, err)
|
||||
}
|
||||
|
||||
@@ -189,7 +189,7 @@ func (ps *partSearch) nextBHS() bool {
|
||||
return false
|
||||
}
|
||||
b = ib
|
||||
ibCache.PutBlock(indexBlockKey, b)
|
||||
ibCache.TryPutBlock(indexBlockKey, b)
|
||||
}
|
||||
ib := b.(*indexBlock)
|
||||
ps.bhs = ib.bhs
|
||||
|
||||
Reference in New Issue
Block a user