app/vmagent: attempt to send in memory blocks to rw during shutdown

vmagent would try to flush in-memory blocks to rw for 5 seconds only
after falling back and store them to the persisted queue

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996
This commit is contained in:
Max Kotliar
2026-05-08 23:49:22 +03:00
parent 17c95e59e3
commit 7214ca1a8b
3 changed files with 57 additions and 11 deletions

View File

@@ -2,6 +2,7 @@ package remotewrite
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@@ -327,17 +328,20 @@ func (c *client) runWorker() {
return
case <-c.stopCh:
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time.Second
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
select {
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
case <-time.After(graceDuration):
case <-stopCtx.Done():
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
c.drainInMemoryQueue(stopCtx, block[:0])
return
}
}
@@ -504,6 +508,32 @@ again:
goto again
}
func (c *client) drainInMemoryQueue(stopCtx context.Context, block []byte) {
var ok bool
for {
select {
case <-stopCtx.Done():
return
default:
}
block, ok = c.fq.MustReadInMemoryBlock(block[:0])
if !ok {
// The in memory queue has already been drained,
// or persisted queue is being used.
// In this case it is guaranteed that fq will be empty
return
}
// at this stage c.stopCh should be closed
// so sendBlock function should not perform retries
if ok := c.sendBlock(block); !ok {
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
}
}
}
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second)

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).

View File

@@ -238,15 +238,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
return dst, false
}
if len(fq.ch) > 0 {
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true
return fq.mustReadInMemoryBlockLocked(dst)
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
@@ -265,6 +257,28 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
return fq.mustReadInMemoryBlockLocked(dst)
}
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) ([]byte, bool) {
if len(fq.ch) == 0 {
return dst, false
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true
}
// Dirname returns the directory name for persistent queue.
func (fq *FastQueue) Dirname() string {
return filepath.Base(fq.pq.dir)