mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-21 18:56:31 +03:00
Compare commits
1 Commits
persitedqu
...
labelscomp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8a1b3f9bd |
@@ -3,32 +3,173 @@ package promutil
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings
|
||||
const lcShardCount = 128
|
||||
|
||||
// lcShard is one shard of the label-to-index map.
|
||||
// Padded to one cache line to prevent false sharing between adjacent shards.
|
||||
type lcShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]uint32
|
||||
_ [32]byte
|
||||
}
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings.
|
||||
// Zero value is ready to use. Each Register call must be paired with Unregister.
|
||||
type LabelsCompressor struct {
|
||||
labelToIdx sync.Map
|
||||
idxToLabel labelsMap
|
||||
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
|
||||
// to reduce RWMutex contention in the concurrent compressFast hot path.
|
||||
labelToIdxShards [lcShardCount]lcShard
|
||||
|
||||
nextIdx atomic.Uint64
|
||||
// generation is incremented after each rotate() call that removes labels.
|
||||
// Callers can use it to detect when cached compressed keys become stale.
|
||||
generation atomic.Uint64
|
||||
|
||||
totalSizeBytes atomic.Uint64
|
||||
idxToLabel atomic.Pointer[[]prompb.Label]
|
||||
|
||||
// usedBitset tracks which indices were used in the current rotation period.
|
||||
// Bits are set atomically from compressFast without mu; grown under mu.
|
||||
usedBitset atomic.Pointer[[]uint64]
|
||||
|
||||
totalSizeBytes uint64
|
||||
mu sync.Mutex
|
||||
|
||||
// freeIdxs holds index slots available for reuse.
|
||||
freeIdxs []uint32
|
||||
|
||||
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
|
||||
pendingIdxs map[uint32]struct{}
|
||||
|
||||
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
|
||||
// Requires absence from both prevBitset and usedBitset to evict a label,
|
||||
// guarding against partial snapshots when rotate fires mid-compress loop.
|
||||
// Only accessed in rotate, which runs in a single goroutine.
|
||||
prevBitset []uint64
|
||||
|
||||
// registry holds staleness intervals of active callers; rotation period = max(registry).
|
||||
registry []time.Duration
|
||||
|
||||
// tickerCh signals runRotate to re-evaluate the rotation period.
|
||||
tickerCh chan struct{}
|
||||
}
|
||||
|
||||
// idleRotationPeriod is the rotation period used when no callers are registered.
|
||||
// It must be long enough that a sleeping goroutine does not consume measurable resources.
|
||||
const idleRotationPeriod = time.Hour
|
||||
|
||||
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
|
||||
// Rotation period equals max(registry). Must be paired with Unregister.
|
||||
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
if lc.tickerCh == nil {
|
||||
lc.tickerCh = make(chan struct{}, 1)
|
||||
go lc.runRotate()
|
||||
}
|
||||
lc.registry = append(lc.registry, stalenessInterval)
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Unregister removes stalenessInterval from the registry.
|
||||
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
|
||||
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
for i, d := range lc.registry {
|
||||
if d == stalenessInterval {
|
||||
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
if tickerCh != nil {
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
|
||||
// must be called with lc.mu held
|
||||
var max time.Duration
|
||||
for _, d := range lc.registry {
|
||||
if d > max {
|
||||
max = d
|
||||
}
|
||||
}
|
||||
if max == 0 {
|
||||
return idleRotationPeriod
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) runRotate() {
|
||||
lc.mu.Lock()
|
||||
period := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
|
||||
timer := time.NewTimer(period)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
lc.rotate()
|
||||
lc.mu.Lock()
|
||||
period = lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
timer.Reset(period)
|
||||
case <-lc.tickerCh:
|
||||
lc.mu.Lock()
|
||||
newPeriod := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
if newPeriod != period {
|
||||
period = newPeriod
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(period)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SizeBytes returns the size of lc data in bytes
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
||||
lc.mu.Lock()
|
||||
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// ItemsCount returns the number of items in lc
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
return lc.nextIdx.Load()
|
||||
lc.mu.Lock()
|
||||
p := lc.idxToLabel.Load()
|
||||
var n uint64
|
||||
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
|
||||
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
|
||||
}
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
@@ -42,46 +183,139 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
|
||||
a := encoding.GetUint64s(len(labels) + 1)
|
||||
a.A[0] = uint64(len(labels))
|
||||
lc.compress(a.A[1:], labels)
|
||||
|
||||
if lc.compressFast(a.A[1:], labels) {
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
lc.compressSlow(a.A[1:], labels)
|
||||
lc.mu.Unlock()
|
||||
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
||||
if len(labels) == 0 {
|
||||
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
var keyBuf [256]byte
|
||||
for i, label := range labels {
|
||||
// Build composite key name+'\x00'+value into the stack buffer.
|
||||
totalLen := len(label.Name) + 1 + len(label.Value)
|
||||
var key string
|
||||
if totalLen <= len(keyBuf) {
|
||||
n := copy(keyBuf[:], label.Name)
|
||||
keyBuf[n] = '\x00'
|
||||
copy(keyBuf[n+1:], label.Value)
|
||||
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
|
||||
} else {
|
||||
key = makeLabelKey(label)
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
}
|
||||
}
|
||||
}
|
||||
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
|
||||
if lc.usedBitset.Load() != p {
|
||||
for _, idx64 := range dst {
|
||||
lc.markUsed(uint32(idx64))
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
|
||||
for i, label := range labels {
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
cloned := cloneLabel(label)
|
||||
idx = lc.newIndex(cloned)
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[string]uint32)
|
||||
}
|
||||
shard.m[key] = idx
|
||||
shard.mu.Unlock()
|
||||
} else {
|
||||
lc.markUsed(idx)
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
}
|
||||
}
|
||||
|
||||
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
|
||||
func (lc *LabelsCompressor) markUsed(idx uint32) {
|
||||
p := lc.usedBitset.Load()
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
_ = dst[len(labels)-1]
|
||||
for i, label := range labels {
|
||||
v, ok := lc.labelToIdx.Load(label)
|
||||
if !ok {
|
||||
idx := lc.nextIdx.Add(1)
|
||||
v = idx
|
||||
labelCopy := cloneLabel(label)
|
||||
|
||||
// Must store idxToLabel entry before labelToIdx,
|
||||
// so it can be found by possible concurrent goroutines.
|
||||
//
|
||||
// We might store duplicated entries for single label with different indexes,
|
||||
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
|
||||
lc.idxToLabel.Store(idx, labelCopy)
|
||||
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
|
||||
if loaded {
|
||||
// This label has been stored by a concurrent goroutine with different index,
|
||||
// use it for key consistency in aggrState.
|
||||
v = vNew
|
||||
}
|
||||
|
||||
// Update lc.totalSizeBytes
|
||||
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
||||
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
||||
lc.totalSizeBytes.Add(entrySizeBytes)
|
||||
bits := *p
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
}
|
||||
dst[i] = v.(uint64)
|
||||
}
|
||||
}
|
||||
|
||||
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) growBitset(idx uint32) {
|
||||
needed := int(idx>>6) + 1
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
if needed <= len(bits) {
|
||||
return
|
||||
}
|
||||
newBits := make([]uint64, needed)
|
||||
copy(newBits, bits)
|
||||
lc.usedBitset.Store(&newBits)
|
||||
}
|
||||
|
||||
func makeLabelKey(label prompb.Label) string {
|
||||
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
|
||||
buf = append(buf, label.Name...)
|
||||
buf = append(buf, '\x00')
|
||||
buf = append(buf, label.Value...)
|
||||
return string(buf)
|
||||
}
|
||||
|
||||
// Generation returns the current generation count.
|
||||
// It increments whenever labels are evicted, invalidating any cached compressed keys.
|
||||
func (lc *LabelsCompressor) Generation() uint64 {
|
||||
return lc.generation.Load()
|
||||
}
|
||||
|
||||
func cloneLabel(label prompb.Label) prompb.Label {
|
||||
// pre-allocate memory for label name and value
|
||||
n := len(label.Name) + len(label.Value)
|
||||
@@ -98,6 +332,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
||||
}
|
||||
}
|
||||
|
||||
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
|
||||
// Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
|
||||
var idx uint32
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
|
||||
if len(lc.freeIdxs) > 0 {
|
||||
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
|
||||
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
next[idx] = cloned
|
||||
lc.idxToLabel.Store(&next)
|
||||
} else {
|
||||
idx = uint32(len(newIdxToLabel))
|
||||
next := append(newIdxToLabel, cloned)
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
|
||||
lc.growBitset(idx)
|
||||
lc.markUsed(idx)
|
||||
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
|
||||
return idx
|
||||
}
|
||||
|
||||
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
@@ -124,111 +387,139 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
||||
if len(tail) > 0 {
|
||||
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
||||
}
|
||||
dst = lc.decompress(dst, a.A)
|
||||
|
||||
p := lc.idxToLabel.Load()
|
||||
if p == nil {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: idxToLabel is nil in Decompress")
|
||||
return dst
|
||||
}
|
||||
labels := *p
|
||||
for _, idx := range a.A {
|
||||
if int(idx) >= len(labels) {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
|
||||
}
|
||||
dst = append(dst, labels[idx])
|
||||
}
|
||||
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
for _, idx := range src {
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: missing label for idx=%d", idx)
|
||||
}
|
||||
dst = append(dst, label)
|
||||
// rotate evicts unused labels and recycles their slots.
|
||||
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
|
||||
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
|
||||
// giving in-flight keys a full rotation period to drain via Decompress.
|
||||
// Must not be called concurrently with itself.
|
||||
func (lc *LabelsCompressor) rotate() {
|
||||
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
|
||||
// Also snapshot idxToLabel for the phase-1 scan.
|
||||
lc.mu.Lock()
|
||||
var currentBits []uint64
|
||||
if p := lc.usedBitset.Load(); p != nil {
|
||||
currentBits = make([]uint64, len(*p))
|
||||
copy(currentBits, *p)
|
||||
newBits := make([]uint64, len(*p))
|
||||
lc.usedBitset.Store(&newBits)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// labelsMap maps uint64 key to prompb.Label
|
||||
//
|
||||
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
||||
type labelsMap struct {
|
||||
readOnly atomic.Pointer[[]*prompb.Label]
|
||||
|
||||
mutableLock sync.Mutex
|
||||
mutable map[uint64]*prompb.Label
|
||||
misses uint64
|
||||
}
|
||||
|
||||
// Store stores label under the given idx.
|
||||
//
|
||||
// It is safe calling Store from concurrent goroutines.
|
||||
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
|
||||
lm.mutableLock.Lock()
|
||||
if lm.mutable == nil {
|
||||
lm.mutable = make(map[uint64]*prompb.Label)
|
||||
}
|
||||
lm.mutable[idx] = &label
|
||||
lm.mutableLock.Unlock()
|
||||
}
|
||||
|
||||
// Load returns the label for the given idx.
|
||||
//
|
||||
// Load returns false if lm doesn't contain label for the given idx.
|
||||
//
|
||||
// It is safe calling Load from concurrent goroutines.
|
||||
//
|
||||
// The performance of Load() scales linearly with CPU cores.
|
||||
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
|
||||
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
// Fast path - the label for the given idx has been found in lm.readOnly.
|
||||
return *pLabel, true
|
||||
// Skip if nothing was used; all-zero snapshots from rapid rotation would
|
||||
// advance pendingIdxs and reclaim slots before in-flight keys drain.
|
||||
anyUsed := false
|
||||
for _, w := range currentBits {
|
||||
if w != 0 {
|
||||
anyUsed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - search in lm.mutable.
|
||||
return lm.loadSlow(idx)
|
||||
}
|
||||
|
||||
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
|
||||
lm.mutableLock.Lock()
|
||||
|
||||
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
||||
pReadOnly := lm.readOnly.Load()
|
||||
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
lm.mutableLock.Unlock()
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
|
||||
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
||||
lm.misses++
|
||||
pLabel := lm.mutable[idx]
|
||||
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
||||
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
||||
lm.misses = 0
|
||||
}
|
||||
lm.mutableLock.Unlock()
|
||||
|
||||
if pLabel == nil {
|
||||
return prompb.Label{}, false
|
||||
}
|
||||
return *pLabel, true
|
||||
}
|
||||
|
||||
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
||||
if len(lm.mutable) == 0 {
|
||||
// Nothing to move
|
||||
if !anyUsed {
|
||||
lc.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var labels []*prompb.Label
|
||||
if pReadOnly != nil {
|
||||
labels = append(labels, *pReadOnly...)
|
||||
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
|
||||
prevSnap := lc.prevBitset
|
||||
lc.prevBitset = currentBits
|
||||
pendingIdxs := lc.pendingIdxs
|
||||
idxSnap := lc.idxToLabel.Load()
|
||||
lc.mu.Unlock()
|
||||
|
||||
// A label is used if present in either the current or previous period snapshot.
|
||||
isUsed := func(idx uint32) bool {
|
||||
word := idx >> 6
|
||||
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
|
||||
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
|
||||
return inCurrent || inPrev
|
||||
}
|
||||
for idx, pLabel := range lm.mutable {
|
||||
if idx < uint64(len(labels)) {
|
||||
labels[idx] = pLabel
|
||||
} else {
|
||||
for idx > uint64(len(labels)) {
|
||||
labels = append(labels, nil)
|
||||
|
||||
// Phase 2: reclaim slots evicted last rotation if still unused.
|
||||
var toReclaim []uint32
|
||||
var nextRelease map[uint32]struct{}
|
||||
for idx := range pendingIdxs {
|
||||
if isUsed(idx) {
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
labels = append(labels, pLabel)
|
||||
nextRelease[idx] = struct{}{}
|
||||
} else {
|
||||
toReclaim = append(toReclaim, idx)
|
||||
}
|
||||
}
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
|
||||
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
|
||||
// cache-line contention with concurrent compressFast lookups.
|
||||
phase1Evicted := false
|
||||
if idxSnap != nil {
|
||||
for i, label := range *idxSnap {
|
||||
if label == (prompb.Label{}) {
|
||||
continue // freed slot
|
||||
}
|
||||
idx := uint32(i)
|
||||
if _, inPending := pendingIdxs[idx]; inPending {
|
||||
continue // already queued for release
|
||||
}
|
||||
if isUsed(idx) {
|
||||
continue
|
||||
}
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.Lock()
|
||||
delete(shard.m, key)
|
||||
shard.mu.Unlock()
|
||||
phase1Evicted = true
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
nextRelease[idx] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if phase1Evicted {
|
||||
lc.generation.Add(1)
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
if len(toReclaim) > 0 {
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
for _, idx := range toReclaim {
|
||||
label := next[idx]
|
||||
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
|
||||
if lc.totalSizeBytes >= entrySize {
|
||||
lc.totalSizeBytes -= entrySize
|
||||
}
|
||||
next[idx] = prompb.Label{}
|
||||
lc.freeIdxs = append(lc.freeIdxs, idx)
|
||||
}
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
lc.pendingIdxs = nextRelease
|
||||
lc.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -8,47 +8,96 @@ import (
|
||||
)
|
||||
|
||||
func BenchmarkLabelsCompressorCompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
Sink.Add(uint64(len(dst)))
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
for _, labels := range series {
|
||||
lc.Compress(nil, labels)
|
||||
}
|
||||
})
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
Sink.Add(uint64(len(dst)))
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
}
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
}
|
||||
|
||||
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
datas := make([][]byte, len(series))
|
||||
var dst []byte
|
||||
for i, labels := range series {
|
||||
dstLen := len(dst)
|
||||
dst = lc.Compress(dst, labels)
|
||||
datas[i] = dst[dstLen:]
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
datas := make([][]byte, len(series))
|
||||
var buf []byte
|
||||
for i, labels := range series {
|
||||
bufLen := len(buf)
|
||||
buf = lc.Compress(buf, labels)
|
||||
datas[i] = buf[bufLen:]
|
||||
}
|
||||
})
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
}
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
}
|
||||
|
||||
var Sink atomic.Uint64
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
)
|
||||
|
||||
func BenchmarkDedupAggr(b *testing.B) {
|
||||
@@ -63,14 +64,15 @@ func newBenchSamples(count int) []pushSample {
|
||||
}
|
||||
labelsLen := len(labels)
|
||||
samples := make([]pushSample, count)
|
||||
var lc promutil.LabelsCompressor
|
||||
var keyBuf []byte
|
||||
for i := range samples {
|
||||
sample := &samples[i]
|
||||
labels = append(labels[:labelsLen], prompb.Label{
|
||||
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
|
||||
Name: "app",
|
||||
Value: fmt.Sprintf("instance-%d", i),
|
||||
})
|
||||
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
|
||||
keyBuf = lc.Compress(keyBuf[:0], labels)
|
||||
sample.key = string(keyBuf)
|
||||
sample.value = float64(i)
|
||||
}
|
||||
|
||||
@@ -77,6 +77,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
|
||||
lc.Register(2 * interval)
|
||||
|
||||
d.wg.Go(func() {
|
||||
d.runFlusher(pushFunc)
|
||||
})
|
||||
@@ -86,6 +88,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
// MustStop stops d.
|
||||
func (d *Deduplicator) MustStop() {
|
||||
lc.Unregister(2 * d.interval)
|
||||
|
||||
metrics.UnregisterSet(d.ms, true)
|
||||
d.ms = nil
|
||||
|
||||
@@ -203,7 +207,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
|
||||
dstSamples := ctx.samples
|
||||
for _, ps := range samples {
|
||||
labelsLen := len(labels)
|
||||
labels = decompressLabels(labels, ps.key)
|
||||
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
|
||||
|
||||
dstSamplesLen := len(dstSamples)
|
||||
dstSamples = append(dstSamples, prompb.Sample{
|
||||
|
||||
@@ -17,29 +17,27 @@ type aggrOutputs struct {
|
||||
outputSamples *metrics.Counter
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
|
||||
src := bytesutil.ToUnsafeBytes(key)
|
||||
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||
if nSize <= 0 {
|
||||
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
|
||||
}
|
||||
src = src[nSize:]
|
||||
outputKey := src[:outputKeyLen]
|
||||
if !ao.useInputKey {
|
||||
return key, bytesutil.ToUnsafeString(outputKey)
|
||||
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
|
||||
if !ao.useInputKey || int(outputKeyLen) == len(src) {
|
||||
return outputKey, outputKey
|
||||
}
|
||||
inputKey := src[outputKeyLen:]
|
||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
|
||||
return outputKey, inputKey
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
|
||||
var inputKey, outputKey string
|
||||
var sample *pushSample
|
||||
var outputs []aggrValue
|
||||
var nv *aggrValues
|
||||
for i := range samples {
|
||||
sample = &samples[i]
|
||||
inputKey, outputKey = ao.getInputOutputKey(sample.key)
|
||||
sample := &samples[i]
|
||||
outputKey, inputKey := ao.getInputOutputKey(sample.key)
|
||||
|
||||
again:
|
||||
v, ok := ao.m.Load(outputKey)
|
||||
@@ -81,7 +79,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
|
||||
}
|
||||
av.mu.Unlock()
|
||||
if deleted {
|
||||
// The entry has been deleted by the concurrent call to flush
|
||||
// The entry has been deleted by the concurrent call to flush.
|
||||
// Try obtaining and updating the entry again.
|
||||
goto again
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -24,6 +23,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
@@ -257,6 +257,10 @@ type Config struct {
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
|
||||
// workCh limits the number of concurrent aggregator.Push calls.
|
||||
// Pre-allocated once to avoid per-Push channel allocation.
|
||||
workCh chan struct{}
|
||||
|
||||
// configData contains marshaled configs.
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
@@ -287,6 +291,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
}
|
||||
|
||||
ms := metrics.NewSet()
|
||||
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
for i, cfg := range cfgs {
|
||||
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
|
||||
@@ -307,6 +312,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
|
||||
configData: configData,
|
||||
filePath: filePath,
|
||||
ms: ms,
|
||||
@@ -364,19 +370,21 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
// use all available CPU cores to copy time-series into aggregators
|
||||
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
var wg sync.WaitGroup
|
||||
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
for _, aggr := range a.as {
|
||||
concurrencyChan <- struct{}{}
|
||||
wg.Go(func() {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
<-concurrencyChan
|
||||
})
|
||||
if len(a.as) == 1 {
|
||||
a.as[0].Push(tss, matchIdxs)
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
// Use all available CPU cores to push time-series into aggregators in parallel.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
var wg sync.WaitGroup
|
||||
for _, aggr := range a.as {
|
||||
a.workCh <- struct{}{}
|
||||
wg.Go(func() {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
<-a.workCh
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return matchIdxs
|
||||
@@ -413,8 +421,8 @@ type aggregator struct {
|
||||
ignoreOldSamples bool
|
||||
enableWindows bool
|
||||
|
||||
by []string
|
||||
without []string
|
||||
bySet map[string]struct{}
|
||||
withoutSet map[string]struct{}
|
||||
aggregateOnlyByTime bool
|
||||
|
||||
// interval is the interval between flushes
|
||||
@@ -446,6 +454,8 @@ type aggregator struct {
|
||||
// for `interval: 1m`, `by: [job]`
|
||||
suffix string
|
||||
|
||||
seriesKeyCache seriesKeyCache
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
|
||||
@@ -645,8 +655,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
enableWindows: enableWindows,
|
||||
stalenessInterval: stalenessInterval,
|
||||
|
||||
by: by,
|
||||
without: without,
|
||||
bySet: makeStringSet(by),
|
||||
withoutSet: makeStringSet(without),
|
||||
aggregateOnlyByTime: aggregateOnlyByTime,
|
||||
|
||||
interval: interval,
|
||||
@@ -709,6 +719,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
}
|
||||
a.cs.Store(cs)
|
||||
|
||||
lc.Register(a.stalenessInterval)
|
||||
|
||||
a.wg.Go(func() {
|
||||
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
|
||||
})
|
||||
@@ -915,6 +927,7 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
|
||||
//
|
||||
// If pushFunc is nil, then the aggregator state is just reset.
|
||||
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
|
||||
a.seriesKeyCache.reset()
|
||||
startTime := time.Now()
|
||||
ao := a.aggrOutputs
|
||||
|
||||
@@ -943,6 +956,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
//
|
||||
// The aggregator stops pushing the aggregated metrics after this call.
|
||||
func (a *aggregator) MustStop() {
|
||||
lc.Unregister(a.stalenessInterval)
|
||||
close(a.stopCh)
|
||||
a.wg.Wait()
|
||||
}
|
||||
@@ -952,7 +966,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
buf := ctx.buf
|
||||
labels := &ctx.labels
|
||||
inputLabels := &ctx.inputLabels
|
||||
outputLabels := &ctx.outputLabels
|
||||
@@ -986,19 +999,22 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
labels.Sort()
|
||||
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
h := computeLabelsFingerprint(labels.Labels)
|
||||
gen := lc.Generation()
|
||||
key, ok := a.seriesKeyCache.get(h, gen)
|
||||
if !ok {
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
bufLen := len(ctx.buf)
|
||||
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
|
||||
key = string(ctx.buf[bufLen:])
|
||||
a.seriesKeyCache.set(h, gen, key)
|
||||
}
|
||||
|
||||
bufLen := len(buf)
|
||||
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
|
||||
// key remains valid only by the end of this function and can't be reused after
|
||||
// do not intern key because number of unique keys could be too high
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, s := range ts.Samples {
|
||||
if math.IsNaN(s.Value) {
|
||||
// Skip NaN values
|
||||
@@ -1042,8 +1058,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
|
||||
|
||||
ctx.buf = buf
|
||||
|
||||
pushSamples := a.aggrOutputs.pushSamples
|
||||
if a.da != nil {
|
||||
pushSamples = a.da.pushSamples
|
||||
@@ -1060,18 +1074,13 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
|
||||
bb := bbPool.Get()
|
||||
bb.B = lc.Compress(bb.B, outputLabels)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
||||
dst = append(dst, bb.B...)
|
||||
bbPool.Put(bb)
|
||||
dst = lc.Compress(dst, inputLabels)
|
||||
return dst
|
||||
}
|
||||
|
||||
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
|
||||
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
|
||||
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
|
||||
ctx.buf = append(ctx.buf, ctx.keybuf...)
|
||||
if len(inputLabels) > 0 {
|
||||
ctx.buf = lc.Compress(ctx.buf, inputLabels)
|
||||
}
|
||||
}
|
||||
|
||||
type pushCtx struct {
|
||||
@@ -1081,6 +1090,7 @@ type pushCtx struct {
|
||||
inputLabels promutil.Labels
|
||||
outputLabels promutil.Labels
|
||||
buf []byte
|
||||
keybuf []byte
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
@@ -1115,10 +1125,10 @@ func putPushCtx(ctx *pushCtx) {
|
||||
|
||||
var pushCtxPool sync.Pool
|
||||
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
|
||||
if len(without) > 0 {
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
|
||||
if len(withoutSet) > 0 {
|
||||
for _, label := range labels {
|
||||
if slices.Contains(without, label.Name) {
|
||||
if _, ok := withoutSet[label.Name]; ok {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1126,7 +1136,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
|
||||
}
|
||||
} else {
|
||||
for _, label := range labels {
|
||||
if !slices.Contains(by, label.Name) {
|
||||
if _, ok := bySet[label.Name]; !ok {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1136,6 +1146,17 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
|
||||
return dstInput, dstOutput
|
||||
}
|
||||
|
||||
func makeStringSet(keys []string) map[string]struct{} {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
m := make(map[string]struct{}, len(keys))
|
||||
for _, k := range keys {
|
||||
m[k] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
|
||||
v := flushCtxPool.Get()
|
||||
if v == nil {
|
||||
@@ -1235,7 +1256,7 @@ func (ctx *flushCtx) flushSeries() {
|
||||
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1257,7 +1278,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1345,3 +1366,74 @@ func sortAndRemoveDuplicates(a []string) []string {
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
type seriesKeyCache struct {
|
||||
shards [64]seriesKeyCacheShard
|
||||
}
|
||||
|
||||
type seriesKeyCacheShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[uint64]seriesCacheEntry
|
||||
_ [32]byte // cache-line padding
|
||||
}
|
||||
|
||||
type seriesCacheEntry struct {
|
||||
generation uint64
|
||||
key string
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.RLock()
|
||||
e, ok := shard.m[h]
|
||||
shard.mu.RUnlock()
|
||||
if !ok || e.generation != generation {
|
||||
return "", false
|
||||
}
|
||||
return e.key, true
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) set(h, generation uint64, key string) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[uint64]seriesCacheEntry)
|
||||
}
|
||||
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) reset() {
|
||||
for i := range c.shards {
|
||||
shard := &c.shards[i]
|
||||
shard.mu.Lock()
|
||||
shard.m = nil
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
|
||||
var buf [512]byte
|
||||
n := 0
|
||||
for i, l := range labels {
|
||||
need := len(l.Name) + 1 + len(l.Value) + 1
|
||||
if n+need > len(buf) {
|
||||
b := make([]byte, n, n+need*2)
|
||||
copy(b, buf[:n])
|
||||
for _, l2 := range labels[i:] {
|
||||
b = append(b, l2.Name...)
|
||||
b = append(b, '\x00')
|
||||
b = append(b, l2.Value...)
|
||||
b = append(b, '\x00')
|
||||
}
|
||||
return xxhash.Sum64(b)
|
||||
}
|
||||
n += copy(buf[n:], l.Name)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
n += copy(buf[n:], l.Value)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
}
|
||||
return xxhash.Sum64(buf[:n])
|
||||
}
|
||||
|
||||
@@ -45,13 +45,16 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
a := newBenchAggregators([]string{output}, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var matchIdxs []uint32
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -82,8 +85,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
|
||||
a := make([]string, 0, seriesCount)
|
||||
for j := range seriesCount {
|
||||
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
|
||||
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
|
||||
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
|
||||
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
|
||||
a = append(a, s)
|
||||
}
|
||||
metrics := strings.Join(a, "\n")
|
||||
@@ -101,13 +104,16 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var matchIdxs []uint32
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -117,21 +123,17 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
}
|
||||
|
||||
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
outputsQuoted := make([]string, len(outputs))
|
||||
var config string
|
||||
for i := range outputs {
|
||||
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
|
||||
cfg := fmt.Sprintf(`
|
||||
var b strings.Builder
|
||||
for _, output := range outputs {
|
||||
fmt.Fprintf(&b, `
|
||||
- match: http_requests_total
|
||||
interval: 24h
|
||||
by: [job]
|
||||
outputs: [%s]
|
||||
`, stringsutil.JSONString(outputs[i]))
|
||||
config += cfg
|
||||
|
||||
`, stringsutil.JSONString(output))
|
||||
}
|
||||
|
||||
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
|
||||
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user