Compare commits

..

7 Commits

Author SHA1 Message Date
Max Kotliar
f2bf5d82ce docs/changelog: add links to the issues\prs 2026-05-18 19:31:59 +03:00
Max Kotliar
bd98a1d2fa docs/changelog: fix link to the issue
follow-up on
e613c3fd6b
2026-05-18 19:22:53 +03:00
Max Kotliar
4a1ceccee4 app/vmagent: fix sharding correctness when disableOnDiskQueue is set (#10947)
When -remoteWrite.shardByURL is enabled, and one of the remote write
targets has -remoteWrite.disableOnDiskQueue set becomes blocked, samples
could be rerouted to other shards (see `getEligibleRemoteWriteCtxs` impl), breaking the sharding guarantee. Fix this by always using `rwctxsGlobal` in sharding mode.

Add a startup check that requires `-remoteWrite.disableOnDiskQueue` to be
configured uniformly across all targets when -remoteWrite.shardByURL` is enabled.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10947
2026-05-18 14:46:24 +03:00
Max Kotliar
48a3eb0215 dashboards: zoom in the "CPU spent on GC" panel. (#10955)
The "CPU spent on GC" panel often shows sub-1% GC CPU usage, which is
barely visible with the current fixed Y-axis settings. Removing the
fixed scale allows Grafana to auto-adjust the Y axis and makes small
fluctuations easier to observe.

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10955

Before:
<img width="1499" height="477" alt="Screenshot 2026-05-14 at 18 35 14"
src="https://github.com/user-attachments/assets/8a78bea4-3dcd-4f26-a40e-304acbf68eb1"
/>

After:
<img width="1508" height="501" alt="Screenshot 2026-05-14 at 18 34 51"
src="https://github.com/user-attachments/assets/aefd2fbc-6b1c-42a9-b45d-e44bcd30be48"
/>
2026-05-18 14:39:13 +03:00
Immanuel Tikhonov
200c03416f chore: use %s instead of %w for error formatting in logger calls
Previously, errors in app/vmalert-tool and lib packages used the %w verb
in logger.Errorf calls, which is intended for wrapping errors via fmt.Errorf.
Using %w with the logger package does not wrap the error — instead, it prints
a malformed %!w(...) placeholder rather than the actual error message.

This commit replaces all affected occurrences of %w with %s to correctly
format and display errors.

Related PR: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10962
2026-05-18 13:13:51 +02:00
Phuong Le
33d8e02ea8 vmagent/remotewrite: hide -remoteWrite.headers as a secret flag
This registers `remoteWrite.headers` in `InitSecretFlags()` so it is
masked by the existing secret-flag.

Without this, values passed via `-remoteWrite.headers` are exposed in
startup flag logs, /metrics, and /flags, because these paths only redact
flags recognized by `flagutil.IsSecretFlag()`.

The change keeps the existing `-remoteWrite.showURL` behavior for
`remoteWrite.url`, while always treating `-remoteWrite.headers` as
secret.
2026-05-18 12:39:52 +02:00
f41gh7
e613c3fd6b app/vmstorage: properly init vminsert server mtls connection
Release v1.130 added a regression into enterprise vmstorage version.
Server configuration for vminsert listener was initialized without mtls
configuration args. It made impossible vminsert to vmstorage mtls
connection.

 This commit fixes regression and adds a integration tests to verify
 it.

Related to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10958
2026-05-18 12:17:25 +02:00
25 changed files with 293 additions and 696 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
func TestParseRetryAfterHeader(t *testing.T) {
@@ -36,6 +37,34 @@ func TestParseRetryAfterHeader(t *testing.T) {
f(time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0)
}
func TestInitSecretFlags(t *testing.T) {
showRemoteWriteURLOrig := *showRemoteWriteURL
defer func() {
*showRemoteWriteURL = showRemoteWriteURLOrig
flagutil.UnregisterAllSecretFlags()
}()
flagutil.UnregisterAllSecretFlags()
*showRemoteWriteURL = false
InitSecretFlags()
if !flagutil.IsSecretFlag("remotewrite.url") {
t.Fatalf("expecting remoteWrite.url to be secret")
}
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to be secret")
}
flagutil.UnregisterAllSecretFlags()
*showRemoteWriteURL = true
InitSecretFlags()
if flagutil.IsSecretFlag("remotewrite.url") {
t.Fatalf("remoteWrite.url must remain visible when -remoteWrite.showURL is set")
}
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to remain secret")
}
}
func TestRepackBlockFromZstdToSnappy(t *testing.T) {
expectedPlainBlock := []byte(`foobar`)

View File

@@ -151,6 +151,8 @@ func InitSecretFlags() {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil.RegisterSecretFlag("remoteWrite.url")
}
// remoteWrite.headers can contain auth headers such as Authorization and API keys.
flagutil.RegisterSecretFlag("remoteWrite.headers")
}
var (
@@ -167,6 +169,18 @@ func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
}
if *shardByURL && len(*disableOnDiskQueue) > 1 {
disableOnDiskQueues := *disableOnDiskQueue
firstValue := disableOnDiskQueues[0]
for _, v := range disableOnDiskQueues[1:] {
if firstValue != v {
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
}
}
}
if limit := getMaxHourlySeries(); limit > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
@@ -499,7 +513,9 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
//
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
if !disableOnDiskQueueAny {
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
if !disableOnDiskQueueAny || *shardByURL {
return rwctxsGlobal, true
}
@@ -514,12 +530,6 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
return nil, false
}
rowsCount := getRowsCount(tss)
if *shardByURL {
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
// data to all rwctxs evenly.
rowsCount = rowsCount / len(rwctxsGlobal)
}
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
}
}

View File

@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
}
eu, err := url.Parse(externalURL)
if err != nil {
logger.Fatalf("failed to parse external URL: %w", err)
logger.Fatalf("failed to parse external URL: %s", err)
}
if err := templates.Load([]string{}, *eu); err != nil {
logger.Fatalf("failed to load template: %v", err)

View File

@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
}
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
for _, err := range errors {
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
}
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
}
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
for addr, metadata := range targetMts {
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
if err != nil {
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
continue
}
updatedTargets = append(updatedTargets, Target{

View File

@@ -3109,7 +3109,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3406,7 +3406,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3110,7 +3110,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3407,7 +3407,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2946,7 +2946,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2324,7 +2324,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2945,7 +2945,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2323,7 +2323,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -31,10 +31,14 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve the [Top Queries](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#top-queries) table UI. Duration columns now display human-readable values (e.g. `1.23s`) instead of raw seconds, memory column shows human-readable sizes (e.g. `1.23 MB`), instant queries are labeled as `instant` instead of empty string, and column headers now show tooltips with descriptions. See [#10790](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10790).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. See [#10918](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). Thanks to @alexei38 for the contribution.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys. See [#10803](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10803).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10972](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10972).
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
srcDir := tb.path
srcDir, err = filepath.Abs(srcDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
}
dstDir, err = filepath.Abs(dstDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
}
prefix := srcDir + string(filepath.Separator)
if strings.HasPrefix(dstDir, prefix) {

View File

@@ -3,173 +3,32 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
const lcShardCount = 128
// lcShard is one shard of the label-to-index map.
// Padded to one cache line to prevent false sharing between adjacent shards.
type lcShard struct {
mu sync.RWMutex
m map[string]uint32
_ [32]byte
}
// LabelsCompressor compresses []prompb.Label into short binary strings.
// Zero value is ready to use. Each Register call must be paired with Unregister.
// LabelsCompressor compresses []prompb.Label into short binary strings
type LabelsCompressor struct {
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
// to reduce RWMutex contention in the concurrent compressFast hot path.
labelToIdxShards [lcShardCount]lcShard
labelToIdx sync.Map
idxToLabel labelsMap
// generation is incremented after each rotate() call that removes labels.
// Callers can use it to detect when cached compressed keys become stale.
generation atomic.Uint64
nextIdx atomic.Uint64
idxToLabel atomic.Pointer[[]prompb.Label]
// usedBitset tracks which indices were used in the current rotation period.
// Bits are set atomically from compressFast without mu; grown under mu.
usedBitset atomic.Pointer[[]uint64]
totalSizeBytes uint64
mu sync.Mutex
// freeIdxs holds index slots available for reuse.
freeIdxs []uint32
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
pendingIdxs map[uint32]struct{}
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
// Requires absence from both prevBitset and usedBitset to evict a label,
// guarding against partial snapshots when rotate fires mid-compress loop.
// Only accessed in rotate, which runs in a single goroutine.
prevBitset []uint64
// registry holds staleness intervals of active callers; rotation period = max(registry).
registry []time.Duration
// tickerCh signals runRotate to re-evaluate the rotation period.
tickerCh chan struct{}
}
// idleRotationPeriod is the rotation period used when no callers are registered.
// It must be long enough that a sleeping goroutine does not consume measurable resources.
const idleRotationPeriod = time.Hour
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
// Rotation period equals max(registry). Must be paired with Unregister.
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
lc.mu.Lock()
if lc.tickerCh == nil {
lc.tickerCh = make(chan struct{}, 1)
go lc.runRotate()
}
lc.registry = append(lc.registry, stalenessInterval)
tickerCh := lc.tickerCh
lc.mu.Unlock()
select {
case tickerCh <- struct{}{}:
default:
}
}
// Unregister removes stalenessInterval from the registry.
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
lc.mu.Lock()
for i, d := range lc.registry {
if d == stalenessInterval {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
tickerCh := lc.tickerCh
lc.mu.Unlock()
if tickerCh != nil {
select {
case tickerCh <- struct{}{}:
default:
}
}
}
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
// must be called with lc.mu held
var max time.Duration
for _, d := range lc.registry {
if d > max {
max = d
}
}
if max == 0 {
return idleRotationPeriod
}
return max
}
func (lc *LabelsCompressor) runRotate() {
lc.mu.Lock()
period := lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer := time.NewTimer(period)
defer timer.Stop()
for {
select {
case <-timer.C:
lc.rotate()
lc.mu.Lock()
period = lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer.Reset(period)
case <-lc.tickerCh:
lc.mu.Lock()
newPeriod := lc.maxRegisteredStaleness()
lc.mu.Unlock()
if newPeriod != period {
period = newPeriod
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
}
}
}
totalSizeBytes atomic.Uint64
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
lc.mu.Lock()
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
lc.mu.Unlock()
return n
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
lc.mu.Lock()
p := lc.idxToLabel.Load()
var n uint64
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
}
lc.mu.Unlock()
return n
return lc.nextIdx.Load()
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
@@ -183,139 +42,46 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
if lc.compressFast(a.A[1:], labels) {
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
lc.mu.Lock()
lc.compressSlow(a.A[1:], labels)
lc.mu.Unlock()
lc.compress(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
var keyBuf [256]byte
for i, label := range labels {
// Build composite key name+'\x00'+value into the stack buffer.
totalLen := len(label.Name) + 1 + len(label.Value)
var key string
if totalLen <= len(keyBuf) {
n := copy(keyBuf[:], label.Name)
keyBuf[n] = '\x00'
copy(keyBuf[n+1:], label.Value)
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
} else {
key = makeLabelKey(label)
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
return false
}
dst[i] = uint64(idx)
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
if lc.usedBitset.Load() != p {
for _, idx64 := range dst {
lc.markUsed(uint32(idx64))
}
}
return true
}
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
for i, label := range labels {
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
cloned := cloneLabel(label)
idx = lc.newIndex(cloned)
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[string]uint32)
}
shard.m[key] = idx
shard.mu.Unlock()
} else {
lc.markUsed(idx)
}
dst[i] = uint64(idx)
}
}
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
func (lc *LabelsCompressor) markUsed(idx uint32) {
p := lc.usedBitset.Load()
if p == nil {
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
return
}
bits := *p
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
_ = dst[len(labels)-1]
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
idx := lc.nextIdx.Add(1)
v = idx
labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx,
// so it can be found by possible concurrent goroutines.
//
// We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy)
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded {
// This label has been stored by a concurrent goroutine with different index,
// use it for key consistency in aggrState.
v = vNew
}
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
}
dst[i] = v.(uint64)
}
}
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
func (lc *LabelsCompressor) growBitset(idx uint32) {
needed := int(idx>>6) + 1
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
if needed <= len(bits) {
return
}
newBits := make([]uint64, needed)
copy(newBits, bits)
lc.usedBitset.Store(&newBits)
}
func makeLabelKey(label prompb.Label) string {
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
buf = append(buf, label.Name...)
buf = append(buf, '\x00')
buf = append(buf, label.Value...)
return string(buf)
}
// Generation returns the current generation count.
// It increments whenever labels are evicted, invalidating any cached compressed keys.
func (lc *LabelsCompressor) Generation() uint64 {
return lc.generation.Load()
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
@@ -332,35 +98,6 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
// Must be called with lc.mu held.
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
var idx uint32
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
if len(lc.freeIdxs) > 0 {
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
next[idx] = cloned
lc.idxToLabel.Store(&next)
} else {
idx = uint32(len(newIdxToLabel))
next := append(newIdxToLabel, cloned)
lc.idxToLabel.Store(&next)
}
lc.growBitset(idx)
lc.markUsed(idx)
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
return idx
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
@@ -387,139 +124,111 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
p := lc.idxToLabel.Load()
if p == nil {
encoding.PutUint64s(a)
logger.Panicf("BUG: idxToLabel is nil in Decompress")
return dst
}
labels := *p
for _, idx := range a.A {
if int(idx) >= len(labels) {
encoding.PutUint64s(a)
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
}
dst = append(dst, labels[idx])
}
dst = lc.decompress(dst, a.A)
encoding.PutUint64s(a)
return dst
}
// rotate evicts unused labels and recycles their slots.
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
// giving in-flight keys a full rotation period to drain via Decompress.
// Must not be called concurrently with itself.
func (lc *LabelsCompressor) rotate() {
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
// Also snapshot idxToLabel for the phase-1 scan.
lc.mu.Lock()
var currentBits []uint64
if p := lc.usedBitset.Load(); p != nil {
currentBits = make([]uint64, len(*p))
copy(currentBits, *p)
newBits := make([]uint64, len(*p))
lc.usedBitset.Store(&newBits)
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label)
}
return dst
}
// Skip if nothing was used; all-zero snapshots from rapid rotation would
// advance pendingIdxs and reclaim slots before in-flight keys drain.
anyUsed := false
for _, w := range currentBits {
if w != 0 {
anyUsed = true
break
// labelsMap maps uint64 key to prompb.Label
//
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
type labelsMap struct {
readOnly atomic.Pointer[[]*prompb.Label]
mutableLock sync.Mutex
mutable map[uint64]*prompb.Label
misses uint64
}
// Store stores label under the given idx.
//
// It is safe calling Store from concurrent goroutines.
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
lm.mutableLock.Lock()
if lm.mutable == nil {
lm.mutable = make(map[uint64]*prompb.Label)
}
lm.mutable[idx] = &label
lm.mutableLock.Unlock()
}
// Load returns the label for the given idx.
//
// Load returns false if lm doesn't contain label for the given idx.
//
// It is safe calling Load from concurrent goroutines.
//
// The performance of Load() scales linearly with CPU cores.
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
// Fast path - the label for the given idx has been found in lm.readOnly.
return *pLabel, true
}
}
if !anyUsed {
lc.mu.Unlock()
// Slow path - search in lm.mutable.
return lm.loadSlow(idx)
}
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
lm.mutableLock.Lock()
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
pReadOnly := lm.readOnly.Load()
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
lm.mutableLock.Unlock()
return *pLabel, true
}
}
// The label for the idx wasn't found in readOnly. Search it in mutable.
lm.misses++
pLabel := lm.mutable[idx]
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
lm.moveMutableToReadOnlyLocked(pReadOnly)
lm.misses = 0
}
lm.mutableLock.Unlock()
if pLabel == nil {
return prompb.Label{}, false
}
return *pLabel, true
}
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
if len(lm.mutable) == 0 {
// Nothing to move
return
}
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
prevSnap := lc.prevBitset
lc.prevBitset = currentBits
pendingIdxs := lc.pendingIdxs
idxSnap := lc.idxToLabel.Load()
lc.mu.Unlock()
// A label is used if present in either the current or previous period snapshot.
isUsed := func(idx uint32) bool {
word := idx >> 6
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
return inCurrent || inPrev
var labels []*prompb.Label
if pReadOnly != nil {
labels = append(labels, *pReadOnly...)
}
// Phase 2: reclaim slots evicted last rotation if still unused.
var toReclaim []uint32
var nextRelease map[uint32]struct{}
for idx := range pendingIdxs {
if isUsed(idx) {
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
for idx, pLabel := range lm.mutable {
if idx < uint64(len(labels)) {
labels[idx] = pLabel
} else {
toReclaim = append(toReclaim, idx)
for idx > uint64(len(labels)) {
labels = append(labels, nil)
}
labels = append(labels, pLabel)
}
}
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
// cache-line contention with concurrent compressFast lookups.
phase1Evicted := false
if idxSnap != nil {
for i, label := range *idxSnap {
if label == (prompb.Label{}) {
continue // freed slot
}
idx := uint32(i)
if _, inPending := pendingIdxs[idx]; inPending {
continue // already queued for release
}
if isUsed(idx) {
continue
}
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.Lock()
delete(shard.m, key)
shard.mu.Unlock()
phase1Evicted = true
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
}
}
if phase1Evicted {
lc.generation.Add(1)
}
lc.mu.Lock()
if len(toReclaim) > 0 {
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
for _, idx := range toReclaim {
label := next[idx]
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
if lc.totalSizeBytes >= entrySize {
lc.totalSizeBytes -= entrySize
}
next[idx] = prompb.Label{}
lc.freeIdxs = append(lc.freeIdxs, idx)
}
lc.idxToLabel.Store(&next)
}
lc.pendingIdxs = nextRelease
lc.mu.Unlock()
clear(lm.mutable)
lm.readOnly.Store(&labels)
}

View File

@@ -8,96 +8,47 @@ import (
)
func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
for _, labels := range series {
lc.Compress(nil, labels)
}
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
Sink.Add(uint64(len(dst)))
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
})
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
datas := make([][]byte, len(series))
var buf []byte
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
datas[i] = buf[bufLen:]
}
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
datas := make([][]byte, len(series))
var dst []byte
for i, labels := range series {
dstLen := len(dst)
dst = lc.Compress(dst, labels)
datas[i] = dst[dstLen:]
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
}
var Sink atomic.Uint64

View File

@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
ok, err := tf.match(b)
if err != nil {
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
}
if !ok {
delete(lvs, lv)

View File

@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
ts.Seek(prefixBuf.B)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
}
return false
}

View File

@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
}
defer func() {
if err := zr.Close(); err != nil {
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
}
}()

View File

@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
}
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())

View File

@@ -6,7 +6,6 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func BenchmarkDedupAggr(b *testing.B) {
@@ -64,15 +63,14 @@ func newBenchSamples(count int) []pushSample {
}
labelsLen := len(labels)
samples := make([]pushSample, count)
var lc promutil.LabelsCompressor
var keyBuf []byte
for i := range samples {
sample := &samples[i]
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
labels = append(labels[:labelsLen], prompb.Label{
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = lc.Compress(keyBuf[:0], labels)
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
sample.key = string(keyBuf)
sample.value = float64(i)
}

View File

@@ -77,8 +77,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
lc.Register(2 * interval)
d.wg.Go(func() {
d.runFlusher(pushFunc)
})
@@ -88,8 +86,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
// MustStop stops d.
func (d *Deduplicator) MustStop() {
lc.Unregister(2 * d.interval)
metrics.UnregisterSet(d.ms, true)
d.ms = nil
@@ -207,7 +203,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
labels = decompressLabels(labels, ps.key)
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompb.Sample{

View File

@@ -17,27 +17,29 @@ type aggrOutputs struct {
outputSamples *metrics.Counter
}
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key)
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
}
src = src[nSize:]
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
if !ao.useInputKey || int(outputKeyLen) == len(src) {
return outputKey, outputKey
outputKey := src[:outputKeyLen]
if !ao.useInputKey {
return key, bytesutil.ToUnsafeString(outputKey)
}
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
return outputKey, inputKey
inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
var inputKey, outputKey string
var sample *pushSample
var outputs []aggrValue
var nv *aggrValues
for i := range samples {
sample := &samples[i]
outputKey, inputKey := ao.getInputOutputKey(sample.key)
sample = &samples[i]
inputKey, outputKey = ao.getInputOutputKey(sample.key)
again:
v, ok := ao.m.Load(outputKey)
@@ -79,7 +81,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
}
av.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flush.
// The entry has been deleted by the concurrent call to flush
// Try obtaining and updating the entry again.
goto again
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
@@ -23,7 +24,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
@@ -257,10 +257,6 @@ type Config struct {
type Aggregators struct {
as []*aggregator
// workCh limits the number of concurrent aggregator.Push calls.
// Pre-allocated once to avoid per-Push channel allocation.
workCh chan struct{}
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
@@ -291,7 +287,6 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
@@ -312,7 +307,6 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
configData: configData,
filePath: filePath,
ms: ms,
@@ -370,21 +364,19 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
return matchIdxs
}
if len(a.as) == 1 {
a.as[0].Push(tss, matchIdxs)
return matchIdxs
}
// Use all available CPU cores to push time-series into aggregators in parallel.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
// use all available CPU cores to copy time-series into aggregators
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
for _, aggr := range a.as {
a.workCh <- struct{}{}
concurrencyChan <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-a.workCh
<-concurrencyChan
})
}
wg.Wait()
return matchIdxs
@@ -421,8 +413,8 @@ type aggregator struct {
ignoreOldSamples bool
enableWindows bool
bySet map[string]struct{}
withoutSet map[string]struct{}
by []string
without []string
aggregateOnlyByTime bool
// interval is the interval between flushes
@@ -454,8 +446,6 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
seriesKeyCache seriesKeyCache
wg sync.WaitGroup
stopCh chan struct{}
@@ -655,8 +645,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
enableWindows: enableWindows,
stalenessInterval: stalenessInterval,
bySet: makeStringSet(by),
withoutSet: makeStringSet(without),
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
@@ -719,8 +709,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
lc.Register(a.stalenessInterval)
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
})
@@ -927,7 +915,6 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
a.seriesKeyCache.reset()
startTime := time.Now()
ao := a.aggrOutputs
@@ -956,7 +943,6 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
lc.Unregister(a.stalenessInterval)
close(a.stopCh)
a.wg.Wait()
}
@@ -966,6 +952,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
ctx := getPushCtx()
defer putPushCtx(ctx)
buf := ctx.buf
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
@@ -999,22 +986,19 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
labels.Sort()
h := computeLabelsFingerprint(labels.Labels)
gen := lc.Generation()
key, ok := a.seriesKeyCache.get(h, gen)
if !ok {
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
bufLen := len(ctx.buf)
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
key = string(ctx.buf[bufLen:])
a.seriesKeyCache.set(h, gen, key)
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
// Skip NaN values
@@ -1058,6 +1042,8 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
ctx.buf = buf
pushSamples := a.aggrOutputs.pushSamples
if a.da != nil {
pushSamples = a.da.pushSamples
@@ -1074,13 +1060,18 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
}
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
ctx.buf = append(ctx.buf, ctx.keybuf...)
if len(inputLabels) > 0 {
ctx.buf = lc.Compress(ctx.buf, inputLabels)
}
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
bb := bbPool.Get()
bb.B = lc.Compress(bb.B, outputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...)
bbPool.Put(bb)
dst = lc.Compress(dst, inputLabels)
return dst
}
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
}
type pushCtx struct {
@@ -1090,7 +1081,6 @@ type pushCtx struct {
inputLabels promutil.Labels
outputLabels promutil.Labels
buf []byte
keybuf []byte
}
func (ctx *pushCtx) reset() {
@@ -1125,10 +1115,10 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
if len(withoutSet) > 0 {
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
if len(without) > 0 {
for _, label := range labels {
if _, ok := withoutSet[label.Name]; ok {
if slices.Contains(without, label.Name) {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1136,7 +1126,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, wit
}
} else {
for _, label := range labels {
if _, ok := bySet[label.Name]; !ok {
if !slices.Contains(by, label.Name) {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1146,17 +1136,6 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, wit
return dstInput, dstOutput
}
func makeStringSet(keys []string) map[string]struct{} {
if len(keys) == 0 {
return nil
}
m := make(map[string]struct{}, len(keys))
for _, k := range keys {
m[k] = struct{}{}
}
return m
}
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
@@ -1256,7 +1235,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
ctx.labels = decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1278,7 +1257,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
ctx.labels = decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1366,74 +1345,3 @@ func sortAndRemoveDuplicates(a []string) []string {
}
var bbPool bytesutil.ByteBufferPool
type seriesKeyCache struct {
shards [64]seriesKeyCacheShard
}
type seriesKeyCacheShard struct {
mu sync.RWMutex
m map[uint64]seriesCacheEntry
_ [32]byte // cache-line padding
}
type seriesCacheEntry struct {
generation uint64
key string
}
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
shard := &c.shards[h>>58]
shard.mu.RLock()
e, ok := shard.m[h]
shard.mu.RUnlock()
if !ok || e.generation != generation {
return "", false
}
return e.key, true
}
func (c *seriesKeyCache) set(h, generation uint64, key string) {
shard := &c.shards[h>>58]
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[uint64]seriesCacheEntry)
}
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
shard.mu.Unlock()
}
func (c *seriesKeyCache) reset() {
for i := range c.shards {
shard := &c.shards[i]
shard.mu.Lock()
shard.m = nil
shard.mu.Unlock()
}
}
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
var buf [512]byte
n := 0
for i, l := range labels {
need := len(l.Name) + 1 + len(l.Value) + 1
if n+need > len(buf) {
b := make([]byte, n, n+need*2)
copy(b, buf[:n])
for _, l2 := range labels[i:] {
b = append(b, l2.Name...)
b = append(b, '\x00')
b = append(b, l2.Value...)
b = append(b, '\x00')
}
return xxhash.Sum64(b)
}
n += copy(buf[n:], l.Name)
buf[n] = '\x00'
n++
n += copy(buf[n:], l.Value)
buf[n] = '\x00'
n++
}
return xxhash.Sum64(buf[:n])
}

View File

@@ -45,16 +45,13 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
matchIdxs := make([]uint32, len(benchSeries))
var matchIdxs []uint32
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -85,8 +82,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
a := make([]string, 0, seriesCount)
for j := range seriesCount {
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
a = append(a, s)
}
metrics := strings.Join(a, "\n")
@@ -104,16 +101,13 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
matchIdxs := make([]uint32, len(benchSeries))
var matchIdxs []uint32
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -123,17 +117,21 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
}
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
var b strings.Builder
for _, output := range outputs {
fmt.Fprintf(&b, `
outputsQuoted := make([]string, len(outputs))
var config string
for i := range outputs {
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
cfg := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, stringsutil.JSONString(output))
`, stringsutil.JSONString(outputs[i]))
config += cfg
}
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}