lib/encoding/zstd: use sync stream decoder (purego) (#9518)

### Describe Your Changes

By default `zstd.Reader` creates multiple goroutines to process a single
connection:
- It doesn't match cgo behavior, which works synchronously, and creates
a lot more concurrent goroutines (0.5k -> 5k on my workload)
- It results in non-zero `vm_tcpdialer_errors_total{type="read"}` errors
on vmselect because an underlying connection is closed while a goroutine
is still reading from it. The goroutine created by
`zstd.NewReader`/`zstd.Reset`

abb348e4db/lib/handshake/buffered_conn.go (L113-L120)
- vmselect (and vmagent) doesn't benefit from async mode since it has
multiple readers in-use at the same time, which usually exceeds the
number of cpu cores

Partly related to #9218

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
This commit is contained in:
Alexander Frolov
2025-07-31 17:01:09 +02:00
committed by Max Kotliar
parent 8a7045e206
commit d0045ba51d

View File

@@ -18,7 +18,7 @@ type Reader struct {
// NewReader returns zstd reader for the given r.
func NewReader(r io.Reader) *Reader {
d, err := zstd.NewReader(r)
d, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1))
if err != nil {
logger.Panicf("BUG: unexpected error returned when creating ZSTD reader: %s", err)
}