mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-13 05:44:02 +03:00
Compare commits
2 Commits
dependabot
...
add-dedup-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
671a5ee85d | ||
|
|
37737c560d |
@@ -6201,7 +6201,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6282,14 +6282,14 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
|
||||
"instant": false,
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Ignored samples ($instance)",
|
||||
"title": "Dropped samples ($instance)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -6200,7 +6200,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6281,14 +6281,14 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
|
||||
"instant": false,
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Ignored samples ($instance)",
|
||||
"title": "Dropped samples ($instance)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Released at 2026-06-08
|
||||
|
||||
@@ -396,7 +396,7 @@ before sending them to the configured `-remoteWrite.url`. The deduplication can
|
||||
|
||||
Labels can be dropped before deduplication is applied. See [these docs](#dropping-unneeded-labels).
|
||||
|
||||
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation.
|
||||
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation. The dropped old samples can be tracked with the `vm_streamaggr_dedup_dropped_samples_total` metric.
|
||||
|
||||
# Relabeling
|
||||
|
||||
@@ -444,7 +444,9 @@ outside the current [aggregation interval](https://docs.victoriametrics.com/vict
|
||||
|
||||
- To enable [aggregation windows](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows).
|
||||
|
||||
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` metric.
|
||||
- To enable [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
|
||||
|
||||
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` and `vm_streamaggr_dedup_dropped_samples_total` metrics.
|
||||
|
||||
## Ignore aggregation intervals on start
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
@@ -17,9 +18,10 @@ import (
|
||||
const dedupAggrShardsCount = 128
|
||||
|
||||
type dedupAggr struct {
|
||||
shards []dedupAggrShard
|
||||
flushDuration *metrics.Histogram
|
||||
flushTimeouts *metrics.Counter
|
||||
shards []dedupAggrShard
|
||||
flushDuration *metrics.Histogram
|
||||
flushTimeouts *metrics.Counter
|
||||
droppedSamples *metrics.Counter
|
||||
}
|
||||
|
||||
type dedupAggrShard struct {
|
||||
@@ -47,10 +49,20 @@ type dedupAggrSample struct {
|
||||
timestamp int64
|
||||
}
|
||||
|
||||
func newDedupAggr() *dedupAggr {
|
||||
return &dedupAggr{
|
||||
shards: make([]dedupAggrShard, dedupAggrShardsCount),
|
||||
}
|
||||
func newDedupAggr(ms *metrics.Set, metricLabels string) *dedupAggr {
|
||||
var d dedupAggr
|
||||
d.shards = make([]dedupAggrShard, dedupAggrShardsCount)
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
||||
return float64(d.sizeBytes())
|
||||
})
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
||||
return float64(d.itemsCount())
|
||||
})
|
||||
|
||||
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
|
||||
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
|
||||
d.droppedSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_dropped_samples_total{%s}`, metricLabels))
|
||||
return &d
|
||||
}
|
||||
|
||||
func (da *dedupAggr) sizeBytes() uint64 {
|
||||
@@ -87,7 +99,8 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
|
||||
if len(shardSamples) == 0 {
|
||||
continue
|
||||
}
|
||||
da.shards[i].pushSamples(shardSamples, isGreen)
|
||||
deduplicatedSamples := da.shards[i].pushSamples(shardSamples, isGreen)
|
||||
da.droppedSamples.Add(deduplicatedSamples)
|
||||
}
|
||||
putPerShardSamples(pss)
|
||||
}
|
||||
@@ -167,8 +180,9 @@ func putPerShardSamples(pss *perShardSamples) {
|
||||
|
||||
var perShardSamplesPool sync.Pool
|
||||
|
||||
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
|
||||
var state *dedupAggrState
|
||||
var deduplicatedSamples int
|
||||
|
||||
if isGreen {
|
||||
state = &das.green
|
||||
@@ -198,8 +212,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
continue
|
||||
}
|
||||
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
||||
deduplicatedSamples++
|
||||
}
|
||||
state.samplesBuf = samplesBuf
|
||||
return deduplicatedSamples
|
||||
}
|
||||
|
||||
// deduplicateSamples returns deduplicated timestamp and value results.
|
||||
|
||||
@@ -7,11 +7,13 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
)
|
||||
|
||||
func TestDedupAggrSerial(t *testing.T) {
|
||||
da := newDedupAggr()
|
||||
da := newDedupAggr(metrics.NewSet(), "")
|
||||
|
||||
const seriesCount = 100_000
|
||||
expectedSamplesMap := make(map[string]pushSample)
|
||||
@@ -59,7 +61,7 @@ func TestDedupAggrSerial(t *testing.T) {
|
||||
func TestDedupAggrConcurrent(_ *testing.T) {
|
||||
const concurrency = 5
|
||||
const seriesCount = 10_000
|
||||
da := newDedupAggr()
|
||||
da := newDedupAggr(metrics.NewSet(), "")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for range concurrency {
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
@@ -23,7 +25,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
|
||||
|
||||
const loops = 2
|
||||
benchSamples := newBenchSamples(samplesPerPush)
|
||||
da := newDedupAggr()
|
||||
da := newDedupAggr(metrics.NewSet(), "")
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
@@ -44,7 +44,6 @@ type Deduplicator struct {
|
||||
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
||||
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
|
||||
d := &Deduplicator{
|
||||
da: newDedupAggr(),
|
||||
dropLabels: dropLabels,
|
||||
interval: interval,
|
||||
enableWindows: enableWindows,
|
||||
@@ -64,16 +63,7 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
ms := d.ms
|
||||
|
||||
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
|
||||
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
||||
return float64(d.da.sizeBytes())
|
||||
})
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
||||
return float64(d.da.itemsCount())
|
||||
})
|
||||
|
||||
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
|
||||
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
|
||||
d.da = newDedupAggr(ms, metricLabels)
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
|
||||
@@ -120,6 +110,7 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, s := range ts.Samples {
|
||||
if d.enableWindows && minDeadline > s.Timestamp {
|
||||
d.da.droppedSamples.Inc()
|
||||
continue
|
||||
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
|
||||
ctx.green = append(ctx.green, pushSample{
|
||||
|
||||
@@ -668,18 +668,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
}
|
||||
|
||||
if dedupInterval > 0 {
|
||||
a.da = newDedupAggr()
|
||||
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
|
||||
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
|
||||
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
||||
n := a.da.sizeBytes()
|
||||
return float64(n)
|
||||
})
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
||||
n := a.da.itemsCount()
|
||||
return float64(n)
|
||||
})
|
||||
a.da = newDedupAggr(ms, metricLabels)
|
||||
}
|
||||
|
||||
alignFlushToInterval := !opts.NoAlignFlushToInterval
|
||||
|
||||
Reference in New Issue
Block a user