From 10f7cd2ffc866d1ec3ca955b2feeca6daa08b5ed Mon Sep 17 00:00:00 2001 From: Nikolay Date: Thu, 13 Nov 2025 18:11:18 +0100 Subject: [PATCH] lib/encoding/zstd: properly apply size limits Previously, zstd Decoder didn't take in account Request Size limits applied by VictoriaMetrics components. And in case of incorrectly formed zstd block, VictoriaMetrics component may allocate extra memory. Which may lead to the OOM errors. This commit makes ingest endpoints check frame content size and window size headers based on MaxRequest Limits. --- docs/victoriametrics/changelog/CHANGELOG.md | 3 +- lib/encoding/compress.go | 2 + lib/encoding/zstd/zstd_cgo.go | 7 ++ lib/encoding/zstd/zstd_pure.go | 47 ++++++++++++- lib/encoding/zstd/zstd_pure_test.go | 69 +++++++++++++++++++ lib/encoding/zstd/zstd_test.go | 63 +++++++++++++++++ .../promremotewrite/stream/streamparser.go | 4 +- .../protoparserutil/compress_reader.go | 5 +- 8 files changed, 193 insertions(+), 7 deletions(-) create mode 100644 lib/encoding/zstd/zstd_pure_test.go diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 72cdd38ba4..a11965f68f 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -38,10 +38,11 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel * BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): prevent early exit when one of multiple service discovery configs (under the same service discovery type) fails. see this issue [#9949](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9949) for details. * BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly access service discovery servers with HTTP/2 protocol enabled. See this issue [#9981](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9981) for details. Thanks to the @JayiceZ * BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix a potential race condition in the `/api/v1/rule`, `/api/v1/alert` and `/api/v1/alerts` APIs during rule hot reload. See this issue [#9551](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9551) for details. +* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): drop labels with empty values in generated alerts and time series. See [#9984](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984). +* BUGFIX: `vminsert`, [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly apply `maxDataSize` memory limits to the `zstd` encoded requests. It protects ingest endpoints from malicious requests. * BUGFIX: all VictoriaMetrics components: prevent from misleading log messages containing `init new cache` substring when opening various cache files at startup. See [9750](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9750). * BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): fix display of isolated points on the chart. See [#9666](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9666). * BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): fix median value calculation displayed below series graph. See [#9926](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9926). -* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): drop labels with empty values in generated alerts and time series. See [#9984](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984). ## [v1.129.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.129.1) diff --git a/lib/encoding/compress.go b/lib/encoding/compress.go index b8acf541e6..1bf0b593bb 100644 --- a/lib/encoding/compress.go +++ b/lib/encoding/compress.go @@ -22,6 +22,8 @@ func CompressZSTDLevel(dst, src []byte, compressLevel int) []byte { // DecompressZSTD decompresses src, appends the result to dst and returns // the appended dst. +// +// This function must be called only for the trusted src. func DecompressZSTD(dst, src []byte) ([]byte, error) { decompressCalls.Inc() b, err := zstd.Decompress(dst, src) diff --git a/lib/encoding/zstd/zstd_cgo.go b/lib/encoding/zstd/zstd_cgo.go index 0917f0e219..4672cee0b7 100644 --- a/lib/encoding/zstd/zstd_cgo.go +++ b/lib/encoding/zstd/zstd_cgo.go @@ -7,10 +7,17 @@ import ( ) // Decompress appends decompressed src to dst and returns the result. +// +// This function must be called only for the trusted src. func Decompress(dst, src []byte) ([]byte, error) { return gozstd.Decompress(dst, src) } +// Decompress appends decompressed src to dst and returns the result. +func DecompressLimited(dst, src []byte, maxDataSizeBytes int) ([]byte, error) { + return gozstd.DecompressLimited(dst, src, maxDataSizeBytes) +} + // CompressLevel appends compressed src to dst and returns the result. // // The given compressionLevel is used for the compression. diff --git a/lib/encoding/zstd/zstd_pure.go b/lib/encoding/zstd/zstd_pure.go index 9e77a3d2bc..c03ee9c0cf 100644 --- a/lib/encoding/zstd/zstd_pure.go +++ b/lib/encoding/zstd/zstd_pure.go @@ -11,7 +11,8 @@ import ( ) var ( - decoder *zstd.Decoder + decodersMu sync.Mutex + decoders atomic.Value mu sync.Mutex @@ -24,15 +25,27 @@ func init() { av.Store(r) var err error - decoder, err = zstd.NewReader(nil) + decoder, err := zstd.NewReader(nil) if err != nil { logger.Panicf("BUG: failed to create ZSTD reader: %s", err) } + d := make(map[int]*zstd.Decoder) + d[0] = decoder + decoders.Store(d) } // Decompress appends decompressed src to dst and returns the result. +// +// This function must be called only for the trusted src. func Decompress(dst, src []byte) ([]byte, error) { - return decoder.DecodeAll(src, dst) + d := getDecoder(0) + return d.DecodeAll(src, dst) +} + +// Decompress appends decompressed src to dst and returns the result. +func DecompressLimited(dst, src []byte, maxDataSizeBytes int) ([]byte, error) { + d := getDecoder(maxDataSizeBytes) + return d.DecodeAll(src, dst) } // CompressLevel appends compressed src to dst and returns the result. @@ -50,6 +63,34 @@ func CompressLevel(dst, src []byte, compressionLevel int) []byte { return e.EncodeAll(src, dst) } +func getDecoder(maxMemory int) *zstd.Decoder { + r := decoders.Load().(map[int]*zstd.Decoder) + d := r[maxMemory] + if d != nil { + return d + } + decodersMu.Lock() + // Create the decoder under lock in order to prevent from wasted work + // when concurrent goroutines create decoder for the same compressionLevel. + r1 := decoders.Load().(map[int]*zstd.Decoder) + if d = r1[maxMemory]; d == nil { + var err error + d, err = zstd.NewReader(nil, zstd.WithDecoderMaxMemory(uint64(maxMemory))) + if err != nil { + logger.Panicf("BUG: failed to create ZSTD reader: %s", err) + } + r2 := make(map[int]*zstd.Decoder) + for k, v := range r1 { + r2[k] = v + } + r2[maxMemory] = d + decoders.Store(r2) + } + decodersMu.Unlock() + + return d +} + func getEncoder(compressionLevel zstd.EncoderLevel) *zstd.Encoder { r := av.Load().(map[zstd.EncoderLevel]*zstd.Encoder) e := r[compressionLevel] diff --git a/lib/encoding/zstd/zstd_pure_test.go b/lib/encoding/zstd/zstd_pure_test.go new file mode 100644 index 0000000000..1a42e699d8 --- /dev/null +++ b/lib/encoding/zstd/zstd_pure_test.go @@ -0,0 +1,69 @@ +//go:build !cgo + +package zstd + +import ( + "bytes" + "encoding/hex" + "fmt" + "testing" +) + +func TestDecomrpessLimitedOk(t *testing.T) { + f := func(compressedData []byte, limit int) { + t.Helper() + + _, err := DecompressLimited(nil, compressedData, limit) + if err != nil { + t.Fatalf("cannot decompress data with limit=%d: %s", limit, err) + } + } + + var bb bytes.Buffer + for bb.Len() < 12*128*1024 { + fmt.Fprintf(&bb, "compress/decompress big data %d, ", bb.Len()) + } + originData := bb.Bytes() + // block decompression + cd := CompressLevel(nil, originData, 0) + + // decompressed size matches block limit + f(cd, len(originData)) + + // unlimited + f(cd, 0) + +} + +func TestDecompressLimitedFail(t *testing.T) { + f := func(input []byte, limit int) { + t.Helper() + _, err := DecompressLimited(nil, input, limit) + if err == nil { + t.Errorf("unexpected nil-error for decompress with limit: %d", limit) + } + + } + + var bb bytes.Buffer + for bb.Len() < 12*128*1024 { + fmt.Fprintf(&bb, "compress/decompress big data %d, ", bb.Len()) + } + + // valid input bigger than limit + f(bb.Bytes(), 1024) + + input, err := hex.DecodeString("28b52ffd8400005ed0b209000030ecaf4412") + if err != nil { + t.Fatalf("BUG: unexpected hex input: %s", err) + } + // input with framecontent bigger than actual payload + f(input, 512) + + // input with stream windowSize bigger than limit + input, err = hex.DecodeString("28b52ffd04981900003030304e8da22b") + if err != nil { + t.Fatalf("BUG: unexpected hex input: %s", err) + } + f(input, 8*1e6*10) +} diff --git a/lib/encoding/zstd/zstd_test.go b/lib/encoding/zstd/zstd_test.go index 17f2a91015..2ea216faa8 100644 --- a/lib/encoding/zstd/zstd_test.go +++ b/lib/encoding/zstd/zstd_test.go @@ -3,6 +3,9 @@ package zstd import ( + "bytes" + "encoding/hex" + "fmt" "math/rand" "testing" @@ -10,6 +13,66 @@ import ( cgo "github.com/valyala/gozstd" ) +func TestDecomrpessLimitedOK(t *testing.T) { + f := func(compressedData []byte, limit int) { + t.Helper() + + _, err := DecompressLimited(nil, compressedData, limit) + if err != nil { + t.Fatalf("cannot decompress data with limit=%d: %s", limit, err) + } + } + + var bb bytes.Buffer + for bb.Len() < 12*128*1024 { + fmt.Fprintf(&bb, "compress/decompress big data %d, ", bb.Len()) + } + originData := bb.Bytes() + // block decompression + cd := CompressLevel(nil, originData, 0) + + // decompressed size matches block limit + f(cd, len(originData)) + + // unlimited + f(cd, 0) + +} + +func TestDecompressLimitedFail(t *testing.T) { + + f := func(input []byte, limit int) { + t.Helper() + _, err := DecompressLimited(nil, input, limit) + if err == nil { + t.Errorf("unexpected nil-error for decompress with limit: %d", limit) + } + + } + + var bb bytes.Buffer + for bb.Len() < 12*128*1024 { + fmt.Fprintf(&bb, "compress/decompress big data %d, ", bb.Len()) + } + + // valid input bigger than limit + f(bb.Bytes(), 1024) + + input, err := hex.DecodeString("28b52ffd8400005ed0b209000030ecaf4412") + if err != nil { + t.Fatalf("BUG: unexpected hex input: %s", err) + } + // input with framecontent bigger than actual payload + f(input, 512) + + // input with stream windowSize bigger than limit + input, err = hex.DecodeString("28b52ffd04981900003030304e8da22b") + if err != nil { + t.Fatalf("BUG: unexpected hex input: %s", err) + } + f(input, 8*1e6*10) +} + func TestCompressDecompress(t *testing.T) { testCrossCompressDecompress(t, []byte("a")) testCrossCompressDecompress(t, []byte("foobarbaz")) diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index f3d6c81b43..83558c1a0b 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -39,7 +39,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer defer bodyBufferPool.Put(bb) var err error if isVMRemoteWrite { - bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B) + bb.B, err = zstd.DecompressLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN()) if err != nil { // Fall back to Snappy decompression, since vmagent may send snappy-encoded messages // with 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart. @@ -65,7 +65,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer // The logic is preserved for backwards compatibility. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650 snappyErr := err - bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B) + bb.B, err = zstd.DecompressLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN()) if err != nil { return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), snappyErr) } diff --git a/lib/protoparser/protoparserutil/compress_reader.go b/lib/protoparser/protoparserutil/compress_reader.go index f5b384c04f..c4681271f5 100644 --- a/lib/protoparser/protoparserutil/compress_reader.go +++ b/lib/protoparser/protoparserutil/compress_reader.go @@ -34,7 +34,10 @@ func ReadUncompressedData(r io.Reader, encoding string, maxDataSize *flagutil.By if encoding == "zstd" { // Fast path for zstd encoding - read the data in full and then decompress it by a single call. - return readUncompressedData(wcr, maxDataSize, zstd.Decompress, callback) + dcompress := func(dst, src []byte) ([]byte, error) { + return zstd.DecompressLimited(dst, src, maxDataSize.IntN()) + } + return readUncompressedData(wcr, maxDataSize, dcompress, callback) } if encoding == "snappy" { // Special case for snappy. The snappy data must be read in full and then decompressed,