From 7f896dc907e4b871fff0853e3b6f7f145c7ef8cb Mon Sep 17 00:00:00 2001 From: Max Kotliar Date: Wed, 13 May 2026 14:34:12 +0300 Subject: [PATCH] address review comment --- lib/persistentqueue/fastqueue.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index e829bb6a4e..bc534be716 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -238,7 +238,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { return dst, false } if len(fq.ch) > 0 { - return fq.mustReadInMemoryBlockLocked(dst) + return fq.mustReadInMemoryBlockLocked(dst), true } if n := fq.pq.GetPendingBytes(); n > 0 { data, ok := fq.pq.MustReadBlockNonblocking(dst) @@ -261,22 +261,26 @@ func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) { fq.mu.Lock() defer fq.mu.Unlock() - return fq.mustReadInMemoryBlockLocked(dst) + if len(fq.ch) > 0 { + return fq.mustReadInMemoryBlockLocked(dst), true + } + + return nil, false } -func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) ([]byte, bool) { +func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte { if len(fq.ch) == 0 { - return dst, false + logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront") } if n := fq.pq.GetPendingBytes(); n > 0 { - logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n) + logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n) } bb := <-fq.ch fq.pendingInmemoryBytes -= uint64(len(bb.B)) fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp() dst = append(dst, bb.B...) blockBufPool.Put(bb) - return dst, true + return dst } // Dirname returns the directory name for persistent queue.