Files
VictoriaMetrics/lib/promutil/labelscompressor.go
Max Kotliar b98e592752 lib/prompb: Merge prompbmarshal logic into prompb
The prompb and prompbmarshal share exactly the same models and provide
marshal and unmarshale capabilities for them. This creates duplication
(changes in one model has to be made in another, case with metadata) and
confusion where for example you compare same looking models but golang
says they are not the same (because of the type).

This commit merge prompbmarshal logic into prompb so the rest of the
code is aligned on prompb models.

Moves samplesPool and labelsPool to WriteRequestUnmarshaller.
Make WriteRequest struct clean from unmarshal logic.

The benchmark shows no significant changes:

$benchstat prompbmarshal.bench prompb2.bench
goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb
cpu: Apple M1 Pro
                                 │ prompbmarshal.bench │           prompb2.bench            │
                                 │       sec/op        │   sec/op     vs base               │
WriteRequestUnmarshalProtobuf-10           189.2µ ± 5%   190.8µ ± 8%       ~ (p=0.579 n=10)
WriteRequestMarshalProtobuf-10             145.3µ ± 7%   143.6µ ± 2%       ~ (p=0.143 n=10)
geomean                                    165.8µ        165.5µ       -0.14%

                                 │ prompbmarshal.bench │            prompb2.bench            │
                                 │         B/s         │     B/s       vs base               │
WriteRequestUnmarshalProtobuf-10          50.42Mi ± 5%   49.99Mi ± 8%       ~ (p=0.593 n=10)
WriteRequestMarshalProtobuf-10            65.64Mi ± 7%   66.39Mi ± 2%       ~ (p=0.143 n=10)
geomean                                   57.53Mi        57.61Mi       +0.14%

                                 │ prompbmarshal.bench │            prompb2.bench             │
                                 │        B/op         │     B/op       vs base               │
WriteRequestUnmarshalProtobuf-10         27.70Ki ±  4%   26.90Ki ±  7%       ~ (p=0.190 n=10)
WriteRequestMarshalProtobuf-10           3.267Ki ± 12%   3.273Ki ± 12%       ~ (p=0.971 n=10)
geomean                                  9.514Ki         9.383Ki        -1.38%

                                 │ prompbmarshal.bench │            prompb2.bench            │
                                 │      allocs/op      │ allocs/op   vs base                 │
WriteRequestUnmarshalProtobuf-10          0.000 ± 0%     0.000 ± 0%       ~ (p=1.000 n=10) ¹
WriteRequestMarshalProtobuf-10            0.000 ± 0%     0.000 ± 0%       ~ (p=1.000 n=10) ¹
geomean                                              ²               +0.00%                ²
¹ all samples are equal
² summaries must be >0 to compute geomean
2025-07-31 01:04:11 +03:00

235 lines
6.4 KiB
Go

package promutil
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
nextIdx atomic.Uint64
totalSizeBytes atomic.Uint64
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load()
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
//
// It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
if len(labels) == 0 {
// Fast path
return append(dst, 0)
}
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
return
}
_ = dst[len(labels)-1]
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
idx := lc.nextIdx.Add(1)
v = idx
labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx,
// so it can be found by possible concurrent goroutines.
//
// We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy)
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded {
// This label has been stored by a concurrent goroutine with different index,
// use it for key consistency in aggrState.
v = vNew
}
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
}
dst[i] = v.(uint64)
}
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
buf := make([]byte, 0, n)
buf = append(buf, label.Name...)
labelName := bytesutil.ToUnsafeString(buf)
buf = append(buf, label.Value...)
labelValue := bytesutil.ToUnsafeString(buf[len(labelName):])
return prompb.Label{
Name: labelName,
Value: labelValue,
}
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
}
tail := src[nSize:]
if labelsLen == 0 {
// fast path - nothing to decode
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail)
}
return dst
}
a := encoding.GetUint64s(int(labelsLen))
var err error
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
}
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label)
}
return dst
}
// labelsMap maps uint64 key to prompb.Label
//
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
type labelsMap struct {
readOnly atomic.Pointer[[]*prompb.Label]
mutableLock sync.Mutex
mutable map[uint64]*prompb.Label
misses uint64
}
// Store stores label under the given idx.
//
// It is safe calling Store from concurrent goroutines.
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
lm.mutableLock.Lock()
if lm.mutable == nil {
lm.mutable = make(map[uint64]*prompb.Label)
}
lm.mutable[idx] = &label
lm.mutableLock.Unlock()
}
// Load returns the label for the given idx.
//
// Load returns false if lm doesn't contain label for the given idx.
//
// It is safe calling Load from concurrent goroutines.
//
// The performance of Load() scales linearly with CPU cores.
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
// Fast path - the label for the given idx has been found in lm.readOnly.
return *pLabel, true
}
}
// Slow path - search in lm.mutable.
return lm.loadSlow(idx)
}
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
lm.mutableLock.Lock()
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
pReadOnly := lm.readOnly.Load()
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
lm.mutableLock.Unlock()
return *pLabel, true
}
}
// The label for the idx wasn't found in readOnly. Search it in mutable.
lm.misses++
pLabel := lm.mutable[idx]
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
lm.moveMutableToReadOnlyLocked(pReadOnly)
lm.misses = 0
}
lm.mutableLock.Unlock()
if pLabel == nil {
return prompb.Label{}, false
}
return *pLabel, true
}
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
if len(lm.mutable) == 0 {
// Nothing to move
return
}
var labels []*prompb.Label
if pReadOnly != nil {
labels = append(labels, *pReadOnly...)
}
for idx, pLabel := range lm.mutable {
if idx < uint64(len(labels)) {
labels[idx] = pLabel
} else {
for idx > uint64(len(labels)) {
labels = append(labels, nil)
}
labels = append(labels, pLabel)
}
}
clear(lm.mutable)
lm.readOnly.Store(&labels)
}