replace explicit compressor cleanup calls with rotation background task

This commit is contained in:
Andrii Chubatiuk
2026-04-23 21:23:54 +03:00
parent f59e3d7742
commit e6e053c8f3
6 changed files with 312 additions and 149 deletions

View File

@@ -3,6 +3,7 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -11,16 +12,125 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
// LabelsCompressor compresses []prompb.Label into short binary strings.
// Zero value is ready to use. Each Register call must be paired with Unregister.
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel atomic.Pointer[[]prompb.Label]
// usedBitset tracks which indices were used in the current rotation period.
// Bits are set atomically from compressFast without mu; grown under mu.
usedBitset atomic.Pointer[[]uint64]
totalSizeBytes uint64
mu sync.Mutex
freeList []uint32
pendingDelete map[uint32]struct{}
pendingFree map[uint32]struct{}
// freeIdxs holds index slots available for reuse.
freeIdxs []uint32
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
pendingIdxs map[uint32]struct{}
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
// Requires absence from both prevBitset and usedBitset to evict a label,
// guarding against partial snapshots when rotate fires mid-compress loop.
// Only accessed in rotate, which runs in a single goroutine.
prevBitset []uint64
// registry holds staleness intervals of active callers; rotation period = max(registry).
registry []time.Duration
// tickerCh signals runRotate to re-evaluate the rotation period.
tickerCh chan struct{}
}
// idleRotationPeriod is the rotation period used when no callers are registered.
// It must be long enough that a sleeping goroutine does not consume measurable resources.
const idleRotationPeriod = time.Hour
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
// Rotation period equals max(registry). Must be paired with Unregister.
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
lc.mu.Lock()
if lc.tickerCh == nil {
lc.tickerCh = make(chan struct{}, 1)
go lc.runRotate()
}
lc.registry = append(lc.registry, stalenessInterval)
tickerCh := lc.tickerCh
lc.mu.Unlock()
select {
case tickerCh <- struct{}{}:
default:
}
}
// Unregister removes stalenessInterval from the registry.
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
lc.mu.Lock()
for i, d := range lc.registry {
if d == stalenessInterval {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
tickerCh := lc.tickerCh
lc.mu.Unlock()
if tickerCh != nil {
select {
case tickerCh <- struct{}{}:
default:
}
}
}
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
// must be called with lc.mu held
var max time.Duration
for _, d := range lc.registry {
if d > max {
max = d
}
}
if max == 0 {
return idleRotationPeriod
}
return max
}
func (lc *LabelsCompressor) runRotate() {
lc.mu.Lock()
period := lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer := time.NewTimer(period)
defer timer.Stop()
for {
select {
case <-timer.C:
lc.rotate()
lc.mu.Lock()
period = lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer.Reset(period)
case <-lc.tickerCh:
lc.mu.Lock()
newPeriod := lc.maxRegisteredStaleness()
lc.mu.Unlock()
if newPeriod != period {
period = newPeriod
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
}
}
}
}
// SizeBytes returns the size of lc data in bytes
@@ -33,12 +143,12 @@ func (lc *LabelsCompressor) SizeBytes() uint64 {
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
p := lc.idxToLabel.Load()
if p == nil {
return 0
}
lc.mu.Lock()
n := uint64(len(*p) - len(lc.freeList))
p := lc.idxToLabel.Load()
var n uint64
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
}
lc.mu.Unlock()
return n
}
@@ -71,12 +181,31 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
}
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
return false
}
dst[i] = uint64(v.(uint32))
idx := v.(uint32)
dst[i] = uint64(idx)
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
if lc.usedBitset.Load() != p {
for _, idx64 := range dst {
lc.markUsed(uint32(idx64))
}
}
return true
}
@@ -89,11 +218,45 @@ func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
idx := lc.newIndex(cloned)
lc.labelToIdx.Store(cloned, idx)
v = idx
} else {
lc.markUsed(v.(uint32))
}
dst[i] = uint64(v.(uint32))
}
}
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
func (lc *LabelsCompressor) markUsed(idx uint32) {
p := lc.usedBitset.Load()
if p == nil {
return
}
bits := *p
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
func (lc *LabelsCompressor) growBitset(idx uint32) {
needed := int(idx>>6) + 1
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
if needed <= len(bits) {
return
}
newBits := make([]uint64, needed)
copy(newBits, bits)
lc.usedBitset.Store(&newBits)
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
@@ -120,9 +283,9 @@ func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
newIdxToLabel = *p
}
if len(lc.freeList) > 0 {
idx = lc.freeList[len(lc.freeList)-1]
lc.freeList = lc.freeList[:len(lc.freeList)-1]
if len(lc.freeIdxs) > 0 {
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
next[idx] = cloned
@@ -133,6 +296,8 @@ func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
lc.idxToLabel.Store(&next)
}
lc.growBitset(idx)
lc.markUsed(idx)
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
return idx
}
@@ -168,6 +333,7 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if p == nil {
encoding.PutUint64s(a)
logger.Panicf("BUG: idxToLabel is nil in Decompress")
return dst
}
labels := *p
for _, idx := range a.A {
@@ -182,85 +348,90 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
return dst
}
// Cleanup removes entries not referenced by any key in liveKeys.
// Pass nil or empty liveKeys to reclaim all entries.
func (lc *LabelsCompressor) Cleanup(liveKeys []string) {
if len(liveKeys) == 0 {
lc.labelToIdx.Range(func(k, _ any) bool {
lc.labelToIdx.Delete(k)
return true
})
lc.mu.Lock()
empty := make([]prompb.Label, 0)
lc.idxToLabel.Store(&empty)
lc.freeList = lc.freeList[:0]
lc.pendingDelete = nil
lc.pendingFree = nil
lc.totalSizeBytes = 0
// rotate evicts unused labels and recycles their slots.
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
// giving in-flight keys a full rotation period to drain via Decompress.
// Must not be called concurrently with itself.
func (lc *LabelsCompressor) rotate() {
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
// Also snapshot idxToLabel for the phase-1 scan.
lc.mu.Lock()
var currentBits []uint64
if p := lc.usedBitset.Load(); p != nil {
currentBits = make([]uint64, len(*p))
copy(currentBits, *p)
newBits := make([]uint64, len(*p))
lc.usedBitset.Store(&newBits)
}
// Skip if nothing was used; all-zero snapshots from rapid rotation would
// advance pendingIdxs and reclaim slots before in-flight keys drain.
anyUsed := false
for _, w := range currentBits {
if w != 0 {
anyUsed = true
break
}
}
if !anyUsed {
lc.mu.Unlock()
return
}
usedIdxs := make(map[uint32]struct{}, len(liveKeys)*4)
for _, key := range liveKeys {
src := bytesutil.ToUnsafeBytes(key)
labelsLen, n := encoding.UnmarshalVarUint64(src)
if n <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length in Cleanup")
}
src = src[n:]
for i := uint64(0); i < labelsLen; i++ {
idx, n := encoding.UnmarshalVarUint64(src)
if n <= 0 {
logger.Panicf("BUG: cannot unmarshal label index in Cleanup")
}
usedIdxs[uint32(idx)] = struct{}{}
src = src[n:]
}
}
lc.mu.Lock()
pendingDelete := lc.pendingDelete
pendingFree := lc.pendingFree
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
prevSnap := lc.prevBitset
lc.prevBitset = currentBits
pendingIdxs := lc.pendingIdxs
idxSnap := lc.idxToLabel.Load()
lc.mu.Unlock()
// Phase 2→3: pendingFree entries are already deleted from labelToIdx;
// safe to free the slot once no live key references the index.
var freeIdxs []uint32
var newPendingFree map[uint32]struct{}
for idx := range pendingFree {
if _, used := usedIdxs[idx]; used {
if newPendingFree == nil {
newPendingFree = make(map[uint32]struct{})
// A label is used if present in either the current or previous period snapshot.
isUsed := func(idx uint32) bool {
word := idx >> 6
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
return inCurrent || inPrev
}
// Phase 2: reclaim slots evicted last rotation if still unused.
var toReclaim []uint32
var nextRelease map[uint32]struct{}
for idx := range pendingIdxs {
if isUsed(idx) {
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
newPendingFree[idx] = struct{}{}
nextRelease[idx] = struct{}{}
} else {
freeIdxs = append(freeIdxs, idx)
toReclaim = append(toReclaim, idx)
}
}
// Phase 1→2 and new phase 1: iterate labelToIdx.
var toPromote []uint32
var newPendingDelete map[uint32]struct{}
lc.labelToIdx.Range(func(k, v any) bool {
idx := v.(uint32)
if _, used := usedIdxs[idx]; used {
return true
}
if _, pending := pendingDelete[idx]; pending {
lc.labelToIdx.Delete(k)
toPromote = append(toPromote, idx)
} else {
if newPendingDelete == nil {
newPendingDelete = make(map[uint32]struct{})
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
// cache-line contention with concurrent compressFast lookups.
if idxSnap != nil {
for i, label := range *idxSnap {
if label == (prompb.Label{}) {
continue // freed slot
}
newPendingDelete[idx] = struct{}{}
idx := uint32(i)
if _, inPending := pendingIdxs[idx]; inPending {
continue // already queued for release
}
if isUsed(idx) {
continue
}
lc.labelToIdx.Delete(label)
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
}
return true
})
}
lc.mu.Lock()
if len(freeIdxs) > 0 {
if len(toReclaim) > 0 {
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
@@ -268,24 +439,17 @@ func (lc *LabelsCompressor) Cleanup(liveKeys []string) {
}
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
for _, idx := range freeIdxs {
for _, idx := range toReclaim {
label := next[idx]
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
if lc.totalSizeBytes >= entrySize {
lc.totalSizeBytes -= entrySize
}
next[idx] = prompb.Label{}
lc.freeList = append(lc.freeList, idx)
lc.freeIdxs = append(lc.freeIdxs, idx)
}
lc.idxToLabel.Store(&next)
}
for _, idx := range toPromote {
if newPendingFree == nil {
newPendingFree = make(map[uint32]struct{})
}
newPendingFree[idx] = struct{}{}
}
lc.pendingDelete = newPendingDelete
lc.pendingFree = newPendingFree
lc.pendingIdxs = nextRelease
lc.mu.Unlock()
}

View File

@@ -9,17 +9,13 @@ import (
func BenchmarkLabelsCompressorCompress(b *testing.B) {
series := newTestSeries(100, 10)
run := func(b *testing.B, withCleanup bool) {
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
var buf []byte
liveKeys := make([]string, len(series))
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
liveKeys[i] = string(buf[bufLen:])
for _, labels := range series {
lc.Compress(nil, labels)
}
var cleanups atomic.Int64
if withCleanup {
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
@@ -27,13 +23,14 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) {
case <-done:
return
default:
lc.Cleanup(liveKeys)
cleanups.Add(1)
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
@@ -47,29 +44,27 @@ func BenchmarkLabelsCompressorCompress(b *testing.B) {
Sink.Add(uint64(len(dst)))
}
})
if withCleanup {
b.ReportMetric(float64(cleanups.Load())/float64(b.N), "cleanups/op")
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_cleanup", func(b *testing.B) { run(b, false) })
b.Run("with_cleanup", func(b *testing.B) { run(b, true) })
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
series := newTestSeries(100, 10)
run := func(b *testing.B, withCleanup bool) {
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
datas := make([][]byte, len(series))
liveKeys := make([]string, len(series))
var buf []byte
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
datas[i] = buf[bufLen:]
liveKeys[i] = string(buf[bufLen:])
}
var cleanups atomic.Int64
if withCleanup {
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
@@ -77,13 +72,14 @@ func BenchmarkLabelsCompressorDecompress(b *testing.B) {
case <-done:
return
default:
lc.Cleanup(liveKeys)
cleanups.Add(1)
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
@@ -96,12 +92,12 @@ func BenchmarkLabelsCompressorDecompress(b *testing.B) {
Sink.Add(uint64(len(labels)))
}
})
if withCleanup {
b.ReportMetric(float64(cleanups.Load())/float64(b.N), "cleanups/op")
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_cleanup", func(b *testing.B) { run(b, false) })
b.Run("with_cleanup", func(b *testing.B) { run(b, true) })
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
var Sink atomic.Uint64

View File

@@ -18,7 +18,6 @@ import (
// Deduplicator deduplicates samples per each time series.
type Deduplicator struct {
da *dedupAggr
lc promutil.LabelsCompressor
cs atomic.Pointer[currentState]
enableWindows bool
@@ -81,6 +80,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
lc.Register(2 * interval)
d.wg.Go(func() {
d.runFlusher(pushFunc)
})
@@ -90,6 +91,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
// MustStop stops d.
func (d *Deduplicator) MustStop() {
lc.Unregister(2 * d.interval)
metrics.UnregisterSet(d.ms, true)
d.ms = nil
@@ -120,7 +123,7 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
labels.Sort()
bufLen := len(buf)
buf = d.lc.Compress(buf, labels.Labels)
buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
@@ -207,7 +210,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = d.lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompb.Sample{
@@ -228,8 +231,6 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
putDeduplicatorFlushCtx(ctx)
}, cs.maxDeadline, cs.isGreen)
d.lc.Cleanup(nil)
duration := time.Since(startTime)
d.da.flushDuration.Update(duration.Seconds())
if duration > d.interval {

View File

@@ -86,10 +86,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
}
}
func (ao *aggrOutputs) flushState(ctx *flushCtx) []string {
func (ao *aggrOutputs) flushState(ctx *flushCtx) {
m := &ao.m
var outputs []aggrValue
var liveKeys []string
m.Range(func(k, v any) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
av := v.(*aggrValues)
@@ -105,7 +104,6 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) []string {
return true
}
outputKey := k.(string)
liveKeys = append(liveKeys, outputKey)
if ctx.isGreen {
outputs = av.green
} else {
@@ -120,7 +118,6 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) []string {
}
return true
})
return liveKeys
}
type aggrValues struct {

View File

@@ -53,6 +53,19 @@ var supportedOutputs = []string{
"unique_samples",
}
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutil.LabelsCompressor
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(lc.SizeBytes())
})
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(lc.ItemsCount())
})
)
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// opts can contain additional options. If opts is nil, then default options are used.
@@ -249,9 +262,9 @@ type Config struct {
type Aggregators struct {
as []*aggregator
// semCh limits the number of concurrent aggregator.Push calls.
// workCh limits the number of concurrent aggregator.Push calls.
// Pre-allocated once to avoid per-Push channel allocation.
semCh chan struct{}
workCh chan struct{}
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
@@ -283,6 +296,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
@@ -303,7 +317,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
semCh: make(chan struct{}, cgroup.AvailableCPUs()),
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
configData: configData,
filePath: filePath,
ms: ms,
@@ -367,12 +381,12 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
state := getAggrPushState()
state.wg.Add(len(a.as))
for _, aggr := range a.as {
a.semCh <- struct{}{}
a.workCh <- struct{}{}
w := getAggrPushWork()
w.aggr = aggr
w.tss = tss
w.matchIdxs = matchIdxs
w.semCh = a.semCh
w.workCh = a.workCh
w.state = state
go aggrPushWorker(w)
}
@@ -432,8 +446,6 @@ type aggregator struct {
// minDeadline is used for ignoring old samples when ignoreOldSamples or enableWindows is set
minDeadline atomic.Int64
lc promutil.LabelsCompressor
// aggrOutputs contains aggregate states for the given outputs
aggrOutputs *aggrOutputs
@@ -683,13 +695,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
})
}
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_size_bytes{%s}`, metricLabels), func() float64 {
return float64(a.lc.SizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_labels_compressor_items_count{%s}`, metricLabels), func() float64 {
return float64(a.lc.ItemsCount())
})
alignFlushToInterval := !opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v
@@ -720,6 +725,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
lc.Register(a.stalenessInterval)
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
})
@@ -940,8 +947,7 @@ func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentSt
a.minDeadline.Store(cs.maxDeadline)
ctx.isGreen = cs.isGreen
}
liveKeys := ao.flushState(ctx)
a.lc.Cleanup(liveKeys)
ao.flushState(ctx)
ctx.flushSeries()
putFlushCtx(ctx)
@@ -961,6 +967,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
lc.Unregister(a.stalenessInterval)
close(a.stopCh)
a.wg.Wait()
}
@@ -1012,7 +1019,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
bufLen := len(ctx.buf)
ctx.compressLabels(&a.lc, inputLabels.Labels, outputLabels.Labels)
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(ctx.buf[bufLen:])
@@ -1069,7 +1076,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
}
func (ctx *pushCtx) compressLabels(lc *promutil.LabelsCompressor, inputLabels, outputLabels []prompb.Label) {
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
ctx.buf = append(ctx.buf, ctx.keybuf...)
@@ -1146,13 +1153,13 @@ type aggrPushWork struct {
aggr *aggregator
tss []prompb.TimeSeries
matchIdxs []uint32
semCh chan struct{}
workCh chan struct{}
state *aggrPushState
}
func aggrPushWorker(w *aggrPushWork) {
w.aggr.Push(w.tss, w.matchIdxs)
<-w.semCh
<-w.workCh
w.state.wg.Done()
putAggrPushWork(w)
}
@@ -1169,7 +1176,7 @@ func putAggrPushWork(w *aggrPushWork) {
w.aggr = nil
w.tss = nil
w.matchIdxs = nil
w.semCh = nil
w.workCh = nil
w.state = nil
aggrPushWorkPool.Put(w)
}
@@ -1296,7 +1303,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = ctx.a.lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1318,7 +1325,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = ctx.a.lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}

View File

@@ -46,8 +46,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
var matchIdxs []uint32
matchIdxs = a.Push(benchSeries, matchIdxs)
a.Push(benchSeries, nil)
const loops = 100
@@ -106,8 +105,7 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
var matchIdxs []uint32
matchIdxs = a.Push(benchSeries, matchIdxs)
a.Push(benchSeries, nil)
const loops = 100