Files
VictoriaMetrics/lib/encoding/zstd/stream_pure.go
Alexander Frolov d0045ba51d lib/encoding/zstd: use sync stream decoder (purego) (#9518)
### Describe Your Changes

By default `zstd.Reader` creates multiple goroutines to process a single
connection:
- It doesn't match cgo behavior, which works synchronously, and creates
a lot more concurrent goroutines (0.5k -> 5k on my workload)
- It results in non-zero `vm_tcpdialer_errors_total{type="read"}` errors
on vmselect because an underlying connection is closed while a goroutine
is still reading from it. The goroutine created by
`zstd.NewReader`/`zstd.Reset`

abb348e4db/lib/handshake/buffered_conn.go (L113-L120)
- vmselect (and vmagent) doesn't benefit from async mode since it has
multiple readers in-use at the same time, which usually exceeds the
number of cpu cores

Partly related to #9218

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
2025-07-31 20:40:17 +03:00

147 lines
3.0 KiB
Go

//go:build !cgo
package zstd
import (
"io"
"sync"
"github.com/klauspost/compress/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Reader is zstd reader
type Reader struct {
d *zstd.Decoder
}
// NewReader returns zstd reader for the given r.
func NewReader(r io.Reader) *Reader {
d, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1))
if err != nil {
logger.Panicf("BUG: unexpected error returned when creating ZSTD reader: %s", err)
}
return &Reader{
d: d,
}
}
// Read reads up to len(p) bytes to p from r.
func (r *Reader) Read(p []byte) (int, error) {
return r.d.Read(p)
}
// Release releases r.
func (r *Reader) Release() {
r.d.Close()
r.d = nil
}
// GetReader returns Reader for reading zstd-uncompressed data from r.
//
// When the reader is no longer needed, return back it to the pool via PutReader().
func GetReader(r io.Reader) *Reader {
v := readerPool.Get()
if v == nil {
return NewReader(r)
}
zr := v.(*Reader)
if err := zr.d.Reset(r); err != nil {
logger.Panicf("BUG: unexpected error when resetting ZSTD reader: %s", err)
}
return zr
}
// PutReader returns zr to the pool, so it could be reused via GetReader.
func PutReader(zr *Reader) {
if err := zr.d.Reset(nil); err != nil {
logger.Panicf("BUG: unexpected error when resetting ZSTD reader: %s", err)
}
readerPool.Put(zr)
}
var readerPool sync.Pool
// Writer is zstd writer
type Writer struct {
e *zstd.Encoder
level int
}
// NewWriterLevel returns zstd writer for the given w and level.
func NewWriterLevel(w io.Writer, level int) *Writer {
l := zstd.EncoderLevelFromZstd(level)
e, err := zstd.NewWriter(w, zstd.WithEncoderLevel(l))
if err != nil {
logger.Panicf("BUG: failed to create ZSTD writer: %s", err)
}
return &Writer{
e: e,
level: level,
}
}
// Write writes p to w.
func (w *Writer) Write(p []byte) (int, error) {
return w.e.Write(p)
}
// Flush flushes all the pending data from w to the underlying writer.
func (w *Writer) Flush() error {
return w.e.Flush()
}
// Close flushes the pending data to the underlying writer and finishes the compressed stream.
func (w *Writer) Close() error {
return w.e.Close()
}
// Release releases w.
func (w *Writer) Release() {
w.e.Reset(nil)
w.e = nil
}
// GetWriter returns Writer for writing zstd-compressed data to w.
//
// When the writer is no longer needed, return back it to the pool via PutWriter.
func GetWriter(w io.Writer, level int) *Writer {
p := getWriterPool(level)
v := p.Get()
if v == nil {
return NewWriterLevel(w, level)
}
zw := v.(*Writer)
zw.e.Reset(w)
return zw
}
// PutWriter returns zw to the pool, so it could be reused via GetWriter.
func PutWriter(zw *Writer) {
zw.e.Reset(nil)
p := getWriterPool(zw.level)
p.Put(zw)
}
func getWriterPool(level int) *sync.Pool {
l := zstd.EncoderLevelFromZstd(level)
writersPoolLock.Lock()
p := writersPool[l]
if p == nil {
p = &sync.Pool{}
writersPool[l] = p
}
writersPoolLock.Unlock()
return p
}
var (
writersPoolLock sync.Mutex
writersPool = make(map[zstd.EncoderLevel]*sync.Pool)
)