all: consistently use encoding.DecompressZSTD* instead of zstd.Decompress* across the codebase

The encoding.DecompressZSTD* consistently updates the vm_zstd_block_decompress_calls_total metric.

Also make the follwing improvements after the commit 10f7cd2ffc:

- Add encoding.DecompressZSTDLimited() function and use it instead of zstd.DecompressLimited,
  so it properly updates vm_zstd_block_decompress_calls_total metric.

- Clarify description for the encoding.DecompressZSTD* and zstd.Decompress* functions.
This commit is contained in:
Aliaksandr Valialkin
2025-12-17 16:39:19 +01:00
parent d9c07dbc0b
commit bed7cbd0a4
6 changed files with 39 additions and 21 deletions

View File

@@ -15,7 +15,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -554,9 +553,9 @@ func getRetryDuration(retryAfterDuration, retryDuration, maxRetryDuration time.D
// For more details, see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9417
func repackBlockFromZstdToSnappy(zstdBlock []byte) ([]byte, error) {
plainBlock := make([]byte, 0, len(zstdBlock)*2)
plainBlock, err := zstd.Decompress(plainBlock, zstdBlock)
plainBlock, err := encoding.DecompressZSTD(plainBlock, zstdBlock)
if err != nil {
return nil, fmt.Errorf("zstd: decompress: %s", err)
return nil, err
}
return snappy.Encode(nil, plainBlock), nil

View File

@@ -7,8 +7,7 @@ import (
"github.com/VictoriaMetrics/metrics"
)
// CompressZSTDLevel appends compressed src to dst and returns
// the appended dst.
// CompressZSTDLevel appends compressed src to dst and returns the appended dst.
//
// The given compressLevel is used for the compression.
func CompressZSTDLevel(dst, src []byte, compressLevel int) []byte {
@@ -20,15 +19,27 @@ func CompressZSTDLevel(dst, src []byte, compressLevel int) []byte {
return dst
}
// DecompressZSTD decompresses src, appends the result to dst and returns
// the appended dst.
// DecompressZSTD decompresses src, appends the result to dst and returns the appended dst.
//
// This function must be called only for the trusted src.
// Use DecompressZSTDLimited for untrusted src.
func DecompressZSTD(dst, src []byte) ([]byte, error) {
decompressCalls.Inc()
b, err := zstd.Decompress(dst, src)
if err != nil {
return b, fmt.Errorf("cannot decompress zstd block with len=%d to a buffer with len=%d: %w; block data (hex): %X", len(src), len(dst), err, src)
return b, fmt.Errorf("cannot decompress zstd block with len=%d: %w; block data (hex): %X", len(src), err, src)
}
return b, nil
}
// DecompressZSTDLimited decompresses src, appends the result to dst and returns the appended dst.
//
// If the decompressed result exceeds maxDataSizeBytes, then error is returned.
func DecompressZSTDLimited(dst, src []byte, maxDataSizeBytes int) ([]byte, error) {
decompressCalls.Inc()
b, err := zstd.DecompressLimited(dst, src, maxDataSizeBytes)
if err != nil {
return b, fmt.Errorf("cannot decompress zstd block with len=%d and maxDataSizeBytes=%d: %w", len(src), maxDataSizeBytes, err)
}
return b, nil
}

View File

@@ -9,11 +9,14 @@ import (
// Decompress appends decompressed src to dst and returns the result.
//
// This function must be called only for the trusted src.
// Otherwise use DecompressLimited function.
func Decompress(dst, src []byte) ([]byte, error) {
return gozstd.Decompress(dst, src)
}
// Decompress appends decompressed src to dst and returns the result.
//
// If the decompressed result exceeds maxDataSizeBytes, then error is returned.
func DecompressLimited(dst, src []byte, maxDataSizeBytes int) ([]byte, error) {
return gozstd.DecompressLimited(dst, src, maxDataSizeBytes)
}

View File

@@ -37,12 +37,16 @@ func init() {
// Decompress appends decompressed src to dst and returns the result.
//
// This function must be called only for the trusted src.
//
// Otherwise use DecompressLimited function.
func Decompress(dst, src []byte) ([]byte, error) {
d := getDecoder(0)
return d.DecodeAll(src, dst)
}
// Decompress appends decompressed src to dst and returns the result.
//
// If the decompressed result exceeds maxDataSizeBytes, then error is returned.
func DecompressLimited(dst, src []byte, maxDataSizeBytes int) ([]byte, error) {
d := getDecoder(maxDataSizeBytes)
return d.DecodeAll(src, dst)

View File

@@ -7,8 +7,8 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ioutil"
@@ -40,7 +40,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
defer bodyBufferPool.Put(bb)
var err error
if isVMRemoteWrite {
bb.B, err = zstd.DecompressLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
if err != nil {
// Fall back to Snappy decompression, since vmagent may send snappy-encoded messages
// with 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart.
@@ -66,7 +66,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
// The logic is preserved for backwards compatibility.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650
snappyErr := err
bb.B, err = zstd.DecompressLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
if err != nil {
return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), snappyErr)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/klauspost/compress/zlib"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -29,18 +30,18 @@ const maxSnappyBlockSize = 56_000_000
// The maxDataSize limits the maximum data size, which can be read from r.
//
// The callback must not hold references to the data after returning.
func ReadUncompressedData(r io.Reader, encoding string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
func ReadUncompressedData(r io.Reader, contentType string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
if encoding == "zstd" {
// Fast path for zstd encoding - read the data in full and then decompress it by a single call.
if contentType == "zstd" {
// Fast path for zstd contentType - read the data in full and then decompress it by a single call.
dcompress := func(dst, src []byte) ([]byte, error) {
return zstd.DecompressLimited(dst, src, maxDataSize.IntN())
return encoding.DecompressZSTDLimited(dst, src, maxDataSize.IntN())
}
return readUncompressedData(wcr, maxDataSize, dcompress, callback)
}
if encoding == "snappy" {
if contentType == "snappy" {
// Special case for snappy. The snappy data must be read in full and then decompressed,
// since streaming snappy encoding is incompatible with block snappy encoding.
decompress := func(dst, src []byte) ([]byte, error) {
@@ -50,7 +51,7 @@ func ReadUncompressedData(r io.Reader, encoding string, maxDataSize *flagutil.By
}
// Slow path for other supported protocol encoders.
reader, err := GetUncompressedReader(wcr, encoding)
reader, err := GetUncompressedReader(wcr, contentType)
if err != nil {
return err
}
@@ -114,11 +115,11 @@ var (
decompressedBufPool bytesutil.ByteBufferPool
)
// GetUncompressedReader returns uncompressed reader for r and the given encoding
// GetUncompressedReader returns uncompressed reader for r and the given contentType
//
// The returned reader must be passed to PutUncompressedReader when no longer needed.
func GetUncompressedReader(r io.Reader, encoding string) (io.Reader, error) {
switch encoding {
func GetUncompressedReader(r io.Reader, contentType string) (io.Reader, error) {
switch contentType {
case "zstd":
return zstd.GetReader(r), nil
case "snappy":
@@ -132,7 +133,7 @@ func GetUncompressedReader(r io.Reader, encoding string) (io.Reader, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8649
return getPlainReader(r), nil
default:
return nil, fmt.Errorf("unsupported encoding: %s", encoding)
return nil, fmt.Errorf("unsupported contentType: %s", contentType)
}
}