mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
address review comment
This commit is contained in:
@@ -238,7 +238,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst)
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
data, ok := fq.pq.MustReadBlockNonblocking(dst)
|
||||
@@ -261,22 +261,26 @@ func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
return fq.mustReadInMemoryBlockLocked(dst)
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) ([]byte, bool) {
|
||||
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
|
||||
if len(fq.ch) == 0 {
|
||||
return dst, false
|
||||
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
|
||||
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
|
||||
}
|
||||
bb := <-fq.ch
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
dst = append(dst, bb.B...)
|
||||
blockBufPool.Put(bb)
|
||||
return dst, true
|
||||
return dst
|
||||
}
|
||||
|
||||
// Dirname returns the directory name for persistent queue.
|
||||
|
||||
Reference in New Issue
Block a user