Compare commits

...

12 Commits

Author SHA1 Message Date
Max Kotliar
f884ffb1e3 lib/netutil: use watch gourouting appraoch
This one actually worked. Unfortunatly, isConnAlive before get does not
work
2026-05-26 18:50:18 +03:00
Max Kotliar
85a5429a2a fix changelog 2026-05-26 17:14:27 +03:00
Max Kotliar
bd836031ca check ErrDeadlineExceeded error 2026-05-26 17:00:37 +03:00
Max Kotliar
0d3bd2d4e6 Update docs/victoriametrics/changelog/CHANGELOG.md
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Signed-off-by: Max Kotliar <kotlyar.maksim@gmail.com>
2026-05-26 16:52:19 +03:00
Max Kotliar
1fe747cf35 remove debug code 2026-05-26 15:48:21 +03:00
Max Kotliar
572f48c110 remove debug code 2026-05-26 15:24:40 +03:00
Max Kotliar
78883a3f98 remove debug code 2026-05-26 15:23:14 +03:00
Max Kotliar
96368dfcfc lib/netutil: watch idle conns proactively
lib/netutil: check pooled conn liveness before reuse

Probe each connection with a non-blocking read before returning it to a
caller. Connections silently dropped at the network layer (no RST/FIN)
stay alive from the OS perspective until TCP_USER_TIMEOUT fires. This
causes the next write to fail with ETIMEDOUT immediately, producing
user-visible query/insert errors.

```
{"ts":"2026-03-18T12:13:37.303Z","level":"warn","caller":"VictoriaMetrics/app/vmselect/main.go:357","msg":"remoteAddr:
\"10.42.156.237:54634\"; requestURI:
/select/0/prometheus/api/v1/query?query=X&time=1773836017.264876; error
when executing query="X" for (time=1773835987281, step=300000): cannot
execute \"X\": cannot evaluateX\": error occured during search: cannot
fetch query results from vmstorage nodes: cannot perform search on
vmstorage vmstorage-vmc-metrics-1.vmstorage-vmc-metrics.metrics:8401:
cannot execute funcName=\"search_v7\" on vmstorage
\"10.42.154.239:8401\": cannot flush requestData to conn: write tcp4
10.42.104.242:52460->10.42.154.239:8401: write: connection timed out"}
```

The probe sets ReadDeadline=now and reads 1 byte from the underlying TCP
conn. A timeout error means the conn is alive; any other result
(ETIMEDOUT, EOF, unexpected data) means it is dead and is discarded.

The comment
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10735#issuecomment-4535832301
described one user case where it happened.

The change would be useful by users with unstable network
infrastructure. Which could broken established connections that sits in
the pool. Check the logs for write: connection timedout` errors or
execute query:

```
sum(increase(vm_request_errors_total{action="search",type="rpcClient",name="vmselect",job="vmselect-aggregated"}[1m]))
by (job)
/
sum(increase(vm_requests_total{action="search",type="rpcClient",name="vmselect",job="vmselect-aggregated"}[1m]))
by (job)
*
100
```
2026-05-26 15:17:58 +03:00
Max Kotliar
a0220db978 Merge remote-tracking branch 'opensource/cluster' into v1-143-0-debug-rpc-write-timeout 2026-05-25 19:20:14 +03:00
Max Kotliar
ef5e42cfa0 add debug flush log message 2026-05-20 19:42:11 +03:00
Max Kotliar
1730051e27 cleanup pool completly 2026-05-20 18:46:51 +03:00
Max Kotliar
74ae5b2f0d cleanup pool completly 2026-05-20 18:04:54 +03:00
2 changed files with 65 additions and 12 deletions

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix intermittent `write: connection timed out` errors caused by silently dropped TCP connections being reused from the connection pool. See [#10735-comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10735#issuecomment-4535832301).
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)
Released at 2026-05-22

View File

@@ -1,7 +1,9 @@
package netutil
import (
"errors"
"fmt"
"os"
"sync"
"time"
@@ -40,6 +42,7 @@ type ConnPool struct {
type connWithTimestamp struct {
bc *handshake.BufferedConn
lastActiveTime uint64
watchCh chan bool
}
var (
@@ -202,20 +205,27 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
}
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
cp.mu.Lock()
defer cp.mu.Unlock()
for {
cp.mu.Lock()
if cp.isStopped {
cp.mu.Unlock()
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
}
if len(cp.conns) == 0 {
err := cp.lastDialError
cp.mu.Unlock()
return nil, err
}
c := cp.conns[len(cp.conns)-1]
cp.conns = cp.conns[:len(cp.conns)-1]
cp.mu.Unlock()
if cp.isStopped {
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
bc := c.stopWatcher()
if bc == nil {
continue
}
return bc, nil
}
if len(cp.conns) == 0 {
return nil, cp.lastDialError
}
c := cp.conns[len(cp.conns)-1]
bc := c.bc
c.bc = nil
cp.conns = cp.conns[:len(cp.conns)-1]
return bc, nil
}
// Put puts bc back to the pool.
@@ -228,6 +238,8 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
_ = bc.Close()
return
}
watchCh := make(chan bool, 1)
go watchConn(bc, watchCh)
cp.mu.Lock()
if cp.isStopped {
_ = bc.Close()
@@ -235,6 +247,7 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
cp.conns = append(cp.conns, connWithTimestamp{
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
watchCh: watchCh,
})
}
cp.mu.Unlock()
@@ -325,3 +338,41 @@ func forEachConnPool(f func(cp *ConnPool)) {
wg.Wait()
connPoolsMu.Unlock()
}
// watchConn blocks on Read until the connection is closed or enters an error state.
// It sends true to watchCh if the connection is dead, false if it was stopped by a deadline.
func watchConn(bc *handshake.BufferedConn, watchCh chan<- bool) {
var buf [1]byte
_, err := bc.Conn.Read(buf[:])
watchCh <- !errors.Is(err, os.ErrDeadlineExceeded)
}
// stopWatcher stops the watchConn goroutine and returns the connection if it is still alive.
// Returns nil if the connection is dead.
func (c *connWithTimestamp) stopWatcher() *handshake.BufferedConn {
// Fast path: goroutine already reported the connection state.
select {
case dead := <-c.watchCh:
if dead {
_ = c.bc.Close()
return nil
}
default:
}
// Stop the goroutine by setting a deadline in the past, then wait for its response.
if err := c.bc.Conn.SetReadDeadline(time.Now()); err != nil {
_ = c.bc.Close()
return nil
}
dead := <-c.watchCh
if err := c.bc.Conn.SetReadDeadline(time.Time{}); err != nil {
_ = c.bc.Close()
return nil
}
if dead {
_ = c.bc.Close()
return nil
}
return c.bc
}