mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
1 Commits
v1.140.0
...
weakpointe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9261be945 |
57
lib/promutil/labelscompressorv2_test.go
Normal file
57
lib/promutil/labelscompressorv2_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package promutil
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func TestLabelsCompressorV2(t *testing.T) {
|
||||
lc := NewLabelsCompressorV2()
|
||||
|
||||
labels1 := []prompb.Label{
|
||||
{Name: "label1", Value: "value1"},
|
||||
{Name: "label2", Value: "value2"},
|
||||
{Name: "label3", Value: "value3"},
|
||||
}
|
||||
labels2 := []prompb.Label{
|
||||
{Name: "label3", Value: "value3"},
|
||||
{Name: "label4", Value: "value4"},
|
||||
{Name: "label5", Value: "value5"},
|
||||
}
|
||||
|
||||
compressed1 := lc.Compress(labels1)
|
||||
compressed2 := lc.Compress(labels2)
|
||||
|
||||
runtime.GC()
|
||||
cleaned := lc.Cleanup()
|
||||
if cleaned != 0 {
|
||||
t.Fatalf("lc.Cleanup() should've cleaned zero unused labels, got %d", cleaned)
|
||||
}
|
||||
|
||||
decompressed1 := compressed1.Decompress()
|
||||
if !reflect.DeepEqual(labels1, decompressed1) {
|
||||
t.Fatalf("decompressed labels1 do not match original: got %+v, want %+v", decompressed1, labels1)
|
||||
}
|
||||
|
||||
compressed1 = Key{}
|
||||
runtime.GC()
|
||||
cleaned = lc.Cleanup()
|
||||
if cleaned != 2 {
|
||||
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
|
||||
}
|
||||
|
||||
decompressed2 := compressed2.Decompress()
|
||||
if !reflect.DeepEqual(labels2, decompressed2) {
|
||||
t.Fatalf("decompressed labels2 do not match original: got %+v, want %+v", decompressed2, labels2)
|
||||
}
|
||||
|
||||
compressed2 = Key{}
|
||||
runtime.GC()
|
||||
cleaned = lc.Cleanup()
|
||||
if cleaned != 3 {
|
||||
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
|
||||
}
|
||||
}
|
||||
102
lib/promutil/labelscomressorv2.go
Normal file
102
lib/promutil/labelscomressorv2.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package promutil
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
"weak"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
type Key struct {
|
||||
labelRefs []labelRef
|
||||
}
|
||||
|
||||
func (k Key) Decompress() []prompb.Label {
|
||||
res := make([]prompb.Label, 0, len(k.labelRefs))
|
||||
for i := range k.labelRefs {
|
||||
res = append(res, cloneLabel(*k.labelRefs[i].label))
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type labelRef struct {
|
||||
label *prompb.Label
|
||||
}
|
||||
|
||||
type LabelsCompressorV2 struct {
|
||||
mux sync.Mutex
|
||||
labels map[prompb.Label]weak.Pointer[prompb.Label]
|
||||
}
|
||||
|
||||
func NewLabelsCompressorV2() *LabelsCompressorV2 {
|
||||
lc := &LabelsCompressorV2{
|
||||
labels: make(map[prompb.Label]weak.Pointer[prompb.Label]),
|
||||
}
|
||||
|
||||
go lc.cleanup()
|
||||
|
||||
return lc
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressorV2) Compress(labels []prompb.Label) Key {
|
||||
lc.mux.Lock()
|
||||
defer lc.mux.Unlock()
|
||||
|
||||
labelRefs := make([]labelRef, 0, len(labels))
|
||||
for i := range labels {
|
||||
wl := lc.labels[labels[i]]
|
||||
l := wl.Value()
|
||||
if l == nil {
|
||||
labelKey := cloneLabel(labels[i])
|
||||
labelVal := cloneLabel(labels[i])
|
||||
|
||||
wl = weak.Make(&labelVal)
|
||||
lc.labels[labelKey] = wl
|
||||
|
||||
l = wl.Value()
|
||||
}
|
||||
|
||||
labelRefs = append(labelRefs, labelRef{
|
||||
label: l,
|
||||
})
|
||||
}
|
||||
|
||||
return Key{
|
||||
labelRefs: labelRefs,
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressorV2) cleanup() {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
lc.Cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressorV2) Cleanup() int {
|
||||
lc.mux.Lock()
|
||||
defer lc.mux.Unlock()
|
||||
|
||||
count := 0
|
||||
|
||||
for l, wl := range lc.labels {
|
||||
if wl.Value() != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Println(l)
|
||||
|
||||
count++
|
||||
delete(lc.labels, l)
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
0
streamaggr.yaml
Normal file
0
streamaggr.yaml
Normal file
Reference in New Issue
Block a user