Compare commits

...

1 Commits

Author SHA1 Message Date
Haley Wang
c137b9c3a6 stream aggregation: add sum_sample_total output function 2026-06-11 22:43:59 +08:00
4 changed files with 36 additions and 14 deletions

View File

@@ -780,7 +780,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "stdvar":
return newStdvarAggrConfig(), nil
case "sum_samples":
return newSumSamplesAggrConfig(), nil
return newSumSamplesAggrConfig(true), nil
case "sum_samples_total":
return newSumSamplesAggrConfig(false), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
case "total_prometheus":

View File

@@ -479,22 +479,32 @@ foo:1m_increase_prometheus{baz="qwe"} 15
f([]string{`
foo 1
foo{bar="baz"} 2
foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo 1.3
`,
`foo 2.7`, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples 2.3
foo:1m_sum_samples 2.7
foo:1m_sum_samples_total 2.3
foo:1m_sum_samples_total 5
foo:1m_sum_samples_total 5
foo:1m_sum_samples_total{bar="baz"} 2
foo:1m_sum_samples_total{bar="baz"} 2
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples 5
foo:5m_by_bar_sum_samples_total 5
foo:5m_by_bar_sum_samples_total{bar="baz"} 2
foo:5m_by_bar_sum_samples{bar="baz"} 2
`, `
- interval: 1m
outputs: [count_series, sum_samples]
outputs: [count_series, sum_samples, sum_samples_total]
- interval: 5m
by: [bar]
outputs: [sum_samples]
`, "111")
outputs: [sum_samples, sum_samples_total]
`, "1111")
// min and max outputs
f([]string{`

View File

@@ -27,6 +27,7 @@ var benchOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",

View File

@@ -8,20 +8,29 @@ func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ st
av.sum += sample.value
}
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
func (av *sumSamplesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*sumSamplesAggrConfig)
if ac.resetTotalOnFlush {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
return
}
ctx.appendSeries(key, "sum_samples_total", av.sum)
}
func (*sumSamplesAggrValue) state() any {
return nil
}
func newSumSamplesAggrConfig() aggrConfig {
return &sumSamplesAggrConfig{}
func newSumSamplesAggrConfig(resetTotalOnFlush bool) aggrConfig {
return &sumSamplesAggrConfig{
resetTotalOnFlush: resetTotalOnFlush,
}
}
type sumSamplesAggrConfig struct{}
type sumSamplesAggrConfig struct {
resetTotalOnFlush bool
}
func (*sumSamplesAggrConfig) getValue(_ any) aggrValue {
return &sumSamplesAggrValue{}