mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib: properly apply snappy Decode limits
Previously, snappy Decoder didn't take in account Request Size limits applied by VictoriaMetrics components. And in case of incorrectly formed snappy block, VictoriaMetrics component may allocate extra memory. Which may lead to the OOM errors. This commit makes ingest endpoints check block size header based on MaxRequest Limits.
This commit is contained in:
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* BUGFIX: `vminsert`, [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly apply `maxDataSize` memory limits to the `snappy` encoded requests. It protects ingest endpoints from malicious requests.
|
||||
|
||||
## [v1.129.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.129.0)
|
||||
|
||||
Released at 2025-10-31
|
||||
|
||||
26
lib/encoding/snappy/snappy.go
Normal file
26
lib/encoding/snappy/snappy.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package snappy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Decode returns the decoded form of src with provided max block data size.
|
||||
// The returned slice may be a sub-slice of dst
|
||||
// if dst was large enough to hold the entire decoded block.
|
||||
// Otherwise, a newly allocated slice will be returned.
|
||||
//
|
||||
// The dst and src must not overlap. It is valid to pass a nil dst.
|
||||
//
|
||||
// Decode handles the Snappy block format, not the Snappy stream format.
|
||||
func Decode(dst []byte, src []byte, maxDataSizeBytes int) ([]byte, error) {
|
||||
dstLen, err := snappy.DecodedLen(src)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read snappy header: %w", err)
|
||||
}
|
||||
if maxDataSizeBytes > 0 && dstLen > maxDataSizeBytes {
|
||||
return nil, fmt.Errorf("too big data size %d exceeding %d bytes", dstLen, maxDataSizeBytes)
|
||||
}
|
||||
return snappy.Decode(dst[:cap(dst)], src)
|
||||
}
|
||||
53
lib/encoding/snappy/snappy_test.go
Normal file
53
lib/encoding/snappy/snappy_test.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package snappy
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestDecodeOk(t *testing.T) {
|
||||
f := func(src []byte, want []byte, maxMemoryLimit int) {
|
||||
t.Helper()
|
||||
got, err := Decode(nil, src, maxMemoryLimit)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if diff := cmp.Diff(string(want), string(got)); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
// regular block, no limit
|
||||
data := make([]byte, 32*1024)
|
||||
encoded := snappy.Encode(nil, data)
|
||||
f(encoded, data, 0)
|
||||
|
||||
// regular block, fits limit
|
||||
f(encoded, data, 68*1024)
|
||||
}
|
||||
|
||||
func TestDecodeFail(t *testing.T) {
|
||||
f := func(src []byte, maxMemoryLimit int) {
|
||||
t.Helper()
|
||||
_, err := Decode(nil, src, maxMemoryLimit)
|
||||
if err == nil {
|
||||
t.Fatal("unexpected empty error")
|
||||
}
|
||||
}
|
||||
// mailformed block
|
||||
mailformed, err := hex.DecodeString("97eab4890a170a085f5f6e616d655f5f120b746573745f6d6574726963121009000000000000f03f10d48fc9b2a333")
|
||||
if err != nil {
|
||||
t.Fatalf("BUG: unexpected hex encoded input: %s", err)
|
||||
}
|
||||
f(mailformed, 32*1024*1024)
|
||||
|
||||
// valid block exceeds maxMemoryLimit
|
||||
data := make([]byte, 32*1024)
|
||||
encoded := snappy.Encode(nil, data)
|
||||
f(encoded, 1024)
|
||||
|
||||
// invalid block
|
||||
f(nil, 0)
|
||||
}
|
||||
@@ -7,13 +7,13 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
|
||||
@@ -49,13 +49,13 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
|
||||
// The logic is preserved for backwards compatibility.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650
|
||||
zstdErr := err
|
||||
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
|
||||
bb.B, err = snappy.Decode(bb.B, ctx.reqBuf.B, maxInsertRequestSize.IntN())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decompress zstd-encoded request with length %d: %w", len(ctx.reqBuf.B), zstdErr)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
|
||||
bb.B, err = snappy.Decode(bb.B, ctx.reqBuf.B, maxInsertRequestSize.IntN())
|
||||
if err != nil {
|
||||
// Fall back to zstd decompression, since vmagent may send zstd-encoded messages
|
||||
// without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart.
|
||||
|
||||
@@ -5,17 +5,24 @@ import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/klauspost/compress/gzip"
|
||||
"github.com/klauspost/compress/zlib"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||
)
|
||||
|
||||
// snappy has default limit of 2_704_094_487 ( 2 GB)
|
||||
// which is too high for common VictoriaMetrics insert requests
|
||||
// limit to 56MB in order to prevent possible memory allocation attacks
|
||||
//
|
||||
// Later we could consider to make this limit configurable
|
||||
const maxSnappyBlockSize = 56_000_000
|
||||
|
||||
// ReadUncompressedData reads uncompressed data from r using the given encoding and then passes it to the callback.
|
||||
//
|
||||
// The maxDataSize limits the maximum data size, which can be read from r.
|
||||
@@ -33,7 +40,7 @@ func ReadUncompressedData(r io.Reader, encoding string, maxDataSize *flagutil.By
|
||||
// Special case for snappy. The snappy data must be read in full and then decompressed,
|
||||
// since streaming snappy encoding is incompatible with block snappy encoding.
|
||||
decompress := func(dst, src []byte) ([]byte, error) {
|
||||
return snappy.Decode(dst[:cap(dst)], src)
|
||||
return snappy.Decode(dst, src, maxDataSize.IntN())
|
||||
}
|
||||
return readUncompressedData(wcr, maxDataSize, decompress, callback)
|
||||
}
|
||||
@@ -198,7 +205,7 @@ func (sr *snappyReader) Reset(r io.Reader) error {
|
||||
compressedBufPool.Put(cbb)
|
||||
return fmt.Errorf("cannot read snappy-encoded data block: %w", err)
|
||||
}
|
||||
sr.b, err = snappy.Decode(sr.b[:cap(sr.b)], cbb.B)
|
||||
sr.b, err = snappy.Decode(sr.b, cbb.B, maxSnappyBlockSize)
|
||||
compressedBufPool.Put(cbb)
|
||||
sr.offset = 0
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user