Files
VictoriaMetrics/lib/writeconcurrencylimiter/concurrencylimiter.go
Alexander Frolov d07c1c73d1 lib/writeconcurrencylimiter: prevent deadlock at IncConcurrency
Previously (*writeconcurrencylimiter.Reader).Read() could permanently leak concurrency tokens from the -maxConcurrentInserts semaphore.
 
 Consider the following example:
* GetReader() acquires a token, then PutReader() unconditionally releases it.
* Read() calls DecConcurrency() before the underlying I/O and IncConcurrency() after it. If IncConcurrency() returns an error, Read() returns without holding a token.
* Each such failure permanently removes one slot from the concurrencyLimitCh semaphore. Slots leak one by one until the channel is fully drained, at which point DecConcurrency() blocks forever, deadlocking ingestion on vmstorage.

 This commit adds tracking for obtained tokens to the reader. Which prevents possible tokens leakage. 

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10784
2026-04-10 19:35:59 +02:00

151 lines
4.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package writeconcurrencylimiter
import (
"errors"
"flag"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
var (
maxConcurrentInserts = flag.Int("maxConcurrentInserts", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent insert requests. "+
"Set higher value when clients send data over slow networks. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage. "+
"See also -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration to wait in the queue when -maxConcurrentInserts "+
"concurrent insert requests are executed")
)
// Reader is a reader, which decreases the concurrency before every Read() call
// and increases the concurrency after Read() call.
//
// It effectively limits the number of concurrent goroutines,
// which may process results returned by concurrently processed Reader structs.
//
// The Reader must be obtained via GetReader() call.
type Reader struct {
r io.Reader
increasedConcurrency bool
}
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader(r io.Reader) (*Reader, error) {
if err := IncConcurrency(); err != nil {
return nil, err
}
v := readerPool.Get()
if v == nil {
v = &Reader{}
}
rr := v.(*Reader)
rr.r = r
rr.increasedConcurrency = true
return rr, nil
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency.
func PutReader(r *Reader) {
r.r = nil
if r.increasedConcurrency {
DecConcurrency()
r.increasedConcurrency = false
}
readerPool.Put(r)
}
var readerPool sync.Pool
// Read implements io.Reader.
func (r *Reader) Read(p []byte) (int, error) {
DecConcurrency()
r.increasedConcurrency = false
n, err := r.r.Read(p)
if errC := IncConcurrency(); errC != nil {
return n, errC
}
r.increasedConcurrency = true
if errors.Is(err, io.ErrUnexpectedEOF) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704
err = fmt.Errorf("%w: while reading the request body. This might be caused by a timeout on the client side. "+
"Possible solutions: to lower -insert.maxQueueDuration below the clients timeout; to increase the client-side timeout; "+
"to increase compute resources at the server; to increase -maxConcurrentInserts", err)
}
return n, err
}
func initConcurrencyLimitCh() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
}
var (
concurrencyLimitCh chan struct{}
concurrencyLimitChOnce sync.Once
)
// IncConcurrency obtains a concurrency token from -maxConcurrentInserts.
//
// The obtained token must be returned back via DecConcurrency() call.
func IncConcurrency() error {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
select {
case concurrencyLimitCh <- struct{}{}:
return nil
default:
}
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
defer timerpool.Put(t)
select {
case concurrencyLimitCh <- struct{}{}:
return nil
case <-t.C:
concurrencyLimitTimeout.Inc()
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
}
}
// DecConcurrency returns the token obtained via IncConcurrency(), so other goroutines could obtain it.
func DecConcurrency() {
<-concurrencyLimitCh
}
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
return float64(cap(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
return float64(len(concurrencyLimitCh))
})
)