Files
VictoriaMetrics/lib/storage/block_stream_writer_timing_test.go
Aliaksandr Valialkin 23fd269ccf lib/{storage,mergeset}: reduce the multi-CPU contention on global stats vars, which are updated during background merge
Background merge updates the global stats on the number of merged / deleted items. This may result in slowdown
when multiple goroutines update these global stats at frequent rate, since every goroutine must fetch the actual value
for the updated stats from slow memory on every update. It is much faster to count the needed stats locally per every goroutine
and then periodically updating the global stats (once per ~second).

Thanks to @tIGO for the intial implementation of this idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8683/files#diff-95e28ae911944708f94f3bb31fa9ba8bc185dedc23ae6fb02a272c34b8f83244

This should help improving scalability of background merges on multi-CPU systems.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8682
2025-06-05 12:24:03 +02:00

80 lines
2.0 KiB
Go

package storage
import (
"fmt"
"testing"
)
func BenchmarkBlockStreamWriterBlocksWorstCase(b *testing.B) {
benchmarkBlockStreamWriter(b, benchBlocksWorstCase, len(benchRawRowsWorstCase), false)
}
func BenchmarkBlockStreamWriterBlocksBestCase(b *testing.B) {
benchmarkBlockStreamWriter(b, benchBlocksBestCase, len(benchRawRowsBestCase), false)
}
func BenchmarkBlockStreamWriterRowsWorstCase(b *testing.B) {
benchmarkBlockStreamWriter(b, benchBlocksWorstCase, len(benchRawRowsWorstCase), true)
}
func BenchmarkBlockStreamWriterRowsBestCase(b *testing.B) {
benchmarkBlockStreamWriter(b, benchBlocksBestCase, len(benchRawRowsBestCase), true)
}
func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeRows bool) {
b.ReportAllocs()
b.SetBytes(int64(rowsCount))
b.RunParallel(func(pb *testing.PB) {
var rowsMerged uint64
var bsw blockStreamWriter
var mp inmemoryPart
var ph partHeader
var ebsCopy []Block
for i := range ebs {
var ebCopy Block
ebCopy.CopyFrom(&ebs[i])
ebsCopy = append(ebsCopy, ebCopy)
}
loopCount := 0
for pb.Next() {
if writeRows {
for i := range ebsCopy {
eb := &ebsCopy[i]
if err := eb.UnmarshalData(); err != nil {
panic(fmt.Errorf("cannot unmarshal block %d on loop %d: %w", i, loopCount, err))
}
}
}
bsw.MustInitFromInmemoryPart(&mp, -5)
for i := range ebsCopy {
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
}
bsw.MustClose()
mp.Reset()
loopCount++
}
})
}
var benchBlocksWorstCase = newBenchBlocks(benchRawRowsWorstCase)
var benchBlocksBestCase = newBenchBlocks(benchRawRowsBestCase)
func newBenchBlocks(rows []rawRow) []Block {
var ebs []Block
mp := newTestInmemoryPart(rows)
var bsr blockStreamReader
bsr.MustInitFromInmemoryPart(mp)
for bsr.NextBlock() {
var eb Block
eb.CopyFrom(&bsr.Block)
ebs = append(ebs, eb)
}
if err := bsr.Error(); err != nil {
panic(fmt.Errorf("unexpected error when reading inmemoryPart: %w", err))
}
return ebs
}