Compare commits

...

2 Commits

9 changed files with 47 additions and 43 deletions

View File

@@ -6201,7 +6201,7 @@
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6282,14 +6282,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Ignored samples ($instance)",
"title": "Dropped samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -6200,7 +6200,7 @@
"type": "prometheus",
"uid": "$ds"
},
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6281,14 +6281,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Ignored samples ($instance)",
"title": "Dropped samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -396,7 +396,7 @@ before sending them to the configured `-remoteWrite.url`. The deduplication can
Labels can be dropped before deduplication is applied. See [these docs](#dropping-unneeded-labels).
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation.
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation. The dropped old samples can be tracked with the `vm_streamaggr_dedup_dropped_samples_total` metric.
# Relabeling
@@ -444,7 +444,9 @@ outside the current [aggregation interval](https://docs.victoriametrics.com/vict
- To enable [aggregation windows](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` metric.
- To enable [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` and `vm_streamaggr_dedup_dropped_samples_total` metrics.
## Ignore aggregation intervals on start

View File

@@ -1,6 +1,7 @@
package streamaggr
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
@@ -17,9 +18,10 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
droppedSamples *metrics.Counter
}
type dedupAggrShard struct {
@@ -47,10 +49,20 @@ type dedupAggrSample struct {
timestamp int64
}
func newDedupAggr() *dedupAggr {
return &dedupAggr{
shards: make([]dedupAggrShard, dedupAggrShardsCount),
}
func newDedupAggr(ms *metrics.Set, metricLabels string) *dedupAggr {
var d dedupAggr
d.shards = make([]dedupAggrShard, dedupAggrShardsCount)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.itemsCount())
})
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.droppedSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_dropped_samples_total{%s}`, metricLabels))
return &d
}
func (da *dedupAggr) sizeBytes() uint64 {
@@ -87,7 +99,8 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples, isGreen)
deduplicatedSamples := da.shards[i].pushSamples(shardSamples, isGreen)
da.droppedSamples.Add(deduplicatedSamples)
}
putPerShardSamples(pss)
}
@@ -167,8 +180,9 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
var state *dedupAggrState
var deduplicatedSamples int
if isGreen {
state = &das.green
@@ -198,8 +212,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
deduplicatedSamples++
}
state.samplesBuf = samplesBuf
return deduplicatedSamples
}
// deduplicateSamples returns deduplicated timestamp and value results.

View File

@@ -7,11 +7,13 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample)
@@ -59,7 +61,7 @@ func TestDedupAggrSerial(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
var wg sync.WaitGroup
for range concurrency {

View File

@@ -5,6 +5,8 @@ import (
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -23,7 +25,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
b.ResetTimer()
b.ReportAllocs()

View File

@@ -44,7 +44,6 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
interval: interval,
enableWindows: enableWindows,
@@ -64,16 +63,7 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.da = newDedupAggr(ms, metricLabels)
metrics.RegisterSet(ms)
@@ -120,6 +110,7 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
d.da.droppedSamples.Inc()
continue
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{

View File

@@ -668,18 +668,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
if dedupInterval > 0 {
a.da = newDedupAggr()
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount()
return float64(n)
})
a.da = newDedupAggr(ms, metricLabels)
}
alignFlushToInterval := !opts.NoAlignFlushToInterval