mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-06 10:32:14 +03:00
Compare commits
3 Commits
v1.136.4
...
labelcompr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c59c07cac7 | ||
|
|
8a20ccf21d | ||
|
|
1a01dbbec7 |
@@ -88,6 +88,7 @@ type QueryOpts struct {
|
||||
MaxLookback string
|
||||
LatencyOffset string
|
||||
Format string
|
||||
NoCache string
|
||||
}
|
||||
|
||||
func (qos *QueryOpts) asURLValues() url.Values {
|
||||
@@ -112,6 +113,7 @@ func (qos *QueryOpts) asURLValues() url.Values {
|
||||
addNonEmpty("max_lookback", qos.MaxLookback)
|
||||
addNonEmpty("latency_offset", qos.LatencyOffset)
|
||||
addNonEmpty("format", qos.Format)
|
||||
addNonEmpty("nocache", qos.NoCache)
|
||||
|
||||
return uv
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ func TestSingleBackupRestore(t *testing.T) {
|
||||
return tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + storageDataPath,
|
||||
"-retentionPeriod=100y",
|
||||
"-search.maxStalenessInterval=1m",
|
||||
})
|
||||
},
|
||||
stopSUT: func() {
|
||||
@@ -70,9 +69,7 @@ func TestClusterBackupRestore(t *testing.T) {
|
||||
VminsertInstance: "vminsert",
|
||||
VminsertFlags: []string{},
|
||||
VmselectInstance: "vmselect",
|
||||
VmselectFlags: []string{
|
||||
"-search.maxStalenessInterval=1m",
|
||||
},
|
||||
VmselectFlags: []string{},
|
||||
})
|
||||
},
|
||||
stopSUT: func() {
|
||||
@@ -100,15 +97,14 @@ func TestClusterBackupRestore(t *testing.T) {
|
||||
func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
t := tc.T()
|
||||
|
||||
const msecPerMinute = 60 * 1000
|
||||
genData := func(count int, prefix string, start int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) {
|
||||
genData := func(count int, prefix string, start, step int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) {
|
||||
recs = make([]string, count)
|
||||
wantSeries = make([]map[string]string, count)
|
||||
wantQueryResults = make([]*apptest.QueryResult, count)
|
||||
for i := range count {
|
||||
name := fmt.Sprintf("%s_%03d", prefix, i)
|
||||
value := float64(i)
|
||||
timestamp := start + int64(i)*msecPerMinute
|
||||
timestamp := start + int64(i)*step
|
||||
|
||||
recs[i] = fmt.Sprintf("%s %f %d", name, value, timestamp)
|
||||
wantSeries[i] = map[string]string{"__name__": name}
|
||||
@@ -148,15 +144,17 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
|
||||
// assertSeries retrieves all data from the storage and compares it with the
|
||||
// expected result.
|
||||
assertQueryResults := func(app apptest.PrometheusQuerier, query string, start, end int64, want []*apptest.QueryResult) {
|
||||
assertQueryResults := func(app apptest.PrometheusQuerier, query string, start, end, step int64, want []*apptest.QueryResult) {
|
||||
t.Helper()
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
Msg: "unexpected /api/v1/query_range response",
|
||||
Got: func() any {
|
||||
return app.PrometheusAPIV1QueryRange(t, query, apptest.QueryOpts{
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: "60s",
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: fmt.Sprintf("%dms", step),
|
||||
MaxLookback: fmt.Sprintf("%dms", step-1),
|
||||
NoCache: "1",
|
||||
})
|
||||
},
|
||||
Want: &apptest.PrometheusAPIV1QueryResponse{
|
||||
@@ -167,7 +165,6 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
},
|
||||
},
|
||||
FailNow: true,
|
||||
Retries: 300,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -194,8 +191,9 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
// below.
|
||||
const numMetrics = 1000
|
||||
// With 1000 metrics (one per minute), the time range spans 2 months.
|
||||
end := time.Date(2025, 3, 1, 10, 0, 0, 0, time.UTC).UnixMilli()
|
||||
start := end - numMetrics*msecPerMinute
|
||||
start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2025, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
step := (end - start) / numMetrics
|
||||
|
||||
// Verify backup/restore:
|
||||
//
|
||||
@@ -209,8 +207,8 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
// - Start vmsingle
|
||||
// - Ensure that the queries return batch1 data only.
|
||||
|
||||
batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start)
|
||||
batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start)
|
||||
batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start, step)
|
||||
batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start, step)
|
||||
wantBatch12Series := slices.Concat(wantBatch1Series, wantBatch2Series)
|
||||
wantBatch12QueryResults := slices.Concat(wantBatch1QueryResults, wantBatch2QueryResults)
|
||||
|
||||
@@ -219,13 +217,14 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, batch1Data, apptest.QueryOpts{})
|
||||
sut.ForceFlush(t)
|
||||
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
|
||||
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1QueryResults)
|
||||
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
|
||||
|
||||
createBackup(sut, "batch1")
|
||||
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, batch2Data, apptest.QueryOpts{})
|
||||
sut.ForceFlush(t)
|
||||
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12Series)
|
||||
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12QueryResults)
|
||||
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, wantBatch12QueryResults)
|
||||
createBackup(sut, "batch12")
|
||||
|
||||
opts.stopSUT()
|
||||
@@ -235,5 +234,5 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
|
||||
sut = opts.startSUT()
|
||||
|
||||
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
|
||||
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1QueryResults)
|
||||
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ func StartVminsert(instance string, flags []string, cli *Client, output io.Write
|
||||
extractREs = append(extractREs, regexp.MustCompile(logRecord))
|
||||
}
|
||||
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert", flags, &appOptions{
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert-race", flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-clusternativeListenAddr": "127.0.0.1:0",
|
||||
@@ -237,8 +237,22 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
var recordsCount int
|
||||
var metadataRecords int
|
||||
uniqueMetadataMetricNames := make(map[string]struct{})
|
||||
for _, record := range records {
|
||||
if strings.HasPrefix(record, "#") {
|
||||
// metric metadata has the following format:
|
||||
//# HELP importprometheus_series
|
||||
//# TYPE importprometheus_series
|
||||
// it results into single metadata record
|
||||
if strings.HasPrefix(record, "# ") {
|
||||
metadataItems := strings.Split(record, " ")
|
||||
if len(metadataItems) < 2 {
|
||||
t.Fatalf("BUG: unexpected metadata format=%q", record)
|
||||
}
|
||||
metricName := metadataItems[2]
|
||||
if _, ok := uniqueMetadataMetricNames[metricName]; ok {
|
||||
continue
|
||||
}
|
||||
uniqueMetadataMetricNames[metricName] = struct{}{}
|
||||
metadataRecords++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ type Vmselect struct {
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr)
|
||||
func StartVmselect(instance string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) {
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect", flags, &appOptions{
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect-race", flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-clusternativeListenAddr": "127.0.0.1:0",
|
||||
|
||||
@@ -80,7 +80,7 @@ Released at 2026-03-27
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): stop logging `error`-level messages when scraping targets that expose OpenMetrics `info`, `gaugehistogram`, `stateset`, or `unknown` metric types. These are valid [OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md) types and should be parsed without error. See [#10685](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10685). Thanks to @tsarna for the contribution.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent panic during directory deletion on `NFS`-based mounts. The bug was introduced in [83da33d8](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/83da33d8cfe8352fd0022d05a8b6346ebb48420d) and included in [v1.123.0](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/changelog/CHANGELOG_2025.md#v11230). See [#9842](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9842).
|
||||
|
||||
## [v1.139.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.139.0)
|
||||
## [v1.138.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.138.0)
|
||||
|
||||
Released at 2026-03-13
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package promutil
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -11,8 +12,10 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings
|
||||
type LabelsCompressor struct {
|
||||
const minRotationInterval = time.Hour
|
||||
|
||||
// labelsCompressor compresses []prompb.Label into short binary strings.
|
||||
type labelsCompressor struct {
|
||||
labelToIdx sync.Map
|
||||
idxToLabel labelsMap
|
||||
|
||||
@@ -21,20 +24,18 @@ type LabelsCompressor struct {
|
||||
totalSizeBytes atomic.Uint64
|
||||
}
|
||||
|
||||
// SizeBytes returns the size of lc data in bytes
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
func (lc *labelsCompressor) sizeBytes() uint64 {
|
||||
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
||||
}
|
||||
|
||||
// ItemsCount returns the number of items in lc
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
func (lc *labelsCompressor) itemsCount() uint64 {
|
||||
return lc.nextIdx.Load()
|
||||
}
|
||||
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
// compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Compress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
// It is safe calling compress from concurrent goroutines.
|
||||
func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
|
||||
if len(labels) == 0 {
|
||||
// Fast path
|
||||
return append(dst, 0)
|
||||
@@ -42,13 +43,13 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
|
||||
a := encoding.GetUint64s(len(labels) + 1)
|
||||
a.A[0] = uint64(len(labels))
|
||||
lc.compress(a.A[1:], labels)
|
||||
lc.compressInto(a.A[1:], labels)
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
||||
func (lc *labelsCompressor) compressInto(dst []uint64, labels []prompb.Label) {
|
||||
if len(labels) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -98,10 +99,10 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
||||
}
|
||||
}
|
||||
|
||||
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
// decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
||||
// It is safe calling decompress from concurrent goroutines.
|
||||
func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.Label {
|
||||
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||
if nSize <= 0 {
|
||||
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
|
||||
@@ -124,12 +125,12 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
||||
if len(tail) > 0 {
|
||||
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
||||
}
|
||||
dst = lc.decompress(dst, a.A)
|
||||
dst = lc.decompressInternal(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
func (lc *labelsCompressor) decompressInternal(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
for _, idx := range src {
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
@@ -232,3 +233,143 @@ func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
}
|
||||
|
||||
// labelsCompressorState holds the current and previous labelsCompressor instances and generation byte that changes between rotations
|
||||
// and is used to pick a right compressor during decompression
|
||||
type labelsCompressorState struct {
|
||||
gen byte
|
||||
current *labelsCompressor
|
||||
previous *labelsCompressor
|
||||
}
|
||||
|
||||
// LabelsCompressor is a rotating compressor that maintains two labelsCompressor
|
||||
// instances to bound memory growth from stale label sets.
|
||||
//
|
||||
// Consumers must call Register on creation and Unregister on shutdown for a proper rotation period calculation.
|
||||
type LabelsCompressor struct {
|
||||
state atomic.Pointer[labelsCompressorState]
|
||||
|
||||
rotationInterval atomic.Int64
|
||||
startOnce sync.Once
|
||||
|
||||
registryMu sync.Mutex
|
||||
registry []time.Duration
|
||||
}
|
||||
|
||||
// getState returns current labelsCompressorState, which is initialized if needed.
|
||||
func (lc *LabelsCompressor) getState() *labelsCompressorState {
|
||||
if s := lc.state.Load(); s != nil {
|
||||
return s
|
||||
}
|
||||
s := &labelsCompressorState{gen: 0, current: &labelsCompressor{}}
|
||||
// use CompareAndSwap to avoid overwriting pointer which could be stored by another thread
|
||||
lc.state.CompareAndSwap(nil, s)
|
||||
return lc.state.Load()
|
||||
}
|
||||
|
||||
// rotate resets current compressor and moves its state to previous.
|
||||
func (lc *LabelsCompressor) rotate() {
|
||||
old := lc.getState()
|
||||
lc.state.Store(&labelsCompressorState{
|
||||
gen: old.gen ^ 1,
|
||||
current: &labelsCompressor{},
|
||||
previous: old.current,
|
||||
})
|
||||
}
|
||||
|
||||
// Register records maxStaleness for a new consumer, recomputes the rotation
|
||||
// interval, starts the background rotation goroutine on the first call, and
|
||||
// returns an id that must be passed to Unregister when the consumer stops.
|
||||
func (lc *LabelsCompressor) Register(maxStaleness time.Duration) {
|
||||
lc.registryMu.Lock()
|
||||
lc.registry = append(lc.registry, maxStaleness)
|
||||
max := lc.maxStaleness()
|
||||
lc.registryMu.Unlock()
|
||||
|
||||
lc.rotationInterval.Store(int64(max * 2))
|
||||
lc.startOnce.Do(func() {
|
||||
lc.getState()
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Duration(lc.rotationInterval.Load()))
|
||||
lc.rotate()
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// Unregister removes the given consumer ID from the registry and recomputes
|
||||
// the rotation interval from the remaining registered consumers.
|
||||
func (lc *LabelsCompressor) Unregister(maxStaleness time.Duration) {
|
||||
lc.registryMu.Lock()
|
||||
for i, s := range lc.registry {
|
||||
if s == maxStaleness {
|
||||
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
max := lc.maxStaleness()
|
||||
lc.registryMu.Unlock()
|
||||
lc.rotationInterval.Store(int64(max * 2))
|
||||
}
|
||||
|
||||
// maxStaleness returns the maximum staleness across all registered consumers.
|
||||
// Must be called with registryMu held.
|
||||
func (lc *LabelsCompressor) maxStaleness() time.Duration {
|
||||
maxStaleness := time.Duration(0)
|
||||
for _, d := range lc.registry {
|
||||
if d > maxStaleness {
|
||||
maxStaleness = d
|
||||
}
|
||||
}
|
||||
return max(maxStaleness, minRotationInterval)
|
||||
}
|
||||
|
||||
// Compress appends the generation byte followed by the compressed labels
|
||||
// to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Compress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
s := lc.getState()
|
||||
dst = append(dst, s.gen)
|
||||
return s.current.compress(dst, labels)
|
||||
}
|
||||
|
||||
// Decompress reads the generation byte from key and decompresses the
|
||||
// remaining bytes using the corresponding labelsCompressor instance.
|
||||
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, key []byte) []prompb.Label {
|
||||
if len(key) == 0 {
|
||||
logger.Panicf("BUG: unexpected empty key in Decompress")
|
||||
}
|
||||
gen := key[0]
|
||||
s := lc.getState()
|
||||
var c *labelsCompressor
|
||||
if s.gen == gen {
|
||||
c = s.current
|
||||
} else if s.previous != nil {
|
||||
c = s.previous
|
||||
} else {
|
||||
logger.Panicf("BUG: compressor for generation %d is not available; current generation is %d", gen, s.gen)
|
||||
}
|
||||
return c.decompress(dst, key[1:])
|
||||
}
|
||||
|
||||
// SizeBytes returns the total memory used by the active compressor instances
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
s := lc.getState()
|
||||
n := s.current.sizeBytes()
|
||||
if s.previous != nil {
|
||||
n += s.previous.sizeBytes()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// ItemsCount returns the total number of label entries stored across the active
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
s := lc.getState()
|
||||
n := s.current.itemsCount()
|
||||
if s.previous != nil {
|
||||
n += s.previous.itemsCount()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ type Deduplicator struct {
|
||||
//
|
||||
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
||||
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
|
||||
lc.Register(2 * interval)
|
||||
d := &Deduplicator{
|
||||
da: newDedupAggr(),
|
||||
dropLabels: dropLabels,
|
||||
@@ -92,6 +93,8 @@ func (d *Deduplicator) MustStop() {
|
||||
metrics.UnregisterSet(d.ms, true)
|
||||
d.ms = nil
|
||||
|
||||
lc.Unregister(2 * d.interval)
|
||||
|
||||
close(d.stopCh)
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
@@ -53,10 +53,10 @@ var supportedOutputs = []string{
|
||||
"unique_samples",
|
||||
}
|
||||
|
||||
var (
|
||||
// lc contains information about all compressed labels for streaming aggregation
|
||||
lc promutil.LabelsCompressor
|
||||
// lc is the global rotating labels compressor shared across all aggregators.
|
||||
var lc promutil.LabelsCompressor
|
||||
|
||||
var (
|
||||
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
|
||||
return float64(lc.SizeBytes())
|
||||
})
|
||||
@@ -310,12 +310,22 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
}
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
a := &Aggregators{
|
||||
as: as,
|
||||
configData: configData,
|
||||
filePath: filePath,
|
||||
ms: ms,
|
||||
}, nil
|
||||
}
|
||||
lc.Register(a.maxStaleness())
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) maxStaleness() time.Duration {
|
||||
maxStaleness := time.Duration(0)
|
||||
for _, aggr := range a.as {
|
||||
maxStaleness = max(aggr.stalenessInterval, maxStaleness)
|
||||
}
|
||||
return maxStaleness
|
||||
}
|
||||
|
||||
// IsEnabled returns true if Aggregators has at least one configured aggregator
|
||||
@@ -335,6 +345,8 @@ func (a *Aggregators) MustStop() {
|
||||
return
|
||||
}
|
||||
|
||||
lc.Unregister(a.maxStaleness())
|
||||
|
||||
metrics.UnregisterSet(a.ms, true)
|
||||
a.ms = nil
|
||||
|
||||
@@ -1078,6 +1090,9 @@ func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte
|
||||
}
|
||||
|
||||
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
|
||||
if len(key) == 0 {
|
||||
logger.Panicf("BUG: unexpected empty key in decompressLabels")
|
||||
}
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user