mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
### Describe Your Changes Added `vm_streamaggr_counter_resets` metric for `rate*`, `total*`, and `increase*` outputs, which is useful for unpredictable output behaviour investigation. ### Checklist The following checks are **mandatory**: - [ ] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist). - [ ] My change adheres to [VictoriaMetrics development goals](https://docs.victoriametrics.com/victoriametrics/goals/). --------- Signed-off-by: Andrii Chubatiuk <andrew.chubatiuk@gmail.com> Signed-off-by: hagen1778 <roman@victoriametrics.com> Signed-off-by: Roman Khavronenko <hagen1778@gmail.com> Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
209 lines
4.7 KiB
Go
209 lines
4.7 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
)
|
|
|
|
var rateAggrSharedValuePool sync.Pool
|
|
|
|
func putRateAggrSharedValue(v *rateAggrSharedValue) {
|
|
v.reset()
|
|
rateAggrSharedValuePool.Put(v)
|
|
}
|
|
|
|
func getRateAggrSharedValue(isGreen bool) *rateAggrSharedValue {
|
|
v := rateAggrSharedValuePool.Get()
|
|
if v == nil {
|
|
v = &rateAggrSharedValue{}
|
|
}
|
|
av := v.(*rateAggrSharedValue)
|
|
if isGreen {
|
|
av.green = getRateAggrStateValue()
|
|
} else {
|
|
av.blue = getRateAggrStateValue()
|
|
}
|
|
return av
|
|
}
|
|
|
|
var rateAggrStateValuePool sync.Pool
|
|
|
|
func putRateAggrStateValue(v *rateAggrStateValue) {
|
|
v.timestamp = 0
|
|
v.increase = 0
|
|
rateAggrStateValuePool.Put(v)
|
|
}
|
|
|
|
func getRateAggrStateValue() *rateAggrStateValue {
|
|
v := rateAggrStateValuePool.Get()
|
|
if v == nil {
|
|
return &rateAggrStateValue{}
|
|
}
|
|
return v.(*rateAggrStateValue)
|
|
}
|
|
|
|
// rateAggrSharedValue calculates output=rate_avg and rate_sum, e.g. the average per-second increase rate for counter metrics.
|
|
type rateAggrSharedValue struct {
|
|
value float64
|
|
deleteDeadline int64
|
|
|
|
// prevTimestamp is the timestamp of the last registered sample in the previous aggregation interval
|
|
prevTimestamp int64
|
|
blue *rateAggrStateValue
|
|
green *rateAggrStateValue
|
|
}
|
|
|
|
func (v *rateAggrSharedValue) getState(isGreen bool) *rateAggrStateValue {
|
|
if isGreen {
|
|
if v.green == nil {
|
|
v.green = getRateAggrStateValue()
|
|
}
|
|
return v.green
|
|
}
|
|
if v.blue == nil {
|
|
v.blue = getRateAggrStateValue()
|
|
}
|
|
return v.blue
|
|
}
|
|
|
|
func (v *rateAggrSharedValue) reset() {
|
|
v.value = 0
|
|
v.deleteDeadline = 0
|
|
v.prevTimestamp = 0
|
|
if v.blue != nil {
|
|
putRateAggrStateValue(v.blue)
|
|
v.blue = nil
|
|
}
|
|
if v.green != nil {
|
|
putRateAggrStateValue(v.green)
|
|
v.green = nil
|
|
}
|
|
}
|
|
|
|
type rateAggrStateValue struct {
|
|
// increase stores cumulative increase for the current time series on the current aggregation interval
|
|
increase float64
|
|
timestamp int64
|
|
}
|
|
|
|
type rateAggrValue struct {
|
|
shared map[string]*rateAggrSharedValue
|
|
isGreen bool
|
|
}
|
|
|
|
func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
|
ac := c.(*rateAggrConfig)
|
|
var state *rateAggrStateValue
|
|
sv, ok := av.shared[key]
|
|
if ok {
|
|
state = sv.getState(av.isGreen)
|
|
if sample.timestamp < state.timestamp {
|
|
// Skip out of order sample
|
|
return
|
|
}
|
|
if sample.value >= sv.value {
|
|
state.increase += sample.value - sv.value
|
|
} else {
|
|
// counter reset
|
|
state.increase += sample.value
|
|
ac.counterResetsTotal.Inc()
|
|
}
|
|
} else {
|
|
sv = getRateAggrSharedValue(av.isGreen)
|
|
sv.prevTimestamp = sample.timestamp
|
|
key = bytesutil.InternString(key)
|
|
av.shared[key] = sv
|
|
state = sv.getState(av.isGreen)
|
|
}
|
|
sv.value = sample.value
|
|
sv.deleteDeadline = deleteDeadline
|
|
state.timestamp = sample.timestamp
|
|
}
|
|
|
|
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
|
ac := c.(*rateAggrConfig)
|
|
var state *rateAggrStateValue
|
|
suffix := ac.getSuffix()
|
|
rate := 0.0
|
|
countSeries := 0
|
|
for sk, sv := range av.shared {
|
|
if ctx.flushTimestamp > sv.deleteDeadline {
|
|
delete(av.shared, sk)
|
|
putRateAggrSharedValue(sv)
|
|
continue
|
|
}
|
|
if sv.prevTimestamp == 0 {
|
|
continue
|
|
}
|
|
state = sv.getState(av.isGreen)
|
|
if state.timestamp > 0 {
|
|
d := float64(state.timestamp-sv.prevTimestamp) / 1000
|
|
if d > 0 {
|
|
rate += state.increase / d
|
|
countSeries++
|
|
}
|
|
// modify prevTimestamp only if current state contains any samples.
|
|
// otherwise, sv.prevTimestamp=0 would skip the next flush even if state had samples.
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9017
|
|
sv.prevTimestamp = state.timestamp
|
|
state.timestamp = 0
|
|
state.increase = 0
|
|
}
|
|
if isLast {
|
|
delete(av.shared, sk)
|
|
putRateAggrSharedValue(sv)
|
|
} else {
|
|
av.shared[sk] = sv
|
|
}
|
|
}
|
|
|
|
if countSeries == 0 {
|
|
return
|
|
}
|
|
if ac.isAvg {
|
|
rate /= float64(countSeries)
|
|
}
|
|
ctx.appendSeries(key, suffix, rate)
|
|
}
|
|
|
|
func (av *rateAggrValue) state() any {
|
|
return av.shared
|
|
}
|
|
|
|
func newRateAggrConfig(ms *metrics.Set, metricLabels string, isAvg bool) aggrConfig {
|
|
cfg := rateAggrConfig{
|
|
isAvg: isAvg,
|
|
}
|
|
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
|
return &cfg
|
|
}
|
|
|
|
type rateAggrConfig struct {
|
|
isAvg bool
|
|
counterResetsTotal *metrics.Counter
|
|
}
|
|
|
|
func (*rateAggrConfig) getValue(s any) aggrValue {
|
|
var shared map[string]*rateAggrSharedValue
|
|
if s == nil {
|
|
shared = make(map[string]*rateAggrSharedValue)
|
|
} else {
|
|
shared = s.(map[string]*rateAggrSharedValue)
|
|
}
|
|
return &rateAggrValue{
|
|
shared: shared,
|
|
isGreen: s != nil,
|
|
}
|
|
}
|
|
|
|
func (ac *rateAggrConfig) getSuffix() string {
|
|
if ac.isAvg {
|
|
return "rate_avg"
|
|
}
|
|
return "rate_sum"
|
|
}
|