app/vmalert: add random jitter to concurrent periodical flushers targeting the remote write destination

I expect the change to help in two ways:
1. Spreading remote write flushes over the flush interval to avoid
congestion at the remote write destination;
2. Enhance queue data consumption. Currently, all flushers may always
flush data simultaneously, resulting in periods where no flushers are
consuming data from the queue, which increases the risk of reaching the
queue limit `remoteWrite.maxQueueSize` even when a increased
`remoteWrite.concurrency`. By making the flushers more dispersed, it is
more likely that some flushers are consistently consuming data from the
queue, which should make queue management easier.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10729/
This commit is contained in:
Hui Wang
2026-04-06 15:50:20 +08:00
committed by f41gh7
parent dba05bd4b6
commit 99ec1f0da7
2 changed files with 50 additions and 27 deletions

View File

@@ -13,8 +13,10 @@ import (
"sync"
"time"
"github.com/cespare/xxhash/v2"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -115,8 +117,10 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
input: make(chan prompb.TimeSeries, cfg.MaxQueueSize),
}
for range cc {
c.run(ctx)
for i := 0; i < cc; i++ {
c.wg.Go(func() {
c.run(ctx, i)
})
}
return c, nil
}
@@ -158,8 +162,7 @@ func (c *Client) Close() error {
return nil
}
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
func (c *Client) run(ctx context.Context, id int) {
wr := &prompb.WriteRequest{}
shutdown := func() {
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
@@ -176,34 +179,53 @@ func (c *Client) run(ctx context.Context) {
cancel()
}
c.wg.Go(func() {
defer ticker.Stop()
for {
// add jitter to spread remote write flushes over the flush interval to avoid congestion at the remote write destination
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(fmt.Sprintf("%d", id)))
randJitter := uint64(float64(c.flushInterval) * (float64(h) / (1 << 64)))
timer := time.NewTimer(time.Duration(randJitter))
addJitter:
for {
select {
case <-c.doneCh:
timer.Stop()
shutdown()
return
case <-ctx.Done():
timer.Stop()
shutdown()
return
case <-timer.C:
break addJitter
}
}
ticker := time.NewTicker(c.flushInterval)
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
// drain the potential stale tick to avoid small or empty flushes after a slow flush.
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
default:
}
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
// drain the potential stale tick to avoid small or empty flushes after a slow flush.
select {
case <-ticker.C:
default:
}
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
})
}
}
var (

View File

@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: introduce `vm_filestream_fsync_duration_seconds_total` and `vm_filestream_fsync_calls_total` metrics, which can be used for detecting slow storage if it cannot keep up with the current data ingestion rate. See [#10432](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10432). Thanks to @mehrdadbn9 for the contribution.
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add dedicated `thanos` mode for [migrating data from Thanos](https://docs.victoriametrics.com/victoriametrics/vmctl/thanos/). This mode supports both raw and downsampled Thanos blocks, including all aggregate types (count, sum, min, max, counter). Each aggregate is imported as a separate metric with resolution and aggregate type suffixes (e.g., `metric_name:5m:count`). The new mode uses `--thanos-*` prefixed flags: `--thanos-snapshot`, `--thanos-concurrency`, `--thanos-filter-time-start`, `--thanos-filter-time-end`, `--thanos-filter-label`, `--thanos-filter-label-value`, and `--thanos-aggr-types`. See [#9262](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9262).
* FEATURE: [dashboards/alert-statistics](https://grafana.com/grafana/dashboards/24553): add pending and firing alerts stats; fix query in `FIRING over time by group` panel. See [#10571](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10571). Thanks to @sias32 for the contribution.
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add random jitter to concurrent periodical flushers targeting the remote write destination. This helps spread remote write flushes across the flush interval, avoiding congestion at the remote write destination and enhancing queue data consumption. See [#10729](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10729).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): expose `vmalert_remotewrite_sent_rows` and `vmalert_remotewrite_sent_bytes` histograms to provide better visibility into remote write request sizes. See [#10727](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10727).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): retry the requests that failed with unexpected EOF due to unstable network to S3 service. See [#10699](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10699).