Compare commits

...

1 Commits

View File

@@ -83,6 +83,11 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
maxHourlySeriesPerURL = flagutil.NewArrayInt("remoteWrite.urlMaxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
maxDailySeriesPerURL = flagutil.NewArrayInt("remoteWrite.urlMaxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
@@ -836,6 +841,12 @@ type remoteWriteCtx struct {
pss []*pendingSeries
pssNextIdx atomic.Uint64
hourlySeriesLimiter *bloomfilter.Limiter
hourlySeriesLimitRowsDropped *metrics.Counter
dailySeriesLimiter *bloomfilter.Limiter
dailySeriesLimitRowsDropped *metrics.Counter
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
@@ -873,6 +884,30 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
return 0
})
var hourlyLimiter *bloomfilter.Limiter
maxHourlySeriesLimit := maxHourlySeriesPerURL.GetOptionalArg(argIdx)
if maxHourlySeriesLimit > 0 {
hourlyLimiter = bloomfilter.NewLimiter(maxHourlySeriesLimit, time.Hour)
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_max_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(hourlyLimiter.MaxItems())
})
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_current_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(hourlyLimiter.CurrentItems())
})
}
var dailyLimiter *bloomfilter.Limiter
maxDailySeriesLimit := maxDailySeriesPerURL.GetOptionalArg(argIdx)
if maxHourlySeriesLimit > 0 {
dailyLimiter = bloomfilter.NewLimiter(maxDailySeriesLimit, time.Hour)
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_max_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(dailyLimiter.MaxItems())
})
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_current_series{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(dailyLimiter.CurrentItems())
})
}
var c *client
switch remoteWriteURL.Scheme {
case "http", "https":
@@ -902,6 +937,12 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
c: c,
pss: pss,
hourlySeriesLimiter: hourlyLimiter,
hourlySeriesLimitRowsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_hourly_series_limit_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
dailySeriesLimiter: dailyLimiter,
dailySeriesLimitRowsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_daily_series_limit_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
@@ -937,6 +978,13 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.fq.MustClose()
rwctx.fq = nil
if rwctx.hourlySeriesLimiter != nil {
rwctx.hourlySeriesLimiter.MustStop()
}
if rwctx.dailySeriesLimiter != nil {
rwctx.dailySeriesLimiter.MustStop()
}
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
}
@@ -1011,6 +1059,29 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
return false
}
func (rwctx *remoteWriteCtx) limitSeriesCardinality(tss []prompb.TimeSeries) []prompb.TimeSeries {
if rwctx.hourlySeriesLimiter == nil && rwctx.dailySeriesLimiter == nil {
return tss
}
dst := make([]prompb.TimeSeries, 0, len(tss))
for i := range tss {
labels := tss[i].Labels
h := getLabelsHash(labels)
if rwctx.hourlySeriesLimiter != nil && !rwctx.hourlySeriesLimiter.Add(h) {
rwctx.hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
logSkippedSeries(labels, "-remoteWrite.maxHourlySeriesPerURL", rwctx.hourlySeriesLimiter.MaxItems())
continue
}
if rwctx.dailySeriesLimiter != nil && !rwctx.dailySeriesLimiter.Add(h) {
rwctx.dailySeriesLimitRowsDropped.Add(len(tss[i].Samples))
logSkippedSeries(labels, "-remoteWrite.maxDailySeriesPerURL", rwctx.dailySeriesLimiter.MaxItems())
continue
}
dst = append(dst, tss[i])
}
return dst
}
var matchIdxsPool bytesutil.ByteBufferPool
func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []byte, dropInput bool) []prompb.TimeSeries {
@@ -1068,6 +1139,9 @@ func (rwctx *remoteWriteCtx) tryPushTimeSeriesInternal(tss []prompb.TimeSeries)
rctx.appendExtraLabels(tss, labelsGlobal)
}
sortLabelsIfNeeded(tss)
tss = rwctx.limitSeriesCardinality(tss)
pss := rwctx.pss
idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss))