lib/{storage,mergeset}: reduce the multi-CPU contention on global stats vars, which are updated during background merge

Background merge updates the global stats on the number of merged / deleted items. This may result in slowdown
when multiple goroutines update these global stats at frequent rate, since every goroutine must fetch the actual value
for the updated stats from slow memory on every update. It is much faster to count the needed stats locally per every goroutine
and then periodically updating the global stats (once per ~second).

Thanks to @tIGO for the intial implementation of this idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8683/files#diff-95e28ae911944708f94f3bb31fa9ba8bc185dedc23ae6fb02a272c34b8f83244

This should help improving scalability of background merges on multi-CPU systems.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8682
This commit is contained in:
Aliaksandr Valialkin
2025-06-05 12:24:00 +02:00
parent 1f5d02e059
commit 23fd269ccf
5 changed files with 68 additions and 24 deletions

View File

@@ -6,6 +6,7 @@ import (
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -101,10 +102,30 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock Prepa
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *atomic.Uint64) error {
// Use local variables for tracking the number of merged items
// and periodically propagate the collected stats to the caller, so it could be reflected in the exposed metrics.
//
// This minimizes expensive updates of itemsMerged var from concurrently running goroutines,
// and improves concurrent merge scalability on multi-CPU systems - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8682 .
var updateStatsDeadline uint64
var localItemsMerged uint64
updateStats := func() {
itemsMerged.Add(localItemsMerged)
localItemsMerged = 0
}
defer updateStats()
again:
ct := fasttime.UnixTimestamp()
if ct > updateStatsDeadline {
updateStats()
// Update the external stats once per second
updateStatsDeadline = ct + 1
}
if len(bsm.bsrHeap) == 0 {
// Write the last (maybe incomplete) inmemoryBlock to bsw.
bsm.flushIB(bsw, ph, itemsMerged)
bsm.flushIB(bsw, ph, &localItemsMerged)
return nil
}
@@ -139,7 +160,7 @@ again:
}
if !bsm.ib.Add(item) {
// The bsm.ib is full. Flush it to bsw and continue.
bsm.flushIB(bsw, ph, itemsMerged)
bsm.flushIB(bsw, ph, &localItemsMerged)
continue
}
bsr.currItemIdx++
@@ -163,14 +184,14 @@ again:
goto again
}
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *atomic.Uint64) {
func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) {
items := bsm.ib.items
data := bsm.ib.data
if len(items) == 0 {
// Nothing to flush.
return
}
itemsMerged.Add(uint64(len(items)))
*itemsMerged += uint64(len(items))
if bsm.prepareBlock != nil {
bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...)
bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...)

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"path/filepath"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -132,8 +131,8 @@ func (bsw *blockStreamWriter) MustClose() {
}
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *atomic.Uint64) {
rowsMerged.Add(uint64(b.rowsCount()))
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
*rowsMerged += uint64(b.rowsCount())
b.deduplicateSamplesDuringMerge()
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)

View File

@@ -2,7 +2,6 @@ package storage
import (
"fmt"
"sync/atomic"
"testing"
)
@@ -23,11 +22,10 @@ func BenchmarkBlockStreamWriterRowsBestCase(b *testing.B) {
}
func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeRows bool) {
var rowsMerged atomic.Uint64
b.ReportAllocs()
b.SetBytes(int64(rowsCount))
b.RunParallel(func(pb *testing.PB) {
var rowsMerged uint64
var bsw blockStreamWriter
var mp inmemoryPart
var ph partHeader
@@ -38,6 +36,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
ebsCopy = append(ebsCopy, ebCopy)
}
loopCount := 0
for pb.Next() {
if writeRows {
for i := range ebsCopy {

View File

@@ -6,6 +6,7 @@ import (
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
@@ -45,22 +46,47 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
defer putBlock(pendingBlock)
tmpBlock := getBlock()
defer putBlock(tmpBlock)
// Use local variables for tracking the number of merged and deleted rows
// and periodically propagate the collected stats to the caller, so it could be reflected in the exposed metrics.
//
// This minimizes expensive updates of rowsMerged and rowsDeleted vars from concurrently running goroutines,
// and improves concurrent merge scalability on multi-CPU systems - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8682 .
var updateStatsDeadline uint64
var localRowsMerged, localRowsDeleted uint64
updateStats := func() {
rowsDeleted.Add(localRowsDeleted)
localRowsDeleted = 0
rowsMerged.Add(localRowsMerged)
localRowsMerged = 0
}
defer updateStats()
for bsm.NextBlock() {
ct := fasttime.UnixTimestamp()
if ct > updateStatsDeadline {
updateStats()
// Update the external stats once per second
updateStatsDeadline = ct + 1
}
select {
case <-stopCh:
return errForciblyStopped
default:
}
b := bsm.Block
if dmis.Has(b.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
rowsDeleted.Add(uint64(b.bh.RowsCount))
localRowsDeleted += uint64(b.bh.RowsCount)
continue
}
retentionDeadline := bsm.getRetentionDeadline(&b.bh)
if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
rowsDeleted.Add(uint64(b.bh.RowsCount))
localRowsDeleted += uint64(b.bh.RowsCount)
continue
}
if pendingBlockIsEmpty {
@@ -77,14 +103,14 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
if b.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &b.bh.TSID, &pendingBlock.bh.TSID)
}
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, &localRowsMerged)
pendingBlock.CopyFrom(b)
continue
}
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= b.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with b.
// Write the pendingBlock and then deal with b.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, &localRowsMerged)
pendingBlock.CopyFrom(b)
continue
}
@@ -98,7 +124,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
tmpBlock.bh.TSID = b.bh.TSID
tmpBlock.bh.Scale = b.bh.Scale
tmpBlock.bh.PrecisionBits = min(pendingBlock.bh.PrecisionBits, b.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, rowsDeleted)
mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, &localRowsDeleted)
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
// More entries may be added to tmpBlock. Swap it with pendingBlock,
// so more entries may be added to pendingBlock on the next iteration.
@@ -120,19 +146,19 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
tmpBlock.fixupTimestamps()
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
bsw.WriteExternalBlock(tmpBlock, ph, &localRowsMerged)
}
if err := bsm.Error(); err != nil {
return fmt.Errorf("cannot read block to be merged: %w", err)
}
if !pendingBlockIsEmpty {
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
bsw.WriteExternalBlock(pendingBlock, ph, &localRowsMerged)
}
return nil
}
// mergeBlocks merges ib1 and ib2 to ob.
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) {
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) {
ib1.assertMergeable(ib2)
ib1.assertUnmarshaled()
ib2.assertUnmarshaled()
@@ -177,7 +203,7 @@ func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *atom
}
}
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) {
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) {
if b.bh.MinTimestamp >= retentionDeadline {
// Fast path - the block contains only samples with timestamps bigger than retentionDeadline.
return
@@ -189,7 +215,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted
nextIdx++
}
if n := nextIdx - nextIdxOrig; n > 0 {
rowsDeleted.Add(uint64(n))
*rowsDeleted += uint64(n)
b.nextIdx = nextIdx
}
}

View File

@@ -3,7 +3,6 @@ package storage
import (
"sort"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -103,7 +102,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
// Group rows into blocks.
var scale int16
var rowsMerged atomic.Uint64
var rowsMerged uint64
r := &rows[0]
tsid := &r.TSID
precisionBits := r.PrecisionBits
@@ -130,8 +129,8 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
if n := rowsMerged.Load(); n != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", n, len(rows))
if rowsMerged != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows))
}
rrm.bsw.MustClose()
}