diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 5772cbbbdc..ca0bcbe711 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel ## tip +* BUGFIX: `vminsert`, [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly apply `maxDataSize` memory limits to the `snappy` encoded requests. It protects ingest endpoints from malicious requests. + ## [v1.129.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.129.0) Released at 2025-10-31 diff --git a/lib/encoding/snappy/snappy.go b/lib/encoding/snappy/snappy.go new file mode 100644 index 0000000000..5e37568fcc --- /dev/null +++ b/lib/encoding/snappy/snappy.go @@ -0,0 +1,26 @@ +package snappy + +import ( + "fmt" + + "github.com/golang/snappy" +) + +// Decode returns the decoded form of src with provided max block data size. +// The returned slice may be a sub-slice of dst +// if dst was large enough to hold the entire decoded block. +// Otherwise, a newly allocated slice will be returned. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +// +// Decode handles the Snappy block format, not the Snappy stream format. +func Decode(dst []byte, src []byte, maxDataSizeBytes int) ([]byte, error) { + dstLen, err := snappy.DecodedLen(src) + if err != nil { + return nil, fmt.Errorf("cannot read snappy header: %w", err) + } + if maxDataSizeBytes > 0 && dstLen > maxDataSizeBytes { + return nil, fmt.Errorf("too big data size %d exceeding %d bytes", dstLen, maxDataSizeBytes) + } + return snappy.Decode(dst[:cap(dst)], src) +} diff --git a/lib/encoding/snappy/snappy_test.go b/lib/encoding/snappy/snappy_test.go new file mode 100644 index 0000000000..063b93cdae --- /dev/null +++ b/lib/encoding/snappy/snappy_test.go @@ -0,0 +1,53 @@ +package snappy + +import ( + "encoding/hex" + "testing" + + "github.com/golang/snappy" + "github.com/google/go-cmp/cmp" +) + +func TestDecodeOk(t *testing.T) { + f := func(src []byte, want []byte, maxMemoryLimit int) { + t.Helper() + got, err := Decode(nil, src, maxMemoryLimit) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if diff := cmp.Diff(string(want), string(got)); diff != "" { + t.Errorf("unexpected response (-want, +got):\n%s", diff) + } + } + // regular block, no limit + data := make([]byte, 32*1024) + encoded := snappy.Encode(nil, data) + f(encoded, data, 0) + + // regular block, fits limit + f(encoded, data, 68*1024) +} + +func TestDecodeFail(t *testing.T) { + f := func(src []byte, maxMemoryLimit int) { + t.Helper() + _, err := Decode(nil, src, maxMemoryLimit) + if err == nil { + t.Fatal("unexpected empty error") + } + } + // mailformed block + mailformed, err := hex.DecodeString("97eab4890a170a085f5f6e616d655f5f120b746573745f6d6574726963121009000000000000f03f10d48fc9b2a333") + if err != nil { + t.Fatalf("BUG: unexpected hex encoded input: %s", err) + } + f(mailformed, 32*1024*1024) + + // valid block exceeds maxMemoryLimit + data := make([]byte, 32*1024) + encoded := snappy.Encode(nil, data) + f(encoded, 1024) + + // invalid block + f(nil, 0) +} diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 2087811ae0..f3d6c81b43 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -7,13 +7,13 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" - "github.com/golang/snappy" ) var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") @@ -49,13 +49,13 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer // The logic is preserved for backwards compatibility. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650 zstdErr := err - bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B) + bb.B, err = snappy.Decode(bb.B, ctx.reqBuf.B, maxInsertRequestSize.IntN()) if err != nil { return fmt.Errorf("cannot decompress zstd-encoded request with length %d: %w", len(ctx.reqBuf.B), zstdErr) } } } else { - bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B) + bb.B, err = snappy.Decode(bb.B, ctx.reqBuf.B, maxInsertRequestSize.IntN()) if err != nil { // Fall back to zstd decompression, since vmagent may send zstd-encoded messages // without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart. diff --git a/lib/protoparser/protoparserutil/compress_reader.go b/lib/protoparser/protoparserutil/compress_reader.go index 309bcd47c3..f5b384c04f 100644 --- a/lib/protoparser/protoparserutil/compress_reader.go +++ b/lib/protoparser/protoparserutil/compress_reader.go @@ -5,17 +5,24 @@ import ( "io" "sync" - "github.com/golang/snappy" "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zlib" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) +// snappy has default limit of 2_704_094_487 ( 2 GB) +// which is too high for common VictoriaMetrics insert requests +// limit to 56MB in order to prevent possible memory allocation attacks +// +// Later we could consider to make this limit configurable +const maxSnappyBlockSize = 56_000_000 + // ReadUncompressedData reads uncompressed data from r using the given encoding and then passes it to the callback. // // The maxDataSize limits the maximum data size, which can be read from r. @@ -33,7 +40,7 @@ func ReadUncompressedData(r io.Reader, encoding string, maxDataSize *flagutil.By // Special case for snappy. The snappy data must be read in full and then decompressed, // since streaming snappy encoding is incompatible with block snappy encoding. decompress := func(dst, src []byte) ([]byte, error) { - return snappy.Decode(dst[:cap(dst)], src) + return snappy.Decode(dst, src, maxDataSize.IntN()) } return readUncompressedData(wcr, maxDataSize, decompress, callback) } @@ -198,7 +205,7 @@ func (sr *snappyReader) Reset(r io.Reader) error { compressedBufPool.Put(cbb) return fmt.Errorf("cannot read snappy-encoded data block: %w", err) } - sr.b, err = snappy.Decode(sr.b[:cap(sr.b)], cbb.B) + sr.b, err = snappy.Decode(sr.b, cbb.B, maxSnappyBlockSize) compressedBufPool.Put(cbb) sr.offset = 0 if err != nil {