lib/streamaggr: fixed streamaggr panic (#8471)

### Describe Your Changes

fixes #8469

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
This commit is contained in:
Andrii Chubatiuk
2025-03-10 14:50:55 +02:00
committed by GitHub
parent 67f8fa66ed
commit c174a046e2
15 changed files with 28 additions and 19 deletions

View File

@@ -18,6 +18,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation): fix panic on `rate` output. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8469).
## [v1.113.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.113.0)
Released at 2025-03-07

View File

@@ -10,7 +10,7 @@ func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _
av.count++
}
func (av *avgAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *avgAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if av.count > 0 {
avg := av.sum / av.count
ctx.appendSeries(key, "avg", avg)

View File

@@ -8,7 +8,7 @@ func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ strin
av.count++
}
func (av *countSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *countSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if av.count > 0 {
ctx.appendSeries(key, "count_samples", float64(av.count))
av.count = 0

View File

@@ -18,7 +18,7 @@ func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key stri
}
}
func (av *countSeriesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *countSeriesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if len(av.samples) > 0 {
ctx.appendSeries(key, "count_series", float64(len(av.samples)))
clear(av.samples)

View File

@@ -14,7 +14,7 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
av.h.Update(sample.value)
}
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*histogramBucketAggrConfig)
shared := av.shared
if ac.useSharedState {

View File

@@ -12,7 +12,7 @@ func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string,
}
}
func (av *lastAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *lastAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if av.timestamp > 0 {
ctx.appendSeries(key, "last", av.last)
av.timestamp = 0

View File

@@ -14,7 +14,7 @@ func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _
}
}
func (av *maxAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *maxAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if av.defined {
ctx.appendSeries(key, "max", av.max)
av.max = 0

View File

@@ -14,7 +14,7 @@ func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _
}
}
func (av *minAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *minAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if av.defined {
ctx.appendSeries(key, "min", av.min)
av.min = 0

View File

@@ -112,7 +112,7 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
outputs = av.blue
}
for i, o := range outputs {
o.flush(ao.configs[i], ctx, outputKey)
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
}
av.mu.Unlock()
if ctx.isLast {
@@ -135,6 +135,6 @@ type aggrConfig interface {
type aggrValue interface {
pushSample(aggrConfig, *pushSample, string, int64)
flush(aggrConfig, *flushCtx, string)
flush(aggrConfig, *flushCtx, string, bool)
state() any
}

View File

@@ -18,7 +18,7 @@ func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ str
av.h.Update(sample.value)
}
func (av *quantilesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
func (av *quantilesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*quantilesAggrConfig)
if av.h != nil {
ac.quantiles = av.h.Quantiles(ac.quantiles[:0], ac.phis)

View File

@@ -71,9 +71,11 @@ func (v *rateAggrSharedValue) reset() {
v.deleteDeadline = 0
v.prevTimestamp = 0
putRateAggrStateValue(v.blue)
putRateAggrStateValue(v.green)
v.blue = nil
v.green = nil
if v.green != nil {
putRateAggrStateValue(v.green)
v.green = nil
}
}
type rateAggrStateValue struct {
@@ -114,7 +116,7 @@ func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string
state.timestamp = sample.timestamp
}
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*rateAggrConfig)
var state *rateAggrStateValue
suffix := ac.getSuffix()
@@ -142,7 +144,12 @@ func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
sv.prevTimestamp = state.timestamp
state.timestamp = 0
state.increase = 0
av.shared[sk] = sv
if isLast {
delete(av.shared, sk)
putRateAggrSharedValue(sv)
} else {
av.shared[sk] = sv
}
}
if countSeries == 0 {

View File

@@ -18,7 +18,7 @@ func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _
av.avg = avg
}
func (av *stdAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
func (av *stdAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*stdAggrConfig)
if av.count > 0 {
suffix := ac.getSuffix()

View File

@@ -8,7 +8,7 @@ func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ st
av.sum += sample.value
}
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
}

View File

@@ -47,7 +47,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
av.shared.lastValues[key] = lv
}
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
@@ -55,7 +55,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string) {
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
}
}

View File

@@ -10,7 +10,7 @@ func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _
}
}
func (av *uniqueSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string) {
func (av *uniqueSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
if len(av.samples) > 0 {
ctx.appendSeries(key, "unique_samples", float64(len(av.samples)))
clear(av.samples)