vmagent/remotewrite: set content encoding header based on actual body

Improve remote write handling in vmagent by setting the
`Content-Encoding` header based on the actual request body, rather than
relying on configuration.

- Detects Zstd compression via the Zstd magic number.
- Falls back to Snappy if Zstd is not detected.
- Persistent queue may now contain mixed-encoding content.
- Add basic vmagent integration tests

Follow up on
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5344 and
12cd32fd75.

Extracted from
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8462

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
This commit is contained in:
Max Kotliar
2025-04-08 17:12:06 +03:00
committed by GitHub
parent 038419663b
commit 3f5bf4bd03
9 changed files with 252 additions and 1 deletions

View File

@@ -13,6 +13,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -384,7 +385,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
h := req.Header
h.Set("User-Agent", "vmagent")
h.Set("Content-Type", "application/x-protobuf")
if c.useVMProto {
if encoding.IsZstd(body) {
h.Set("Content-Encoding", "zstd")
h.Set("X-VictoriaMetrics-Remote-Write-Version", "1")
} else {

View File

@@ -124,6 +124,23 @@ func (tc *TestCase) MustStartVminsert(instance string, flags []string) *Vminsert
return app
}
// MustStartVmagent is a test helper function that starts an instance of
// vmagent and fails the test if the app fails to start.
func (tc *TestCase) MustStartVmagent(instance string, flags []string, promScrapeConfigFileYAML string) *Vmagent {
tc.t.Helper()
promScrapeConfigFilePath := path.Join(tc.t.TempDir(), "prometheus.yml")
if err := os.WriteFile(promScrapeConfigFilePath, []byte(promScrapeConfigFileYAML), os.ModePerm); err != nil {
tc.t.Fatalf("cannot init vmagent: prom config file write failed: %s", err)
}
app, err := StartVmagent(instance, flags, tc.cli, promScrapeConfigFilePath)
if err != nil {
tc.t.Fatalf("Could not start %s: %v", instance, err)
}
tc.addApp(instance, app)
return app
}
// Vmcluster represents a typical cluster setup: several vmstorage replicas, one
// vminsert, and one vmselect.
//

View File

@@ -0,0 +1,54 @@
package tests
import (
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
// TestSingleVMAgentZstdRemoteWrite verifies that vmagent can successfully perform
// a remote write to vmsingle using VM protocol (zstd).
func TestSingleVMAgentZstdRemoteWrite(t *testing.T) {
testSingleVMAgentRemoteWrite(t, false)
}
// TestSingleVMAgentSnappyRemoteWrite verifies that vmagent can successfully perform
// a remote write to vmsingle using Prometheus protocol (snappy).
func TestSingleVMAgentSnappyRemoteWrite(t *testing.T) {
testSingleVMAgentRemoteWrite(t, true)
}
func testSingleVMAgentRemoteWrite(t *testing.T, forcePromProto bool) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
vmsingle := tc.MustStartDefaultVmsingle()
vmagent := tc.MustStartVmagent("vmagent", []string{
`-remoteWrite.flushInterval=50ms`,
fmt.Sprintf(`-remoteWrite.forcePromProto=%v`, forcePromProto),
fmt.Sprintf(`-remoteWrite.url=http://%s/api/v1/write`, vmsingle.HTTPAddr()),
}, ``)
vmagent.APIV1ImportPrometheus(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
tc.Assert(&at.AssertOptions{
Msg: `unexpected metrics stored on vmagent remote write`,
Got: func() any {
return vmsingle.PrometheusAPIV1Series(t, `{__name__="foo_bar"}`, at.QueryOpts{
Start: "2022-05-10T00:00:00Z",
End: "2022-05-10T23:59:59Z",
}).Sort()
},
Want: &at.PrometheusAPIV1SeriesResponse{
Status: "success",
Data: []map[string]string{{"__name__": "foo_bar"}},
},
})
}

107
apptest/vmagent.go Normal file
View File

@@ -0,0 +1,107 @@
package apptest
import (
"fmt"
"net/http"
"regexp"
"strings"
"testing"
"time"
)
// Vmagent holds the state of a vmagent app and provides vmagent-specific functions
type Vmagent struct {
*app
*ServesMetrics
httpListenAddr string
apiV1ImportPrometheusURL string
}
// StartVmagent starts an instance of vmagent with the given flags. It also
// sets the default flags and populates the app instance state with runtime
// values extracted from the application log (such as httpListenAddr)
func StartVmagent(instance string, flags []string, cli *Client, promScrapeConfigFilePath string) (*Vmagent, error) {
extractREs := []*regexp.Regexp{
httpListenAddrRE,
}
app, stderrExtracts, err := startApp(instance, "../../bin/vmagent", flags, &appOptions{
defaultFlags: map[string]string{
"-httpListenAddr": "127.0.0.1:0",
"-promscrape.config": promScrapeConfigFilePath,
},
extractREs: extractREs,
})
if err != nil {
return nil, err
}
return &Vmagent{
app: app,
ServesMetrics: &ServesMetrics{
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
cli: cli,
},
httpListenAddr: stderrExtracts[0],
apiV1ImportPrometheusURL: fmt.Sprintf("http://%s/api/v1/import/prometheus", stderrExtracts[0]),
}, nil
}
// APIV1ImportPrometheus is a test helper function that inserts a
// collection of records in Prometheus text exposition format for the given
// tenant by sending a HTTP POST request to /api/v1/import/prometheus vmagent endpoint.
//
// The call is blocked until the data is flushed to vmstorage or the timeout is reached.
//
// See https://docs.victoriametrics.com/url-examples/#apiv1importprometheus
func (app *Vmagent) APIV1ImportPrometheus(t *testing.T, records []string, _ QueryOpts) {
t.Helper()
data := []byte(strings.Join(records, "\n"))
app.sendBlocking(t, len(records), func() {
_, statusCode := app.cli.Post(t, app.apiV1ImportPrometheusURL, "text/plain", data)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
})
}
// sendBlocking sends the data to vmstorage by executing `send` function and
// waits until the data is actually sent.
//
// vmagent does not send the data immediately. It first puts the data into a
// buffer. Then a background goroutine takes the data from the buffer sends it
// to the vmstorage. This happens every 1s by default.
//
// Waiting is implemented a retrieving the value of `vmagent_remotewrite_requests_total`
// metric and checking whether it is equal or greater than the wanted value.
// If it is, then the data has been sent to vmstorage.
//
// Unreliable if the records are inserted concurrently.
func (app *Vmagent) sendBlocking(t *testing.T, numRecordsToSend int, send func()) {
t.Helper()
send()
const (
retries = 20
period = 100 * time.Millisecond
)
wantRowsSentCount := app.remoteWriteRequestsTotal(t) + numRecordsToSend
for range retries {
if app.remoteWriteRequestsTotal(t) >= wantRowsSentCount {
return
}
time.Sleep(period)
}
t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage")
}
func (app *Vmagent) remoteWriteRequestsTotal(t *testing.T) int {
total := 0.0
for _, v := range app.GetMetricsByPrefix(t, "vmagent_remotewrite_requests_total") {
total += v
}
return int(total)
}

View File

@@ -350,6 +350,12 @@ func (app *Vmsingle) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse
return &res
}
// HTTPAddr returns the address at which the vmstorage process is listening
// for http connections.
func (app *Vmsingle) HTTPAddr() string {
return app.httpListenAddr
}
// String returns the string representation of the vmsingle app state.
func (app *Vmsingle) String() string {
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{

View File

@@ -36,6 +36,7 @@ Released at 2025-04-04
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): return `502 Bad Gateway` status code in case request failed due to a network timeout. Previously, vmauth would incorrectly return `200 OK`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8621).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation): fix panic on `rate` output. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8634).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmrestore](https://docs.victoriametrics.com/vmrestore/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix compatibility with S3-compatible storages which do not support data integrity checks. See [this issue](https://github.com/victoriaMetrics/victoriaMetrics/issues/8622).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): remote write client sets correct content encoding header based on actual body content, rather than relying on configuration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650).
## [v1.114.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.114.0)

12
lib/encoding/util.go Normal file
View File

@@ -0,0 +1,12 @@
package encoding
import "encoding/binary"
// IsZstd checks if the given data is compressed using the zstd format.
// It does this by verifying the presence of the zstd magic number (0xFD2FB528)
// at the beginning of the byte slice.
//
// See: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames
func IsZstd(data []byte) bool {
return len(data) >= 4 && binary.LittleEndian.Uint32(data) == 0xFD2FB528
}

45
lib/encoding/util_test.go Normal file
View File

@@ -0,0 +1,45 @@
package encoding_test
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/golang/snappy"
)
func TestIsZstd(t *testing.T) {
// nil
if encoding.IsZstd(nil) {
t.Fatalf("unexpected IsZstd result; got true; expecting false")
}
// empty
if encoding.IsZstd([]byte{}) {
t.Fatalf("unexpected IsZstd result; got true; expecting false")
}
// less than 4 bytes
if encoding.IsZstd([]byte(`foo`)) {
t.Fatalf("unexpected IsZstd result; got true; expecting false")
}
// plain text
if encoding.IsZstd([]byte(`foobar`)) {
t.Fatalf("unexpected IsZstd result; got true; expecting false")
}
// snappy compressed
if encoding.IsZstd(snappy.Encode(nil, []byte(`foobar`))) {
t.Fatalf("unexpected IsZstd result; got true; expecting false")
}
// zstd minimum compressed level
if !encoding.IsZstd(encoding.CompressZSTDLevel(nil, []byte(`foobar`), -22)) {
t.Fatalf("unexpected IsZstd result; got false; expecting true")
}
// zstd maximum compressed level
if !encoding.IsZstd(encoding.CompressZSTDLevel(nil, []byte(`foobar`), 22)) {
t.Fatalf("unexpected IsZstd result; got false; expecting true")
}
}

View File

@@ -44,6 +44,10 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
// Fall back to Snappy decompression, since vmagent may send snappy-encoded messages
// with 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
//
// Newer vmagent sends proper 'Content-Encoding' header.
// The logic is preserved for backwards compatibility.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650
zstdErr := err
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
if err != nil {
@@ -56,6 +60,10 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
// Fall back to zstd decompression, since vmagent may send zstd-encoded messages
// without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992
//
// Newer vmagent sends proper 'Content-Encoding' header.
// The logic is preserved for backwards compatibility.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650
snappyErr := err
bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B)
if err != nil {