mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-29 13:35:53 +03:00
Compare commits
8 Commits
labelcompr
...
issue-1059
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27f3c6ba45 | ||
|
|
c558291847 | ||
|
|
9bd219fdc7 | ||
|
|
ec9d37ce36 | ||
|
|
607630b9f5 | ||
|
|
f4df18d2db | ||
|
|
29bc38871d | ||
|
|
3f35399c24 |
84
app/vmagent/remotewrite/obfuscation.go
Normal file
84
app/vmagent/remotewrite/obfuscation.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package remotewrite
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
)
|
||||
|
||||
type obfuscationCtx struct {
|
||||
labels []prompb.Label
|
||||
}
|
||||
|
||||
func (ctx *obfuscationCtx) Reset() {
|
||||
promrelabel.CleanLabels(ctx.labels)
|
||||
ctx.labels = ctx.labels[:0]
|
||||
}
|
||||
|
||||
var obfuscationCtxPool = &sync.Pool{
|
||||
New: func() any {
|
||||
return &obfuscationCtx{}
|
||||
},
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) initObfuscationConfig() {
|
||||
if len(*obfuscationLabels) == 0 {
|
||||
return
|
||||
}
|
||||
idx := rwctx.idx
|
||||
rwctx.obfuscationLabels = make(map[string]struct{})
|
||||
rwObfuscationLabels := obfuscationLabels.GetOptionalArg(idx)
|
||||
rwObfuscationLabelsList := strings.Split(rwObfuscationLabels, "^^")
|
||||
|
||||
for _, label := range rwObfuscationLabelsList {
|
||||
rwctx.obfuscationLabels[label] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) applyObfuscation(tss []prompb.TimeSeries, ctx *obfuscationCtx) []prompb.TimeSeries {
|
||||
if len(rwctx.obfuscationLabels) == 0 || len(tss) == 0 {
|
||||
return tss
|
||||
}
|
||||
cacheObfuscatedResult := make(map[string]string)
|
||||
poolLabels := ctx.labels[:0]
|
||||
for i := range tss {
|
||||
ts := &tss[i]
|
||||
labels := ts.Labels
|
||||
j := 0
|
||||
needToObfuscate := false
|
||||
for ; j < len(labels); j++ {
|
||||
label := &labels[j]
|
||||
if _, ok := rwctx.obfuscationLabels[label.Name]; !ok {
|
||||
continue
|
||||
}
|
||||
needToObfuscate = true
|
||||
break
|
||||
}
|
||||
if !needToObfuscate {
|
||||
continue
|
||||
}
|
||||
// Copy the label array to apply obfuscation
|
||||
poolLabelsLen := len(poolLabels)
|
||||
labels = append(poolLabels, labels...)
|
||||
ts.Labels = labels[poolLabelsLen:]
|
||||
for ; j < len(labels); j++ {
|
||||
label := &labels[j]
|
||||
if _, ok := rwctx.obfuscationLabels[label.Name]; !ok {
|
||||
continue
|
||||
}
|
||||
if obfuscatedValue, ok := cacheObfuscatedResult[label.Value]; ok {
|
||||
// fast path: the obfuscated result was calculated before
|
||||
label.Value = obfuscatedValue
|
||||
} else {
|
||||
obfuscatedResult := sha256.Sum256([]byte(label.Value))
|
||||
cacheObfuscatedResult[label.Value] = hex.EncodeToString(obfuscatedResult[:])
|
||||
label.Value = cacheObfuscatedResult[label.Value]
|
||||
}
|
||||
}
|
||||
}
|
||||
return tss
|
||||
}
|
||||
@@ -102,6 +102,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
obfuscationLabels = flagutil.NewArrayString("remoteWrite.obfuscationLabels", "List of label names whose values must be obfuscated before sending to the corresponding -remoteWrite.url."+
|
||||
"Multiple label names should be separated by `^^`, e.g. \"job^^instance,ip\". By default, label obfuscation is disabled")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -833,6 +836,8 @@ type remoteWriteCtx struct {
|
||||
pss []*pendingSeries
|
||||
pssNextIdx atomic.Uint64
|
||||
|
||||
obfuscationLabels map[string]struct{}
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
|
||||
@@ -937,6 +942,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
rwctx.initObfuscationConfig()
|
||||
|
||||
return rwctx
|
||||
}
|
||||
@@ -1102,6 +1108,7 @@ func (rwctx *remoteWriteCtx) tryPushMetadataInternal(mms []prompb.MetricMetadata
|
||||
func (rwctx *remoteWriteCtx) tryPushTimeSeriesInternal(tss []prompb.TimeSeries) bool {
|
||||
var rctx *relabelCtx
|
||||
var v *[]prompb.TimeSeries
|
||||
var octx *obfuscationCtx
|
||||
defer func() {
|
||||
if rctx == nil {
|
||||
return
|
||||
@@ -1120,6 +1127,24 @@ func (rwctx *remoteWriteCtx) tryPushTimeSeriesInternal(tss []prompb.TimeSeries)
|
||||
rctx.appendExtraLabels(tss, labelsGlobal)
|
||||
}
|
||||
|
||||
if len(rwctx.obfuscationLabels) != 0 {
|
||||
if rctx == nil {
|
||||
shadowTss := tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*shadowTss, tss...)
|
||||
defer func() {
|
||||
*shadowTss = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(shadowTss)
|
||||
}()
|
||||
}
|
||||
octx = obfuscationCtxPool.Get().(*obfuscationCtx)
|
||||
defer func() {
|
||||
octx.Reset()
|
||||
obfuscationCtxPool.Put(octx)
|
||||
}()
|
||||
|
||||
tss = rwctx.applyObfuscation(tss, octx)
|
||||
}
|
||||
|
||||
pss := rwctx.pss
|
||||
idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss))
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package remotewrite
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
@@ -374,3 +376,124 @@ func TestCalculateHealthyRwctxIdx(t *testing.T) {
|
||||
f(1, []int{0}, nil)
|
||||
f(1, []int{}, []int{0})
|
||||
}
|
||||
|
||||
func TestRemoteWriteObfuscation(t *testing.T) {
|
||||
f := func(obfuscationLabelList string, inputTss []prompb.TimeSeries, expectedTss []prompb.TimeSeries) {
|
||||
t.Helper()
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: 0,
|
||||
}
|
||||
defer metrics.UnregisterAllMetrics()
|
||||
originValue := *obfuscationLabels
|
||||
defer func() {
|
||||
*obfuscationLabels = originValue
|
||||
}()
|
||||
*obfuscationLabels = []string{obfuscationLabelList}
|
||||
rwctx.initObfuscationConfig()
|
||||
octx := obfuscationCtx{}
|
||||
outputTss := rwctx.applyObfuscation(inputTss, &octx)
|
||||
|
||||
if !reflect.DeepEqual(expectedTss, outputTss) {
|
||||
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", outputTss, expectedTss)
|
||||
}
|
||||
}
|
||||
|
||||
sha256Result := func(str string) string {
|
||||
sha256Result := sha256.Sum256([]byte(str))
|
||||
return hex.EncodeToString(sha256Result[:])
|
||||
}
|
||||
|
||||
// 1. obfuscation is not set.
|
||||
f("",
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: "123"},
|
||||
{Name: "instance", Value: "1234"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: "123"},
|
||||
{Name: "instance", Value: "1234"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// 2. obfuscate the value of "ip" label
|
||||
f("ip",
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: "123"},
|
||||
{Name: "instance", Value: "1234"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: sha256Result("123")},
|
||||
{Name: "instance", Value: "1234"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// 3. obfuscate the values of "ip" and "instance"
|
||||
f("ip^^instance",
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: "123"},
|
||||
{Name: "instance", Value: "1234"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "123"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "ip", Value: sha256Result("123")},
|
||||
{Name: "instance", Value: sha256Result("1234")},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "123"},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{Value: 1, Timestamp: 0},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -25,9 +25,9 @@ The sandbox cluster installation runs under the constant load generated by
|
||||
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
## tip
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce obfuscation functionality for remote write. By setting `-remoteWrite.obfuscationLabels`, the values of the specific labels will be anonymized before they're sent to corresponding `-remoteWrite.url`. See [#10599](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10599).
|
||||
|
||||
## [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0)
|
||||
|
||||
Released at 2026-04-10
|
||||
|
||||
**Update Note 1:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): [CSV export](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-export-csv-data) (`/api/v1/export/csv`) now adds a header row as the first line of the response, so existing CSV-processing scripts may need to skip this header. See [#10666](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10666).
|
||||
|
||||
@@ -289,14 +289,16 @@ flowchart TB
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#obfuscating-label-values">obfuscate labels</a><br><b>-remoteWrite.obfuscationLabels</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#obfuscating-label-values">obfuscate labels</a><br><b>-remoteWrite.obfuscationLabels</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
@@ -529,6 +531,24 @@ Extra labels can be added to metrics collected by `vmagent` via the following me
|
||||
/path/to/vmagent -remoteWrite.url=http://127.0.0.1:8428/api/v1/write?extra_label="env=prod"
|
||||
```
|
||||
|
||||
## Obfuscating label values
|
||||
|
||||
Before sending metrics to `-remoteWrite.url`, `vmagent` can anonymize the values of specific labels in the metrics via setting `-remoteWrite.obfuscationLabels`.
|
||||
Sometimes, some of the `-remoteWrite.url` may point to external services, such as monitoring service vendor outside the department or company. For security and compliance requirements,
|
||||
obfuscating the specific label values (e.g. ip, host, datacenter, etc.) before sending them to these external services will be useful.
|
||||
|
||||
Use `-remoteWrite.obfuscationLabels` to specify the labels that need to be obfuscated before sending to `-remoteWrite.url`. Multiple labels should be separated by `^^`:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://<external-service1> \
|
||||
-remoteWrite.obfuscationLabels='instance^^datacenter' \
|
||||
-remoteWrite.url=http://<external-service2> \
|
||||
-remoteWrite.obfuscationLabels='instance' \
|
||||
-remoteWrite.url=http://<internal-service> \
|
||||
-remoteWrite.obfuscationLabels=''
|
||||
```
|
||||
|
||||
## Automatically generated metrics
|
||||
|
||||
`vmagent` automatically generates the following metrics per each scrape of every [Prometheus-compatible target](#how-to-collect-metrics-in-prometheus-format)
|
||||
|
||||
@@ -3,7 +3,6 @@ package promutil
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -12,10 +11,8 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
const minRotationInterval = time.Hour
|
||||
|
||||
// labelsCompressor compresses []prompb.Label into short binary strings.
|
||||
type labelsCompressor struct {
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings
|
||||
type LabelsCompressor struct {
|
||||
labelToIdx sync.Map
|
||||
idxToLabel labelsMap
|
||||
|
||||
@@ -24,18 +21,20 @@ type labelsCompressor struct {
|
||||
totalSizeBytes atomic.Uint64
|
||||
}
|
||||
|
||||
func (lc *labelsCompressor) sizeBytes() uint64 {
|
||||
// SizeBytes returns the size of lc data in bytes
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
||||
}
|
||||
|
||||
func (lc *labelsCompressor) itemsCount() uint64 {
|
||||
// ItemsCount returns the number of items in lc
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
return lc.nextIdx.Load()
|
||||
}
|
||||
|
||||
// compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
//
|
||||
// It is safe calling compress from concurrent goroutines.
|
||||
func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
|
||||
// It is safe calling Compress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
if len(labels) == 0 {
|
||||
// Fast path
|
||||
return append(dst, 0)
|
||||
@@ -43,13 +42,13 @@ func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
|
||||
|
||||
a := encoding.GetUint64s(len(labels) + 1)
|
||||
a.A[0] = uint64(len(labels))
|
||||
lc.compressInto(a.A[1:], labels)
|
||||
lc.compress(a.A[1:], labels)
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *labelsCompressor) compressInto(dst []uint64, labels []prompb.Label) {
|
||||
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
||||
if len(labels) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -99,10 +98,10 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
||||
}
|
||||
}
|
||||
|
||||
// decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling decompress from concurrent goroutines.
|
||||
func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
||||
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||
if nSize <= 0 {
|
||||
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
|
||||
@@ -125,12 +124,12 @@ func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.
|
||||
if len(tail) > 0 {
|
||||
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
||||
}
|
||||
dst = lc.decompressInternal(dst, a.A)
|
||||
dst = lc.decompress(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *labelsCompressor) decompressInternal(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
for _, idx := range src {
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
@@ -233,143 +232,3 @@ func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
}
|
||||
|
||||
// labelsCompressorState holds the current and previous labelsCompressor instances and generation byte that changes between rotations
|
||||
// and is used to pick a right compressor during decompression
|
||||
type labelsCompressorState struct {
|
||||
gen byte
|
||||
current *labelsCompressor
|
||||
previous *labelsCompressor
|
||||
}
|
||||
|
||||
// LabelsCompressor is a rotating compressor that maintains two labelsCompressor
|
||||
// instances to bound memory growth from stale label sets.
|
||||
//
|
||||
// Consumers must call Register on creation and Unregister on shutdown for a proper rotation period calculation.
|
||||
type LabelsCompressor struct {
|
||||
state atomic.Pointer[labelsCompressorState]
|
||||
|
||||
rotationInterval atomic.Int64
|
||||
startOnce sync.Once
|
||||
|
||||
registryMu sync.Mutex
|
||||
registry []time.Duration
|
||||
}
|
||||
|
||||
// getState returns current labelsCompressorState, which is initialized if needed.
|
||||
func (lc *LabelsCompressor) getState() *labelsCompressorState {
|
||||
if s := lc.state.Load(); s != nil {
|
||||
return s
|
||||
}
|
||||
s := &labelsCompressorState{gen: 0, current: &labelsCompressor{}}
|
||||
// use CompareAndSwap to avoid overwriting pointer which could be stored by another thread
|
||||
lc.state.CompareAndSwap(nil, s)
|
||||
return lc.state.Load()
|
||||
}
|
||||
|
||||
// rotate resets current compressor and moves its state to previous.
|
||||
func (lc *LabelsCompressor) rotate() {
|
||||
old := lc.getState()
|
||||
lc.state.Store(&labelsCompressorState{
|
||||
gen: old.gen ^ 1,
|
||||
current: &labelsCompressor{},
|
||||
previous: old.current,
|
||||
})
|
||||
}
|
||||
|
||||
// Register records maxStaleness for a new consumer, recomputes the rotation
|
||||
// interval, starts the background rotation goroutine on the first call, and
|
||||
// returns an id that must be passed to Unregister when the consumer stops.
|
||||
func (lc *LabelsCompressor) Register(maxStaleness time.Duration) {
|
||||
lc.registryMu.Lock()
|
||||
lc.registry = append(lc.registry, maxStaleness)
|
||||
max := lc.maxStaleness()
|
||||
lc.registryMu.Unlock()
|
||||
|
||||
lc.rotationInterval.Store(int64(max * 2))
|
||||
lc.startOnce.Do(func() {
|
||||
lc.getState()
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Duration(lc.rotationInterval.Load()))
|
||||
lc.rotate()
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// Unregister removes the given consumer ID from the registry and recomputes
|
||||
// the rotation interval from the remaining registered consumers.
|
||||
func (lc *LabelsCompressor) Unregister(maxStaleness time.Duration) {
|
||||
lc.registryMu.Lock()
|
||||
for i, s := range lc.registry {
|
||||
if s == maxStaleness {
|
||||
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
max := lc.maxStaleness()
|
||||
lc.registryMu.Unlock()
|
||||
lc.rotationInterval.Store(int64(max * 2))
|
||||
}
|
||||
|
||||
// maxStaleness returns the maximum staleness across all registered consumers.
|
||||
// Must be called with registryMu held.
|
||||
func (lc *LabelsCompressor) maxStaleness() time.Duration {
|
||||
maxStaleness := time.Duration(0)
|
||||
for _, d := range lc.registry {
|
||||
if d > maxStaleness {
|
||||
maxStaleness = d
|
||||
}
|
||||
}
|
||||
return max(maxStaleness, minRotationInterval)
|
||||
}
|
||||
|
||||
// Compress appends the generation byte followed by the compressed labels
|
||||
// to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Compress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
s := lc.getState()
|
||||
dst = append(dst, s.gen)
|
||||
return s.current.compress(dst, labels)
|
||||
}
|
||||
|
||||
// Decompress reads the generation byte from key and decompresses the
|
||||
// remaining bytes using the corresponding labelsCompressor instance.
|
||||
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, key []byte) []prompb.Label {
|
||||
if len(key) == 0 {
|
||||
logger.Panicf("BUG: unexpected empty key in Decompress")
|
||||
}
|
||||
gen := key[0]
|
||||
s := lc.getState()
|
||||
var c *labelsCompressor
|
||||
if s.gen == gen {
|
||||
c = s.current
|
||||
} else if s.previous != nil {
|
||||
c = s.previous
|
||||
} else {
|
||||
logger.Panicf("BUG: compressor for generation %d is not available; current generation is %d", gen, s.gen)
|
||||
}
|
||||
return c.decompress(dst, key[1:])
|
||||
}
|
||||
|
||||
// SizeBytes returns the total memory used by the active compressor instances
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
s := lc.getState()
|
||||
n := s.current.sizeBytes()
|
||||
if s.previous != nil {
|
||||
n += s.previous.sizeBytes()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// ItemsCount returns the total number of label entries stored across the active
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
s := lc.getState()
|
||||
n := s.current.itemsCount()
|
||||
if s.previous != nil {
|
||||
n += s.previous.itemsCount()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@@ -45,7 +45,6 @@ type Deduplicator struct {
|
||||
//
|
||||
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
||||
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
|
||||
lc.Register(2 * interval)
|
||||
d := &Deduplicator{
|
||||
da: newDedupAggr(),
|
||||
dropLabels: dropLabels,
|
||||
@@ -93,8 +92,6 @@ func (d *Deduplicator) MustStop() {
|
||||
metrics.UnregisterSet(d.ms, true)
|
||||
d.ms = nil
|
||||
|
||||
lc.Unregister(2 * d.interval)
|
||||
|
||||
close(d.stopCh)
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
@@ -53,10 +53,10 @@ var supportedOutputs = []string{
|
||||
"unique_samples",
|
||||
}
|
||||
|
||||
// lc is the global rotating labels compressor shared across all aggregators.
|
||||
var lc promutil.LabelsCompressor
|
||||
|
||||
var (
|
||||
// lc contains information about all compressed labels for streaming aggregation
|
||||
lc promutil.LabelsCompressor
|
||||
|
||||
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
|
||||
return float64(lc.SizeBytes())
|
||||
})
|
||||
@@ -310,22 +310,12 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
}
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
a := &Aggregators{
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
configData: configData,
|
||||
filePath: filePath,
|
||||
ms: ms,
|
||||
}
|
||||
lc.Register(a.maxStaleness())
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) maxStaleness() time.Duration {
|
||||
maxStaleness := time.Duration(0)
|
||||
for _, aggr := range a.as {
|
||||
maxStaleness = max(aggr.stalenessInterval, maxStaleness)
|
||||
}
|
||||
return maxStaleness
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IsEnabled returns true if Aggregators has at least one configured aggregator
|
||||
@@ -345,8 +335,6 @@ func (a *Aggregators) MustStop() {
|
||||
return
|
||||
}
|
||||
|
||||
lc.Unregister(a.maxStaleness())
|
||||
|
||||
metrics.UnregisterSet(a.ms, true)
|
||||
a.ms = nil
|
||||
|
||||
@@ -1090,9 +1078,6 @@ func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte
|
||||
}
|
||||
|
||||
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
|
||||
if len(key) == 0 {
|
||||
logger.Panicf("BUG: unexpected empty key in decompressLabels")
|
||||
}
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user