mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-27 12:37:29 +03:00
Compare commits
1 Commits
dependabot
...
state-only
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90029442cb |
@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
|||||||
## tip
|
## tip
|
||||||
|
|
||||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
||||||
|
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
|
||||||
|
|
||||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ type avgAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
av.sum += sample.value
|
av.sum += sample.value
|
||||||
av.count++
|
av.count++
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,10 @@ type countSamplesAggrValue struct {
|
|||||||
count uint64
|
count uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
|
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
av.count++
|
av.count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,10 @@ type countSeriesAggrValue struct {
|
|||||||
samples map[uint64]struct{}
|
samples map[uint64]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
|
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Count unique hashes over the keys instead of unique key values.
|
// Count unique hashes over the keys instead of unique key values.
|
||||||
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
|
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
|
||||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ type dedupAggrShardNopad struct {
|
|||||||
type dedupAggrSample struct {
|
type dedupAggrSample struct {
|
||||||
value float64
|
value float64
|
||||||
timestamp int64
|
timestamp int64
|
||||||
|
stateOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDedupAggr() *dedupAggr {
|
func newDedupAggr() *dedupAggr {
|
||||||
@@ -189,6 +190,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
|||||||
s = &samplesBuf[len(samplesBuf)-1]
|
s = &samplesBuf[len(samplesBuf)-1]
|
||||||
s.value = sample.value
|
s.value = sample.value
|
||||||
s.timestamp = sample.timestamp
|
s.timestamp = sample.timestamp
|
||||||
|
s.stateOnly = sample.stateOnly
|
||||||
|
|
||||||
key := bytesutil.InternString(sample.key)
|
key := bytesutil.InternString(sample.key)
|
||||||
state.m[key] = s
|
state.m[key] = s
|
||||||
@@ -197,28 +199,33 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
|||||||
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
var newWins bool
|
||||||
|
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
||||||
|
if newWins {
|
||||||
|
s.stateOnly = sample.stateOnly
|
||||||
|
}
|
||||||
}
|
}
|
||||||
state.samplesBuf = samplesBuf
|
state.samplesBuf = samplesBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
// deduplicateSamples returns deduplicated timestamp and value results.
|
// deduplicateSamples returns deduplicated timestamp and value results,
|
||||||
|
// along with a boolean indicating whether the new sample won.
|
||||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
|
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
|
||||||
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
|
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
|
||||||
if newT > oldT {
|
if newT > oldT {
|
||||||
return newT, newV
|
return newT, newV, true
|
||||||
}
|
}
|
||||||
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
|
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
|
||||||
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
|
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
|
||||||
if newT == oldT {
|
if newT == oldT {
|
||||||
if decimal.IsStaleNaN(oldV) {
|
if decimal.IsStaleNaN(oldV) {
|
||||||
return newT, newV
|
return newT, newV, true
|
||||||
}
|
}
|
||||||
if newV > oldV {
|
if newV > oldV {
|
||||||
return newT, newV
|
return newT, newV, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return oldT, oldV
|
return oldT, oldV, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
||||||
@@ -250,6 +257,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
|||||||
key: key,
|
key: key,
|
||||||
value: s.value,
|
value: s.value,
|
||||||
timestamp: s.timestamp,
|
timestamp: s.timestamp,
|
||||||
|
stateOnly: s.stateOnly,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Limit the number of samples per each flush in order to limit memory usage.
|
// Limit the number of samples per each flush in order to limit memory usage.
|
||||||
|
|||||||
@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
|
|||||||
}
|
}
|
||||||
da.pushSamples(samples, 0, false)
|
da.pushSamples(samples, 0, false)
|
||||||
|
|
||||||
if n := da.sizeBytes(); n > 5_000_000 {
|
if n := da.sizeBytes(); n > 6_000_000 {
|
||||||
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
|
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
|
||||||
}
|
}
|
||||||
if n := da.itemsCount(); n != seriesCount {
|
if n := da.itemsCount(); n != seriesCount {
|
||||||
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
|
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
|
||||||
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
|
|||||||
func TestDeduplicateSamples(t *testing.T) {
|
func TestDeduplicateSamples(t *testing.T) {
|
||||||
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
|
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
|
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
|
||||||
if dedupT != expectedT || dedupV != expectedV {
|
if dedupT != expectedT || dedupV != expectedV {
|
||||||
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
|
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
|
||||||
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)
|
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ type histogramBucketAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
av.h.Update(sample.value)
|
av.h.Update(sample.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
109
lib/streamaggr/increase.go
Normal file
109
lib/streamaggr/increase.go
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
package streamaggr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
|
)
|
||||||
|
|
||||||
|
type increaseLastValue struct {
|
||||||
|
value float64
|
||||||
|
timestamp int64
|
||||||
|
deleteDeadline int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type increaseAggrValueShared struct {
|
||||||
|
lastValues map[string]increaseLastValue
|
||||||
|
}
|
||||||
|
|
||||||
|
type increaseAggrValue struct {
|
||||||
|
total float64
|
||||||
|
shared *increaseAggrValueShared
|
||||||
|
}
|
||||||
|
|
||||||
|
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||||
|
ac := c.(*increaseAggrConfig)
|
||||||
|
currentTime := fasttime.UnixTimestamp()
|
||||||
|
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
||||||
|
lv, ok := av.shared.lastValues[key]
|
||||||
|
if ok || keepFirstSample {
|
||||||
|
if sample.timestamp < lv.timestamp {
|
||||||
|
// Skip out of order sample
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !sample.stateOnly {
|
||||||
|
if sample.value >= lv.value {
|
||||||
|
av.total += sample.value - lv.value
|
||||||
|
} else {
|
||||||
|
// counter reset
|
||||||
|
av.total += sample.value
|
||||||
|
ac.counterResetsTotal.Inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lv.value = sample.value
|
||||||
|
lv.timestamp = sample.timestamp
|
||||||
|
lv.deleteDeadline = deleteDeadline
|
||||||
|
key = bytesutil.InternString(key)
|
||||||
|
av.shared.lastValues[key] = lv
|
||||||
|
}
|
||||||
|
|
||||||
|
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||||
|
ac := c.(*increaseAggrConfig)
|
||||||
|
suffix := ac.getSuffix()
|
||||||
|
total := av.total
|
||||||
|
av.total = 0
|
||||||
|
lvs := av.shared.lastValues
|
||||||
|
for lk, lv := range lvs {
|
||||||
|
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||||
|
delete(lvs, lk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ctx.appendSeries(key, suffix, total)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (av *increaseAggrValue) state() any {
|
||||||
|
return av.shared
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||||
|
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||||
|
cfg := &increaseAggrConfig{
|
||||||
|
keepFirstSample: keepFirstSample,
|
||||||
|
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||||
|
}
|
||||||
|
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
type increaseAggrConfig struct {
|
||||||
|
keepFirstSample bool
|
||||||
|
|
||||||
|
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
||||||
|
ignoreFirstSampleDeadline uint64
|
||||||
|
counterResetsTotal *metrics.Counter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*increaseAggrConfig) getValue(s any) aggrValue {
|
||||||
|
var shared *increaseAggrValueShared
|
||||||
|
if s == nil {
|
||||||
|
shared = &increaseAggrValueShared{
|
||||||
|
lastValues: make(map[string]increaseLastValue),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
shared = s.(*increaseAggrValueShared)
|
||||||
|
}
|
||||||
|
return &increaseAggrValue{
|
||||||
|
shared: shared,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ac *increaseAggrConfig) getSuffix() string {
|
||||||
|
if ac.keepFirstSample {
|
||||||
|
return "increase"
|
||||||
|
}
|
||||||
|
return "increase_prometheus"
|
||||||
|
}
|
||||||
@@ -6,6 +6,9 @@ type lastAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
if sample.timestamp >= av.timestamp {
|
if sample.timestamp >= av.timestamp {
|
||||||
av.last = sample.value
|
av.last = sample.value
|
||||||
av.timestamp = sample.timestamp
|
av.timestamp = sample.timestamp
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ type maxAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
if sample.value > av.max || !av.defined {
|
if sample.value > av.max || !av.defined {
|
||||||
av.max = sample.value
|
av.max = sample.value
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ type minAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
if sample.value < av.min || !av.defined {
|
if sample.value < av.min || !av.defined {
|
||||||
av.min = sample.value
|
av.min = sample.value
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ type quantilesAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
if av.h == nil {
|
if av.h == nil {
|
||||||
av.h = histogram.GetFast()
|
av.h = histogram.GetFast()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ var rateAggrStateValuePool sync.Pool
|
|||||||
|
|
||||||
func putRateAggrStateValue(v *rateAggrStateValue) {
|
func putRateAggrStateValue(v *rateAggrStateValue) {
|
||||||
v.timestamp = 0
|
v.timestamp = 0
|
||||||
|
v.lastTimestamp = 0
|
||||||
v.increase = 0
|
v.increase = 0
|
||||||
rateAggrStateValuePool.Put(v)
|
rateAggrStateValuePool.Put(v)
|
||||||
}
|
}
|
||||||
@@ -88,6 +89,10 @@ type rateAggrStateValue struct {
|
|||||||
// increase stores cumulative increase for the current time series on the current aggregation interval
|
// increase stores cumulative increase for the current time series on the current aggregation interval
|
||||||
increase float64
|
increase float64
|
||||||
timestamp int64
|
timestamp int64
|
||||||
|
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
|
||||||
|
// It is used for out-of-order detection, while timestamp (above) is only updated by
|
||||||
|
// non-state-only samples and is used for rate calculation.
|
||||||
|
lastTimestamp int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type rateAggrValue struct {
|
type rateAggrValue struct {
|
||||||
@@ -101,16 +106,20 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
|
|||||||
sv, ok := av.shared[key]
|
sv, ok := av.shared[key]
|
||||||
if ok {
|
if ok {
|
||||||
state = sv.getState(av.isGreen)
|
state = sv.getState(av.isGreen)
|
||||||
if sample.timestamp < state.timestamp {
|
if sample.timestamp < state.lastTimestamp {
|
||||||
// Skip out of order sample
|
// Skip out of order sample
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if sample.value >= sv.value {
|
if !sample.stateOnly {
|
||||||
state.increase += sample.value - sv.value
|
if sample.value >= sv.value {
|
||||||
|
state.increase += sample.value - sv.value
|
||||||
|
} else {
|
||||||
|
// counter reset
|
||||||
|
state.increase += sample.value
|
||||||
|
ac.counterResetsTotal.Inc()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// counter reset
|
sv.prevTimestamp = sample.timestamp
|
||||||
state.increase += sample.value
|
|
||||||
ac.counterResetsTotal.Inc()
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sv = getRateAggrSharedValue(av.isGreen)
|
sv = getRateAggrSharedValue(av.isGreen)
|
||||||
@@ -121,7 +130,10 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
|
|||||||
}
|
}
|
||||||
sv.value = sample.value
|
sv.value = sample.value
|
||||||
sv.deleteDeadline = deleteDeadline
|
sv.deleteDeadline = deleteDeadline
|
||||||
state.timestamp = sample.timestamp
|
state.lastTimestamp = sample.timestamp
|
||||||
|
if !sample.stateOnly {
|
||||||
|
state.timestamp = sample.timestamp
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ type stdAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
av.count++
|
av.count++
|
||||||
avg := av.avg + (sample.value-av.avg)/av.count
|
avg := av.avg + (sample.value-av.avg)/av.count
|
||||||
av.q += (sample.value - av.avg) * (sample.value - avg)
|
av.q += (sample.value - av.avg) * (sample.value - avg)
|
||||||
|
|||||||
@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
|||||||
case "histogram_bucket":
|
case "histogram_bucket":
|
||||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||||
case "increase":
|
case "increase":
|
||||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||||
case "increase_prometheus":
|
case "increase_prometheus":
|
||||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||||
case "last":
|
case "last":
|
||||||
return newLastAggrConfig(), nil
|
return newLastAggrConfig(), nil
|
||||||
case "max":
|
case "max":
|
||||||
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
|||||||
case "sum_samples":
|
case "sum_samples":
|
||||||
return newSumSamplesAggrConfig(), nil
|
return newSumSamplesAggrConfig(), nil
|
||||||
case "total":
|
case "total":
|
||||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||||
case "total_prometheus":
|
case "total_prometheus":
|
||||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||||
case "unique_samples":
|
case "unique_samples":
|
||||||
return newUniqueSamplesAggrConfig(), nil
|
return newUniqueSamplesAggrConfig(), nil
|
||||||
default:
|
default:
|
||||||
@@ -1006,26 +1006,28 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
|||||||
a.ignoredNaNSamples.Inc()
|
a.ignoredNaNSamples.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
|
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
|
||||||
// Skip old samples outside the current aggregation interval
|
if stateOnly {
|
||||||
a.ignoredOldSamples.Inc()
|
a.ignoredOldSamples.Inc()
|
||||||
continue
|
} else {
|
||||||
}
|
lagMsec := nowMsec - s.Timestamp
|
||||||
lagMsec := nowMsec - s.Timestamp
|
if lagMsec > maxLagMsec {
|
||||||
if lagMsec > maxLagMsec {
|
maxLagMsec = lagMsec
|
||||||
maxLagMsec = lagMsec
|
}
|
||||||
}
|
}
|
||||||
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
|
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
|
||||||
ctx.green = append(ctx.green, pushSample{
|
ctx.green = append(ctx.green, pushSample{
|
||||||
key: key,
|
key: key,
|
||||||
value: s.Value,
|
value: s.Value,
|
||||||
timestamp: s.Timestamp,
|
timestamp: s.Timestamp,
|
||||||
|
stateOnly: stateOnly,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
ctx.blue = append(ctx.blue, pushSample{
|
ctx.blue = append(ctx.blue, pushSample{
|
||||||
key: key,
|
key: key,
|
||||||
value: s.Value,
|
value: s.Value,
|
||||||
timestamp: s.Timestamp,
|
timestamp: s.Timestamp,
|
||||||
|
stateOnly: stateOnly,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1099,6 +1101,10 @@ type pushSample struct {
|
|||||||
key string
|
key string
|
||||||
value float64
|
value float64
|
||||||
timestamp int64
|
timestamp int64
|
||||||
|
|
||||||
|
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
|
||||||
|
// (total, rate, increase) but do not contribute to the aggregation output.
|
||||||
|
stateOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPushCtx() *pushCtx {
|
func getPushCtx() *pushCtx {
|
||||||
|
|||||||
@@ -688,7 +688,9 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
|
|||||||
outputs: [rate_sum, rate_avg]
|
outputs: [rate_sum, rate_avg]
|
||||||
`, "11111")
|
`, "11111")
|
||||||
|
|
||||||
// test rate_sum and rate_avg, when two aggregation intervals are empty
|
// test rate_sum and rate_avg, when two aggregation intervals are empty.
|
||||||
|
// abc=777 arrives slightly before the start of each interval (-10ms) but still
|
||||||
|
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
|
||||||
f([]string{`
|
f([]string{`
|
||||||
foo{abc="123", cde="1"} 1
|
foo{abc="123", cde="1"} 1
|
||||||
foo{abc="123", cde="1"} 2 1
|
foo{abc="123", cde="1"} 2 1
|
||||||
@@ -807,4 +809,55 @@ foo:1m_sum_samples{baz="qwe"} 10
|
|||||||
dedup_interval: 30s
|
dedup_interval: 30s
|
||||||
outputs: [sum_samples]
|
outputs: [sum_samples]
|
||||||
`, "11111111")
|
`, "11111111")
|
||||||
|
|
||||||
|
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
|
||||||
|
// update the state reference without contributing to the interval total, so the subsequent
|
||||||
|
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
|
||||||
|
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
|
||||||
|
f([]string{`
|
||||||
|
foo 100
|
||||||
|
`, `
|
||||||
|
foo 150 -30
|
||||||
|
foo 250
|
||||||
|
`}, time.Minute, `foo:1m_total 100
|
||||||
|
foo:1m_total 200
|
||||||
|
`, `
|
||||||
|
- interval: 1m
|
||||||
|
outputs: [total]
|
||||||
|
ignore_old_samples: true
|
||||||
|
ignore_first_sample_interval: 0s
|
||||||
|
`, "111")
|
||||||
|
|
||||||
|
// increase with ignore_old_samples: same correctness check for increase output.
|
||||||
|
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
|
||||||
|
f([]string{`
|
||||||
|
foo 100
|
||||||
|
`, `
|
||||||
|
foo 150 -30
|
||||||
|
foo 250
|
||||||
|
`}, time.Minute, `foo:1m_increase 100
|
||||||
|
foo:1m_increase 100
|
||||||
|
`, `
|
||||||
|
- interval: 1m
|
||||||
|
outputs: [increase]
|
||||||
|
ignore_old_samples: true
|
||||||
|
ignore_first_sample_interval: 0s
|
||||||
|
`, "111")
|
||||||
|
|
||||||
|
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
|
||||||
|
// and the winning stateOnly sample's timestamp is used as the denominator start.
|
||||||
|
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
|
||||||
|
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
|
||||||
|
f([]string{`
|
||||||
|
foo 100
|
||||||
|
`, `
|
||||||
|
foo 150 -30
|
||||||
|
foo 120 -40
|
||||||
|
foo 200
|
||||||
|
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
|
||||||
|
`, `
|
||||||
|
- interval: 1m
|
||||||
|
outputs: [rate_sum]
|
||||||
|
ignore_old_samples: true
|
||||||
|
`, "1111")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ type sumSamplesAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
av.sum += sample.value
|
av.sum += sample.value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,12 +36,14 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
|||||||
// Skip out of order sample
|
// Skip out of order sample
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if sample.value >= lv.value {
|
if !sample.stateOnly {
|
||||||
av.total += sample.value - lv.value
|
if sample.value >= lv.value {
|
||||||
} else {
|
av.total += sample.value - lv.value
|
||||||
// counter reset
|
} else {
|
||||||
av.total += sample.value
|
// counter reset
|
||||||
ac.counterResetsTotal.Inc()
|
av.total += sample.value
|
||||||
|
ac.counterResetsTotal.Inc()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lv.value = sample.value
|
lv.value = sample.value
|
||||||
@@ -54,7 +56,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
|||||||
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||||
ac := c.(*totalAggrConfig)
|
ac := c.(*totalAggrConfig)
|
||||||
suffix := ac.getSuffix()
|
suffix := ac.getSuffix()
|
||||||
// check for stale entries
|
|
||||||
total := av.shared.total + av.total
|
total := av.shared.total + av.total
|
||||||
av.total = 0
|
av.total = 0
|
||||||
lvs := av.shared.lastValues
|
lvs := av.shared.lastValues
|
||||||
@@ -63,9 +64,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
|
|||||||
delete(lvs, lk)
|
delete(lvs, lk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ac.resetTotalOnFlush {
|
if math.Abs(total) >= (1 << 53) {
|
||||||
av.shared.total = 0
|
|
||||||
} else if math.Abs(total) >= (1 << 53) {
|
|
||||||
// It is time to reset the entry, since it starts losing float64 precision
|
// It is time to reset the entry, since it starts losing float64 precision
|
||||||
av.shared.total = 0
|
av.shared.total = 0
|
||||||
} else {
|
} else {
|
||||||
@@ -78,11 +77,10 @@ func (av *totalAggrValue) state() any {
|
|||||||
return av.shared
|
return av.shared
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||||
cfg := &totalAggrConfig{
|
cfg := &totalAggrConfig{
|
||||||
keepFirstSample: keepFirstSample,
|
keepFirstSample: keepFirstSample,
|
||||||
resetTotalOnFlush: resetTotalOnFlush,
|
|
||||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||||
}
|
}
|
||||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||||
@@ -90,8 +88,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
|
|||||||
}
|
}
|
||||||
|
|
||||||
type totalAggrConfig struct {
|
type totalAggrConfig struct {
|
||||||
resetTotalOnFlush bool
|
|
||||||
|
|
||||||
// Whether to take into account the first sample in new time series when calculating the output value.
|
// Whether to take into account the first sample in new time series when calculating the output value.
|
||||||
keepFirstSample bool
|
keepFirstSample bool
|
||||||
|
|
||||||
@@ -117,12 +113,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ac *totalAggrConfig) getSuffix() string {
|
func (ac *totalAggrConfig) getSuffix() string {
|
||||||
if ac.resetTotalOnFlush {
|
|
||||||
if ac.keepFirstSample {
|
|
||||||
return "increase"
|
|
||||||
}
|
|
||||||
return "increase_prometheus"
|
|
||||||
}
|
|
||||||
if ac.keepFirstSample {
|
if ac.keepFirstSample {
|
||||||
return "total"
|
return "total"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ type uniqueSamplesAggrValue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||||
|
if sample.stateOnly {
|
||||||
|
return
|
||||||
|
}
|
||||||
if _, ok := av.samples[sample.value]; !ok {
|
if _, ok := av.samples[sample.value]; !ok {
|
||||||
av.samples[sample.value] = struct{}{}
|
av.samples[sample.value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user