Compare commits

...

4 Commits

Author SHA1 Message Date
Max Kotliar
966eecc7cf wip 2025-07-03 21:36:48 +03:00
Max Kotliar
5d476dfcab wip 2025-07-03 19:42:54 +03:00
Max Kotliar
0f0bd28510 lib/netutil: connection pool based on sync.Cond syncronization 2025-07-03 18:49:24 +03:00
Max Kotliar
00b3189646 lib/netutil: try get conn from pool on dial or handshake error
Dial and handshake takes some time, during which a connection may appear
in the pool. So, if we failed establish a new connection we can try luck
getting one from pool, otherwise return the error.

Follow up on
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8922 and attempt
to partially address
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9345
2025-07-03 15:35:07 +03:00
2 changed files with 57 additions and 34 deletions

View File

@@ -29,6 +29,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add ability to proxy `/api/v1/notifiers` to vmalert when `-vmalert.proxyURL` is set. See [9267](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9267) PR for details.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `vm_cache_eviction_bytes_total` counter metrics to reflect cache evictions due to expiration, misses and cache size. See [9293](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9293) PR for details. Thanks to the @BenNF
* FEATURE: all the [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/enterprise.html) components: improve error message when an empty license is provided via the `-license` or `-licenseFile` command-line flags. See [#9337](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9337) for the details.
* FEATURE: [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): attempt to reuse an existing connection from the pool on dial or handshake failure when creating a new connection. The change should reduce transient handshake or dial errors between `vmselect` and `vmstorage`. See [#9345](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9345)
* BUGFIX: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683) and [dashboards/vmalert](https://grafana.com/grafana/dashboards/14950): fix ad-hoc filters auto-complete and filtering on panels that use MetricsQL specific expressions. See [#8657](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8657).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): automatically retry requests failing with `Expired Token` errors. This helps to avoid failed backups when using [EKS Pod Identity](https://docs.aws.amazon.com/eks/latest/userguide/pod-id-how-it-works.html) for authentication. See [#9280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9280).

View File

@@ -16,6 +16,8 @@ type ConnPool struct {
mu sync.Mutex
d *TCPDialer
cond sync.Cond
// concurrentDialsCh limits the number of concurrent dials the ConnPool can make.
// This should prevent from creating an excees number of connections during temporary
// spikes in workload at vmselect and vmstorage nodes.
@@ -29,6 +31,7 @@ type ConnPool struct {
conns []connWithTimestamp
isStopped bool
stopCh chan struct{}
// lastDialError contains the last error seen when dialing remote addr.
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
@@ -70,7 +73,11 @@ func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Fun
name: name,
handshakeFunc: handshakeFunc,
compressionLevel: compressionLevel,
stopCh: make(chan struct{}),
}
cp.cond.L = &cp.mu
cp.checkAvailability(true)
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
@@ -101,6 +108,7 @@ func (cp *ConnPool) MustStop() {
cp.mu.Lock()
isStopped := cp.isStopped
cp.isStopped = true
close(cp.stopCh)
for _, c := range cp.conns {
_ = c.bc.Close()
}
@@ -134,41 +142,57 @@ func (cp *ConnPool) Addr() string {
// Get returns free connection from the pool.
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
// Fast path - obtained the connection from pool.
return bc, nil
}
return cp.getConnSlow()
}
cp.cond.L.Lock()
defer cp.cond.L.Unlock()
func (cp *ConnPool) getConnSlow() (*handshake.BufferedConn, error) {
for {
select {
// Limit the number of concurrent dials.
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
case cp.concurrentDialsCh <- struct{}{}:
// Create new connection.
conn, err := cp.dialAndHandshake()
<-cp.concurrentDialsCh
return conn, err
default:
// Make attempt to get already established connections from the pool.
// It may appear there while waiting for cp.concurrentDialsCh.
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc == nil {
time.Sleep(100 * time.Millisecond)
continue
}
maxAttempts := 5
for i := 0; i < maxAttempts; i++ {
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
// Fast path - obtained the connection from pool.
return bc, nil
}
// Slow path - no free connections in the pool.
go func() {
// notify waiting goroutines about the new connection
// notify even if err != nil, so that they can unblock and try to get a connection again
defer cp.cond.Signal()
select {
// Limit the number of concurrent dials.
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
case cp.concurrentDialsCh <- struct{}{}:
// Create new connection.
bc, err := cp.dialAndHandshake()
<-cp.concurrentDialsCh
if err != nil {
return
}
cp.mu.Lock()
defer cp.mu.Unlock()
if cp.isStopped {
_ = bc.Close()
return
}
cp.conns = append(cp.conns, connWithTimestamp{
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
case <-cp.stopCh:
return
}
}()
cp.cond.Wait()
}
return nil, fmt.Errorf("cannot get connection from pool %s: no free connections after %d attempts", cp.name, maxAttempts)
}
func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
@@ -194,9 +218,6 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
}
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
cp.mu.Lock()
defer cp.mu.Unlock()
if cp.isStopped {
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
}
@@ -228,6 +249,7 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
cp.cond.Signal()
}
cp.mu.Unlock()
}