mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-01 16:12:18 +03:00
Compare commits
1 Commits
articles-v
...
weakpointe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9261be945 |
57
lib/promutil/labelscompressorv2_test.go
Normal file
57
lib/promutil/labelscompressorv2_test.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package promutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLabelsCompressorV2(t *testing.T) {
|
||||||
|
lc := NewLabelsCompressorV2()
|
||||||
|
|
||||||
|
labels1 := []prompb.Label{
|
||||||
|
{Name: "label1", Value: "value1"},
|
||||||
|
{Name: "label2", Value: "value2"},
|
||||||
|
{Name: "label3", Value: "value3"},
|
||||||
|
}
|
||||||
|
labels2 := []prompb.Label{
|
||||||
|
{Name: "label3", Value: "value3"},
|
||||||
|
{Name: "label4", Value: "value4"},
|
||||||
|
{Name: "label5", Value: "value5"},
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed1 := lc.Compress(labels1)
|
||||||
|
compressed2 := lc.Compress(labels2)
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
cleaned := lc.Cleanup()
|
||||||
|
if cleaned != 0 {
|
||||||
|
t.Fatalf("lc.Cleanup() should've cleaned zero unused labels, got %d", cleaned)
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressed1 := compressed1.Decompress()
|
||||||
|
if !reflect.DeepEqual(labels1, decompressed1) {
|
||||||
|
t.Fatalf("decompressed labels1 do not match original: got %+v, want %+v", decompressed1, labels1)
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed1 = Key{}
|
||||||
|
runtime.GC()
|
||||||
|
cleaned = lc.Cleanup()
|
||||||
|
if cleaned != 2 {
|
||||||
|
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressed2 := compressed2.Decompress()
|
||||||
|
if !reflect.DeepEqual(labels2, decompressed2) {
|
||||||
|
t.Fatalf("decompressed labels2 do not match original: got %+v, want %+v", decompressed2, labels2)
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed2 = Key{}
|
||||||
|
runtime.GC()
|
||||||
|
cleaned = lc.Cleanup()
|
||||||
|
if cleaned != 3 {
|
||||||
|
t.Fatalf("lc.Cleanup() should've cleaned two unused labels, got %d", cleaned)
|
||||||
|
}
|
||||||
|
}
|
||||||
102
lib/promutil/labelscomressorv2.go
Normal file
102
lib/promutil/labelscomressorv2.go
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
package promutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"weak"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Key struct {
|
||||||
|
labelRefs []labelRef
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k Key) Decompress() []prompb.Label {
|
||||||
|
res := make([]prompb.Label, 0, len(k.labelRefs))
|
||||||
|
for i := range k.labelRefs {
|
||||||
|
res = append(res, cloneLabel(*k.labelRefs[i].label))
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
type labelRef struct {
|
||||||
|
label *prompb.Label
|
||||||
|
}
|
||||||
|
|
||||||
|
type LabelsCompressorV2 struct {
|
||||||
|
mux sync.Mutex
|
||||||
|
labels map[prompb.Label]weak.Pointer[prompb.Label]
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLabelsCompressorV2() *LabelsCompressorV2 {
|
||||||
|
lc := &LabelsCompressorV2{
|
||||||
|
labels: make(map[prompb.Label]weak.Pointer[prompb.Label]),
|
||||||
|
}
|
||||||
|
|
||||||
|
go lc.cleanup()
|
||||||
|
|
||||||
|
return lc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lc *LabelsCompressorV2) Compress(labels []prompb.Label) Key {
|
||||||
|
lc.mux.Lock()
|
||||||
|
defer lc.mux.Unlock()
|
||||||
|
|
||||||
|
labelRefs := make([]labelRef, 0, len(labels))
|
||||||
|
for i := range labels {
|
||||||
|
wl := lc.labels[labels[i]]
|
||||||
|
l := wl.Value()
|
||||||
|
if l == nil {
|
||||||
|
labelKey := cloneLabel(labels[i])
|
||||||
|
labelVal := cloneLabel(labels[i])
|
||||||
|
|
||||||
|
wl = weak.Make(&labelVal)
|
||||||
|
lc.labels[labelKey] = wl
|
||||||
|
|
||||||
|
l = wl.Value()
|
||||||
|
}
|
||||||
|
|
||||||
|
labelRefs = append(labelRefs, labelRef{
|
||||||
|
label: l,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return Key{
|
||||||
|
labelRefs: labelRefs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lc *LabelsCompressorV2) cleanup() {
|
||||||
|
t := time.NewTicker(5 * time.Minute)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
lc.Cleanup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lc *LabelsCompressorV2) Cleanup() int {
|
||||||
|
lc.mux.Lock()
|
||||||
|
defer lc.mux.Unlock()
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
for l, wl := range lc.labels {
|
||||||
|
if wl.Value() != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println(l)
|
||||||
|
|
||||||
|
count++
|
||||||
|
delete(lc.labels, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
return count
|
||||||
|
}
|
||||||
0
streamaggr.yaml
Normal file
0
streamaggr.yaml
Normal file
Reference in New Issue
Block a user