Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
c59c07cac7 lib/promutil/labelscompressor: add rotation of labelscompressor 2026-04-17 19:09:48 +03:00
3 changed files with 180 additions and 21 deletions

View File

@@ -3,6 +3,7 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -11,8 +12,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
type LabelsCompressor struct {
const minRotationInterval = time.Hour
// labelsCompressor compresses []prompb.Label into short binary strings.
type labelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
@@ -21,20 +24,18 @@ type LabelsCompressor struct {
totalSizeBytes atomic.Uint64
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
func (lc *labelsCompressor) sizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
func (lc *labelsCompressor) itemsCount() uint64 {
return lc.nextIdx.Load()
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
// compress compresses labels, appends the compressed labels to dst and returns the result.
//
// It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
// It is safe calling compress from concurrent goroutines.
func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
if len(labels) == 0 {
// Fast path
return append(dst, 0)
@@ -42,13 +43,13 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
lc.compressInto(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
func (lc *labelsCompressor) compressInto(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
return
}
@@ -98,10 +99,10 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
// decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
// It is safe calling decompress from concurrent goroutines.
func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.Label {
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
@@ -124,12 +125,12 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
dst = lc.decompressInternal(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
func (lc *labelsCompressor) decompressInternal(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
@@ -232,3 +233,143 @@ func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
clear(lm.mutable)
lm.readOnly.Store(&labels)
}
// labelsCompressorState holds the current and previous labelsCompressor instances and generation byte that changes between rotations
// and is used to pick a right compressor during decompression
type labelsCompressorState struct {
gen byte
current *labelsCompressor
previous *labelsCompressor
}
// LabelsCompressor is a rotating compressor that maintains two labelsCompressor
// instances to bound memory growth from stale label sets.
//
// Consumers must call Register on creation and Unregister on shutdown for a proper rotation period calculation.
type LabelsCompressor struct {
state atomic.Pointer[labelsCompressorState]
rotationInterval atomic.Int64
startOnce sync.Once
registryMu sync.Mutex
registry []time.Duration
}
// getState returns current labelsCompressorState, which is initialized if needed.
func (lc *LabelsCompressor) getState() *labelsCompressorState {
if s := lc.state.Load(); s != nil {
return s
}
s := &labelsCompressorState{gen: 0, current: &labelsCompressor{}}
// use CompareAndSwap to avoid overwriting pointer which could be stored by another thread
lc.state.CompareAndSwap(nil, s)
return lc.state.Load()
}
// rotate resets current compressor and moves its state to previous.
func (lc *LabelsCompressor) rotate() {
old := lc.getState()
lc.state.Store(&labelsCompressorState{
gen: old.gen ^ 1,
current: &labelsCompressor{},
previous: old.current,
})
}
// Register records maxStaleness for a new consumer, recomputes the rotation
// interval, starts the background rotation goroutine on the first call, and
// returns an id that must be passed to Unregister when the consumer stops.
func (lc *LabelsCompressor) Register(maxStaleness time.Duration) {
lc.registryMu.Lock()
lc.registry = append(lc.registry, maxStaleness)
max := lc.maxStaleness()
lc.registryMu.Unlock()
lc.rotationInterval.Store(int64(max * 2))
lc.startOnce.Do(func() {
lc.getState()
go func() {
for {
time.Sleep(time.Duration(lc.rotationInterval.Load()))
lc.rotate()
}
}()
})
}
// Unregister removes the given consumer ID from the registry and recomputes
// the rotation interval from the remaining registered consumers.
func (lc *LabelsCompressor) Unregister(maxStaleness time.Duration) {
lc.registryMu.Lock()
for i, s := range lc.registry {
if s == maxStaleness {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
max := lc.maxStaleness()
lc.registryMu.Unlock()
lc.rotationInterval.Store(int64(max * 2))
}
// maxStaleness returns the maximum staleness across all registered consumers.
// Must be called with registryMu held.
func (lc *LabelsCompressor) maxStaleness() time.Duration {
maxStaleness := time.Duration(0)
for _, d := range lc.registry {
if d > maxStaleness {
maxStaleness = d
}
}
return max(maxStaleness, minRotationInterval)
}
// Compress appends the generation byte followed by the compressed labels
// to dst and returns the result.
//
// It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
s := lc.getState()
dst = append(dst, s.gen)
return s.current.compress(dst, labels)
}
// Decompress reads the generation byte from key and decompresses the
// remaining bytes using the corresponding labelsCompressor instance.
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, key []byte) []prompb.Label {
if len(key) == 0 {
logger.Panicf("BUG: unexpected empty key in Decompress")
}
gen := key[0]
s := lc.getState()
var c *labelsCompressor
if s.gen == gen {
c = s.current
} else if s.previous != nil {
c = s.previous
} else {
logger.Panicf("BUG: compressor for generation %d is not available; current generation is %d", gen, s.gen)
}
return c.decompress(dst, key[1:])
}
// SizeBytes returns the total memory used by the active compressor instances
func (lc *LabelsCompressor) SizeBytes() uint64 {
s := lc.getState()
n := s.current.sizeBytes()
if s.previous != nil {
n += s.previous.sizeBytes()
}
return n
}
// ItemsCount returns the total number of label entries stored across the active
func (lc *LabelsCompressor) ItemsCount() uint64 {
s := lc.getState()
n := s.current.itemsCount()
if s.previous != nil {
n += s.previous.itemsCount()
}
return n
}

View File

@@ -45,6 +45,7 @@ type Deduplicator struct {
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
lc.Register(2 * interval)
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
@@ -92,6 +93,8 @@ func (d *Deduplicator) MustStop() {
metrics.UnregisterSet(d.ms, true)
d.ms = nil
lc.Unregister(2 * d.interval)
close(d.stopCh)
d.wg.Wait()
}

View File

@@ -53,10 +53,10 @@ var supportedOutputs = []string{
"unique_samples",
}
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutil.LabelsCompressor
// lc is the global rotating labels compressor shared across all aggregators.
var lc promutil.LabelsCompressor
var (
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(lc.SizeBytes())
})
@@ -310,12 +310,22 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
metrics.RegisterSet(ms)
return &Aggregators{
a := &Aggregators{
as: as,
configData: configData,
filePath: filePath,
ms: ms,
}, nil
}
lc.Register(a.maxStaleness())
return a, nil
}
func (a *Aggregators) maxStaleness() time.Duration {
maxStaleness := time.Duration(0)
for _, aggr := range a.as {
maxStaleness = max(aggr.stalenessInterval, maxStaleness)
}
return maxStaleness
}
// IsEnabled returns true if Aggregators has at least one configured aggregator
@@ -335,6 +345,8 @@ func (a *Aggregators) MustStop() {
return
}
lc.Unregister(a.maxStaleness())
metrics.UnregisterSet(a.ms, true)
a.ms = nil
@@ -1078,6 +1090,9 @@ func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte
}
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
if len(key) == 0 {
logger.Panicf("BUG: unexpected empty key in decompressLabels")
}
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
}