mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
306 lines
8.6 KiB
Go
306 lines
8.6 KiB
Go
package protoparserutil
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/klauspost/compress/gzip"
|
|
"github.com/klauspost/compress/zlib"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ioutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
)
|
|
|
|
// snappy has default limit of 2_704_094_487 ( 2 GB)
|
|
// which is too high for common VictoriaMetrics insert requests
|
|
// limit to 56MB in order to prevent possible memory allocation attacks
|
|
//
|
|
// Later we could consider to make this limit configurable
|
|
const maxSnappyBlockSize = 56_000_000
|
|
|
|
// ReadUncompressedData reads uncompressed data from r using the given contentType and then passes it to the callback.
|
|
//
|
|
// The maxDataSize limits the maximum data size, which can be read from r.
|
|
//
|
|
// The callback must not hold references to the data after returning.
|
|
func ReadUncompressedData(r io.Reader, contentType string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
|
|
fbr := ioutil.GetFirstByteReader(r)
|
|
defer ioutil.PutFirstByteReader(fbr)
|
|
|
|
// Wait for the first byte before obtaining the concurrency token
|
|
// and allocating resources needed for reading and processing the data from r.
|
|
// This should prevent from allocating concurrency tokens and memory
|
|
// for connections without incoming data.
|
|
fbr.WaitForData()
|
|
|
|
if err := writeconcurrencylimiter.IncConcurrency(); err != nil {
|
|
return err
|
|
}
|
|
defer writeconcurrencylimiter.DecConcurrency()
|
|
|
|
if contentType == "zstd" {
|
|
// Fast path for zstd contentType - read the data in full and then decompress it by a single call.
|
|
dcompress := func(dst, src []byte) ([]byte, error) {
|
|
return encoding.DecompressZSTDLimited(dst, src, maxDataSize.IntN())
|
|
}
|
|
return readUncompressedData(fbr, maxDataSize, dcompress, callback)
|
|
}
|
|
if contentType == "snappy" {
|
|
// Special case for snappy. The snappy data must be read in full and then decompressed,
|
|
// since streaming snappy encoding is incompatible with block snappy encoding.
|
|
decompress := func(dst, src []byte) ([]byte, error) {
|
|
return snappy.Decode(dst, src, maxDataSize.IntN())
|
|
}
|
|
return readUncompressedData(fbr, maxDataSize, decompress, callback)
|
|
}
|
|
|
|
// Slow path for other supported protocol encoders.
|
|
reader, err := GetUncompressedReader(fbr, contentType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer PutUncompressedReader(reader)
|
|
|
|
return readFull(reader, maxDataSize, callback)
|
|
}
|
|
|
|
func readUncompressedData(r io.Reader, maxDataSize *flagutil.Bytes, decompress func(dst, src []byte) ([]byte, error), callback func(data []byte) error) error {
|
|
return readFull(r, maxDataSize, func(data []byte) error {
|
|
dbb := decompressedBufPool.Get()
|
|
defer func() {
|
|
if cap(dbb.B) > 1024*1024 && cap(dbb.B) > 4*len(dbb.B) {
|
|
// Do not store too big dbb to the pool if only a small part of the buffer is used last time.
|
|
// This should reduce memory waste.
|
|
return
|
|
}
|
|
decompressedBufPool.Put(dbb)
|
|
}()
|
|
|
|
var err error
|
|
dbb.B, err = decompress(dbb.B, data)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decompress data: %w", err)
|
|
}
|
|
if int64(len(dbb.B)) > maxDataSize.N {
|
|
return fmt.Errorf("too big decompressed data size exceeding -%s=%d bytes", maxDataSize.Name, maxDataSize.N)
|
|
}
|
|
|
|
return callback(dbb.B)
|
|
})
|
|
}
|
|
|
|
func readFull(r io.Reader, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
|
|
lr := ioutil.GetLimitedReader(r, maxDataSize.N+1)
|
|
defer ioutil.PutLimitedReader(lr)
|
|
|
|
// Use chunkedbuffer for reading the data from potentially slow lr.
|
|
// This should reduce memory fragmentation and memory usage when reading large amounts of data from slow lr.
|
|
cb := chunkedbuffer.Get()
|
|
|
|
if _, err := cb.ReadFrom(lr); err != nil {
|
|
chunkedbuffer.Put(cb)
|
|
return err
|
|
}
|
|
|
|
if int64(cb.Len()) > maxDataSize.N {
|
|
return fmt.Errorf("too big data size exceeding -%s=%d bytes", maxDataSize.Name, maxDataSize.N)
|
|
}
|
|
|
|
// The data is read to cb.
|
|
// Copy it to contiguous bb.B and pass to callback for CPU-bound processing.
|
|
bb := fullReaderBufPool.Get()
|
|
defer func() {
|
|
if cap(bb.B) > 1024*1024 && cap(bb.B) > 4*len(bb.B) {
|
|
// Do not store too big bb to the pool if only a small part of the buffer is used last time.
|
|
// This should reduce memory waste.
|
|
return
|
|
}
|
|
fullReaderBufPool.Put(bb)
|
|
}()
|
|
|
|
cb.MustWriteTo(bb)
|
|
chunkedbuffer.Put(cb)
|
|
|
|
return callback(bb.B)
|
|
}
|
|
|
|
var fullReaderBufPool bytesutil.ByteBufferPool
|
|
|
|
var (
|
|
compressedBufPool bytesutil.ByteBufferPool
|
|
decompressedBufPool bytesutil.ByteBufferPool
|
|
)
|
|
|
|
// GetUncompressedReader returns uncompressed reader for r and the given contentType
|
|
//
|
|
// The returned reader must be passed to PutUncompressedReader when no longer needed.
|
|
func GetUncompressedReader(r io.Reader, contentType string) (io.Reader, error) {
|
|
switch contentType {
|
|
case "zstd":
|
|
return zstd.GetReader(r), nil
|
|
case "snappy":
|
|
return getSnappyReader(r)
|
|
case "gzip":
|
|
return getGzipReader(r)
|
|
case "deflate":
|
|
return getZlibReader(r)
|
|
case "", "none", "identity":
|
|
// Datadog extensions sends Content-Encoding: identity, which is not supported by RFC 2616
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8649
|
|
return getPlainReader(r), nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported contentType: %s", contentType)
|
|
}
|
|
}
|
|
|
|
// PutUncompressedReader puts r to the pool, so it could be reused via GetUncompressedReader()
|
|
func PutUncompressedReader(r io.Reader) {
|
|
switch t := r.(type) {
|
|
case *snappyReader:
|
|
putSnappyReader(t)
|
|
case *zstd.Reader:
|
|
zstd.PutReader(t)
|
|
case *gzip.Reader:
|
|
putGzipReader(t)
|
|
case zlib.Resetter:
|
|
putZlibReader(t)
|
|
case *plainReader:
|
|
putPlainReader(t)
|
|
default:
|
|
logger.Panicf("BUG: unsupported reader passed to PutUncompressedReader: %T", r)
|
|
}
|
|
}
|
|
|
|
type plainReader struct {
|
|
r io.Reader
|
|
}
|
|
|
|
func (pr *plainReader) Read(p []byte) (int, error) {
|
|
return pr.r.Read(p)
|
|
}
|
|
|
|
func getPlainReader(r io.Reader) *plainReader {
|
|
v := plainReaderPool.Get()
|
|
if v == nil {
|
|
v = &plainReader{}
|
|
}
|
|
pr := v.(*plainReader)
|
|
pr.r = r
|
|
return pr
|
|
}
|
|
|
|
func putPlainReader(pr *plainReader) {
|
|
pr.r = nil
|
|
plainReaderPool.Put(pr)
|
|
}
|
|
|
|
var plainReaderPool sync.Pool
|
|
|
|
func getGzipReader(r io.Reader) (*gzip.Reader, error) {
|
|
v := gzipReaderPool.Get()
|
|
if v == nil {
|
|
return gzip.NewReader(r)
|
|
}
|
|
zr := v.(*gzip.Reader)
|
|
if err := zr.Reset(r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func putGzipReader(zr *gzip.Reader) {
|
|
gzipReaderPool.Put(zr)
|
|
}
|
|
|
|
var gzipReaderPool sync.Pool
|
|
|
|
func getZlibReader(r io.Reader) (io.ReadCloser, error) {
|
|
v := zlibReaderPool.Get()
|
|
if v == nil {
|
|
return zlib.NewReader(r)
|
|
}
|
|
zr := v.(zlib.Resetter)
|
|
if err := zr.Reset(r, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr.(io.ReadCloser), nil
|
|
}
|
|
|
|
func putZlibReader(zr zlib.Resetter) {
|
|
zlibReaderPool.Put(zr)
|
|
}
|
|
|
|
var zlibReaderPool sync.Pool
|
|
|
|
type snappyReader struct {
|
|
// b contains decompressed the data, which must be read by snappy reader
|
|
b []byte
|
|
|
|
// offset is an offset at b for the remaining data to read
|
|
offset int
|
|
}
|
|
|
|
func (sr *snappyReader) Reset(r io.Reader) error {
|
|
// Read the whole data in one go, since it is expected that Snappy data
|
|
// is compressed in block mode instead of stream mode.
|
|
// See https://pkg.go.dev/github.com/golang/snappy
|
|
|
|
lr := ioutil.GetLimitedReader(r, maxSnappyBlockSize+1)
|
|
defer ioutil.PutLimitedReader(lr)
|
|
|
|
cbb := compressedBufPool.Get()
|
|
defer compressedBufPool.Put(cbb)
|
|
|
|
_, err := cbb.ReadFrom(lr)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read snappy-encoded data block: %w", err)
|
|
}
|
|
if len(cbb.B) > maxSnappyBlockSize {
|
|
return fmt.Errorf("cannot read snappy-encoded data block because its' size exceeds %d bytes", maxSnappyBlockSize)
|
|
}
|
|
sr.b, err = snappy.Decode(sr.b, cbb.B, maxSnappyBlockSize)
|
|
sr.offset = 0
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decode snappy-encoded data block: %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (sr *snappyReader) Read(p []byte) (int, error) {
|
|
if sr.offset >= len(sr.b) {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, sr.b[sr.offset:])
|
|
sr.offset += n
|
|
if n == len(p) {
|
|
return n, nil
|
|
}
|
|
return n, io.EOF
|
|
}
|
|
|
|
func getSnappyReader(r io.Reader) (*snappyReader, error) {
|
|
v := snappyReaderPool.Get()
|
|
if v == nil {
|
|
v = &snappyReader{}
|
|
}
|
|
zr := v.(*snappyReader)
|
|
if err := zr.Reset(r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func putSnappyReader(zr *snappyReader) {
|
|
snappyReaderPool.Put(zr)
|
|
}
|
|
|
|
var snappyReaderPool sync.Pool
|