Compare commits

..

3 Commits

Author SHA1 Message Date
Andrii Chubatiuk
c59c07cac7 lib/promutil/labelscompressor: add rotation of labelscompressor 2026-04-17 19:09:48 +03:00
Artem Fetishev
8a20ccf21d apptest: sync code between branches and fix backup/restore range queries (#10799)
Fix app tests:

1. Sync code between vmsingle and vmcluster: it must be the same because
apptest does not differentiate between branches, it just runs pre-built
binaries
2. Simplify range queries in backup/restore test so that it does not
depend on the interval between samples to work correctly.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-04-14 07:18:09 +02:00
Max Kotliar
1a01dbbec7 docs/changelog: fix unwanted release tag change
The tag v1.138.0 was unintentinally changed to v1.139.0 due to bug in
release script.

Reverting the change. The bug will be addressed separate.
2026-04-13 14:52:21 +03:00
8 changed files with 218 additions and 44 deletions

View File

@@ -88,6 +88,7 @@ type QueryOpts struct {
MaxLookback string
LatencyOffset string
Format string
NoCache string
}
func (qos *QueryOpts) asURLValues() url.Values {
@@ -112,6 +113,7 @@ func (qos *QueryOpts) asURLValues() url.Values {
addNonEmpty("max_lookback", qos.MaxLookback)
addNonEmpty("latency_offset", qos.LatencyOffset)
addNonEmpty("format", qos.Format)
addNonEmpty("nocache", qos.NoCache)
return uv
}

View File

@@ -28,7 +28,6 @@ func TestSingleBackupRestore(t *testing.T) {
return tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + storageDataPath,
"-retentionPeriod=100y",
"-search.maxStalenessInterval=1m",
})
},
stopSUT: func() {
@@ -70,9 +69,7 @@ func TestClusterBackupRestore(t *testing.T) {
VminsertInstance: "vminsert",
VminsertFlags: []string{},
VmselectInstance: "vmselect",
VmselectFlags: []string{
"-search.maxStalenessInterval=1m",
},
VmselectFlags: []string{},
})
},
stopSUT: func() {
@@ -100,15 +97,14 @@ func TestClusterBackupRestore(t *testing.T) {
func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
t := tc.T()
const msecPerMinute = 60 * 1000
genData := func(count int, prefix string, start int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) {
genData := func(count int, prefix string, start, step int64) (recs []string, wantSeries []map[string]string, wantQueryResults []*apptest.QueryResult) {
recs = make([]string, count)
wantSeries = make([]map[string]string, count)
wantQueryResults = make([]*apptest.QueryResult, count)
for i := range count {
name := fmt.Sprintf("%s_%03d", prefix, i)
value := float64(i)
timestamp := start + int64(i)*msecPerMinute
timestamp := start + int64(i)*step
recs[i] = fmt.Sprintf("%s %f %d", name, value, timestamp)
wantSeries[i] = map[string]string{"__name__": name}
@@ -148,15 +144,17 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
// assertSeries retrieves all data from the storage and compares it with the
// expected result.
assertQueryResults := func(app apptest.PrometheusQuerier, query string, start, end int64, want []*apptest.QueryResult) {
assertQueryResults := func(app apptest.PrometheusQuerier, query string, start, end, step int64, want []*apptest.QueryResult) {
t.Helper()
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/query_range response",
Got: func() any {
return app.PrometheusAPIV1QueryRange(t, query, apptest.QueryOpts{
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
Step: "60s",
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
Step: fmt.Sprintf("%dms", step),
MaxLookback: fmt.Sprintf("%dms", step-1),
NoCache: "1",
})
},
Want: &apptest.PrometheusAPIV1QueryResponse{
@@ -167,7 +165,6 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
},
},
FailNow: true,
Retries: 300,
})
}
@@ -194,8 +191,9 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
// below.
const numMetrics = 1000
// With 1000 metrics (one per minute), the time range spans 2 months.
end := time.Date(2025, 3, 1, 10, 0, 0, 0, time.UTC).UnixMilli()
start := end - numMetrics*msecPerMinute
start := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2025, 3, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
step := (end - start) / numMetrics
// Verify backup/restore:
//
@@ -209,8 +207,8 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
// - Start vmsingle
// - Ensure that the queries return batch1 data only.
batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start)
batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start)
batch1Data, wantBatch1Series, wantBatch1QueryResults := genData(numMetrics, "batch1", start, step)
batch2Data, wantBatch2Series, wantBatch2QueryResults := genData(numMetrics, "batch2", start, step)
wantBatch12Series := slices.Concat(wantBatch1Series, wantBatch2Series)
wantBatch12QueryResults := slices.Concat(wantBatch1QueryResults, wantBatch2QueryResults)
@@ -219,13 +217,14 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
sut.PrometheusAPIV1ImportPrometheus(t, batch1Data, apptest.QueryOpts{})
sut.ForceFlush(t)
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1QueryResults)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
createBackup(sut, "batch1")
sut.PrometheusAPIV1ImportPrometheus(t, batch2Data, apptest.QueryOpts{})
sut.ForceFlush(t)
assertSeries(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12Series)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, wantBatch12QueryResults)
assertQueryResults(sut, `{__name__=~"batch(1|2).*"}`, start, end, step, wantBatch12QueryResults)
createBackup(sut, "batch12")
opts.stopSUT()
@@ -235,5 +234,5 @@ func testBackupRestore(tc *apptest.TestCase, opts testBackupRestoreOpts) {
sut = opts.startSUT()
assertSeries(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1Series)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, wantBatch1QueryResults)
assertQueryResults(sut, `{__name__=~"batch1.*"}`, start, end, step, wantBatch1QueryResults)
}

View File

@@ -57,7 +57,7 @@ func StartVminsert(instance string, flags []string, cli *Client, output io.Write
extractREs = append(extractREs, regexp.MustCompile(logRecord))
}
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert", flags, &appOptions{
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert-race", flags, &appOptions{
defaultFlags: map[string]string{
"-httpListenAddr": "127.0.0.1:0",
"-clusternativeListenAddr": "127.0.0.1:0",
@@ -237,8 +237,22 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
data := []byte(strings.Join(records, "\n"))
var recordsCount int
var metadataRecords int
uniqueMetadataMetricNames := make(map[string]struct{})
for _, record := range records {
if strings.HasPrefix(record, "#") {
// metric metadata has the following format:
//# HELP importprometheus_series
//# TYPE importprometheus_series
// it results into single metadata record
if strings.HasPrefix(record, "# ") {
metadataItems := strings.Split(record, " ")
if len(metadataItems) < 2 {
t.Fatalf("BUG: unexpected metadata format=%q", record)
}
metricName := metadataItems[2]
if _, ok := uniqueMetadataMetricNames[metricName]; ok {
continue
}
uniqueMetadataMetricNames[metricName] = struct{}{}
metadataRecords++
continue
}

View File

@@ -25,7 +25,7 @@ type Vmselect struct {
// sets the default flags and populates the app instance state with runtime
// values extracted from the application log (such as httpListenAddr)
func StartVmselect(instance string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) {
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect", flags, &appOptions{
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect-race", flags, &appOptions{
defaultFlags: map[string]string{
"-httpListenAddr": "127.0.0.1:0",
"-clusternativeListenAddr": "127.0.0.1:0",

View File

@@ -80,7 +80,7 @@ Released at 2026-03-27
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): stop logging `error`-level messages when scraping targets that expose OpenMetrics `info`, `gaugehistogram`, `stateset`, or `unknown` metric types. These are valid [OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md) types and should be parsed without error. See [#10685](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10685). Thanks to @tsarna for the contribution.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent panic during directory deletion on `NFS`-based mounts. The bug was introduced in [83da33d8](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/83da33d8cfe8352fd0022d05a8b6346ebb48420d) and included in [v1.123.0](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/changelog/CHANGELOG_2025.md#v11230). See [#9842](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9842).
## [v1.139.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.139.0)
## [v1.138.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.138.0)
Released at 2026-03-13

View File

@@ -3,6 +3,7 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -11,8 +12,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
type LabelsCompressor struct {
const minRotationInterval = time.Hour
// labelsCompressor compresses []prompb.Label into short binary strings.
type labelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
@@ -21,20 +24,18 @@ type LabelsCompressor struct {
totalSizeBytes atomic.Uint64
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
func (lc *labelsCompressor) sizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
func (lc *labelsCompressor) itemsCount() uint64 {
return lc.nextIdx.Load()
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
// compress compresses labels, appends the compressed labels to dst and returns the result.
//
// It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
// It is safe calling compress from concurrent goroutines.
func (lc *labelsCompressor) compress(dst []byte, labels []prompb.Label) []byte {
if len(labels) == 0 {
// Fast path
return append(dst, 0)
@@ -42,13 +43,13 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
lc.compressInto(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
func (lc *labelsCompressor) compressInto(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
return
}
@@ -98,10 +99,10 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
// decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
// It is safe calling decompress from concurrent goroutines.
func (lc *labelsCompressor) decompress(dst []prompb.Label, src []byte) []prompb.Label {
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
@@ -124,12 +125,12 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
dst = lc.decompressInternal(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
func (lc *labelsCompressor) decompressInternal(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
@@ -232,3 +233,143 @@ func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
clear(lm.mutable)
lm.readOnly.Store(&labels)
}
// labelsCompressorState holds the current and previous labelsCompressor instances and generation byte that changes between rotations
// and is used to pick a right compressor during decompression
type labelsCompressorState struct {
gen byte
current *labelsCompressor
previous *labelsCompressor
}
// LabelsCompressor is a rotating compressor that maintains two labelsCompressor
// instances to bound memory growth from stale label sets.
//
// Consumers must call Register on creation and Unregister on shutdown for a proper rotation period calculation.
type LabelsCompressor struct {
state atomic.Pointer[labelsCompressorState]
rotationInterval atomic.Int64
startOnce sync.Once
registryMu sync.Mutex
registry []time.Duration
}
// getState returns current labelsCompressorState, which is initialized if needed.
func (lc *LabelsCompressor) getState() *labelsCompressorState {
if s := lc.state.Load(); s != nil {
return s
}
s := &labelsCompressorState{gen: 0, current: &labelsCompressor{}}
// use CompareAndSwap to avoid overwriting pointer which could be stored by another thread
lc.state.CompareAndSwap(nil, s)
return lc.state.Load()
}
// rotate resets current compressor and moves its state to previous.
func (lc *LabelsCompressor) rotate() {
old := lc.getState()
lc.state.Store(&labelsCompressorState{
gen: old.gen ^ 1,
current: &labelsCompressor{},
previous: old.current,
})
}
// Register records maxStaleness for a new consumer, recomputes the rotation
// interval, starts the background rotation goroutine on the first call, and
// returns an id that must be passed to Unregister when the consumer stops.
func (lc *LabelsCompressor) Register(maxStaleness time.Duration) {
lc.registryMu.Lock()
lc.registry = append(lc.registry, maxStaleness)
max := lc.maxStaleness()
lc.registryMu.Unlock()
lc.rotationInterval.Store(int64(max * 2))
lc.startOnce.Do(func() {
lc.getState()
go func() {
for {
time.Sleep(time.Duration(lc.rotationInterval.Load()))
lc.rotate()
}
}()
})
}
// Unregister removes the given consumer ID from the registry and recomputes
// the rotation interval from the remaining registered consumers.
func (lc *LabelsCompressor) Unregister(maxStaleness time.Duration) {
lc.registryMu.Lock()
for i, s := range lc.registry {
if s == maxStaleness {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
max := lc.maxStaleness()
lc.registryMu.Unlock()
lc.rotationInterval.Store(int64(max * 2))
}
// maxStaleness returns the maximum staleness across all registered consumers.
// Must be called with registryMu held.
func (lc *LabelsCompressor) maxStaleness() time.Duration {
maxStaleness := time.Duration(0)
for _, d := range lc.registry {
if d > maxStaleness {
maxStaleness = d
}
}
return max(maxStaleness, minRotationInterval)
}
// Compress appends the generation byte followed by the compressed labels
// to dst and returns the result.
//
// It is safe calling Compress from concurrent goroutines.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
s := lc.getState()
dst = append(dst, s.gen)
return s.current.compress(dst, labels)
}
// Decompress reads the generation byte from key and decompresses the
// remaining bytes using the corresponding labelsCompressor instance.
func (lc *LabelsCompressor) Decompress(dst []prompb.Label, key []byte) []prompb.Label {
if len(key) == 0 {
logger.Panicf("BUG: unexpected empty key in Decompress")
}
gen := key[0]
s := lc.getState()
var c *labelsCompressor
if s.gen == gen {
c = s.current
} else if s.previous != nil {
c = s.previous
} else {
logger.Panicf("BUG: compressor for generation %d is not available; current generation is %d", gen, s.gen)
}
return c.decompress(dst, key[1:])
}
// SizeBytes returns the total memory used by the active compressor instances
func (lc *LabelsCompressor) SizeBytes() uint64 {
s := lc.getState()
n := s.current.sizeBytes()
if s.previous != nil {
n += s.previous.sizeBytes()
}
return n
}
// ItemsCount returns the total number of label entries stored across the active
func (lc *LabelsCompressor) ItemsCount() uint64 {
s := lc.getState()
n := s.current.itemsCount()
if s.previous != nil {
n += s.previous.itemsCount()
}
return n
}

View File

@@ -45,6 +45,7 @@ type Deduplicator struct {
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
lc.Register(2 * interval)
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
@@ -92,6 +93,8 @@ func (d *Deduplicator) MustStop() {
metrics.UnregisterSet(d.ms, true)
d.ms = nil
lc.Unregister(2 * d.interval)
close(d.stopCh)
d.wg.Wait()
}

View File

@@ -53,10 +53,10 @@ var supportedOutputs = []string{
"unique_samples",
}
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutil.LabelsCompressor
// lc is the global rotating labels compressor shared across all aggregators.
var lc promutil.LabelsCompressor
var (
_ = metrics.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(lc.SizeBytes())
})
@@ -310,12 +310,22 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
metrics.RegisterSet(ms)
return &Aggregators{
a := &Aggregators{
as: as,
configData: configData,
filePath: filePath,
ms: ms,
}, nil
}
lc.Register(a.maxStaleness())
return a, nil
}
func (a *Aggregators) maxStaleness() time.Duration {
maxStaleness := time.Duration(0)
for _, aggr := range a.as {
maxStaleness = max(aggr.stalenessInterval, maxStaleness)
}
return maxStaleness
}
// IsEnabled returns true if Aggregators has at least one configured aggregator
@@ -335,6 +345,8 @@ func (a *Aggregators) MustStop() {
return
}
lc.Unregister(a.maxStaleness())
metrics.UnregisterSet(a.ms, true)
a.ms = nil
@@ -1078,6 +1090,9 @@ func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte
}
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
if len(key) == 0 {
logger.Panicf("BUG: unexpected empty key in decompressLabels")
}
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
}