From 3f5bf4bd03071385f2172fd279c9099fa682fdb2 Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Tue, 8 Apr 2025 17:12:06 +0300 Subject: [PATCH] vmagent/remotewrite: set content encoding header based on actual body Improve remote write handling in vmagent by setting the `Content-Encoding` header based on the actual request body, rather than relying on configuration. - Detects Zstd compression via the Zstd magic number. - Falls back to Snappy if Zstd is not detected. - Persistent queue may now contain mixed-encoding content. - Add basic vmagent integration tests Follow up on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5344 and https://github.com/VictoriaMetrics/VictoriaMetrics/commit/12cd32fd75706969f972a328e8583ca1da9e68c3. Extracted from https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8462 Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301 --- app/vmagent/remotewrite/client.go | 3 +- apptest/testcase.go | 17 +++ apptest/tests/vmagent_remotewrite_test.go | 54 +++++++++ apptest/vmagent.go | 107 ++++++++++++++++++ apptest/vmsingle.go | 6 + docs/victoriametrics/changelog/CHANGELOG.md | 1 + lib/encoding/util.go | 12 ++ lib/encoding/util_test.go | 45 ++++++++ .../promremotewrite/stream/streamparser.go | 8 ++ 9 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 apptest/tests/vmagent_remotewrite_test.go create mode 100644 apptest/vmagent.go create mode 100644 lib/encoding/util.go create mode 100644 lib/encoding/util_test.go diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 5ea7af61cd..5f04f1912b 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -13,6 +13,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -384,7 +385,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) { h := req.Header h.Set("User-Agent", "vmagent") h.Set("Content-Type", "application/x-protobuf") - if c.useVMProto { + if encoding.IsZstd(body) { h.Set("Content-Encoding", "zstd") h.Set("X-VictoriaMetrics-Remote-Write-Version", "1") } else { diff --git a/apptest/testcase.go b/apptest/testcase.go index 05e827381f..b52f2a76af 100644 --- a/apptest/testcase.go +++ b/apptest/testcase.go @@ -124,6 +124,23 @@ func (tc *TestCase) MustStartVminsert(instance string, flags []string) *Vminsert return app } +// MustStartVmagent is a test helper function that starts an instance of +// vmagent and fails the test if the app fails to start. +func (tc *TestCase) MustStartVmagent(instance string, flags []string, promScrapeConfigFileYAML string) *Vmagent { + tc.t.Helper() + + promScrapeConfigFilePath := path.Join(tc.t.TempDir(), "prometheus.yml") + if err := os.WriteFile(promScrapeConfigFilePath, []byte(promScrapeConfigFileYAML), os.ModePerm); err != nil { + tc.t.Fatalf("cannot init vmagent: prom config file write failed: %s", err) + } + app, err := StartVmagent(instance, flags, tc.cli, promScrapeConfigFilePath) + if err != nil { + tc.t.Fatalf("Could not start %s: %v", instance, err) + } + tc.addApp(instance, app) + return app +} + // Vmcluster represents a typical cluster setup: several vmstorage replicas, one // vminsert, and one vmselect. // diff --git a/apptest/tests/vmagent_remotewrite_test.go b/apptest/tests/vmagent_remotewrite_test.go new file mode 100644 index 0000000000..6f50c74f6d --- /dev/null +++ b/apptest/tests/vmagent_remotewrite_test.go @@ -0,0 +1,54 @@ +package tests + +import ( + "fmt" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" + at "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +// TestSingleVMAgentZstdRemoteWrite verifies that vmagent can successfully perform +// a remote write to vmsingle using VM protocol (zstd). +func TestSingleVMAgentZstdRemoteWrite(t *testing.T) { + testSingleVMAgentRemoteWrite(t, false) +} + +// TestSingleVMAgentSnappyRemoteWrite verifies that vmagent can successfully perform +// a remote write to vmsingle using Prometheus protocol (snappy). +func TestSingleVMAgentSnappyRemoteWrite(t *testing.T) { + testSingleVMAgentRemoteWrite(t, true) +} + +func testSingleVMAgentRemoteWrite(t *testing.T, forcePromProto bool) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + vmsingle := tc.MustStartDefaultVmsingle() + + vmagent := tc.MustStartVmagent("vmagent", []string{ + `-remoteWrite.flushInterval=50ms`, + fmt.Sprintf(`-remoteWrite.forcePromProto=%v`, forcePromProto), + fmt.Sprintf(`-remoteWrite.url=http://%s/api/v1/write`, vmsingle.HTTPAddr()), + }, ``) + + vmagent.APIV1ImportPrometheus(t, []string{ + "foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z + }, apptest.QueryOpts{}) + + vmsingle.ForceFlush(t) + + tc.Assert(&at.AssertOptions{ + Msg: `unexpected metrics stored on vmagent remote write`, + Got: func() any { + return vmsingle.PrometheusAPIV1Series(t, `{__name__="foo_bar"}`, at.QueryOpts{ + Start: "2022-05-10T00:00:00Z", + End: "2022-05-10T23:59:59Z", + }).Sort() + }, + Want: &at.PrometheusAPIV1SeriesResponse{ + Status: "success", + Data: []map[string]string{{"__name__": "foo_bar"}}, + }, + }) +} diff --git a/apptest/vmagent.go b/apptest/vmagent.go new file mode 100644 index 0000000000..3947ae0dce --- /dev/null +++ b/apptest/vmagent.go @@ -0,0 +1,107 @@ +package apptest + +import ( + "fmt" + "net/http" + "regexp" + "strings" + "testing" + "time" +) + +// Vmagent holds the state of a vmagent app and provides vmagent-specific functions +type Vmagent struct { + *app + *ServesMetrics + + httpListenAddr string + apiV1ImportPrometheusURL string +} + +// StartVmagent starts an instance of vmagent with the given flags. It also +// sets the default flags and populates the app instance state with runtime +// values extracted from the application log (such as httpListenAddr) +func StartVmagent(instance string, flags []string, cli *Client, promScrapeConfigFilePath string) (*Vmagent, error) { + extractREs := []*regexp.Regexp{ + httpListenAddrRE, + } + + app, stderrExtracts, err := startApp(instance, "../../bin/vmagent", flags, &appOptions{ + defaultFlags: map[string]string{ + "-httpListenAddr": "127.0.0.1:0", + "-promscrape.config": promScrapeConfigFilePath, + }, + extractREs: extractREs, + }) + if err != nil { + return nil, err + } + + return &Vmagent{ + app: app, + ServesMetrics: &ServesMetrics{ + metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), + cli: cli, + }, + httpListenAddr: stderrExtracts[0], + apiV1ImportPrometheusURL: fmt.Sprintf("http://%s/api/v1/import/prometheus", stderrExtracts[0]), + }, nil +} + +// APIV1ImportPrometheus is a test helper function that inserts a +// collection of records in Prometheus text exposition format for the given +// tenant by sending a HTTP POST request to /api/v1/import/prometheus vmagent endpoint. +// +// The call is blocked until the data is flushed to vmstorage or the timeout is reached. +// +// See https://docs.victoriametrics.com/url-examples/#apiv1importprometheus +func (app *Vmagent) APIV1ImportPrometheus(t *testing.T, records []string, _ QueryOpts) { + t.Helper() + + data := []byte(strings.Join(records, "\n")) + app.sendBlocking(t, len(records), func() { + _, statusCode := app.cli.Post(t, app.apiV1ImportPrometheusURL, "text/plain", data) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// sendBlocking sends the data to vmstorage by executing `send` function and +// waits until the data is actually sent. +// +// vmagent does not send the data immediately. It first puts the data into a +// buffer. Then a background goroutine takes the data from the buffer sends it +// to the vmstorage. This happens every 1s by default. +// +// Waiting is implemented a retrieving the value of `vmagent_remotewrite_requests_total` +// metric and checking whether it is equal or greater than the wanted value. +// If it is, then the data has been sent to vmstorage. +// +// Unreliable if the records are inserted concurrently. +func (app *Vmagent) sendBlocking(t *testing.T, numRecordsToSend int, send func()) { + t.Helper() + + send() + + const ( + retries = 20 + period = 100 * time.Millisecond + ) + wantRowsSentCount := app.remoteWriteRequestsTotal(t) + numRecordsToSend + for range retries { + if app.remoteWriteRequestsTotal(t) >= wantRowsSentCount { + return + } + time.Sleep(period) + } + t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage") +} + +func (app *Vmagent) remoteWriteRequestsTotal(t *testing.T) int { + total := 0.0 + for _, v := range app.GetMetricsByPrefix(t, "vmagent_remotewrite_requests_total") { + total += v + } + return int(total) +} diff --git a/apptest/vmsingle.go b/apptest/vmsingle.go index cedc17812c..bccc4cbb6a 100644 --- a/apptest/vmsingle.go +++ b/apptest/vmsingle.go @@ -350,6 +350,12 @@ func (app *Vmsingle) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse return &res } +// HTTPAddr returns the address at which the vmstorage process is listening +// for http connections. +func (app *Vmsingle) HTTPAddr() string { + return app.httpListenAddr +} + // String returns the string representation of the vmsingle app state. func (app *Vmsingle) String() string { return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{ diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 63e2534f46..e961ea5e1f 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -36,6 +36,7 @@ Released at 2025-04-04 * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): return `502 Bad Gateway` status code in case request failed due to a network timeout. Previously, vmauth would incorrectly return `200 OK`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8621). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation): fix panic on `rate` output. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8634). * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix compatibility with S3-compatible storages which do not support data integrity checks. See [this issue](https://github.com/victoriaMetrics/victoriaMetrics/issues/8622). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): remote write client sets correct content encoding header based on actual body content, rather than relying on configuration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650). ## [v1.114.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.114.0) diff --git a/lib/encoding/util.go b/lib/encoding/util.go new file mode 100644 index 0000000000..8151aaed92 --- /dev/null +++ b/lib/encoding/util.go @@ -0,0 +1,12 @@ +package encoding + +import "encoding/binary" + +// IsZstd checks if the given data is compressed using the zstd format. +// It does this by verifying the presence of the zstd magic number (0xFD2FB528) +// at the beginning of the byte slice. +// +// See: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames +func IsZstd(data []byte) bool { + return len(data) >= 4 && binary.LittleEndian.Uint32(data) == 0xFD2FB528 +} diff --git a/lib/encoding/util_test.go b/lib/encoding/util_test.go new file mode 100644 index 0000000000..9ccdb971fa --- /dev/null +++ b/lib/encoding/util_test.go @@ -0,0 +1,45 @@ +package encoding_test + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/golang/snappy" +) + +func TestIsZstd(t *testing.T) { + // nil + if encoding.IsZstd(nil) { + t.Fatalf("unexpected IsZstd result; got true; expecting false") + } + + // empty + if encoding.IsZstd([]byte{}) { + t.Fatalf("unexpected IsZstd result; got true; expecting false") + } + + // less than 4 bytes + if encoding.IsZstd([]byte(`foo`)) { + t.Fatalf("unexpected IsZstd result; got true; expecting false") + } + + // plain text + if encoding.IsZstd([]byte(`foobar`)) { + t.Fatalf("unexpected IsZstd result; got true; expecting false") + } + + // snappy compressed + if encoding.IsZstd(snappy.Encode(nil, []byte(`foobar`))) { + t.Fatalf("unexpected IsZstd result; got true; expecting false") + } + + // zstd minimum compressed level + if !encoding.IsZstd(encoding.CompressZSTDLevel(nil, []byte(`foobar`), -22)) { + t.Fatalf("unexpected IsZstd result; got false; expecting true") + } + + // zstd maximum compressed level + if !encoding.IsZstd(encoding.CompressZSTDLevel(nil, []byte(`foobar`), 22)) { + t.Fatalf("unexpected IsZstd result; got false; expecting true") + } +} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 33dd4efc42..93a4987654 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -44,6 +44,10 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer // Fall back to Snappy decompression, since vmagent may send snappy-encoded messages // with 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301 + // + // Newer vmagent sends proper 'Content-Encoding' header. + // The logic is preserved for backwards compatibility. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650 zstdErr := err bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B) if err != nil { @@ -56,6 +60,10 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer // Fall back to zstd decompression, since vmagent may send zstd-encoded messages // without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992 + // + // Newer vmagent sends proper 'Content-Encoding' header. + // The logic is preserved for backwards compatibility. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650 snappyErr := err bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B) if err != nil {