mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-02 08:32:46 +03:00
Compare commits
1 Commits
shared-vms
...
streamaggr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3eefe6483 |
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add `vm_streamaggr_counter_resets_total` metric for `total*`, `increase*` and `rate*` outputs that is useful for aggregation behaviour tracking.
|
||||
|
||||
## [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0)
|
||||
|
||||
Released at 2026-04-10
|
||||
|
||||
@@ -571,6 +571,7 @@ Below is an example of an `aggr.yaml` configuration that drops the `replica` and
|
||||
# Troubleshooting
|
||||
|
||||
- [Unexpected spikes for `total` or `increase` outputs](#staleness).
|
||||
- [Excessively large values for `total*`, `increase*`, and `rate*` outputs](#counter-resets).
|
||||
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#staleness).
|
||||
- [High memory usage and CPU usage](#high-resource-usage).
|
||||
- [Unexpected results in vmagent cluster mode](#cluster-mode).
|
||||
@@ -601,6 +602,10 @@ the following settings:
|
||||
- `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#stream-aggregation-config).
|
||||
It allows enabling aggregation windows for a specific aggregator.
|
||||
|
||||
## Counter resets
|
||||
|
||||
If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, exhibit values that are significantly higher than anticipated, it is advisable to examine the `vm_streamaggr_counter_resets` metric. The observation of frequent counter resets may indicate potential issues with the raw data, including possible series collisions.
|
||||
|
||||
## Staleness
|
||||
|
||||
The following outputs track the last seen per-series values in order to properly calculate output values:
|
||||
|
||||
232
lib/streamaggr/dedup
Normal file
232
lib/streamaggr/dedup
Normal file
@@ -0,0 +1,232 @@
|
||||
diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go
|
||||
index ee87c7786f..e1038b2706 100644
|
||||
--- a/lib/streamaggr/dedup.go
|
||||
+++ b/lib/streamaggr/dedup.go
|
||||
@@ -1,6 +1,7 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
+ "math/bits"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
@@ -11,7 +12,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
- "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
const dedupAggrShardsCount = 128
|
||||
@@ -30,9 +30,11 @@ type dedupAggrShard struct {
|
||||
}
|
||||
|
||||
type dedupAggrState struct {
|
||||
- m map[string]*dedupAggrSample
|
||||
mu sync.Mutex
|
||||
samplesBuf []dedupAggrSample
|
||||
+ mask uint64
|
||||
+ keys []string
|
||||
+ count int
|
||||
sizeBytes atomic.Uint64
|
||||
itemsCount atomic.Uint64
|
||||
}
|
||||
@@ -43,6 +45,9 @@ type dedupAggrShardNopad struct {
|
||||
}
|
||||
|
||||
type dedupAggrSample struct {
|
||||
+ hash uint64
|
||||
+ keyIdx uint32
|
||||
+ _ uint32 // padding so value stays 8-byte aligned
|
||||
value float64
|
||||
timestamp int64
|
||||
}
|
||||
@@ -55,9 +60,8 @@ func newDedupAggr() *dedupAggr {
|
||||
|
||||
func (da *dedupAggr) sizeBytes() uint64 {
|
||||
n := uint64(unsafe.Sizeof(*da))
|
||||
- var shard *dedupAggrShard
|
||||
for i := range da.shards {
|
||||
- shard = &da.shards[i]
|
||||
+ shard := &da.shards[i]
|
||||
n += shard.blue.sizeBytes.Load()
|
||||
n += shard.green.sizeBytes.Load()
|
||||
}
|
||||
@@ -66,9 +70,8 @@ func (da *dedupAggr) sizeBytes() uint64 {
|
||||
|
||||
func (da *dedupAggr) itemsCount() uint64 {
|
||||
n := uint64(0)
|
||||
- var shard *dedupAggrShard
|
||||
for i := range da.shards {
|
||||
- shard = &da.shards[i]
|
||||
+ shard := &da.shards[i]
|
||||
n += shard.blue.itemsCount.Load()
|
||||
n += shard.green.itemsCount.Load()
|
||||
}
|
||||
@@ -169,7 +172,6 @@ var perShardSamplesPool sync.Pool
|
||||
|
||||
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
var state *dedupAggrState
|
||||
-
|
||||
if isGreen {
|
||||
state = &das.green
|
||||
} else {
|
||||
@@ -177,29 +179,81 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
}
|
||||
|
||||
state.mu.Lock()
|
||||
- defer state.mu.Unlock()
|
||||
- if state.m == nil {
|
||||
- state.m = make(map[string]*dedupAggrSample, len(samples))
|
||||
+ if state.samplesBuf == nil {
|
||||
+ n := nextPow2(max(16, len(samples)*2))
|
||||
+ state.samplesBuf = make([]dedupAggrSample, n)
|
||||
+ state.mask = uint64(n - 1)
|
||||
}
|
||||
- samplesBuf := state.samplesBuf
|
||||
- for _, sample := range samples {
|
||||
- s, ok := state.m[sample.key]
|
||||
- if !ok {
|
||||
- samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
|
||||
- s = &samplesBuf[len(samplesBuf)-1]
|
||||
- s.value = sample.value
|
||||
- s.timestamp = sample.timestamp
|
||||
-
|
||||
- key := bytesutil.InternString(sample.key)
|
||||
- state.m[key] = s
|
||||
-
|
||||
- state.itemsCount.Add(1)
|
||||
- state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
||||
+ for i := range samples {
|
||||
+ state.push(&samples[i])
|
||||
+ }
|
||||
+ sz := uint64(state.count) * uint64(unsafe.Sizeof(dedupAggrSample{}))
|
||||
+ cnt := uint64(state.count)
|
||||
+ state.sizeBytes.Store(sz)
|
||||
+ state.itemsCount.Store(cnt)
|
||||
+ state.mu.Unlock()
|
||||
+}
|
||||
+
|
||||
+// push inserts or deduplicates a single sample into the hash table.
|
||||
+// Must be called with state.mu held.
|
||||
+func (s *dedupAggrState) push(sample *pushSample) {
|
||||
+ if s.count*4 >= len(s.samplesBuf)*3 {
|
||||
+ s.grow()
|
||||
+ }
|
||||
+
|
||||
+ key := sample.key
|
||||
+ h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
+ idx := h & s.mask
|
||||
+
|
||||
+ for {
|
||||
+ e := &s.samplesBuf[idx]
|
||||
+ if e.keyIdx == 0 {
|
||||
+ s.keys = append(s.keys, key)
|
||||
+ e.hash = h
|
||||
+ e.keyIdx = uint32(len(s.keys)) // 1-based
|
||||
+ e.value = sample.value
|
||||
+ e.timestamp = sample.timestamp
|
||||
+ s.count++
|
||||
+ return
|
||||
+ }
|
||||
+ // Hash check first to skip the string comparison in the common case.
|
||||
+ if e.hash == h && s.keys[e.keyIdx-1] == key {
|
||||
+ e.timestamp, e.value = deduplicateSamples(e.timestamp, sample.timestamp, e.value, sample.value)
|
||||
+ return
|
||||
+ }
|
||||
+ idx = (idx + 1) & s.mask
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
+// grow doubles the hash table capacity and rehashes all existing entries.
|
||||
+// Must be called with state.mu held.
|
||||
+func (s *dedupAggrState) grow() {
|
||||
+ newSize := len(s.samplesBuf) * 2
|
||||
+ if newSize == 0 {
|
||||
+ newSize = 16
|
||||
+ }
|
||||
+ newSamplesBuf := make([]dedupAggrSample, newSize)
|
||||
+ newMask := uint64(newSize - 1)
|
||||
+ for _, e := range s.samplesBuf {
|
||||
+ if e.keyIdx == 0 {
|
||||
continue
|
||||
}
|
||||
- s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
||||
+ idx := e.hash & newMask
|
||||
+ for newSamplesBuf[idx].keyIdx != 0 {
|
||||
+ idx = (idx + 1) & newMask
|
||||
+ }
|
||||
+ newSamplesBuf[idx] = e
|
||||
}
|
||||
- state.samplesBuf = samplesBuf
|
||||
+ s.samplesBuf = newSamplesBuf
|
||||
+ s.mask = newMask
|
||||
+}
|
||||
+
|
||||
+// nextPow2 returns the smallest power of two >= n.
|
||||
+func nextPow2(n int) int {
|
||||
+ if n <= 1 {
|
||||
+ return 1
|
||||
+ }
|
||||
+ return 1 << bits.Len(uint(n-1))
|
||||
}
|
||||
|
||||
// deduplicateSamples returns deduplicated timestamp and value results.
|
||||
@@ -208,8 +262,6 @@ func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
|
||||
if newT > oldT {
|
||||
return newT, newV
|
||||
}
|
||||
- // if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
|
||||
- // always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
|
||||
if newT == oldT {
|
||||
if decimal.IsStaleNaN(oldV) {
|
||||
return newT, newV
|
||||
@@ -222,7 +274,6 @@ func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
|
||||
}
|
||||
|
||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
||||
- var m map[string]*dedupAggrSample
|
||||
var state *dedupAggrState
|
||||
if ctx.isGreen {
|
||||
state = &das.green
|
||||
@@ -231,23 +282,28 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
||||
}
|
||||
|
||||
state.mu.Lock()
|
||||
- if len(state.m) > 0 {
|
||||
- m = state.m
|
||||
- state.m = make(map[string]*dedupAggrSample, len(state.m))
|
||||
- state.samplesBuf = make([]dedupAggrSample, 0, len(state.samplesBuf))
|
||||
- state.sizeBytes.Store(0)
|
||||
- state.itemsCount.Store(0)
|
||||
- }
|
||||
- state.mu.Unlock()
|
||||
-
|
||||
- if len(m) == 0 {
|
||||
+ if state.count == 0 {
|
||||
+ state.mu.Unlock()
|
||||
return
|
||||
}
|
||||
+ samplesBuf := state.samplesBuf
|
||||
+ keys := state.keys
|
||||
+ n := len(samplesBuf)
|
||||
+ state.samplesBuf = make([]dedupAggrSample, n)
|
||||
+ state.mask = uint64(n - 1)
|
||||
+ state.keys = make([]string, 0, len(keys))
|
||||
+ state.count = 0
|
||||
+ state.sizeBytes.Store(0)
|
||||
+ state.itemsCount.Store(0)
|
||||
+ state.mu.Unlock()
|
||||
|
||||
dstSamples := ctx.samples
|
||||
- for key, s := range m {
|
||||
+ for _, s := range samplesBuf {
|
||||
+ if s.keyIdx == 0 {
|
||||
+ continue
|
||||
+ }
|
||||
dstSamples = append(dstSamples, pushSample{
|
||||
- key: key,
|
||||
+ key: keys[s.keyIdx-1],
|
||||
value: s.value,
|
||||
timestamp: s.timestamp,
|
||||
})
|
||||
@@ -1,8 +1,11 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
)
|
||||
|
||||
@@ -92,7 +95,8 @@ type rateAggrValue struct {
|
||||
isGreen bool
|
||||
}
|
||||
|
||||
func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
ac := c.(*rateAggrConfig)
|
||||
var state *rateAggrStateValue
|
||||
sv, ok := av.shared[key]
|
||||
if ok {
|
||||
@@ -106,6 +110,7 @@ func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string
|
||||
} else {
|
||||
// counter reset
|
||||
state.increase += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else {
|
||||
sv = getRateAggrSharedValue(av.isGreen)
|
||||
@@ -169,14 +174,17 @@ func (av *rateAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newRateAggrConfig(isAvg bool) aggrConfig {
|
||||
return &rateAggrConfig{
|
||||
func newRateAggrConfig(ms *metrics.Set, metricLabels string, isAvg bool) aggrConfig {
|
||||
cfg := rateAggrConfig{
|
||||
isAvg: isAvg,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return &cfg
|
||||
}
|
||||
|
||||
type rateAggrConfig struct {
|
||||
isAvg bool
|
||||
isAvg bool
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
func (*rateAggrConfig) getValue(s any) aggrValue {
|
||||
|
||||
@@ -613,7 +613,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
}
|
||||
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
ac, err := newOutputConfig(output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -723,7 +724,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
// check for duplicated output
|
||||
if _, ok := outputsSeen[output]; ok {
|
||||
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
||||
@@ -769,9 +770,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "histogram_bucket":
|
||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||
case "increase":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
case "increase_prometheus":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
case "last":
|
||||
return newLastAggrConfig(), nil
|
||||
case "max":
|
||||
@@ -779,9 +780,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "min":
|
||||
return newMinAggrConfig(), nil
|
||||
case "rate_avg":
|
||||
return newRateAggrConfig(true), nil
|
||||
return newRateAggrConfig(ms, metricLabels, true), nil
|
||||
case "rate_sum":
|
||||
return newRateAggrConfig(false), nil
|
||||
return newRateAggrConfig(ms, metricLabels, false), nil
|
||||
case "stddev":
|
||||
return newStddevAggrConfig(), nil
|
||||
case "stdvar":
|
||||
@@ -789,9 +790,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "sum_samples":
|
||||
return newSumSamplesAggrConfig(), nil
|
||||
case "total":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
case "total_prometheus":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
case "unique_samples":
|
||||
return newUniqueSamplesAggrConfig(), nil
|
||||
default:
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
@@ -38,6 +41,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
} else {
|
||||
// counter reset
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
}
|
||||
lv.value = sample.value
|
||||
@@ -74,13 +78,15 @@ func (av *totalAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newTotalAggrConfig(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
return &totalAggrConfig{
|
||||
cfg := &totalAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return cfg
|
||||
}
|
||||
|
||||
type totalAggrConfig struct {
|
||||
@@ -93,6 +99,7 @@ type totalAggrConfig struct {
|
||||
// This allows avoiding an initial spike of the output values at startup when new time series
|
||||
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
func (*totalAggrConfig) getValue(s any) aggrValue {
|
||||
|
||||
Reference in New Issue
Block a user