mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
app/vmalert: add random jitter to concurrent periodical flushers targeting the remote write destination
I expect the change to help in two ways: 1. Spreading remote write flushes over the flush interval to avoid congestion at the remote write destination; 2. Enhance queue data consumption. Currently, all flushers may always flush data simultaneously, resulting in periods where no flushers are consuming data from the queue, which increases the risk of reaching the queue limit `remoteWrite.maxQueueSize` even when a increased `remoteWrite.concurrency`. By making the flushers more dispersed, it is more likely that some flushers are consistently consuming data from the queue, which should make queue management easier. Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10729/
This commit is contained in:
@@ -13,8 +13,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@@ -115,8 +117,10 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
input: make(chan prompb.TimeSeries, cfg.MaxQueueSize),
|
||||
}
|
||||
|
||||
for range cc {
|
||||
c.run(ctx)
|
||||
for i := 0; i < cc; i++ {
|
||||
c.wg.Go(func() {
|
||||
c.run(ctx, i)
|
||||
})
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
@@ -158,8 +162,7 @@ func (c *Client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) run(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
func (c *Client) run(ctx context.Context, id int) {
|
||||
wr := &prompb.WriteRequest{}
|
||||
shutdown := func() {
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||
@@ -176,34 +179,53 @@ func (c *Client) run(ctx context.Context) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
c.wg.Go(func() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
// add jitter to spread remote write flushes over the flush interval to avoid congestion at the remote write destination
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(fmt.Sprintf("%d", id)))
|
||||
randJitter := uint64(float64(c.flushInterval) * (float64(h) / (1 << 64)))
|
||||
timer := time.NewTimer(time.Duration(randJitter))
|
||||
addJitter:
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
timer.Stop()
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
shutdown()
|
||||
return
|
||||
case <-timer.C:
|
||||
break addJitter
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.flush(ctx, wr)
|
||||
// drain the potential stale tick to avoid small or empty flushes after a slow flush.
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
default:
|
||||
}
|
||||
case ts, ok := <-c.input:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
// drain the potential stale tick to avoid small or empty flushes after a slow flush.
|
||||
select {
|
||||
case <-ticker.C:
|
||||
default:
|
||||
}
|
||||
case ts, ok := <-c.input:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: introduce `vm_filestream_fsync_duration_seconds_total` and `vm_filestream_fsync_calls_total` metrics, which can be used for detecting slow storage if it cannot keep up with the current data ingestion rate. See [#10432](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10432). Thanks to @mehrdadbn9 for the contribution.
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add dedicated `thanos` mode for [migrating data from Thanos](https://docs.victoriametrics.com/victoriametrics/vmctl/thanos/). This mode supports both raw and downsampled Thanos blocks, including all aggregate types (count, sum, min, max, counter). Each aggregate is imported as a separate metric with resolution and aggregate type suffixes (e.g., `metric_name:5m:count`). The new mode uses `--thanos-*` prefixed flags: `--thanos-snapshot`, `--thanos-concurrency`, `--thanos-filter-time-start`, `--thanos-filter-time-end`, `--thanos-filter-label`, `--thanos-filter-label-value`, and `--thanos-aggr-types`. See [#9262](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9262).
|
||||
* FEATURE: [dashboards/alert-statistics](https://grafana.com/grafana/dashboards/24553): add pending and firing alerts stats; fix query in `FIRING over time by group` panel. See [#10571](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10571). Thanks to @sias32 for the contribution.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add random jitter to concurrent periodical flushers targeting the remote write destination. This helps spread remote write flushes across the flush interval, avoiding congestion at the remote write destination and enhancing queue data consumption. See [#10729](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10729).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): expose `vmalert_remotewrite_sent_rows` and `vmalert_remotewrite_sent_bytes` histograms to provide better visibility into remote write request sizes. See [#10727](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10727).
|
||||
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): retry the requests that failed with unexpected EOF due to unstable network to S3 service. See [#10699](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10699).
|
||||
|
||||
Reference in New Issue
Block a user