mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
Background merge updates the global stats on the number of merged / deleted items. This may result in slowdown when multiple goroutines update these global stats at frequent rate, since every goroutine must fetch the actual value for the updated stats from slow memory on every update. It is much faster to count the needed stats locally per every goroutine and then periodically updating the global stats (once per ~second). Thanks to @tIGO for the intial implementation of this idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8683/files#diff-95e28ae911944708f94f3bb31fa9ba8bc185dedc23ae6fb02a272c34b8f83244 This should help improving scalability of background merges on multi-CPU systems. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8682
80 lines
2.0 KiB
Go
80 lines
2.0 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
)
|
|
|
|
func BenchmarkBlockStreamWriterBlocksWorstCase(b *testing.B) {
|
|
benchmarkBlockStreamWriter(b, benchBlocksWorstCase, len(benchRawRowsWorstCase), false)
|
|
}
|
|
|
|
func BenchmarkBlockStreamWriterBlocksBestCase(b *testing.B) {
|
|
benchmarkBlockStreamWriter(b, benchBlocksBestCase, len(benchRawRowsBestCase), false)
|
|
}
|
|
|
|
func BenchmarkBlockStreamWriterRowsWorstCase(b *testing.B) {
|
|
benchmarkBlockStreamWriter(b, benchBlocksWorstCase, len(benchRawRowsWorstCase), true)
|
|
}
|
|
|
|
func BenchmarkBlockStreamWriterRowsBestCase(b *testing.B) {
|
|
benchmarkBlockStreamWriter(b, benchBlocksBestCase, len(benchRawRowsBestCase), true)
|
|
}
|
|
|
|
func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeRows bool) {
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(rowsCount))
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var rowsMerged uint64
|
|
var bsw blockStreamWriter
|
|
var mp inmemoryPart
|
|
var ph partHeader
|
|
var ebsCopy []Block
|
|
for i := range ebs {
|
|
var ebCopy Block
|
|
ebCopy.CopyFrom(&ebs[i])
|
|
ebsCopy = append(ebsCopy, ebCopy)
|
|
}
|
|
loopCount := 0
|
|
|
|
for pb.Next() {
|
|
if writeRows {
|
|
for i := range ebsCopy {
|
|
eb := &ebsCopy[i]
|
|
if err := eb.UnmarshalData(); err != nil {
|
|
panic(fmt.Errorf("cannot unmarshal block %d on loop %d: %w", i, loopCount, err))
|
|
}
|
|
}
|
|
}
|
|
|
|
bsw.MustInitFromInmemoryPart(&mp, -5)
|
|
for i := range ebsCopy {
|
|
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
|
|
}
|
|
bsw.MustClose()
|
|
mp.Reset()
|
|
loopCount++
|
|
}
|
|
})
|
|
}
|
|
|
|
var benchBlocksWorstCase = newBenchBlocks(benchRawRowsWorstCase)
|
|
var benchBlocksBestCase = newBenchBlocks(benchRawRowsBestCase)
|
|
|
|
func newBenchBlocks(rows []rawRow) []Block {
|
|
var ebs []Block
|
|
|
|
mp := newTestInmemoryPart(rows)
|
|
var bsr blockStreamReader
|
|
bsr.MustInitFromInmemoryPart(mp)
|
|
for bsr.NextBlock() {
|
|
var eb Block
|
|
eb.CopyFrom(&bsr.Block)
|
|
ebs = append(ebs, eb)
|
|
}
|
|
if err := bsr.Error(); err != nil {
|
|
panic(fmt.Errorf("unexpected error when reading inmemoryPart: %w", err))
|
|
}
|
|
return ebs
|
|
}
|