Compare commits

...

1 Commits

Author SHA1 Message Date
Max Kotliar
e9261be945 lib/promutil: Weak pointer based labels compressor 2025-08-20 20:02:11 +03:00
3 changed files with 159 additions and 0 deletions

View File

@@ -0,0 +1,57 @@
package promutil
import (
"reflect"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func TestLabelsCompressorV2(t *testing.T) {
lc := NewLabelsCompressorV2()
labels1 := []prompb.Label{
{Name: "label1", Value: "value1"},
{Name: "label2", Value: "value2"},
{Name: "label3", Value: "value3"},
}
labels2 := []prompb.Label{
{Name: "label3", Value: "value3"},
{Name: "label4", Value: "value4"},
{Name: "label5", Value: "value5"},
}
compressed1 := lc.Compress(labels1)
compressed2 := lc.Compress(labels2)
runtime.GC()
cleaned := lc.Cleanup()
if cleaned != 0 {
t.Fatalf("lc.Cleanup() should've cleaned zero unused labels, got %d", cleaned)
}
decompressed1 := compressed1.Decompress()
if !reflect.DeepEqual(labels1, decompressed1) {
t.Fatalf("decompressed labels1 do not match original: got %+v, want %+v", decompressed1, labels1)
}
compressed1 = Key{}
runtime.GC()
cleaned = lc.Cleanup()
if cleaned != 2 {
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
}
decompressed2 := compressed2.Decompress()
if !reflect.DeepEqual(labels2, decompressed2) {
t.Fatalf("decompressed labels2 do not match original: got %+v, want %+v", decompressed2, labels2)
}
compressed2 = Key{}
runtime.GC()
cleaned = lc.Cleanup()
if cleaned != 3 {
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
}
}

View File

@@ -0,0 +1,102 @@
package promutil
import (
"log"
"sync"
"time"
"weak"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
type Key struct {
labelRefs []labelRef
}
func (k Key) Decompress() []prompb.Label {
res := make([]prompb.Label, 0, len(k.labelRefs))
for i := range k.labelRefs {
res = append(res, cloneLabel(*k.labelRefs[i].label))
}
return res
}
type labelRef struct {
label *prompb.Label
}
type LabelsCompressorV2 struct {
mux sync.Mutex
labels map[prompb.Label]weak.Pointer[prompb.Label]
}
func NewLabelsCompressorV2() *LabelsCompressorV2 {
lc := &LabelsCompressorV2{
labels: make(map[prompb.Label]weak.Pointer[prompb.Label]),
}
go lc.cleanup()
return lc
}
func (lc *LabelsCompressorV2) Compress(labels []prompb.Label) Key {
lc.mux.Lock()
defer lc.mux.Unlock()
labelRefs := make([]labelRef, 0, len(labels))
for i := range labels {
wl := lc.labels[labels[i]]
l := wl.Value()
if l == nil {
labelKey := cloneLabel(labels[i])
labelVal := cloneLabel(labels[i])
wl = weak.Make(&labelVal)
lc.labels[labelKey] = wl
l = wl.Value()
}
labelRefs = append(labelRefs, labelRef{
label: l,
})
}
return Key{
labelRefs: labelRefs,
}
}
func (lc *LabelsCompressorV2) cleanup() {
t := time.NewTicker(5 * time.Minute)
defer t.Stop()
for {
select {
case <-t.C:
lc.Cleanup()
}
}
}
func (lc *LabelsCompressorV2) Cleanup() int {
lc.mux.Lock()
defer lc.mux.Unlock()
count := 0
for l, wl := range lc.labels {
if wl.Value() != nil {
continue
}
log.Println(l)
count++
delete(lc.labels, l)
}
return count
}

0
streamaggr.yaml Normal file
View File