mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
4 Commits
query-debu
...
cond-based
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
966eecc7cf | ||
|
|
5d476dfcab | ||
|
|
0f0bd28510 | ||
|
|
00b3189646 |
@@ -29,6 +29,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add ability to proxy `/api/v1/notifiers` to vmalert when `-vmalert.proxyURL` is set. See [9267](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9267) PR for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `vm_cache_eviction_bytes_total` counter metrics to reflect cache evictions due to expiration, misses and cache size. See [9293](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9293) PR for details. Thanks to the @BenNF
|
||||
* FEATURE: all the [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/enterprise.html) components: improve error message when an empty license is provided via the `-license` or `-licenseFile` command-line flags. See [#9337](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9337) for the details.
|
||||
* FEATURE: [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): attempt to reuse an existing connection from the pool on dial or handshake failure when creating a new connection. The change should reduce transient handshake or dial errors between `vmselect` and `vmstorage`. See [#9345](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9345)
|
||||
|
||||
* BUGFIX: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683) and [dashboards/vmalert](https://grafana.com/grafana/dashboards/14950): fix ad-hoc filters auto-complete and filtering on panels that use MetricsQL specific expressions. See [#8657](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8657).
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): automatically retry requests failing with `Expired Token` errors. This helps to avoid failed backups when using [EKS Pod Identity](https://docs.aws.amazon.com/eks/latest/userguide/pod-id-how-it-works.html) for authentication. See [#9280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9280).
|
||||
|
||||
@@ -16,6 +16,8 @@ type ConnPool struct {
|
||||
mu sync.Mutex
|
||||
d *TCPDialer
|
||||
|
||||
cond sync.Cond
|
||||
|
||||
// concurrentDialsCh limits the number of concurrent dials the ConnPool can make.
|
||||
// This should prevent from creating an excees number of connections during temporary
|
||||
// spikes in workload at vmselect and vmstorage nodes.
|
||||
@@ -29,6 +31,7 @@ type ConnPool struct {
|
||||
conns []connWithTimestamp
|
||||
|
||||
isStopped bool
|
||||
stopCh chan struct{}
|
||||
|
||||
// lastDialError contains the last error seen when dialing remote addr.
|
||||
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
|
||||
@@ -70,7 +73,11 @@ func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Fun
|
||||
name: name,
|
||||
handshakeFunc: handshakeFunc,
|
||||
compressionLevel: compressionLevel,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
cp.cond.L = &cp.mu
|
||||
|
||||
cp.checkAvailability(true)
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
|
||||
cp.mu.Lock()
|
||||
@@ -101,6 +108,7 @@ func (cp *ConnPool) MustStop() {
|
||||
cp.mu.Lock()
|
||||
isStopped := cp.isStopped
|
||||
cp.isStopped = true
|
||||
close(cp.stopCh)
|
||||
for _, c := range cp.conns {
|
||||
_ = c.bc.Close()
|
||||
}
|
||||
@@ -134,41 +142,57 @@ func (cp *ConnPool) Addr() string {
|
||||
|
||||
// Get returns free connection from the pool.
|
||||
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc != nil {
|
||||
// Fast path - obtained the connection from pool.
|
||||
return bc, nil
|
||||
}
|
||||
return cp.getConnSlow()
|
||||
}
|
||||
cp.cond.L.Lock()
|
||||
defer cp.cond.L.Unlock()
|
||||
|
||||
func (cp *ConnPool) getConnSlow() (*handshake.BufferedConn, error) {
|
||||
for {
|
||||
select {
|
||||
// Limit the number of concurrent dials.
|
||||
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
case cp.concurrentDialsCh <- struct{}{}:
|
||||
// Create new connection.
|
||||
conn, err := cp.dialAndHandshake()
|
||||
<-cp.concurrentDialsCh
|
||||
return conn, err
|
||||
default:
|
||||
// Make attempt to get already established connections from the pool.
|
||||
// It may appear there while waiting for cp.concurrentDialsCh.
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc == nil {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
maxAttempts := 5
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc != nil {
|
||||
// Fast path - obtained the connection from pool.
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// Slow path - no free connections in the pool.
|
||||
go func() {
|
||||
// notify waiting goroutines about the new connection
|
||||
// notify even if err != nil, so that they can unblock and try to get a connection again
|
||||
defer cp.cond.Signal()
|
||||
|
||||
select {
|
||||
// Limit the number of concurrent dials.
|
||||
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
case cp.concurrentDialsCh <- struct{}{}:
|
||||
// Create new connection.
|
||||
bc, err := cp.dialAndHandshake()
|
||||
<-cp.concurrentDialsCh
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
if cp.isStopped {
|
||||
_ = bc.Close()
|
||||
return
|
||||
}
|
||||
cp.conns = append(cp.conns, connWithTimestamp{
|
||||
bc: bc,
|
||||
lastActiveTime: fasttime.UnixTimestamp(),
|
||||
})
|
||||
case <-cp.stopCh:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
cp.cond.Wait()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("cannot get connection from pool %s: no free connections after %d attempts", cp.name, maxAttempts)
|
||||
}
|
||||
|
||||
func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
@@ -194,9 +218,6 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
}
|
||||
|
||||
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
|
||||
if cp.isStopped {
|
||||
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
|
||||
}
|
||||
@@ -228,6 +249,7 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
|
||||
bc: bc,
|
||||
lastActiveTime: fasttime.UnixTimestamp(),
|
||||
})
|
||||
cp.cond.Signal()
|
||||
}
|
||||
cp.mu.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user