app/vmalert: support sending data to -remoteWrite.url via zstd

In most cases, vmalert is configured to write to vm components like
vminsert or vmagent, using VictoriaMetrics remote write protocol can
save network bandwidth.
The VictoriaMetrics remote write protocol is used by default, and the
protocol is downgraded from VictoriaMetrics to Prometheus remote write
if one request fails with protocol error.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10929
This commit is contained in:
Hui Wang
2026-05-13 04:19:29 +08:00
committed by GitHub
parent 71716e7201
commit 2d33493009
8 changed files with 89 additions and 33 deletions

View File

@@ -466,7 +466,7 @@ again:
goto again
}
logger.Warnf("failed to repack zstd block (%s bytes) to snappy: %s; The block will be rejected. "+
logger.Warnf("failed to repack zstd block (%d bytes) to snappy: %s; The block will be rejected. "+
"Possible cause: ungraceful shutdown leading to persisted queue corruption.",
zstdBlockLen, err)
}

View File

@@ -11,6 +11,7 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cespare/xxhash/v2"
@@ -18,6 +19,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
@@ -57,6 +60,11 @@ type Client struct {
wg sync.WaitGroup
doneCh chan struct{}
// Whether to encode the write request with VictoriaMetrics remote write protocol.
// It is set to true by default, and will be switched to false if the client
// receives specific errors indicating that the remote storage doesn't support VictoriaMetrics remote write protocol.
isVMRemoteWrite atomic.Bool
}
// Config is config for remote write client.
@@ -116,6 +124,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
doneCh: make(chan struct{}),
input: make(chan prompb.TimeSeries, cfg.MaxQueueSize),
}
c.isVMRemoteWrite.Store(true)
for i := 0; i < cc; i++ {
c.wg.Go(func() {
@@ -265,8 +274,16 @@ func (c *Client) flush(ctx context.Context, wr *prompb.WriteRequest) {
defer wr.Reset()
defer bufferFlushDuration.UpdateDuration(time.Now())
data := wr.MarshalProtobuf(nil)
b := snappy.Encode(nil, data)
bb := writeRequestBufPool.Get()
bb.B = wr.MarshalProtobuf(bb.B[:0])
zb := compressBufPool.Get()
defer compressBufPool.Put(zb)
if c.isVMRemoteWrite.Load() {
zb.B = zstd.CompressLevel(zb.B[:0], bb.B, 0)
} else {
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
}
writeRequestBufPool.Put(bb)
maxRetryInterval := *retryMaxTime
bt := timeutil.NewBackoffTimer(*retryMinInterval, maxRetryInterval)
@@ -278,17 +295,17 @@ func (c *Client) flush(ctx context.Context, wr *prompb.WriteRequest) {
attempts := 0
L:
for {
err := c.send(ctx, b)
err := c.send(ctx, zb.B)
if err != nil && (errors.Is(err, io.EOF) || netutil.IsTrivialNetworkError(err)) {
// Something in the middle between client and destination might be closing
// the connection. So we do a one more attempt in hope request will succeed.
err = c.send(ctx, b)
err = c.send(ctx, zb.B)
}
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
sentBytes.Add(len(zb.B))
flushedRows.Update(float64(len(wr.Timeseries)))
flushedBytes.Update(float64(len(b)))
flushedBytes.Update(float64(len(zb.B)))
return
}
@@ -340,12 +357,16 @@ func (c *Client) send(ctx context.Context, data []byte) error {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("User-Agent", "vmalert")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if encoding.IsZstd(data) {
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
} else {
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
}
if c.authCfg != nil {
err = c.authCfg.SetHeaders(req, true)
@@ -374,6 +395,29 @@ func (c *Client) send(ctx context.Context, data []byte) error {
// respond with HTTP 2xx status code when write is successful.
return nil
case 4:
// - Remote Write v1 specification implicitly expects a `400 Bad Request` when the encoding is not supported.
// - Remote Write v2 specification explicitly specifies a `415 Unsupported Media Type` for unsupported encodings.
// - Real-world implementations of v1 use both 400 and 415 status codes.
// See more in research: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8462#issuecomment-2786918054
if resp.StatusCode == http.StatusUnsupportedMediaType || resp.StatusCode == http.StatusBadRequest {
if encoding.IsZstd(data) {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Re-packing the block to Prometheus remote write and retrying."+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", req.URL.Redacted())
zstdBlockLen := len(data)
data, err = repackBlockFromZstdToSnappy(data)
if err == nil {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Downgrading protocol from VictoriaMetrics to Prometheus remote write for all future requests. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", req.URL.Redacted())
c.isVMRemoteWrite.Store(false)
return c.send(ctx, data)
}
logger.Warnf("failed to repack zstd block (%d bytes) to snappy: %s; The block will be rejected. "+
"Possible cause: ungraceful shutdown leading to persisted queue corruption.",
zstdBlockLen, err)
}
}
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{
@@ -394,3 +438,19 @@ type nonRetriableError struct {
func (e *nonRetriableError) Error() string {
return e.err.Error()
}
var (
writeRequestBufPool bytesutil.ByteBufferPool
compressBufPool bytesutil.ByteBufferPool
)
// repackBlockFromZstdToSnappy repacks the given zstd-compressed block to snappy-compressed block.
func repackBlockFromZstdToSnappy(zstdBlock []byte) ([]byte, error) {
plainBlock := make([]byte, 0, len(zstdBlock)*2)
plainBlock, err := encoding.DecompressZSTD(plainBlock, zstdBlock)
if err != nil {
return nil, err
}
return snappy.Encode(nil, plainBlock), nil
}

View File

@@ -12,8 +12,7 @@ import (
"testing"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -159,8 +158,8 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
}
h := r.Header.Get("Content-Encoding")
if h != "snappy" {
rw.err(w, fmt.Errorf("header read error: Content-Encoding is not snappy (%q)", h))
if h != "zstd" {
rw.err(w, fmt.Errorf("header read error: Content-Encoding is not zstd (%q)", h))
}
h = r.Header.Get("Content-Type")
@@ -168,9 +167,9 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
rw.err(w, fmt.Errorf("header read error: Content-Type is not x-protobuf (%q)", h))
}
h = r.Header.Get("X-Prometheus-Remote-Write-Version")
if h != "0.1.0" {
rw.err(w, fmt.Errorf("header read error: X-Prometheus-Remote-Write-Version is not 0.1.0 (%q)", h))
h = r.Header.Get("X-VictoriaMetrics-Remote-Write-Version")
if h != "1" {
rw.err(w, fmt.Errorf("header read error: X-VictoriaMetrics-Remote-Write-Version is not 1 (%q)", h))
}
data, err := io.ReadAll(r.Body)
@@ -180,7 +179,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
}
defer func() { _ = r.Body.Close() }()
b, err := snappy.Decode(nil, data)
b, err := zstd.Decompress(nil, data)
if err != nil {
rw.err(w, fmt.Errorf("decode err: %w", err))
return

View File

@@ -9,8 +9,7 @@ import (
"strings"
"sync"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
@@ -64,19 +63,17 @@ func (c *DebugClient) Close() error {
}
func (c *DebugClient) send(data []byte) error {
b := snappy.Encode(nil, data)
b := zstd.CompressLevel(nil, data, 0)
r := bytes.NewReader(b)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Encoding", "zstd")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")

View File

@@ -13,8 +13,8 @@ import (
)
var (
addr = flag.String("remoteWrite.url", "", "Optional URL to VictoriaMetrics or vminsert where to persist alerts state "+
"and recording rules results in form of timeseries. "+
addr = flag.String("remoteWrite.url", "", "Optional URL to persist alerts state and recording rules results in form of timeseries. "+
"It must support either VictoriaMetrics remote write protocol or Prometheus remote_write protocol. "+
"Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. "+
"For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, "+
"then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.")

View File

@@ -47,6 +47,7 @@ Released at 2026-05-08
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add support for [Prometheus native histogram](https://prometheus.io/docs/specs/native_histograms/) during ingestion. See [#10743](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): introduce the `vm_fs_info` metric. It exposes the filesystem type (e.g., ext4, xfs, nfs) used for `-*Path` related flags. See [#10482](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10482).
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Kafka (Enterprise)` row with panels for monitoring traffic (bytes), messages in/out, producer and consumer errors. See [#10728](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10728).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): support sending data to the configured `-remoteWrite.url` via [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol). See [#10929](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10929).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly obtain `__meta_hetzner_hcloud_location` and `__meta_hetzner_hcloud_location_network_zone` labels for [hetzner_sd_configs](https://docs.victoriametrics.com/victoriametrics/sd_configs/#hetzner_sd_configs). Hetzner changed discovery [API response](https://docs.hetzner.cloud/changelog#2025-12-16-phasing-out-datacenters) and returns `location` information from different field. See [#10909](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10909). Thanks to @juliusrickert for contribution.
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly discover addresses of storage nodes with [enterprise automatic-vmstorage-discovery](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#automatic-vmstorage-discovery). Bug was introduced at [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/changelog/CHANGELOG.md#v11410).

View File

@@ -17,8 +17,7 @@ aliases:
or [recording](https://docs.victoriametrics.com/victoriametrics/vmalert/#recording-rules)
rules against configured `-datasource.url`. For sending alerting notifications
`vmalert` relies on [Alertmanager](https://github.com/prometheus/alertmanager) configured via `-notifier.url` flag.
Recording rules results are persisted via [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
protocol and require `-remoteWrite.url` to be configured.
Recording rules results are persisted via remote write protocols and require `-remoteWrite.url` to be configured.
`vmalert` is heavily inspired by [Prometheus](https://prometheus.io/docs/alerting/latest/overview/)
implementation and aims to be compatible with its syntax.
@@ -68,7 +67,7 @@ To start using `vmalert` you will need the following things:
* notifier address [optional] - reachable [Alert Manager](https://github.com/prometheus/alertmanager) instance for processing,
aggregating alerts, and sending notifications. Please note, notifier address also supports Consul and DNS Service Discovery via
[config file](https://docs.victoriametrics.com/victoriametrics/vmalert/#notifier-configuration-file).
* remote write address [optional] - [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
* remote write address [optional] - [remote write](https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol)
compatible storage to persist rules and alerts state info. To persist results to multiple destinations use vmagent
configured with multiple remote writes as a proxy;
* remote read address [optional] - MetricsQL compatible datasource to restore alerts state from.
@@ -848,7 +847,7 @@ If you want rules to run concurrently based on the `concurrency` setting, set `-
vmalert sends rule's expression to [/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query) endpoint
of the configured `-datasource.url`. Returned data is then processed according to the rule type and
backfilled to `-remoteWrite.url` via [remote Write protocol](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations).
backfilled to `-remoteWrite.url` via [remote Write protocol](https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol).
vmalert respects `evaluationInterval` value set by flag or per-group during the replay.
vmalert automatically disables caching on VictoriaMetrics side by sending `nocache=1` param. It allows
to prevent cache pollution and unwanted time range boundaries adjustment during backfilling.

View File

@@ -391,7 +391,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmalert/ .
-remoteWrite.tlsServerName string
Optional TLS server name to use for connections to -remoteWrite.url. By default, the server name from -remoteWrite.url is used
-remoteWrite.url string
Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.
Optional URL to persist alerts state and recording rules results in form of timeseries. It must support either VictoriaMetrics remote write protocol or Prometheus remote_write protocol. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.
-replay.disableProgressBar
Whether to disable rendering progress bars during the replay. Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.
-replay.maxDatapointsPerQuery int