diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 98f5742d43..b643a50c12 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -2,6 +2,7 @@ package remotewrite import ( "bytes" + "context" "errors" "fmt" "io" @@ -327,17 +328,20 @@ func (c *client) runWorker() { return case <-c.stopCh: // c must be stopped. Wait for a while in the hope the block will be sent. - graceDuration := 5 * time.Second + stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + select { case ok := <-ch: if !ok { // Return unsent block to the queue. c.fq.MustWriteBlockIgnoreDisabledPQ(block) } - case <-time.After(graceDuration): + case <-stopCtx.Done(): // Return unsent block to the queue. c.fq.MustWriteBlockIgnoreDisabledPQ(block) } + c.drainInMemoryQueue(stopCtx, block[:0]) return } } @@ -504,6 +508,32 @@ again: goto again } +func (c *client) drainInMemoryQueue(stopCtx context.Context, block []byte) { + var ok bool + for { + select { + case <-stopCtx.Done(): + return + default: + } + + block, ok = c.fq.MustReadInMemoryBlock(block[:0]) + if !ok { + // The in memory queue has already been drained, + // or persisted queue is being used. + // In this case it is guaranteed that fq will be empty + return + } + + // at this stage c.stopCh should be closed + // so sendBlock function should not perform retries + if ok := c.sendBlock(block); !ok { + c.fq.MustWriteBlockIgnoreDisabledPQ(block) + return + } + } +} + var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second) var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second) diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 2139f5155f..ec18164da0 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996) + * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402). diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index f2d2163132..e829bb6a4e 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -238,15 +238,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { return dst, false } if len(fq.ch) > 0 { - if n := fq.pq.GetPendingBytes(); n > 0 { - logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n) - } - bb := <-fq.ch - fq.pendingInmemoryBytes -= uint64(len(bb.B)) - fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() - dst = append(dst, bb.B...) - blockBufPool.Put(bb) - return dst, true + return fq.mustReadInMemoryBlockLocked(dst) } if n := fq.pq.GetPendingBytes(); n > 0 { data, ok := fq.pq.MustReadBlockNonblocking(dst) @@ -265,6 +257,28 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { } } +func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) { + fq.mu.Lock() + defer fq.mu.Unlock() + + return fq.mustReadInMemoryBlockLocked(dst) +} + +func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) ([]byte, bool) { + if len(fq.ch) == 0 { + return dst, false + } + if n := fq.pq.GetPendingBytes(); n > 0 { + logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n) + } + bb := <-fq.ch + fq.pendingInmemoryBytes -= uint64(len(bb.B)) + fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() + dst = append(dst, bb.B...) + blockBufPool.Put(bb) + return dst, true +} + // Dirname returns the directory name for persistent queue. func (fq *FastQueue) Dirname() string { return filepath.Base(fq.pq.dir)