mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
1 Commits
v1.143.0
...
streamaggr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f98b53c9b1 |
@@ -12,7 +12,6 @@ import (
|
||||
type aggrOutputs struct {
|
||||
m sync.Map
|
||||
useSharedState bool
|
||||
useInputKey bool
|
||||
configs []aggrConfig
|
||||
outputSamples *metrics.Counter
|
||||
}
|
||||
@@ -25,10 +24,10 @@ func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
|
||||
}
|
||||
src = src[nSize:]
|
||||
outputKey := src[:outputKeyLen]
|
||||
if !ao.useInputKey {
|
||||
inputKey := src[outputKeyLen:]
|
||||
if len(inputKey) == 0 {
|
||||
return key, bytesutil.ToUnsafeString(outputKey)
|
||||
}
|
||||
inputKey := src[outputKeyLen:]
|
||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
)
|
||||
|
||||
@@ -92,7 +95,8 @@ type rateAggrValue struct {
|
||||
isGreen bool
|
||||
}
|
||||
|
||||
func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
ac := c.(*rateAggrConfig)
|
||||
var state *rateAggrStateValue
|
||||
sv, ok := av.shared[key]
|
||||
if ok {
|
||||
@@ -106,6 +110,7 @@ func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string
|
||||
} else {
|
||||
// counter reset
|
||||
state.increase += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else {
|
||||
sv = getRateAggrSharedValue(av.isGreen)
|
||||
@@ -169,14 +174,17 @@ func (av *rateAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newRateAggrConfig(isAvg bool) aggrConfig {
|
||||
return &rateAggrConfig{
|
||||
func newRateAggrConfig(ms *metrics.Set, metricLabels string, isAvg bool) aggrConfig {
|
||||
cfg := rateAggrConfig{
|
||||
isAvg: isAvg,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return &cfg
|
||||
}
|
||||
|
||||
type rateAggrConfig struct {
|
||||
isAvg bool
|
||||
isAvg bool
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
func (*rateAggrConfig) getValue(s any) aggrValue {
|
||||
|
||||
@@ -604,16 +604,15 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
|
||||
"see https://docs.victoriametrics.com/victoriametrics/stream-aggregation/", supportedOutputs)
|
||||
}
|
||||
useInputKey := dedupInterval <= 0
|
||||
useSharedState := enableWindows && useInputKey
|
||||
useSharedState := enableWindows && dedupInterval <= 0
|
||||
aggrOutputs := &aggrOutputs{
|
||||
configs: make([]aggrConfig, len(cfg.Outputs)),
|
||||
useSharedState: useSharedState,
|
||||
useInputKey: useInputKey,
|
||||
}
|
||||
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
ac, err := newOutputConfig(output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -723,7 +722,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
// check for duplicated output
|
||||
if _, ok := outputsSeen[output]; ok {
|
||||
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
||||
@@ -769,9 +768,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "histogram_bucket":
|
||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||
case "increase":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
case "increase_prometheus":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
case "last":
|
||||
return newLastAggrConfig(), nil
|
||||
case "max":
|
||||
@@ -779,9 +778,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "min":
|
||||
return newMinAggrConfig(), nil
|
||||
case "rate_avg":
|
||||
return newRateAggrConfig(true), nil
|
||||
return newRateAggrConfig(ms, metricLabels, true), nil
|
||||
case "rate_sum":
|
||||
return newRateAggrConfig(false), nil
|
||||
return newRateAggrConfig(ms, metricLabels, false), nil
|
||||
case "stddev":
|
||||
return newStddevAggrConfig(), nil
|
||||
case "stdvar":
|
||||
@@ -789,9 +788,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
case "sum_samples":
|
||||
return newSumSamplesAggrConfig(), nil
|
||||
case "total":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
case "total_prometheus":
|
||||
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
case "unique_samples":
|
||||
return newUniqueSamplesAggrConfig(), nil
|
||||
default:
|
||||
@@ -802,44 +801,25 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
|
||||
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlushOnShutdown bool, ignoreFirstIntervals int) {
|
||||
minTime := time.UnixMilli(a.minDeadline.Load())
|
||||
flushTime := minTime.Add(a.interval)
|
||||
|
||||
if !a.alignedSleep(minTime) {
|
||||
return
|
||||
}
|
||||
|
||||
interval := a.interval
|
||||
if a.dedupInterval > 0 {
|
||||
interval = a.dedupInterval
|
||||
}
|
||||
alignedSleep := func() {
|
||||
dSleep := time.Until(minTime)
|
||||
if dSleep <= 0 {
|
||||
return
|
||||
}
|
||||
timer := timerpool.Get(dSleep)
|
||||
defer timerpool.Put(timer)
|
||||
select {
|
||||
case <-a.stopCh:
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
|
||||
tickerWait := func(t *time.Ticker) bool {
|
||||
select {
|
||||
case <-a.stopCh:
|
||||
return false
|
||||
case <-t.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
alignedSleep()
|
||||
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
|
||||
var fa *histogram.Fast
|
||||
for tickerWait(t) {
|
||||
pf := pushFunc
|
||||
lastCS := a.cs.Load()
|
||||
for a.tickerWait(t) {
|
||||
if a.enableWindows {
|
||||
// Calculate delay and wait
|
||||
a.muFlushAfter.Lock()
|
||||
fa, a.flushAfter = a.flushAfter, histogram.GetFast()
|
||||
fa := a.flushAfter
|
||||
a.flushAfter = histogram.GetFast()
|
||||
a.muFlushAfter.Unlock()
|
||||
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
|
||||
histogram.PutFast(fa)
|
||||
@@ -852,12 +832,13 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
|
||||
if !flushTime.After(deadlineTime) {
|
||||
// It is time to flush the aggregated state
|
||||
pf := pushFunc
|
||||
if ignoreFirstIntervals > 0 {
|
||||
a.flush(nil, flushTime, cs, false)
|
||||
pf = nil
|
||||
ignoreFirstIntervals--
|
||||
} else {
|
||||
a.flush(pf, flushTime, cs, false)
|
||||
}
|
||||
a.flush(pf, flushTime, cs, false)
|
||||
flushTime = flushTime.Add(a.interval)
|
||||
for time.Now().After(flushTime) {
|
||||
flushTime = flushTime.Add(a.interval)
|
||||
}
|
||||
@@ -869,6 +850,8 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
cs.isGreen = !cs.isGreen
|
||||
}
|
||||
a.cs.Store(cs)
|
||||
lastCS = cs
|
||||
|
||||
if alignFlushToInterval {
|
||||
select {
|
||||
case <-t.C:
|
||||
@@ -877,7 +860,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
}
|
||||
}
|
||||
|
||||
cs := a.cs.Load()
|
||||
cs := lastCS
|
||||
var dedupTime time.Time
|
||||
if alignFlushToInterval {
|
||||
if a.dedupInterval > 0 {
|
||||
@@ -889,8 +872,8 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
dedupTime = flushTime
|
||||
}
|
||||
}
|
||||
|
||||
a.dedupFlush(dedupTime, cs)
|
||||
|
||||
pf := pushFunc
|
||||
if skipFlushOnShutdown || ignoreFirstIntervals > 0 {
|
||||
pf = nil
|
||||
@@ -898,6 +881,30 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
a.flush(pf, flushTime, cs, true)
|
||||
}
|
||||
|
||||
func (a *aggregator) tickerWait(t *time.Ticker) bool {
|
||||
select {
|
||||
case <-a.stopCh:
|
||||
return false
|
||||
case <-t.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) alignedSleep(minTime time.Time) bool {
|
||||
dSleep := time.Until(minTime)
|
||||
if dSleep <= 0 {
|
||||
return true
|
||||
}
|
||||
timer := timerpool.Get(dSleep)
|
||||
defer timerpool.Put(timer)
|
||||
select {
|
||||
case <-a.stopCh:
|
||||
return false
|
||||
case <-timer.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
|
||||
if a.dedupInterval <= 0 {
|
||||
// The de-duplication is disabled.
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
@@ -38,6 +41,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
} else {
|
||||
// counter reset
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
}
|
||||
lv.value = sample.value
|
||||
@@ -74,13 +78,15 @@ func (av *totalAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newTotalAggrConfig(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
return &totalAggrConfig{
|
||||
cfg := &totalAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return cfg
|
||||
}
|
||||
|
||||
type totalAggrConfig struct {
|
||||
@@ -93,6 +99,7 @@ type totalAggrConfig struct {
|
||||
// This allows avoiding an initial spike of the output values at startup when new time series
|
||||
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
func (*totalAggrConfig) getValue(s any) aggrValue {
|
||||
|
||||
Reference in New Issue
Block a user