mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/writeconcurrencylimiter: prevent deadlock at IncConcurrency
Previously (*writeconcurrencylimiter.Reader).Read() could permanently leak concurrency tokens from the -maxConcurrentInserts semaphore. Consider the following example: * GetReader() acquires a token, then PutReader() unconditionally releases it. * Read() calls DecConcurrency() before the underlying I/O and IncConcurrency() after it. If IncConcurrency() returns an error, Read() returns without holding a token. * Each such failure permanently removes one slot from the concurrencyLimitCh semaphore. Slots leak one by one until the channel is fully drained, at which point DecConcurrency() blocks forever, deadlocking ingestion on vmstorage. This commit adds tracking for obtained tokens to the reader. Which prevents possible tokens leakage. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10784
This commit is contained in:
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent data ingestion from getting completely stuck when storage is under heavy load. See [#10784](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10784). Thanks to @fxrlv for the contribution.
|
||||
|
||||
## [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0)
|
||||
|
||||
Released at 2026-04-10
|
||||
|
||||
@@ -32,7 +32,8 @@ var (
|
||||
//
|
||||
// The Reader must be obtained via GetReader() call.
|
||||
type Reader struct {
|
||||
r io.Reader
|
||||
r io.Reader
|
||||
increasedConcurrency bool
|
||||
}
|
||||
|
||||
// GetReader returns the Reader for r.
|
||||
@@ -49,6 +50,7 @@ func GetReader(r io.Reader) (*Reader, error) {
|
||||
}
|
||||
rr := v.(*Reader)
|
||||
rr.r = r
|
||||
rr.increasedConcurrency = true
|
||||
|
||||
return rr, nil
|
||||
}
|
||||
@@ -58,9 +60,11 @@ func GetReader(r io.Reader) (*Reader, error) {
|
||||
// It decreases the concurrency.
|
||||
func PutReader(r *Reader) {
|
||||
r.r = nil
|
||||
if r.increasedConcurrency {
|
||||
DecConcurrency()
|
||||
r.increasedConcurrency = false
|
||||
}
|
||||
readerPool.Put(r)
|
||||
|
||||
DecConcurrency()
|
||||
}
|
||||
|
||||
var readerPool sync.Pool
|
||||
@@ -68,12 +72,14 @@ var readerPool sync.Pool
|
||||
// Read implements io.Reader.
|
||||
func (r *Reader) Read(p []byte) (int, error) {
|
||||
DecConcurrency()
|
||||
r.increasedConcurrency = false
|
||||
|
||||
n, err := r.r.Read(p)
|
||||
|
||||
if errC := IncConcurrency(); errC != nil {
|
||||
return n, errC
|
||||
}
|
||||
r.increasedConcurrency = true
|
||||
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704
|
||||
|
||||
Reference in New Issue
Block a user