|
|
|
|
@@ -3,6 +3,7 @@ package promutil
|
|
|
|
|
import (
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
|
@@ -11,8 +12,10 @@ import (
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// LabelsCompressor compresses []prompb.Label into short binary strings
|
|
|
|
|
type LabelsCompressor struct {
|
|
|
|
|
const minRotationInterval = time.Hour
|
|
|
|
|
|
|
|
|
|
// labelsCompressor compresses []prompb.Label into short binary strings.
|
|
|
|
|
type labelsCompressor struct {
|
|
|
|
|
labelToIdx sync.Map
|
|
|
|
|
idxToLabel labelsMap
|
|
|
|
|
|
|
|
|
|
@@ -21,20 +24,18 @@ type LabelsCompressor struct {
|
|
|
|
|
totalSizeBytes atomic.Uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SizeBytes returns the size of lc data in bytes
|
|
|
|
|
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
|
|
|
|
func (lc *labelsCompressor) sizeBytes() uint64 {
|
|
|
|
|
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ItemsCount returns the number of items in lc
|
|
|
|
|
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
|
|
|
|
func (lc *labelsCompressor) itemsCount() uint64 {
|
|
|
|
|
return lc.nextIdx.Load()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
|
|
|
|
// compress compresses labels, appends the compressed labels to dst and returns the result.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Compress from concurrent goroutines.
|
|
|
|
|
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
|
|
|
|
// It is safe calling compress from concurrent goroutines.
|
|
|
|
|
func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
|
|
|
|
|
if len(labels) == 0 {
|
|
|
|
|
// Fast path
|
|
|
|
|
return append(dst, 0)
|
|
|
|
|
@@ -42,13 +43,13 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
|
|
|
|
|
|
|
|
|
a := encoding.GetUint64s(len(labels) + 1)
|
|
|
|
|
a.A[0] = uint64(len(labels))
|
|
|
|
|
lc.compress(a.A[1:], labels)
|
|
|
|
|
lc.compressInto(a.A[1:], labels)
|
|
|
|
|
dst = encoding.MarshalVarUint64s(dst, a.A)
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
|
|
|
|
func (lc *labelsCompressor) compressInto(dst []uint64, labels []prompb.Label) {
|
|
|
|
|
if len(labels) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
@@ -98,10 +99,10 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
|
|
|
|
// decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Decompress from concurrent goroutines.
|
|
|
|
|
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
|
|
|
|
// It is safe calling decompress from concurrent goroutines.
|
|
|
|
|
func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
|
|
|
|
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
|
|
|
|
|
if nSize <= 0 {
|
|
|
|
|
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
|
|
|
|
|
@@ -124,12 +125,12 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
|
|
|
|
if len(tail) > 0 {
|
|
|
|
|
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
|
|
|
|
}
|
|
|
|
|
dst = lc.decompress(dst, a.A)
|
|
|
|
|
dst = lc.decompressInternal(dst, a.A)
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
|
|
|
|
func (lc *labelsCompressor) decompressInternal(dst []prompb.Label, src []uint64) []prompb.Label {
|
|
|
|
|
for _, idx := range src {
|
|
|
|
|
label, ok := lc.idxToLabel.Load(idx)
|
|
|
|
|
if !ok {
|
|
|
|
|
@@ -232,3 +233,143 @@ func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
|
|
|
|
clear(lm.mutable)
|
|
|
|
|
lm.readOnly.Store(&labels)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// labelsCompressorState holds the current and previous labelsCompressor instances and generation byte that changes between rotations
|
|
|
|
|
// and is used to pick a right compressor during decompression
|
|
|
|
|
type labelsCompressorState struct {
|
|
|
|
|
gen byte
|
|
|
|
|
current *labelsCompressor
|
|
|
|
|
previous *labelsCompressor
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LabelsCompressor is a rotating compressor that maintains two labelsCompressor
|
|
|
|
|
// instances to bound memory growth from stale label sets.
|
|
|
|
|
//
|
|
|
|
|
// Consumers must call Register on creation and Unregister on shutdown for a proper rotation period calculation.
|
|
|
|
|
type LabelsCompressor struct {
|
|
|
|
|
state atomic.Pointer[labelsCompressorState]
|
|
|
|
|
|
|
|
|
|
rotationInterval atomic.Int64
|
|
|
|
|
startOnce sync.Once
|
|
|
|
|
|
|
|
|
|
registryMu sync.Mutex
|
|
|
|
|
registry []time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// getState returns current labelsCompressorState, which is initialized if needed.
|
|
|
|
|
func (lc *LabelsCompressor) getState() *labelsCompressorState {
|
|
|
|
|
if s := lc.state.Load(); s != nil {
|
|
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
s := &labelsCompressorState{gen: 0, current: &labelsCompressor{}}
|
|
|
|
|
// use CompareAndSwap to avoid overwriting pointer which could be stored by another thread
|
|
|
|
|
lc.state.CompareAndSwap(nil, s)
|
|
|
|
|
return lc.state.Load()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// rotate resets current compressor and moves its state to previous.
|
|
|
|
|
func (lc *LabelsCompressor) rotate() {
|
|
|
|
|
old := lc.getState()
|
|
|
|
|
lc.state.Store(&labelsCompressorState{
|
|
|
|
|
gen: old.gen ^ 1,
|
|
|
|
|
current: &labelsCompressor{},
|
|
|
|
|
previous: old.current,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register records maxStaleness for a new consumer, recomputes the rotation
|
|
|
|
|
// interval, starts the background rotation goroutine on the first call, and
|
|
|
|
|
// returns an id that must be passed to Unregister when the consumer stops.
|
|
|
|
|
func (lc *LabelsCompressor) Register(maxStaleness time.Duration) {
|
|
|
|
|
lc.registryMu.Lock()
|
|
|
|
|
lc.registry = append(lc.registry, maxStaleness)
|
|
|
|
|
max := lc.maxStaleness()
|
|
|
|
|
lc.registryMu.Unlock()
|
|
|
|
|
|
|
|
|
|
lc.rotationInterval.Store(int64(max * 2))
|
|
|
|
|
lc.startOnce.Do(func() {
|
|
|
|
|
lc.getState()
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
time.Sleep(time.Duration(lc.rotationInterval.Load()))
|
|
|
|
|
lc.rotate()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unregister removes the given consumer ID from the registry and recomputes
|
|
|
|
|
// the rotation interval from the remaining registered consumers.
|
|
|
|
|
func (lc *LabelsCompressor) Unregister(maxStaleness time.Duration) {
|
|
|
|
|
lc.registryMu.Lock()
|
|
|
|
|
for i, s := range lc.registry {
|
|
|
|
|
if s == maxStaleness {
|
|
|
|
|
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
max := lc.maxStaleness()
|
|
|
|
|
lc.registryMu.Unlock()
|
|
|
|
|
lc.rotationInterval.Store(int64(max * 2))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// maxStaleness returns the maximum staleness across all registered consumers.
|
|
|
|
|
// Must be called with registryMu held.
|
|
|
|
|
func (lc *LabelsCompressor) maxStaleness() time.Duration {
|
|
|
|
|
maxStaleness := time.Duration(0)
|
|
|
|
|
for _, d := range lc.registry {
|
|
|
|
|
if d > maxStaleness {
|
|
|
|
|
maxStaleness = d
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return max(maxStaleness, minRotationInterval)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compress appends the generation byte followed by the compressed labels
|
|
|
|
|
// to dst and returns the result.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Compress from concurrent goroutines.
|
|
|
|
|
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
|
|
|
|
s := lc.getState()
|
|
|
|
|
dst = append(dst, s.gen)
|
|
|
|
|
return s.current.compress(dst, labels)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decompress reads the generation byte from key and decompresses the
|
|
|
|
|
// remaining bytes using the corresponding labelsCompressor instance.
|
|
|
|
|
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, key []byte) []prompb.Label {
|
|
|
|
|
if len(key) == 0 {
|
|
|
|
|
logger.Panicf("BUG: unexpected empty key in Decompress")
|
|
|
|
|
}
|
|
|
|
|
gen := key[0]
|
|
|
|
|
s := lc.getState()
|
|
|
|
|
var c *labelsCompressor
|
|
|
|
|
if s.gen == gen {
|
|
|
|
|
c = s.current
|
|
|
|
|
} else if s.previous != nil {
|
|
|
|
|
c = s.previous
|
|
|
|
|
} else {
|
|
|
|
|
logger.Panicf("BUG: compressor for generation %d is not available; current generation is %d", gen, s.gen)
|
|
|
|
|
}
|
|
|
|
|
return c.decompress(dst, key[1:])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SizeBytes returns the total memory used by the active compressor instances
|
|
|
|
|
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
|
|
|
|
s := lc.getState()
|
|
|
|
|
n := s.current.sizeBytes()
|
|
|
|
|
if s.previous != nil {
|
|
|
|
|
n += s.previous.sizeBytes()
|
|
|
|
|
}
|
|
|
|
|
return n
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ItemsCount returns the total number of label entries stored across the active
|
|
|
|
|
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
|
|
|
|
s := lc.getState()
|
|
|
|
|
n := s.current.itemsCount()
|
|
|
|
|
if s.previous != nil {
|
|
|
|
|
n += s.previous.itemsCount()
|
|
|
|
|
}
|
|
|
|
|
return n
|
|
|
|
|
}
|
|
|
|
|
|