Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
e8a1b3f9bd lib/promutil: added labelcompressor cleanup 2026-05-17 20:30:34 +03:00
7 changed files with 676 additions and 238 deletions

View File

@@ -3,32 +3,173 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
const lcShardCount = 128
// lcShard is one shard of the label-to-index map.
// Padded to one cache line to prevent false sharing between adjacent shards.
type lcShard struct {
mu sync.RWMutex
m map[string]uint32
_ [32]byte
}
// LabelsCompressor compresses []prompb.Label into short binary strings.
// Zero value is ready to use. Each Register call must be paired with Unregister.
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
// to reduce RWMutex contention in the concurrent compressFast hot path.
labelToIdxShards [lcShardCount]lcShard
nextIdx atomic.Uint64
// generation is incremented after each rotate() call that removes labels.
// Callers can use it to detect when cached compressed keys become stale.
generation atomic.Uint64
totalSizeBytes atomic.Uint64
idxToLabel atomic.Pointer[[]prompb.Label]
// usedBitset tracks which indices were used in the current rotation period.
// Bits are set atomically from compressFast without mu; grown under mu.
usedBitset atomic.Pointer[[]uint64]
totalSizeBytes uint64
mu sync.Mutex
// freeIdxs holds index slots available for reuse.
freeIdxs []uint32
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
pendingIdxs map[uint32]struct{}
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
// Requires absence from both prevBitset and usedBitset to evict a label,
// guarding against partial snapshots when rotate fires mid-compress loop.
// Only accessed in rotate, which runs in a single goroutine.
prevBitset []uint64
// registry holds staleness intervals of active callers; rotation period = max(registry).
registry []time.Duration
// tickerCh signals runRotate to re-evaluate the rotation period.
tickerCh chan struct{}
}
// idleRotationPeriod is the rotation period used when no callers are registered.
// It must be long enough that a sleeping goroutine does not consume measurable resources.
const idleRotationPeriod = time.Hour
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
// Rotation period equals max(registry). Must be paired with Unregister.
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
lc.mu.Lock()
if lc.tickerCh == nil {
lc.tickerCh = make(chan struct{}, 1)
go lc.runRotate()
}
lc.registry = append(lc.registry, stalenessInterval)
tickerCh := lc.tickerCh
lc.mu.Unlock()
select {
case tickerCh <- struct{}{}:
default:
}
}
// Unregister removes stalenessInterval from the registry.
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
lc.mu.Lock()
for i, d := range lc.registry {
if d == stalenessInterval {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
tickerCh := lc.tickerCh
lc.mu.Unlock()
if tickerCh != nil {
select {
case tickerCh <- struct{}{}:
default:
}
}
}
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
// must be called with lc.mu held
var max time.Duration
for _, d := range lc.registry {
if d > max {
max = d
}
}
if max == 0 {
return idleRotationPeriod
}
return max
}
func (lc *LabelsCompressor) runRotate() {
lc.mu.Lock()
period := lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer := time.NewTimer(period)
defer timer.Stop()
for {
select {
case <-timer.C:
lc.rotate()
lc.mu.Lock()
period = lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer.Reset(period)
case <-lc.tickerCh:
lc.mu.Lock()
newPeriod := lc.maxRegisteredStaleness()
lc.mu.Unlock()
if newPeriod != period {
period = newPeriod
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
}
}
}
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
lc.mu.Lock()
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
lc.mu.Unlock()
return n
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load()
lc.mu.Lock()
p := lc.idxToLabel.Load()
var n uint64
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
}
lc.mu.Unlock()
return n
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
@@ -42,46 +183,139 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
if lc.compressFast(a.A[1:], labels) {
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
lc.mu.Lock()
lc.compressSlow(a.A[1:], labels)
lc.mu.Unlock()
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
var keyBuf [256]byte
for i, label := range labels {
// Build composite key name+'\x00'+value into the stack buffer.
totalLen := len(label.Name) + 1 + len(label.Value)
var key string
if totalLen <= len(keyBuf) {
n := copy(keyBuf[:], label.Name)
keyBuf[n] = '\x00'
copy(keyBuf[n+1:], label.Value)
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
} else {
key = makeLabelKey(label)
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
return false
}
dst[i] = uint64(idx)
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
if lc.usedBitset.Load() != p {
for _, idx64 := range dst {
lc.markUsed(uint32(idx64))
}
}
return true
}
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
for i, label := range labels {
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
cloned := cloneLabel(label)
idx = lc.newIndex(cloned)
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[string]uint32)
}
shard.m[key] = idx
shard.mu.Unlock()
} else {
lc.markUsed(idx)
}
dst[i] = uint64(idx)
}
}
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
func (lc *LabelsCompressor) markUsed(idx uint32) {
p := lc.usedBitset.Load()
if p == nil {
return
}
_ = dst[len(labels)-1]
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
idx := lc.nextIdx.Add(1)
v = idx
labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx,
// so it can be found by possible concurrent goroutines.
//
// We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy)
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded {
// This label has been stored by a concurrent goroutine with different index,
// use it for key consistency in aggrState.
v = vNew
}
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
bits := *p
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
dst[i] = v.(uint64)
}
}
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
func (lc *LabelsCompressor) growBitset(idx uint32) {
needed := int(idx>>6) + 1
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
if needed <= len(bits) {
return
}
newBits := make([]uint64, needed)
copy(newBits, bits)
lc.usedBitset.Store(&newBits)
}
func makeLabelKey(label prompb.Label) string {
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
buf = append(buf, label.Name...)
buf = append(buf, '\x00')
buf = append(buf, label.Value...)
return string(buf)
}
// Generation returns the current generation count.
// It increments whenever labels are evicted, invalidating any cached compressed keys.
func (lc *LabelsCompressor) Generation() uint64 {
return lc.generation.Load()
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
@@ -98,6 +332,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
// Must be called with lc.mu held.
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
var idx uint32
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
if len(lc.freeIdxs) > 0 {
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
next[idx] = cloned
lc.idxToLabel.Store(&next)
} else {
idx = uint32(len(newIdxToLabel))
next := append(newIdxToLabel, cloned)
lc.idxToLabel.Store(&next)
}
lc.growBitset(idx)
lc.markUsed(idx)
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
return idx
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
@@ -124,111 +387,139 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
p := lc.idxToLabel.Load()
if p == nil {
encoding.PutUint64s(a)
logger.Panicf("BUG: idxToLabel is nil in Decompress")
return dst
}
labels := *p
for _, idx := range a.A {
if int(idx) >= len(labels) {
encoding.PutUint64s(a)
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
}
dst = append(dst, labels[idx])
}
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label)
// rotate evicts unused labels and recycles their slots.
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
// giving in-flight keys a full rotation period to drain via Decompress.
// Must not be called concurrently with itself.
func (lc *LabelsCompressor) rotate() {
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
// Also snapshot idxToLabel for the phase-1 scan.
lc.mu.Lock()
var currentBits []uint64
if p := lc.usedBitset.Load(); p != nil {
currentBits = make([]uint64, len(*p))
copy(currentBits, *p)
newBits := make([]uint64, len(*p))
lc.usedBitset.Store(&newBits)
}
return dst
}
// labelsMap maps uint64 key to prompb.Label
//
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
type labelsMap struct {
readOnly atomic.Pointer[[]*prompb.Label]
mutableLock sync.Mutex
mutable map[uint64]*prompb.Label
misses uint64
}
// Store stores label under the given idx.
//
// It is safe calling Store from concurrent goroutines.
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
lm.mutableLock.Lock()
if lm.mutable == nil {
lm.mutable = make(map[uint64]*prompb.Label)
}
lm.mutable[idx] = &label
lm.mutableLock.Unlock()
}
// Load returns the label for the given idx.
//
// Load returns false if lm doesn't contain label for the given idx.
//
// It is safe calling Load from concurrent goroutines.
//
// The performance of Load() scales linearly with CPU cores.
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
// Fast path - the label for the given idx has been found in lm.readOnly.
return *pLabel, true
// Skip if nothing was used; all-zero snapshots from rapid rotation would
// advance pendingIdxs and reclaim slots before in-flight keys drain.
anyUsed := false
for _, w := range currentBits {
if w != 0 {
anyUsed = true
break
}
}
// Slow path - search in lm.mutable.
return lm.loadSlow(idx)
}
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
lm.mutableLock.Lock()
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
pReadOnly := lm.readOnly.Load()
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
lm.mutableLock.Unlock()
return *pLabel, true
}
}
// The label for the idx wasn't found in readOnly. Search it in mutable.
lm.misses++
pLabel := lm.mutable[idx]
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
lm.moveMutableToReadOnlyLocked(pReadOnly)
lm.misses = 0
}
lm.mutableLock.Unlock()
if pLabel == nil {
return prompb.Label{}, false
}
return *pLabel, true
}
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
if len(lm.mutable) == 0 {
// Nothing to move
if !anyUsed {
lc.mu.Unlock()
return
}
var labels []*prompb.Label
if pReadOnly != nil {
labels = append(labels, *pReadOnly...)
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
prevSnap := lc.prevBitset
lc.prevBitset = currentBits
pendingIdxs := lc.pendingIdxs
idxSnap := lc.idxToLabel.Load()
lc.mu.Unlock()
// A label is used if present in either the current or previous period snapshot.
isUsed := func(idx uint32) bool {
word := idx >> 6
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
return inCurrent || inPrev
}
for idx, pLabel := range lm.mutable {
if idx < uint64(len(labels)) {
labels[idx] = pLabel
} else {
for idx > uint64(len(labels)) {
labels = append(labels, nil)
// Phase 2: reclaim slots evicted last rotation if still unused.
var toReclaim []uint32
var nextRelease map[uint32]struct{}
for idx := range pendingIdxs {
if isUsed(idx) {
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
labels = append(labels, pLabel)
nextRelease[idx] = struct{}{}
} else {
toReclaim = append(toReclaim, idx)
}
}
clear(lm.mutable)
lm.readOnly.Store(&labels)
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
// cache-line contention with concurrent compressFast lookups.
phase1Evicted := false
if idxSnap != nil {
for i, label := range *idxSnap {
if label == (prompb.Label{}) {
continue // freed slot
}
idx := uint32(i)
if _, inPending := pendingIdxs[idx]; inPending {
continue // already queued for release
}
if isUsed(idx) {
continue
}
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.Lock()
delete(shard.m, key)
shard.mu.Unlock()
phase1Evicted = true
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
}
}
if phase1Evicted {
lc.generation.Add(1)
}
lc.mu.Lock()
if len(toReclaim) > 0 {
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
for _, idx := range toReclaim {
label := next[idx]
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
if lc.totalSizeBytes >= entrySize {
lc.totalSizeBytes -= entrySize
}
next[idx] = prompb.Label{}
lc.freeIdxs = append(lc.freeIdxs, idx)
}
lc.idxToLabel.Store(&next)
}
lc.pendingIdxs = nextRelease
lc.mu.Unlock()
}

View File

@@ -8,47 +8,96 @@ import (
)
func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
for _, labels := range series {
lc.Compress(nil, labels)
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
datas := make([][]byte, len(series))
var dst []byte
for i, labels := range series {
dstLen := len(dst)
dst = lc.Compress(dst, labels)
datas[i] = dst[dstLen:]
}
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
datas := make([][]byte, len(series))
var buf []byte
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
datas[i] = buf[bufLen:]
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
var Sink atomic.Uint64

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func BenchmarkDedupAggr(b *testing.B) {
@@ -63,14 +64,15 @@ func newBenchSamples(count int) []pushSample {
}
labelsLen := len(labels)
samples := make([]pushSample, count)
var lc promutil.LabelsCompressor
var keyBuf []byte
for i := range samples {
sample := &samples[i]
labels = append(labels[:labelsLen], prompb.Label{
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
keyBuf = lc.Compress(keyBuf[:0], labels)
sample.key = string(keyBuf)
sample.value = float64(i)
}

View File

@@ -77,6 +77,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
lc.Register(2 * interval)
d.wg.Go(func() {
d.runFlusher(pushFunc)
})
@@ -86,6 +88,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
// MustStop stops d.
func (d *Deduplicator) MustStop() {
lc.Unregister(2 * d.interval)
metrics.UnregisterSet(d.ms, true)
d.ms = nil
@@ -203,7 +207,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = decompressLabels(labels, ps.key)
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompb.Sample{

View File

@@ -17,29 +17,27 @@ type aggrOutputs struct {
outputSamples *metrics.Counter
}
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
src := bytesutil.ToUnsafeBytes(key)
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
}
src = src[nSize:]
outputKey := src[:outputKeyLen]
if !ao.useInputKey {
return key, bytesutil.ToUnsafeString(outputKey)
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
if !ao.useInputKey || int(outputKeyLen) == len(src) {
return outputKey, outputKey
}
inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
return outputKey, inputKey
}
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
var inputKey, outputKey string
var sample *pushSample
var outputs []aggrValue
var nv *aggrValues
for i := range samples {
sample = &samples[i]
inputKey, outputKey = ao.getInputOutputKey(sample.key)
sample := &samples[i]
outputKey, inputKey := ao.getInputOutputKey(sample.key)
again:
v, ok := ao.m.Load(outputKey)
@@ -81,7 +79,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
}
av.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flush
// The entry has been deleted by the concurrent call to flush.
// Try obtaining and updating the entry again.
goto again
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
@@ -24,6 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
@@ -257,6 +257,10 @@ type Config struct {
type Aggregators struct {
as []*aggregator
// workCh limits the number of concurrent aggregator.Push calls.
// Pre-allocated once to avoid per-Push channel allocation.
workCh chan struct{}
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
@@ -287,6 +291,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
@@ -307,6 +312,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
configData: configData,
filePath: filePath,
ms: ms,
@@ -364,19 +370,21 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
return matchIdxs
}
// use all available CPU cores to copy time-series into aggregators
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
for _, aggr := range a.as {
concurrencyChan <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-concurrencyChan
})
if len(a.as) == 1 {
a.as[0].Push(tss, matchIdxs)
return matchIdxs
}
// Use all available CPU cores to push time-series into aggregators in parallel.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
for _, aggr := range a.as {
a.workCh <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-a.workCh
})
}
wg.Wait()
return matchIdxs
@@ -413,8 +421,8 @@ type aggregator struct {
ignoreOldSamples bool
enableWindows bool
by []string
without []string
bySet map[string]struct{}
withoutSet map[string]struct{}
aggregateOnlyByTime bool
// interval is the interval between flushes
@@ -446,6 +454,8 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
seriesKeyCache seriesKeyCache
wg sync.WaitGroup
stopCh chan struct{}
@@ -645,8 +655,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
enableWindows: enableWindows,
stalenessInterval: stalenessInterval,
by: by,
without: without,
bySet: makeStringSet(by),
withoutSet: makeStringSet(without),
aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
@@ -709,6 +719,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
lc.Register(a.stalenessInterval)
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
})
@@ -915,6 +927,7 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
a.seriesKeyCache.reset()
startTime := time.Now()
ao := a.aggrOutputs
@@ -943,6 +956,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
lc.Unregister(a.stalenessInterval)
close(a.stopCh)
a.wg.Wait()
}
@@ -952,7 +966,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
ctx := getPushCtx()
defer putPushCtx(ctx)
buf := ctx.buf
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
@@ -986,19 +999,22 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
labels.Sort()
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
h := computeLabelsFingerprint(labels.Labels)
gen := lc.Generation()
key, ok := a.seriesKeyCache.get(h, gen)
if !ok {
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
bufLen := len(ctx.buf)
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
key = string(ctx.buf[bufLen:])
a.seriesKeyCache.set(h, gen, key)
}
bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
// Skip NaN values
@@ -1042,8 +1058,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
ctx.buf = buf
pushSamples := a.aggrOutputs.pushSamples
if a.da != nil {
pushSamples = a.da.pushSamples
@@ -1060,18 +1074,13 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
}
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
bb := bbPool.Get()
bb.B = lc.Compress(bb.B, outputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...)
bbPool.Put(bb)
dst = lc.Compress(dst, inputLabels)
return dst
}
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
ctx.buf = append(ctx.buf, ctx.keybuf...)
if len(inputLabels) > 0 {
ctx.buf = lc.Compress(ctx.buf, inputLabels)
}
}
type pushCtx struct {
@@ -1081,6 +1090,7 @@ type pushCtx struct {
inputLabels promutil.Labels
outputLabels promutil.Labels
buf []byte
keybuf []byte
}
func (ctx *pushCtx) reset() {
@@ -1115,10 +1125,10 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
if len(without) > 0 {
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
if len(withoutSet) > 0 {
for _, label := range labels {
if slices.Contains(without, label.Name) {
if _, ok := withoutSet[label.Name]; ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1126,7 +1136,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
}
} else {
for _, label := range labels {
if !slices.Contains(by, label.Name) {
if _, ok := bySet[label.Name]; !ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1136,6 +1146,17 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
return dstInput, dstOutput
}
func makeStringSet(keys []string) map[string]struct{} {
if len(keys) == 0 {
return nil
}
m := make(map[string]struct{}, len(keys))
for _, k := range keys {
m[k] = struct{}{}
}
return m
}
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
@@ -1235,7 +1256,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1257,7 +1278,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1345,3 +1366,74 @@ func sortAndRemoveDuplicates(a []string) []string {
}
var bbPool bytesutil.ByteBufferPool
type seriesKeyCache struct {
shards [64]seriesKeyCacheShard
}
type seriesKeyCacheShard struct {
mu sync.RWMutex
m map[uint64]seriesCacheEntry
_ [32]byte // cache-line padding
}
type seriesCacheEntry struct {
generation uint64
key string
}
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
shard := &c.shards[h>>58]
shard.mu.RLock()
e, ok := shard.m[h]
shard.mu.RUnlock()
if !ok || e.generation != generation {
return "", false
}
return e.key, true
}
func (c *seriesKeyCache) set(h, generation uint64, key string) {
shard := &c.shards[h>>58]
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[uint64]seriesCacheEntry)
}
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
shard.mu.Unlock()
}
func (c *seriesKeyCache) reset() {
for i := range c.shards {
shard := &c.shards[i]
shard.mu.Lock()
shard.m = nil
shard.mu.Unlock()
}
}
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
var buf [512]byte
n := 0
for i, l := range labels {
need := len(l.Name) + 1 + len(l.Value) + 1
if n+need > len(buf) {
b := make([]byte, n, n+need*2)
copy(b, buf[:n])
for _, l2 := range labels[i:] {
b = append(b, l2.Name...)
b = append(b, '\x00')
b = append(b, l2.Value...)
b = append(b, '\x00')
}
return xxhash.Sum64(b)
}
n += copy(buf[n:], l.Name)
buf[n] = '\x00'
n++
n += copy(buf[n:], l.Value)
buf[n] = '\x00'
n++
}
return xxhash.Sum64(buf[:n])
}

View File

@@ -45,13 +45,16 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []uint32
matchIdxs := make([]uint32, len(benchSeries))
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -82,8 +85,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
a := make([]string, 0, seriesCount)
for j := range seriesCount {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
a = append(a, s)
}
metrics := strings.Join(a, "\n")
@@ -101,13 +104,16 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []uint32
matchIdxs := make([]uint32, len(benchSeries))
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -117,21 +123,17 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
}
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputsQuoted := make([]string, len(outputs))
var config string
for i := range outputs {
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
cfg := fmt.Sprintf(`
var b strings.Builder
for _, output := range outputs {
fmt.Fprintf(&b, `
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, stringsutil.JSONString(outputs[i]))
config += cfg
`, stringsutil.JSONString(output))
}
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}