mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-02 08:32:46 +03:00
Compare commits
12 Commits
shared-vms
...
v1-143-0-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f884ffb1e3 | ||
|
|
85a5429a2a | ||
|
|
bd836031ca | ||
|
|
0d3bd2d4e6 | ||
|
|
1fe747cf35 | ||
|
|
572f48c110 | ||
|
|
78883a3f98 | ||
|
|
96368dfcfc | ||
|
|
a0220db978 | ||
|
|
ef5e42cfa0 | ||
|
|
1730051e27 | ||
|
|
74ae5b2f0d |
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix intermittent `write: connection timed out` errors caused by silently dropped TCP connections being reused from the connection pool. See [#10735-comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10735#issuecomment-4535832301).
|
||||
|
||||
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)
|
||||
|
||||
Released at 2026-05-22
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package netutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -40,6 +42,7 @@ type ConnPool struct {
|
||||
type connWithTimestamp struct {
|
||||
bc *handshake.BufferedConn
|
||||
lastActiveTime uint64
|
||||
watchCh chan bool
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -202,20 +205,27 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
}
|
||||
|
||||
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
for {
|
||||
cp.mu.Lock()
|
||||
if cp.isStopped {
|
||||
cp.mu.Unlock()
|
||||
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
|
||||
}
|
||||
if len(cp.conns) == 0 {
|
||||
err := cp.lastDialError
|
||||
cp.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
c := cp.conns[len(cp.conns)-1]
|
||||
cp.conns = cp.conns[:len(cp.conns)-1]
|
||||
cp.mu.Unlock()
|
||||
|
||||
if cp.isStopped {
|
||||
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
|
||||
bc := c.stopWatcher()
|
||||
if bc == nil {
|
||||
continue
|
||||
}
|
||||
return bc, nil
|
||||
}
|
||||
if len(cp.conns) == 0 {
|
||||
return nil, cp.lastDialError
|
||||
}
|
||||
c := cp.conns[len(cp.conns)-1]
|
||||
bc := c.bc
|
||||
c.bc = nil
|
||||
cp.conns = cp.conns[:len(cp.conns)-1]
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// Put puts bc back to the pool.
|
||||
@@ -228,6 +238,8 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
|
||||
_ = bc.Close()
|
||||
return
|
||||
}
|
||||
watchCh := make(chan bool, 1)
|
||||
go watchConn(bc, watchCh)
|
||||
cp.mu.Lock()
|
||||
if cp.isStopped {
|
||||
_ = bc.Close()
|
||||
@@ -235,6 +247,7 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
|
||||
cp.conns = append(cp.conns, connWithTimestamp{
|
||||
bc: bc,
|
||||
lastActiveTime: fasttime.UnixTimestamp(),
|
||||
watchCh: watchCh,
|
||||
})
|
||||
}
|
||||
cp.mu.Unlock()
|
||||
@@ -325,3 +338,41 @@ func forEachConnPool(f func(cp *ConnPool)) {
|
||||
wg.Wait()
|
||||
connPoolsMu.Unlock()
|
||||
}
|
||||
|
||||
// watchConn blocks on Read until the connection is closed or enters an error state.
|
||||
// It sends true to watchCh if the connection is dead, false if it was stopped by a deadline.
|
||||
func watchConn(bc *handshake.BufferedConn, watchCh chan<- bool) {
|
||||
var buf [1]byte
|
||||
_, err := bc.Conn.Read(buf[:])
|
||||
watchCh <- !errors.Is(err, os.ErrDeadlineExceeded)
|
||||
}
|
||||
|
||||
// stopWatcher stops the watchConn goroutine and returns the connection if it is still alive.
|
||||
// Returns nil if the connection is dead.
|
||||
func (c *connWithTimestamp) stopWatcher() *handshake.BufferedConn {
|
||||
// Fast path: goroutine already reported the connection state.
|
||||
select {
|
||||
case dead := <-c.watchCh:
|
||||
if dead {
|
||||
_ = c.bc.Close()
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
// Stop the goroutine by setting a deadline in the past, then wait for its response.
|
||||
if err := c.bc.Conn.SetReadDeadline(time.Now()); err != nil {
|
||||
_ = c.bc.Close()
|
||||
return nil
|
||||
}
|
||||
dead := <-c.watchCh
|
||||
if err := c.bc.Conn.SetReadDeadline(time.Time{}); err != nil {
|
||||
_ = c.bc.Close()
|
||||
return nil
|
||||
}
|
||||
if dead {
|
||||
_ = c.bc.Close()
|
||||
return nil
|
||||
}
|
||||
return c.bc
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user