Files
VictoriaMetrics/app/vmalert/remotewrite/client.go
Hui Wang 2d33493009 app/vmalert: support sending data to -remoteWrite.url via zstd
In most cases, vmalert is configured to write to vm components like
vminsert or vmagent, using VictoriaMetrics remote write protocol can
save network bandwidth.
The VictoriaMetrics remote write protocol is used by default, and the
protocol is downgraded from VictoriaMetrics to Prometheus remote write
if one request fails with protocol error.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10929
2026-05-12 22:19:29 +02:00

457 lines
14 KiB
Go

package remotewrite
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cespare/xxhash/v2"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"
)
var defaultConcurrency = cgroup.AvailableCPUs() * 2
const (
defaultMaxBatchSize = 1e4
defaultMaxQueueSize = 1e5
defaultFlushInterval = 2 * time.Second
defaultWriteTimeout = 30 * time.Second
)
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxTime")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
)
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompb.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
// Whether to encode the write request with VictoriaMetrics remote write protocol.
// It is set to true by default, and will be switched to false if the client
// receives specific errors indicating that the remote storage doesn't support VictoriaMetrics remote write protocol.
isVMRemoteWrite atomic.Bool
}
// Config is config for remote write client.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
}
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
}
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
}
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = httputil.NewTransport(false, "vmalert_remotewrite")
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompb.TimeSeries, cfg.MaxQueueSize),
}
c.isVMRemoteWrite.Store(true)
for i := 0; i < cc; i++ {
c.wg.Go(func() {
c.run(ctx, i)
})
}
return c, nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompb.TimeSeries) error {
rwTotal.Inc()
select {
case <-c.doneCh:
rwErrors.Inc()
droppedRows.Add(len(s.Samples))
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
default:
rwErrors.Inc()
droppedRows.Add(len(s.Samples))
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
c.maxQueueSize)
}
}
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
}
close(c.input)
start := time.Now()
logger.Infof("shutting down remote write client: flushing remained series")
close(c.doneCh)
c.wg.Wait()
logger.Infof("shutting down remote write client: finished in %v", time.Since(start))
return nil
}
func (c *Client) run(ctx context.Context, id int) {
wr := &prompb.WriteRequest{}
shutdown := func() {
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(lastCtx, wr)
}
}
// flush the last batch. `flush` will re-check and avoid flushing empty batch.
c.flush(lastCtx, wr)
cancel()
}
// add jitter to spread remote write flushes over the flush interval to avoid congestion at the remote write destination
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(fmt.Sprintf("%d", id)))
randJitter := uint64(float64(c.flushInterval) * (float64(h) / (1 << 64)))
timer := time.NewTimer(time.Duration(randJitter))
addJitter:
for {
select {
case <-c.doneCh:
timer.Stop()
shutdown()
return
case <-ctx.Done():
timer.Stop()
shutdown()
return
case <-timer.C:
break addJitter
}
}
ticker := time.NewTicker(c.flushInterval)
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
// drain the potential stale tick to avoid small or empty flushes after a slow flush.
select {
case <-ticker.C:
default:
}
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
}
var (
rwErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
rwTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
// sentRows and sentBytes are historical counters that can now be replaced by flushedRows and flushedBytes histograms. They may be deprecated in the future after the new histograms have been adopted for some time.
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
flushedRows = metrics.NewHistogram(`vmalert_remotewrite_sent_rows`)
flushedBytes = metrics.NewHistogram(`vmalert_remotewrite_sent_bytes`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
remoteWriteQueueSize = metrics.NewHistogram(`vmalert_remotewrite_queue_size`)
_ = metrics.NewGauge(`vmalert_remotewrite_queue_capacity`, func() float64 {
return float64(*maxQueueSize)
})
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
})
)
// GetDroppedRows returns value of droppedRows metric
func GetDroppedRows() int { return int(droppedRows.Get()) }
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompb.WriteRequest) {
remoteWriteQueueSize.Update(float64(len(c.input)))
if len(wr.Timeseries) < 1 {
return
}
defer wr.Reset()
defer bufferFlushDuration.UpdateDuration(time.Now())
bb := writeRequestBufPool.Get()
bb.B = wr.MarshalProtobuf(bb.B[:0])
zb := compressBufPool.Get()
defer compressBufPool.Put(zb)
if c.isVMRemoteWrite.Load() {
zb.B = zstd.CompressLevel(zb.B[:0], bb.B, 0)
} else {
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
}
writeRequestBufPool.Put(bb)
maxRetryInterval := *retryMaxTime
bt := timeutil.NewBackoffTimer(*retryMinInterval, maxRetryInterval)
timeStart := time.Now()
defer func() {
sendDuration.Add(time.Since(timeStart).Seconds())
}()
attempts := 0
L:
for {
err := c.send(ctx, zb.B)
if err != nil && (errors.Is(err, io.EOF) || netutil.IsTrivialNetworkError(err)) {
// Something in the middle between client and destination might be closing
// the connection. So we do a one more attempt in hope request will succeed.
err = c.send(ctx, zb.B)
}
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(zb.B))
flushedRows.Update(float64(len(wr.Timeseries)))
flushedBytes.Update(float64(len(zb.B)))
return
}
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
break
}
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
default:
}
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
break
}
if bt.CurrentDelay() > timeLeftForRetries {
bt.SetDelay(timeLeftForRetries)
}
// sleeping to prevent remote db hammering
bt.Wait(ctx.Done())
attempts++
}
rwErrors.Inc()
rows := 0
for _, ts := range wr.Timeseries {
rows += len(ts.Samples)
}
droppedRows.Add(rows)
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
req.Header.Set("User-Agent", "vmalert")
req.Header.Set("Content-Type", "application/x-protobuf")
if encoding.IsZstd(data) {
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
} else {
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
}
if c.authCfg != nil {
err = c.authCfg.SetHeaders(req, true)
if err != nil {
return &nonRetriableError{
err: err,
}
}
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to https://prometheus.io/docs/concepts/remote_write_spec/
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with HTTP 2xx status code when write is successful.
return nil
case 4:
// - Remote Write v1 specification implicitly expects a `400 Bad Request` when the encoding is not supported.
// - Remote Write v2 specification explicitly specifies a `415 Unsupported Media Type` for unsupported encodings.
// - Real-world implementations of v1 use both 400 and 415 status codes.
// See more in research: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8462#issuecomment-2786918054
if resp.StatusCode == http.StatusUnsupportedMediaType || resp.StatusCode == http.StatusBadRequest {
if encoding.IsZstd(data) {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Re-packing the block to Prometheus remote write and retrying."+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", req.URL.Redacted())
zstdBlockLen := len(data)
data, err = repackBlockFromZstdToSnappy(data)
if err == nil {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Downgrading protocol from VictoriaMetrics to Prometheus remote write for all future requests. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", req.URL.Redacted())
c.isVMRemoteWrite.Store(false)
return c.send(ctx, data)
}
logger.Warnf("failed to repack zstd block (%d bytes) to snappy: %s; The block will be rejected. "+
"Possible cause: ungraceful shutdown leading to persisted queue corruption.",
zstdBlockLen, err)
}
}
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{
err: fmt.Errorf("unexpected response code %d for %s. Response body %q", resp.StatusCode, req.URL.Redacted(), body),
}
}
fallthrough
default:
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}
}
type nonRetriableError struct {
err error
}
func (e *nonRetriableError) Error() string {
return e.err.Error()
}
var (
writeRequestBufPool bytesutil.ByteBufferPool
compressBufPool bytesutil.ByteBufferPool
)
// repackBlockFromZstdToSnappy repacks the given zstd-compressed block to snappy-compressed block.
func repackBlockFromZstdToSnappy(zstdBlock []byte) ([]byte, error) {
plainBlock := make([]byte, 0, len(zstdBlock)*2)
plainBlock, err := encoding.DecompressZSTD(plainBlock, zstdBlock)
if err != nil {
return nil, err
}
return snappy.Encode(nil, plainBlock), nil
}