mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-11 21:04:54 +03:00
Compare commits
1 Commits
enable-err
...
increase-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58124ac80e |
@@ -14,16 +14,10 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
|
||||
av.h.Update(sample.value)
|
||||
}
|
||||
|
||||
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
|
||||
ac := c.(*histogramBucketAggrConfig)
|
||||
shared := av.shared
|
||||
if ac.useSharedState {
|
||||
shared.Merge(&av.h)
|
||||
av.h.Reset()
|
||||
} else {
|
||||
shared = &av.h
|
||||
}
|
||||
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
|
||||
av.shared.Merge(&av.h)
|
||||
av.h.Reset()
|
||||
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
|
||||
})
|
||||
}
|
||||
@@ -32,26 +26,17 @@ func (av *histogramBucketAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
|
||||
return &histogramBucketAggrConfig{
|
||||
useSharedState: useSharedState,
|
||||
}
|
||||
func newHistogramBucketAggrConfig() aggrConfig {
|
||||
return &histogramBucketAggrConfig{}
|
||||
}
|
||||
|
||||
type histogramBucketAggrConfig struct {
|
||||
useSharedState bool
|
||||
}
|
||||
type histogramBucketAggrConfig struct{}
|
||||
|
||||
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
|
||||
var shared *metrics.Histogram
|
||||
if ac.useSharedState {
|
||||
if s == nil {
|
||||
shared = &metrics.Histogram{}
|
||||
} else {
|
||||
shared = s.(*metrics.Histogram)
|
||||
}
|
||||
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
|
||||
if s == nil {
|
||||
s = &metrics.Histogram{}
|
||||
}
|
||||
return &histogramBucketAggrValue{
|
||||
shared: shared,
|
||||
shared: s.(*metrics.Histogram),
|
||||
}
|
||||
}
|
||||
|
||||
109
lib/streamaggr/increase.go
Normal file
109
lib/streamaggr/increase.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
type increaseLastValue struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type increaseAggrConfig struct {
|
||||
keepFirstSample bool
|
||||
|
||||
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
||||
// This allows avoiding an initial spike of the output values at startup when new time series
|
||||
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
type increaseAggrValue struct {
|
||||
total *float64
|
||||
shared map[string]increaseLastValue
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
ac := c.(*increaseAggrConfig)
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
||||
lv, ok := av.shared[key]
|
||||
if av.total == nil {
|
||||
av.total = new(float64)
|
||||
}
|
||||
if ok {
|
||||
if sample.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
return
|
||||
}
|
||||
if sample.value >= lv.value {
|
||||
*av.total += sample.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
*av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else if keepFirstSample {
|
||||
*av.total += sample.value
|
||||
}
|
||||
lv.value = sample.value
|
||||
lv.timestamp = sample.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
key = bytesutil.InternString(key)
|
||||
av.shared[key] = lv
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*increaseAggrConfig)
|
||||
for lk, lv := range av.shared {
|
||||
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||
delete(av.shared, lk)
|
||||
}
|
||||
}
|
||||
if av.total == nil {
|
||||
return
|
||||
}
|
||||
total := *av.total
|
||||
av.total = nil
|
||||
ctx.appendSeries(key, ac.getSuffix(), total)
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &increaseAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (*increaseAggrConfig) getValue(s any) aggrValue {
|
||||
var shared map[string]increaseLastValue
|
||||
if s == nil {
|
||||
shared = make(map[string]increaseLastValue)
|
||||
} else {
|
||||
shared = s.(map[string]increaseLastValue)
|
||||
}
|
||||
return &increaseAggrValue{
|
||||
shared: shared,
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *increaseAggrConfig) getSuffix() string {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
@@ -75,6 +75,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
|
||||
outputs = av.blue
|
||||
}
|
||||
for idx, o := range outputs {
|
||||
if o == nil {
|
||||
o = av.blue[idx]
|
||||
}
|
||||
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
|
||||
}
|
||||
av.deleteDeadline = deleteDeadline
|
||||
@@ -112,6 +115,9 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
|
||||
outputs = av.blue
|
||||
}
|
||||
for i, o := range outputs {
|
||||
if o == nil {
|
||||
o = av.blue[i]
|
||||
}
|
||||
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
|
||||
}
|
||||
av.mu.Unlock()
|
||||
|
||||
@@ -609,7 +609,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -716,7 +716,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
// check for duplicated output
|
||||
if _, ok := outputsSeen[output]; ok {
|
||||
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
||||
@@ -760,11 +760,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "count_series":
|
||||
return newCountSeriesAggrConfig(), nil
|
||||
case "histogram_bucket":
|
||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||
return newHistogramBucketAggrConfig(), nil
|
||||
case "increase":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "increase_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "last":
|
||||
return newLastAggrConfig(), nil
|
||||
case "max":
|
||||
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "sum_samples":
|
||||
return newSumSamplesAggrConfig(), nil
|
||||
case "total":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "total_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "unique_samples":
|
||||
return newUniqueSamplesAggrConfig(), nil
|
||||
default:
|
||||
|
||||
@@ -53,36 +53,30 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
|
||||
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*totalAggrConfig)
|
||||
suffix := ac.getSuffix()
|
||||
// check for stale entries
|
||||
total := av.shared.total + av.total
|
||||
av.total = 0
|
||||
lvs := av.shared.lastValues
|
||||
for lk, lv := range lvs {
|
||||
for lk, lv := range av.shared.lastValues {
|
||||
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||
delete(lvs, lk)
|
||||
delete(av.shared.lastValues, lk)
|
||||
}
|
||||
}
|
||||
if ac.resetTotalOnFlush {
|
||||
av.shared.total = 0
|
||||
} else if math.Abs(total) >= (1 << 53) {
|
||||
if math.Abs(total) >= (1 << 53) {
|
||||
// It is time to reset the entry, since it starts losing float64 precision
|
||||
av.shared.total = 0
|
||||
} else {
|
||||
av.shared.total = total
|
||||
}
|
||||
ctx.appendSeries(key, suffix, total)
|
||||
ctx.appendSeries(key, ac.getSuffix(), total)
|
||||
}
|
||||
|
||||
func (av *totalAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &totalAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
@@ -90,8 +84,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
|
||||
}
|
||||
|
||||
type totalAggrConfig struct {
|
||||
resetTotalOnFlush bool
|
||||
|
||||
// Whether to take into account the first sample in new time series when calculating the output value.
|
||||
keepFirstSample bool
|
||||
|
||||
@@ -117,12 +109,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
|
||||
}
|
||||
|
||||
func (ac *totalAggrConfig) getSuffix() string {
|
||||
if ac.resetTotalOnFlush {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
if ac.keepFirstSample {
|
||||
return "total"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user