mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-05 10:02:22 +03:00
Compare commits
1 Commits
debug-grou
...
vmagent-ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ec4c72e7a |
@@ -83,6 +83,11 @@ var (
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
|
||||
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
|
||||
maxHourlySeriesPerURL = flagutil.NewArrayInt("remoteWrite.urlMaxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
|
||||
maxDailySeriesPerURL = flagutil.NewArrayInt("remoteWrite.urlMaxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
|
||||
|
||||
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
|
||||
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
|
||||
|
||||
@@ -836,6 +841,12 @@ type remoteWriteCtx struct {
|
||||
pss []*pendingSeries
|
||||
pssNextIdx atomic.Uint64
|
||||
|
||||
hourlySeriesLimiter *bloomfilter.Limiter
|
||||
hourlySeriesLimitRowsDropped *metrics.Counter
|
||||
|
||||
dailySeriesLimiter *bloomfilter.Limiter
|
||||
dailySeriesLimitRowsDropped *metrics.Counter
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
|
||||
@@ -873,6 +884,30 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||
return 0
|
||||
})
|
||||
|
||||
var hourlyLimiter *bloomfilter.Limiter
|
||||
maxHourlySeriesLimit := maxHourlySeriesPerURL.GetOptionalArg(argIdx)
|
||||
if maxHourlySeriesLimit > 0 {
|
||||
hourlyLimiter = bloomfilter.NewLimiter(maxHourlySeriesLimit, time.Hour)
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_max_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(hourlyLimiter.MaxItems())
|
||||
})
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_current_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(hourlyLimiter.CurrentItems())
|
||||
})
|
||||
}
|
||||
|
||||
var dailyLimiter *bloomfilter.Limiter
|
||||
maxDailySeriesLimit := maxDailySeriesPerURL.GetOptionalArg(argIdx)
|
||||
if maxHourlySeriesLimit > 0 {
|
||||
dailyLimiter = bloomfilter.NewLimiter(maxDailySeriesLimit, time.Hour)
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_max_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(dailyLimiter.MaxItems())
|
||||
})
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_current_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(dailyLimiter.CurrentItems())
|
||||
})
|
||||
}
|
||||
|
||||
var c *client
|
||||
switch remoteWriteURL.Scheme {
|
||||
case "http", "https":
|
||||
@@ -902,6 +937,12 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
||||
c: c,
|
||||
pss: pss,
|
||||
|
||||
hourlySeriesLimiter: hourlyLimiter,
|
||||
hourlySeriesLimitRowsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
|
||||
dailySeriesLimiter: dailyLimiter,
|
||||
dailySeriesLimitRowsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
|
||||
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
|
||||
@@ -937,6 +978,13 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.fq.MustClose()
|
||||
rwctx.fq = nil
|
||||
|
||||
if rwctx.hourlySeriesLimiter != nil {
|
||||
rwctx.hourlySeriesLimiter.MustStop()
|
||||
}
|
||||
if rwctx.dailySeriesLimiter != nil {
|
||||
rwctx.dailySeriesLimiter.MustStop()
|
||||
}
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
}
|
||||
@@ -1011,6 +1059,29 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
return false
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) limitSeriesCardinality(tss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
if rwctx.hourlySeriesLimiter == nil && rwctx.dailySeriesLimiter == nil {
|
||||
return tss
|
||||
}
|
||||
dst := make([]prompb.TimeSeries, 0, len(tss))
|
||||
for i := range tss {
|
||||
labels := tss[i].Labels
|
||||
h := getLabelsHash(labels)
|
||||
if rwctx.hourlySeriesLimiter != nil && !rwctx.hourlySeriesLimiter.Add(h) {
|
||||
rwctx.hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxHourlySeriesPerURL", rwctx.hourlySeriesLimiter.MaxItems())
|
||||
continue
|
||||
}
|
||||
if rwctx.dailySeriesLimiter != nil && !rwctx.dailySeriesLimiter.Add(h) {
|
||||
rwctx.dailySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxDailySeriesPerURL", rwctx.dailySeriesLimiter.MaxItems())
|
||||
continue
|
||||
}
|
||||
dst = append(dst, tss[i])
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
var matchIdxsPool bytesutil.ByteBufferPool
|
||||
|
||||
func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []byte, dropInput bool) []prompb.TimeSeries {
|
||||
@@ -1068,6 +1139,9 @@ func (rwctx *remoteWriteCtx) tryPushTimeSeriesInternal(tss []prompb.TimeSeries)
|
||||
rctx.appendExtraLabels(tss, labelsGlobal)
|
||||
}
|
||||
|
||||
sortLabelsIfNeeded(tss)
|
||||
tss = rwctx.limitSeriesCardinality(tss)
|
||||
|
||||
pss := rwctx.pss
|
||||
idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user