Compare commits

...

1 Commits

Author SHA1 Message Date
Haley Wang
23e1b64820 stream aggregation: add sum_sample_total output function 2026-06-17 22:26:38 +08:00
6 changed files with 72 additions and 21 deletions

View File

@@ -507,6 +507,13 @@ See also:
- [count_samples](#count_samples)
- [count_series](#count_series)
### `sum_samples_total`
`sum_samples_total` sums input delta values into a cumulative [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter) and outputs the result at the given `interval`.
`sum_samples_total` makes sense only for aggregating delta values from clients such as [StatsD counter](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting).
>Note: The aggregator will forget the cumulative counter if it has not seen input samples for `staleness_interval`(set to `interval` by default) per output result, so the output counter will start from `0` the next time it sees the input again. Increase the `staleness_interval` option if you want to extend the window to tolerate bigger gaps.
### total
`total` generates output [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) by summing the input counters over the given `interval`.

View File

@@ -43,6 +43,7 @@ var supportedOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",
@@ -780,7 +781,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "stdvar":
return newStdvarAggrConfig(), nil
case "sum_samples":
return newSumSamplesAggrConfig(), nil
return newSumSamplesAggrConfig(true), nil
case "sum_samples_total":
return newSumSamplesAggrConfig(false), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":

View File

@@ -475,26 +475,53 @@ foo:1m_increase_prometheus{baz="qwe"} 15
outputs: [increase_prometheus]
`, "11111111")
// multiple aggregate configs
// sum_sample and sum_samples_total outputs with different staleness intervals
f([]string{`
foo 1
foo 2 1
foo{bar="baz"} 2
foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
`, `
foo 4
`, ``, ``, `
foo 6
`, ``, ``}, time.Minute, `foo:1m_sum_samples 3
foo:1m_sum_samples 4
foo:1m_sum_samples 6
foo:1m_sum_samples_total 3
foo:1m_sum_samples_total 7
foo:1m_sum_samples_total 6
foo:1m_sum_samples_total{bar="baz"} 2
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:1m_without_non-existing-label_sum_samples 3
foo:1m_without_non-existing-label_sum_samples 4
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples 6
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples_total 3
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 0
foo:5m_by_bar_sum_samples 13
foo:5m_by_bar_sum_samples_total 13
foo:5m_by_bar_sum_samples_total{bar="baz"} 2
foo:5m_by_bar_sum_samples{bar="baz"} 2
`, `
- interval: 1m
outputs: [count_series, sum_samples]
staleness_interval: 1m
outputs: [ sum_samples, sum_samples_total]
- interval: 1m
staleness_interval: 2m
without: [non-existing-label]
outputs: [ sum_samples, sum_samples_total]
- interval: 5m
by: [bar]
outputs: [sum_samples]
`, "111")
outputs: [sum_samples, sum_samples_total]
`, "11111")
// min and max outputs
f([]string{`

View File

@@ -252,11 +252,15 @@ func TestAggregatorsEqual(t *testing.T) {
}
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
sorted := make([]prompb.TimeSeries, len(tss))
copy(sorted, tss)
sort.SliceStable(sorted, func(i, j int) bool {
return promrelabel.LabelsToString(sorted[i].Labels) < promrelabel.LabelsToString(sorted[j].Labels)
})
a := make([]string, len(sorted))
for i, ts := range sorted {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}

View File

@@ -27,6 +27,7 @@ var benchOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",

View File

@@ -8,20 +8,29 @@ func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ st
av.sum += sample.value
}
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
func (av *sumSamplesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*sumSamplesAggrConfig)
if ac.resetTotalOnFlush {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
return
}
ctx.appendSeries(key, "sum_samples_total", av.sum)
}
func (*sumSamplesAggrValue) state() any {
return nil
}
func newSumSamplesAggrConfig() aggrConfig {
return &sumSamplesAggrConfig{}
func newSumSamplesAggrConfig(resetTotalOnFlush bool) aggrConfig {
return &sumSamplesAggrConfig{
resetTotalOnFlush: resetTotalOnFlush,
}
}
type sumSamplesAggrConfig struct{}
type sumSamplesAggrConfig struct {
resetTotalOnFlush bool
}
func (*sumSamplesAggrConfig) getValue(_ any) aggrValue {
return &sumSamplesAggrValue{}