mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 09:46:57 +03:00
Compare commits
7 Commits
labelscomp
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2bf5d82ce | ||
|
|
bd98a1d2fa | ||
|
|
4a1ceccee4 | ||
|
|
48a3eb0215 | ||
|
|
200c03416f | ||
|
|
33d8e02ea8 | ||
|
|
e613c3fd6b |
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
)
|
||||
|
||||
func TestParseRetryAfterHeader(t *testing.T) {
|
||||
@@ -36,6 +37,34 @@ func TestParseRetryAfterHeader(t *testing.T) {
|
||||
f(time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0)
|
||||
}
|
||||
|
||||
func TestInitSecretFlags(t *testing.T) {
|
||||
showRemoteWriteURLOrig := *showRemoteWriteURL
|
||||
defer func() {
|
||||
*showRemoteWriteURL = showRemoteWriteURLOrig
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
}()
|
||||
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
*showRemoteWriteURL = false
|
||||
InitSecretFlags()
|
||||
if !flagutil.IsSecretFlag("remotewrite.url") {
|
||||
t.Fatalf("expecting remoteWrite.url to be secret")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.headers") {
|
||||
t.Fatalf("expecting remoteWrite.headers to be secret")
|
||||
}
|
||||
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
*showRemoteWriteURL = true
|
||||
InitSecretFlags()
|
||||
if flagutil.IsSecretFlag("remotewrite.url") {
|
||||
t.Fatalf("remoteWrite.url must remain visible when -remoteWrite.showURL is set")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.headers") {
|
||||
t.Fatalf("expecting remoteWrite.headers to remain secret")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepackBlockFromZstdToSnappy(t *testing.T) {
|
||||
expectedPlainBlock := []byte(`foobar`)
|
||||
|
||||
|
||||
@@ -151,6 +151,8 @@ func InitSecretFlags() {
|
||||
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
|
||||
flagutil.RegisterSecretFlag("remoteWrite.url")
|
||||
}
|
||||
// remoteWrite.headers can contain auth headers such as Authorization and API keys.
|
||||
flagutil.RegisterSecretFlag("remoteWrite.headers")
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -167,6 +169,18 @@ func Init() {
|
||||
if len(*remoteWriteURLs) == 0 {
|
||||
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
|
||||
}
|
||||
if *shardByURL && len(*disableOnDiskQueue) > 1 {
|
||||
disableOnDiskQueues := *disableOnDiskQueue
|
||||
|
||||
firstValue := disableOnDiskQueues[0]
|
||||
for _, v := range disableOnDiskQueues[1:] {
|
||||
if firstValue != v {
|
||||
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
|
||||
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if limit := getMaxHourlySeries(); limit > 0 {
|
||||
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
|
||||
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
|
||||
@@ -499,7 +513,9 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
|
||||
//
|
||||
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
|
||||
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
|
||||
if !disableOnDiskQueueAny {
|
||||
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
|
||||
if !disableOnDiskQueueAny || *shardByURL {
|
||||
return rwctxsGlobal, true
|
||||
}
|
||||
|
||||
@@ -514,12 +530,6 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
|
||||
return nil, false
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
if *shardByURL {
|
||||
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
|
||||
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
|
||||
// data to all rwctxs evenly.
|
||||
rowsCount = rowsCount / len(rwctxsGlobal)
|
||||
}
|
||||
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
|
||||
}
|
||||
eu, err := url.Parse(externalURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to parse external URL: %w", err)
|
||||
logger.Fatalf("failed to parse external URL: %s", err)
|
||||
}
|
||||
if err := templates.Load([]string{}, *eu); err != nil {
|
||||
logger.Fatalf("failed to load template: %v", err)
|
||||
|
||||
@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
|
||||
}
|
||||
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
|
||||
for _, err := range errors {
|
||||
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
|
||||
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
|
||||
}
|
||||
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
|
||||
}
|
||||
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
|
||||
for addr, metadata := range targetMts {
|
||||
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
|
||||
if err != nil {
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
|
||||
continue
|
||||
}
|
||||
updatedTargets = append(updatedTargets, Target{
|
||||
|
||||
@@ -3109,7 +3109,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3406,7 +3406,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3110,7 +3110,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3407,7 +3407,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2946,7 +2946,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2324,7 +2324,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2945,7 +2945,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2323,7 +2323,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -31,10 +31,14 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve the [Top Queries](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#top-queries) table UI. Duration columns now display human-readable values (e.g. `1.23s`) instead of raw seconds, memory column shows human-readable sizes (e.g. `1.23 MB`), instant queries are labeled as `instant` instead of empty string, and column headers now show tooltips with descriptions. See [#10790](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10790).
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. See [#10918](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). Thanks to @alexei38 for the contribution.
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys. See [#10803](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10803).
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10972](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10972).
|
||||
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
|
||||
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)
|
||||
|
||||
|
||||
@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
|
||||
srcDir := tb.path
|
||||
srcDir, err = filepath.Abs(srcDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
|
||||
}
|
||||
dstDir, err = filepath.Abs(dstDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
|
||||
}
|
||||
prefix := srcDir + string(filepath.Separator)
|
||||
if strings.HasPrefix(dstDir, prefix) {
|
||||
|
||||
@@ -3,173 +3,32 @@ package promutil
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
const lcShardCount = 128
|
||||
|
||||
// lcShard is one shard of the label-to-index map.
|
||||
// Padded to one cache line to prevent false sharing between adjacent shards.
|
||||
type lcShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]uint32
|
||||
_ [32]byte
|
||||
}
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings.
|
||||
// Zero value is ready to use. Each Register call must be paired with Unregister.
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings
|
||||
type LabelsCompressor struct {
|
||||
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
|
||||
// to reduce RWMutex contention in the concurrent compressFast hot path.
|
||||
labelToIdxShards [lcShardCount]lcShard
|
||||
labelToIdx sync.Map
|
||||
idxToLabel labelsMap
|
||||
|
||||
// generation is incremented after each rotate() call that removes labels.
|
||||
// Callers can use it to detect when cached compressed keys become stale.
|
||||
generation atomic.Uint64
|
||||
nextIdx atomic.Uint64
|
||||
|
||||
idxToLabel atomic.Pointer[[]prompb.Label]
|
||||
|
||||
// usedBitset tracks which indices were used in the current rotation period.
|
||||
// Bits are set atomically from compressFast without mu; grown under mu.
|
||||
usedBitset atomic.Pointer[[]uint64]
|
||||
|
||||
totalSizeBytes uint64
|
||||
mu sync.Mutex
|
||||
|
||||
// freeIdxs holds index slots available for reuse.
|
||||
freeIdxs []uint32
|
||||
|
||||
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
|
||||
pendingIdxs map[uint32]struct{}
|
||||
|
||||
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
|
||||
// Requires absence from both prevBitset and usedBitset to evict a label,
|
||||
// guarding against partial snapshots when rotate fires mid-compress loop.
|
||||
// Only accessed in rotate, which runs in a single goroutine.
|
||||
prevBitset []uint64
|
||||
|
||||
// registry holds staleness intervals of active callers; rotation period = max(registry).
|
||||
registry []time.Duration
|
||||
|
||||
// tickerCh signals runRotate to re-evaluate the rotation period.
|
||||
tickerCh chan struct{}
|
||||
}
|
||||
|
||||
// idleRotationPeriod is the rotation period used when no callers are registered.
|
||||
// It must be long enough that a sleeping goroutine does not consume measurable resources.
|
||||
const idleRotationPeriod = time.Hour
|
||||
|
||||
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
|
||||
// Rotation period equals max(registry). Must be paired with Unregister.
|
||||
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
if lc.tickerCh == nil {
|
||||
lc.tickerCh = make(chan struct{}, 1)
|
||||
go lc.runRotate()
|
||||
}
|
||||
lc.registry = append(lc.registry, stalenessInterval)
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Unregister removes stalenessInterval from the registry.
|
||||
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
|
||||
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
for i, d := range lc.registry {
|
||||
if d == stalenessInterval {
|
||||
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
if tickerCh != nil {
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
|
||||
// must be called with lc.mu held
|
||||
var max time.Duration
|
||||
for _, d := range lc.registry {
|
||||
if d > max {
|
||||
max = d
|
||||
}
|
||||
}
|
||||
if max == 0 {
|
||||
return idleRotationPeriod
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) runRotate() {
|
||||
lc.mu.Lock()
|
||||
period := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
|
||||
timer := time.NewTimer(period)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
lc.rotate()
|
||||
lc.mu.Lock()
|
||||
period = lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
timer.Reset(period)
|
||||
case <-lc.tickerCh:
|
||||
lc.mu.Lock()
|
||||
newPeriod := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
if newPeriod != period {
|
||||
period = newPeriod
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(period)
|
||||
}
|
||||
}
|
||||
}
|
||||
totalSizeBytes atomic.Uint64
|
||||
}
|
||||
|
||||
// SizeBytes returns the size of lc data in bytes
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
lc.mu.Lock()
|
||||
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
||||
}
|
||||
|
||||
// ItemsCount returns the number of items in lc
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
lc.mu.Lock()
|
||||
p := lc.idxToLabel.Load()
|
||||
var n uint64
|
||||
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
|
||||
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
|
||||
}
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
return lc.nextIdx.Load()
|
||||
}
|
||||
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
@@ -183,139 +42,46 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
|
||||
a := encoding.GetUint64s(len(labels) + 1)
|
||||
a.A[0] = uint64(len(labels))
|
||||
|
||||
if lc.compressFast(a.A[1:], labels) {
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
lc.compressSlow(a.A[1:], labels)
|
||||
lc.mu.Unlock()
|
||||
|
||||
lc.compress(a.A[1:], labels)
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
var keyBuf [256]byte
|
||||
for i, label := range labels {
|
||||
// Build composite key name+'\x00'+value into the stack buffer.
|
||||
totalLen := len(label.Name) + 1 + len(label.Value)
|
||||
var key string
|
||||
if totalLen <= len(keyBuf) {
|
||||
n := copy(keyBuf[:], label.Name)
|
||||
keyBuf[n] = '\x00'
|
||||
copy(keyBuf[n+1:], label.Value)
|
||||
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
|
||||
} else {
|
||||
key = makeLabelKey(label)
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
}
|
||||
}
|
||||
}
|
||||
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
|
||||
if lc.usedBitset.Load() != p {
|
||||
for _, idx64 := range dst {
|
||||
lc.markUsed(uint32(idx64))
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
|
||||
for i, label := range labels {
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
cloned := cloneLabel(label)
|
||||
idx = lc.newIndex(cloned)
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[string]uint32)
|
||||
}
|
||||
shard.m[key] = idx
|
||||
shard.mu.Unlock()
|
||||
} else {
|
||||
lc.markUsed(idx)
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
}
|
||||
}
|
||||
|
||||
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
|
||||
func (lc *LabelsCompressor) markUsed(idx uint32) {
|
||||
p := lc.usedBitset.Load()
|
||||
if p == nil {
|
||||
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
||||
if len(labels) == 0 {
|
||||
return
|
||||
}
|
||||
bits := *p
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
_ = dst[len(labels)-1]
|
||||
for i, label := range labels {
|
||||
v, ok := lc.labelToIdx.Load(label)
|
||||
if !ok {
|
||||
idx := lc.nextIdx.Add(1)
|
||||
v = idx
|
||||
labelCopy := cloneLabel(label)
|
||||
|
||||
// Must store idxToLabel entry before labelToIdx,
|
||||
// so it can be found by possible concurrent goroutines.
|
||||
//
|
||||
// We might store duplicated entries for single label with different indexes,
|
||||
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
|
||||
lc.idxToLabel.Store(idx, labelCopy)
|
||||
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
|
||||
if loaded {
|
||||
// This label has been stored by a concurrent goroutine with different index,
|
||||
// use it for key consistency in aggrState.
|
||||
v = vNew
|
||||
}
|
||||
|
||||
// Update lc.totalSizeBytes
|
||||
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
||||
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
||||
lc.totalSizeBytes.Add(entrySizeBytes)
|
||||
}
|
||||
dst[i] = v.(uint64)
|
||||
}
|
||||
}
|
||||
|
||||
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) growBitset(idx uint32) {
|
||||
needed := int(idx>>6) + 1
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
if needed <= len(bits) {
|
||||
return
|
||||
}
|
||||
newBits := make([]uint64, needed)
|
||||
copy(newBits, bits)
|
||||
lc.usedBitset.Store(&newBits)
|
||||
}
|
||||
|
||||
func makeLabelKey(label prompb.Label) string {
|
||||
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
|
||||
buf = append(buf, label.Name...)
|
||||
buf = append(buf, '\x00')
|
||||
buf = append(buf, label.Value...)
|
||||
return string(buf)
|
||||
}
|
||||
|
||||
// Generation returns the current generation count.
|
||||
// It increments whenever labels are evicted, invalidating any cached compressed keys.
|
||||
func (lc *LabelsCompressor) Generation() uint64 {
|
||||
return lc.generation.Load()
|
||||
}
|
||||
|
||||
func cloneLabel(label prompb.Label) prompb.Label {
|
||||
// pre-allocate memory for label name and value
|
||||
n := len(label.Name) + len(label.Value)
|
||||
@@ -332,35 +98,6 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
||||
}
|
||||
}
|
||||
|
||||
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
|
||||
// Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
|
||||
var idx uint32
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
|
||||
if len(lc.freeIdxs) > 0 {
|
||||
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
|
||||
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
next[idx] = cloned
|
||||
lc.idxToLabel.Store(&next)
|
||||
} else {
|
||||
idx = uint32(len(newIdxToLabel))
|
||||
next := append(newIdxToLabel, cloned)
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
|
||||
lc.growBitset(idx)
|
||||
lc.markUsed(idx)
|
||||
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
|
||||
return idx
|
||||
}
|
||||
|
||||
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
@@ -387,139 +124,111 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
||||
if len(tail) > 0 {
|
||||
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
||||
}
|
||||
|
||||
p := lc.idxToLabel.Load()
|
||||
if p == nil {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: idxToLabel is nil in Decompress")
|
||||
return dst
|
||||
}
|
||||
labels := *p
|
||||
for _, idx := range a.A {
|
||||
if int(idx) >= len(labels) {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
|
||||
}
|
||||
dst = append(dst, labels[idx])
|
||||
}
|
||||
|
||||
dst = lc.decompress(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
// rotate evicts unused labels and recycles their slots.
|
||||
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
|
||||
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
|
||||
// giving in-flight keys a full rotation period to drain via Decompress.
|
||||
// Must not be called concurrently with itself.
|
||||
func (lc *LabelsCompressor) rotate() {
|
||||
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
|
||||
// Also snapshot idxToLabel for the phase-1 scan.
|
||||
lc.mu.Lock()
|
||||
var currentBits []uint64
|
||||
if p := lc.usedBitset.Load(); p != nil {
|
||||
currentBits = make([]uint64, len(*p))
|
||||
copy(currentBits, *p)
|
||||
newBits := make([]uint64, len(*p))
|
||||
lc.usedBitset.Store(&newBits)
|
||||
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
for _, idx := range src {
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: missing label for idx=%d", idx)
|
||||
}
|
||||
dst = append(dst, label)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// Skip if nothing was used; all-zero snapshots from rapid rotation would
|
||||
// advance pendingIdxs and reclaim slots before in-flight keys drain.
|
||||
anyUsed := false
|
||||
for _, w := range currentBits {
|
||||
if w != 0 {
|
||||
anyUsed = true
|
||||
break
|
||||
// labelsMap maps uint64 key to prompb.Label
|
||||
//
|
||||
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
||||
type labelsMap struct {
|
||||
readOnly atomic.Pointer[[]*prompb.Label]
|
||||
|
||||
mutableLock sync.Mutex
|
||||
mutable map[uint64]*prompb.Label
|
||||
misses uint64
|
||||
}
|
||||
|
||||
// Store stores label under the given idx.
|
||||
//
|
||||
// It is safe calling Store from concurrent goroutines.
|
||||
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
|
||||
lm.mutableLock.Lock()
|
||||
if lm.mutable == nil {
|
||||
lm.mutable = make(map[uint64]*prompb.Label)
|
||||
}
|
||||
lm.mutable[idx] = &label
|
||||
lm.mutableLock.Unlock()
|
||||
}
|
||||
|
||||
// Load returns the label for the given idx.
|
||||
//
|
||||
// Load returns false if lm doesn't contain label for the given idx.
|
||||
//
|
||||
// It is safe calling Load from concurrent goroutines.
|
||||
//
|
||||
// The performance of Load() scales linearly with CPU cores.
|
||||
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
|
||||
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
// Fast path - the label for the given idx has been found in lm.readOnly.
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
if !anyUsed {
|
||||
lc.mu.Unlock()
|
||||
|
||||
// Slow path - search in lm.mutable.
|
||||
return lm.loadSlow(idx)
|
||||
}
|
||||
|
||||
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
|
||||
lm.mutableLock.Lock()
|
||||
|
||||
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
||||
pReadOnly := lm.readOnly.Load()
|
||||
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
lm.mutableLock.Unlock()
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
|
||||
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
||||
lm.misses++
|
||||
pLabel := lm.mutable[idx]
|
||||
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
||||
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
||||
lm.misses = 0
|
||||
}
|
||||
lm.mutableLock.Unlock()
|
||||
|
||||
if pLabel == nil {
|
||||
return prompb.Label{}, false
|
||||
}
|
||||
return *pLabel, true
|
||||
}
|
||||
|
||||
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
||||
if len(lm.mutable) == 0 {
|
||||
// Nothing to move
|
||||
return
|
||||
}
|
||||
|
||||
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
|
||||
prevSnap := lc.prevBitset
|
||||
lc.prevBitset = currentBits
|
||||
pendingIdxs := lc.pendingIdxs
|
||||
idxSnap := lc.idxToLabel.Load()
|
||||
lc.mu.Unlock()
|
||||
|
||||
// A label is used if present in either the current or previous period snapshot.
|
||||
isUsed := func(idx uint32) bool {
|
||||
word := idx >> 6
|
||||
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
|
||||
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
|
||||
return inCurrent || inPrev
|
||||
var labels []*prompb.Label
|
||||
if pReadOnly != nil {
|
||||
labels = append(labels, *pReadOnly...)
|
||||
}
|
||||
|
||||
// Phase 2: reclaim slots evicted last rotation if still unused.
|
||||
var toReclaim []uint32
|
||||
var nextRelease map[uint32]struct{}
|
||||
for idx := range pendingIdxs {
|
||||
if isUsed(idx) {
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
nextRelease[idx] = struct{}{}
|
||||
for idx, pLabel := range lm.mutable {
|
||||
if idx < uint64(len(labels)) {
|
||||
labels[idx] = pLabel
|
||||
} else {
|
||||
toReclaim = append(toReclaim, idx)
|
||||
for idx > uint64(len(labels)) {
|
||||
labels = append(labels, nil)
|
||||
}
|
||||
labels = append(labels, pLabel)
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
|
||||
// cache-line contention with concurrent compressFast lookups.
|
||||
phase1Evicted := false
|
||||
if idxSnap != nil {
|
||||
for i, label := range *idxSnap {
|
||||
if label == (prompb.Label{}) {
|
||||
continue // freed slot
|
||||
}
|
||||
idx := uint32(i)
|
||||
if _, inPending := pendingIdxs[idx]; inPending {
|
||||
continue // already queued for release
|
||||
}
|
||||
if isUsed(idx) {
|
||||
continue
|
||||
}
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.Lock()
|
||||
delete(shard.m, key)
|
||||
shard.mu.Unlock()
|
||||
phase1Evicted = true
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
nextRelease[idx] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if phase1Evicted {
|
||||
lc.generation.Add(1)
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
if len(toReclaim) > 0 {
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
for _, idx := range toReclaim {
|
||||
label := next[idx]
|
||||
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
|
||||
if lc.totalSizeBytes >= entrySize {
|
||||
lc.totalSizeBytes -= entrySize
|
||||
}
|
||||
next[idx] = prompb.Label{}
|
||||
lc.freeIdxs = append(lc.freeIdxs, idx)
|
||||
}
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
lc.pendingIdxs = nextRelease
|
||||
lc.mu.Unlock()
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
}
|
||||
|
||||
@@ -8,96 +8,47 @@ import (
|
||||
)
|
||||
|
||||
func BenchmarkLabelsCompressorCompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
for _, labels := range series {
|
||||
lc.Compress(nil, labels)
|
||||
}
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
Sink.Add(uint64(len(dst)))
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
Sink.Add(uint64(len(dst)))
|
||||
}
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
datas := make([][]byte, len(series))
|
||||
var buf []byte
|
||||
for i, labels := range series {
|
||||
bufLen := len(buf)
|
||||
buf = lc.Compress(buf, labels)
|
||||
datas[i] = buf[bufLen:]
|
||||
}
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
}
|
||||
datas := make([][]byte, len(series))
|
||||
var dst []byte
|
||||
for i, labels := range series {
|
||||
dstLen := len(dst)
|
||||
dst = lc.Compress(dst, labels)
|
||||
datas[i] = dst[dstLen:]
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var Sink atomic.Uint64
|
||||
|
||||
@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
|
||||
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
|
||||
ok, err := tf.match(b)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
|
||||
}
|
||||
if !ok {
|
||||
delete(lvs, lv)
|
||||
|
||||
@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
|
||||
ts.Seek(prefixBuf.B)
|
||||
if !ts.NextItem() {
|
||||
if err := ts.Error(); err != nil {
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
|
||||
}
|
||||
defer func() {
|
||||
if err := zr.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
|
||||
|
||||
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
|
||||
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
}
|
||||
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
)
|
||||
|
||||
func BenchmarkDedupAggr(b *testing.B) {
|
||||
@@ -64,15 +63,14 @@ func newBenchSamples(count int) []pushSample {
|
||||
}
|
||||
labelsLen := len(labels)
|
||||
samples := make([]pushSample, count)
|
||||
var lc promutil.LabelsCompressor
|
||||
var keyBuf []byte
|
||||
for i := range samples {
|
||||
sample := &samples[i]
|
||||
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
|
||||
labels = append(labels[:labelsLen], prompb.Label{
|
||||
Name: "app",
|
||||
Value: fmt.Sprintf("instance-%d", i),
|
||||
})
|
||||
keyBuf = lc.Compress(keyBuf[:0], labels)
|
||||
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
|
||||
sample.key = string(keyBuf)
|
||||
sample.value = float64(i)
|
||||
}
|
||||
|
||||
@@ -77,8 +77,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
|
||||
lc.Register(2 * interval)
|
||||
|
||||
d.wg.Go(func() {
|
||||
d.runFlusher(pushFunc)
|
||||
})
|
||||
@@ -88,8 +86,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
// MustStop stops d.
|
||||
func (d *Deduplicator) MustStop() {
|
||||
lc.Unregister(2 * d.interval)
|
||||
|
||||
metrics.UnregisterSet(d.ms, true)
|
||||
d.ms = nil
|
||||
|
||||
@@ -207,7 +203,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
|
||||
dstSamples := ctx.samples
|
||||
for _, ps := range samples {
|
||||
labelsLen := len(labels)
|
||||
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
|
||||
labels = decompressLabels(labels, ps.key)
|
||||
|
||||
dstSamplesLen := len(dstSamples)
|
||||
dstSamples = append(dstSamples, prompb.Sample{
|
||||
|
||||
@@ -17,27 +17,29 @@ type aggrOutputs struct {
|
||||
outputSamples *metrics.Counter
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
|
||||
src := bytesutil.ToUnsafeBytes(key)
|
||||
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||
if nSize <= 0 {
|
||||
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
|
||||
}
|
||||
src = src[nSize:]
|
||||
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
|
||||
if !ao.useInputKey || int(outputKeyLen) == len(src) {
|
||||
return outputKey, outputKey
|
||||
outputKey := src[:outputKeyLen]
|
||||
if !ao.useInputKey {
|
||||
return key, bytesutil.ToUnsafeString(outputKey)
|
||||
}
|
||||
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
|
||||
return outputKey, inputKey
|
||||
inputKey := src[outputKeyLen:]
|
||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
|
||||
var inputKey, outputKey string
|
||||
var sample *pushSample
|
||||
var outputs []aggrValue
|
||||
var nv *aggrValues
|
||||
for i := range samples {
|
||||
sample := &samples[i]
|
||||
outputKey, inputKey := ao.getInputOutputKey(sample.key)
|
||||
sample = &samples[i]
|
||||
inputKey, outputKey = ao.getInputOutputKey(sample.key)
|
||||
|
||||
again:
|
||||
v, ok := ao.m.Load(outputKey)
|
||||
@@ -79,7 +81,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
|
||||
}
|
||||
av.mu.Unlock()
|
||||
if deleted {
|
||||
// The entry has been deleted by the concurrent call to flush.
|
||||
// The entry has been deleted by the concurrent call to flush
|
||||
// Try obtaining and updating the entry again.
|
||||
goto again
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -23,7 +24,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
@@ -257,10 +257,6 @@ type Config struct {
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
|
||||
// workCh limits the number of concurrent aggregator.Push calls.
|
||||
// Pre-allocated once to avoid per-Push channel allocation.
|
||||
workCh chan struct{}
|
||||
|
||||
// configData contains marshaled configs.
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
@@ -291,7 +287,6 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
}
|
||||
|
||||
ms := metrics.NewSet()
|
||||
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
for i, cfg := range cfgs {
|
||||
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
|
||||
@@ -312,7 +307,6 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
|
||||
configData: configData,
|
||||
filePath: filePath,
|
||||
ms: ms,
|
||||
@@ -370,21 +364,19 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
if len(a.as) == 1 {
|
||||
a.as[0].Push(tss, matchIdxs)
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
// Use all available CPU cores to push time-series into aggregators in parallel.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
// use all available CPU cores to copy time-series into aggregators
|
||||
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
var wg sync.WaitGroup
|
||||
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
for _, aggr := range a.as {
|
||||
a.workCh <- struct{}{}
|
||||
concurrencyChan <- struct{}{}
|
||||
wg.Go(func() {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
<-a.workCh
|
||||
<-concurrencyChan
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return matchIdxs
|
||||
@@ -421,8 +413,8 @@ type aggregator struct {
|
||||
ignoreOldSamples bool
|
||||
enableWindows bool
|
||||
|
||||
bySet map[string]struct{}
|
||||
withoutSet map[string]struct{}
|
||||
by []string
|
||||
without []string
|
||||
aggregateOnlyByTime bool
|
||||
|
||||
// interval is the interval between flushes
|
||||
@@ -454,8 +446,6 @@ type aggregator struct {
|
||||
// for `interval: 1m`, `by: [job]`
|
||||
suffix string
|
||||
|
||||
seriesKeyCache seriesKeyCache
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
|
||||
@@ -655,8 +645,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
enableWindows: enableWindows,
|
||||
stalenessInterval: stalenessInterval,
|
||||
|
||||
bySet: makeStringSet(by),
|
||||
withoutSet: makeStringSet(without),
|
||||
by: by,
|
||||
without: without,
|
||||
aggregateOnlyByTime: aggregateOnlyByTime,
|
||||
|
||||
interval: interval,
|
||||
@@ -719,8 +709,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
}
|
||||
a.cs.Store(cs)
|
||||
|
||||
lc.Register(a.stalenessInterval)
|
||||
|
||||
a.wg.Go(func() {
|
||||
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
|
||||
})
|
||||
@@ -927,7 +915,6 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
|
||||
//
|
||||
// If pushFunc is nil, then the aggregator state is just reset.
|
||||
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
|
||||
a.seriesKeyCache.reset()
|
||||
startTime := time.Now()
|
||||
ao := a.aggrOutputs
|
||||
|
||||
@@ -956,7 +943,6 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
//
|
||||
// The aggregator stops pushing the aggregated metrics after this call.
|
||||
func (a *aggregator) MustStop() {
|
||||
lc.Unregister(a.stalenessInterval)
|
||||
close(a.stopCh)
|
||||
a.wg.Wait()
|
||||
}
|
||||
@@ -966,6 +952,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
buf := ctx.buf
|
||||
labels := &ctx.labels
|
||||
inputLabels := &ctx.inputLabels
|
||||
outputLabels := &ctx.outputLabels
|
||||
@@ -999,22 +986,19 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
labels.Sort()
|
||||
|
||||
h := computeLabelsFingerprint(labels.Labels)
|
||||
gen := lc.Generation()
|
||||
key, ok := a.seriesKeyCache.get(h, gen)
|
||||
if !ok {
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
bufLen := len(ctx.buf)
|
||||
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
|
||||
key = string(ctx.buf[bufLen:])
|
||||
a.seriesKeyCache.set(h, gen, key)
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
|
||||
bufLen := len(buf)
|
||||
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
|
||||
// key remains valid only by the end of this function and can't be reused after
|
||||
// do not intern key because number of unique keys could be too high
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, s := range ts.Samples {
|
||||
if math.IsNaN(s.Value) {
|
||||
// Skip NaN values
|
||||
@@ -1058,6 +1042,8 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
|
||||
|
||||
ctx.buf = buf
|
||||
|
||||
pushSamples := a.aggrOutputs.pushSamples
|
||||
if a.da != nil {
|
||||
pushSamples = a.da.pushSamples
|
||||
@@ -1074,13 +1060,18 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
|
||||
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
|
||||
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
|
||||
ctx.buf = append(ctx.buf, ctx.keybuf...)
|
||||
if len(inputLabels) > 0 {
|
||||
ctx.buf = lc.Compress(ctx.buf, inputLabels)
|
||||
}
|
||||
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
|
||||
bb := bbPool.Get()
|
||||
bb.B = lc.Compress(bb.B, outputLabels)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
||||
dst = append(dst, bb.B...)
|
||||
bbPool.Put(bb)
|
||||
dst = lc.Compress(dst, inputLabels)
|
||||
return dst
|
||||
}
|
||||
|
||||
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
}
|
||||
|
||||
type pushCtx struct {
|
||||
@@ -1090,7 +1081,6 @@ type pushCtx struct {
|
||||
inputLabels promutil.Labels
|
||||
outputLabels promutil.Labels
|
||||
buf []byte
|
||||
keybuf []byte
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
@@ -1125,10 +1115,10 @@ func putPushCtx(ctx *pushCtx) {
|
||||
|
||||
var pushCtxPool sync.Pool
|
||||
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
|
||||
if len(withoutSet) > 0 {
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
|
||||
if len(without) > 0 {
|
||||
for _, label := range labels {
|
||||
if _, ok := withoutSet[label.Name]; ok {
|
||||
if slices.Contains(without, label.Name) {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1136,7 +1126,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, wit
|
||||
}
|
||||
} else {
|
||||
for _, label := range labels {
|
||||
if _, ok := bySet[label.Name]; !ok {
|
||||
if !slices.Contains(by, label.Name) {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1146,17 +1136,6 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, wit
|
||||
return dstInput, dstOutput
|
||||
}
|
||||
|
||||
func makeStringSet(keys []string) map[string]struct{} {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
m := make(map[string]struct{}, len(keys))
|
||||
for _, k := range keys {
|
||||
m[k] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
|
||||
v := flushCtxPool.Get()
|
||||
if v == nil {
|
||||
@@ -1256,7 +1235,7 @@ func (ctx *flushCtx) flushSeries() {
|
||||
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1278,7 +1257,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1366,74 +1345,3 @@ func sortAndRemoveDuplicates(a []string) []string {
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
type seriesKeyCache struct {
|
||||
shards [64]seriesKeyCacheShard
|
||||
}
|
||||
|
||||
type seriesKeyCacheShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[uint64]seriesCacheEntry
|
||||
_ [32]byte // cache-line padding
|
||||
}
|
||||
|
||||
type seriesCacheEntry struct {
|
||||
generation uint64
|
||||
key string
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.RLock()
|
||||
e, ok := shard.m[h]
|
||||
shard.mu.RUnlock()
|
||||
if !ok || e.generation != generation {
|
||||
return "", false
|
||||
}
|
||||
return e.key, true
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) set(h, generation uint64, key string) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[uint64]seriesCacheEntry)
|
||||
}
|
||||
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) reset() {
|
||||
for i := range c.shards {
|
||||
shard := &c.shards[i]
|
||||
shard.mu.Lock()
|
||||
shard.m = nil
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
|
||||
var buf [512]byte
|
||||
n := 0
|
||||
for i, l := range labels {
|
||||
need := len(l.Name) + 1 + len(l.Value) + 1
|
||||
if n+need > len(buf) {
|
||||
b := make([]byte, n, n+need*2)
|
||||
copy(b, buf[:n])
|
||||
for _, l2 := range labels[i:] {
|
||||
b = append(b, l2.Name...)
|
||||
b = append(b, '\x00')
|
||||
b = append(b, l2.Value...)
|
||||
b = append(b, '\x00')
|
||||
}
|
||||
return xxhash.Sum64(b)
|
||||
}
|
||||
n += copy(buf[n:], l.Name)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
n += copy(buf[n:], l.Value)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
}
|
||||
return xxhash.Sum64(buf[:n])
|
||||
}
|
||||
|
||||
@@ -45,16 +45,13 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
a := newBenchAggregators([]string{output}, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
var matchIdxs []uint32
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -85,8 +82,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
|
||||
a := make([]string, 0, seriesCount)
|
||||
for j := range seriesCount {
|
||||
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
|
||||
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
|
||||
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
|
||||
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
|
||||
a = append(a, s)
|
||||
}
|
||||
metrics := strings.Join(a, "\n")
|
||||
@@ -104,16 +101,13 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
var matchIdxs []uint32
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -123,17 +117,21 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
}
|
||||
|
||||
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
var b strings.Builder
|
||||
for _, output := range outputs {
|
||||
fmt.Fprintf(&b, `
|
||||
outputsQuoted := make([]string, len(outputs))
|
||||
var config string
|
||||
for i := range outputs {
|
||||
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
|
||||
cfg := fmt.Sprintf(`
|
||||
- match: http_requests_total
|
||||
interval: 24h
|
||||
by: [job]
|
||||
outputs: [%s]
|
||||
`, stringsutil.JSONString(output))
|
||||
`, stringsutil.JSONString(outputs[i]))
|
||||
config += cfg
|
||||
|
||||
}
|
||||
|
||||
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
|
||||
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user