Files
VictoriaMetrics/lib/storage/block_stream_merger.go
Artem Fetishev d6dacd9771 lib/storage: introduce metricNameSearch type
Searching metricName by metricID happens many times during a single API
call. This requires getting the current set of idbs before those calls
happen. Which is fine but requires propagating idbs across the code
base. This is also fine in case of OSS version as it is used in Search
only.

Propagating idbs across the code base becomes a problem in Enterprise
version as it is used in at least 3 places. As a result it becomes very
difficult to merge things from OSS to Ent.

Localizing the all the dependencies in one searchMetricName type and
reusing this type everywhere should make things simpler.

Related enterprise changes:
https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/compare/search-metric-name-ent?expand=1

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9756
2025-09-24 15:25:48 +02:00

156 lines
3.0 KiB
Go

package storage
import (
"container/heap"
"fmt"
"io"
)
// blockStreamMerger is used for merging block streams.
type blockStreamMerger struct {
// The current block to work with.
Block *Block
bsrHeap blockStreamReaderHeap
// Blocks with smaller timestamps are removed because of retention.
retentionDeadline int64
// Whether the call to NextBlock must be no-op.
nextBlockNoop bool
// The last error
err error
}
func (bsm *blockStreamMerger) reset() {
bsm.Block = nil
for i := range bsm.bsrHeap {
bsm.bsrHeap[i] = nil
}
bsm.bsrHeap = bsm.bsrHeap[:0]
bsm.retentionDeadline = 0
bsm.nextBlockNoop = false
bsm.err = nil
}
// Init initializes bsm with the given bsrs.
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
bsm.reset()
bsm.retentionDeadline = retentionDeadline
for _, bsr := range bsrs {
if bsr.NextBlock() {
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
continue
}
if err := bsr.Error(); err != nil {
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", err)
return
}
}
if len(bsm.bsrHeap) == 0 {
bsm.err = io.EOF
return
}
heap.Init(&bsm.bsrHeap)
bsm.Block = &bsm.bsrHeap[0].Block
bsm.nextBlockNoop = true
}
func (bsm *blockStreamMerger) getRetentionDeadline(_ *blockHeader) int64 {
return bsm.retentionDeadline
}
// NextBlock stores the next block in bsm.Block.
//
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
// for the same TSID may contain overlapped time ranges.
func (bsm *blockStreamMerger) NextBlock() bool {
if bsm.err != nil {
return false
}
if bsm.nextBlockNoop {
bsm.nextBlockNoop = false
return true
}
bsm.err = bsm.nextBlock()
switch bsm.err {
case nil:
return true
case io.EOF:
return false
default:
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", bsm.err)
return false
}
}
func (bsm *blockStreamMerger) nextBlock() error {
bsrMin := bsm.bsrHeap[0]
if bsrMin.NextBlock() {
heap.Fix(&bsm.bsrHeap, 0)
bsm.Block = &bsm.bsrHeap[0].Block
return nil
}
if err := bsrMin.Error(); err != nil {
bsm.Block = nil
return err
}
heap.Pop(&bsm.bsrHeap)
if len(bsm.bsrHeap) == 0 {
bsm.Block = nil
return io.EOF
}
bsm.Block = &bsm.bsrHeap[0].Block
return nil
}
func (bsm *blockStreamMerger) Error() error {
if bsm.err == io.EOF {
return nil
}
return bsm.err
}
type blockStreamReaderHeap []*blockStreamReader
func (bsrh *blockStreamReaderHeap) Len() int {
return len(*bsrh)
}
func (bsrh *blockStreamReaderHeap) Less(i, j int) bool {
x := *bsrh
a, b := &x[i].Block.bh, &x[j].Block.bh
if a.TSID.MetricID == b.TSID.MetricID {
// Fast path for identical TSID values.
return a.MinTimestamp < b.MinTimestamp
}
// Slow path for distinct TSID values.
return a.TSID.Less(&b.TSID)
}
func (bsrh *blockStreamReaderHeap) Swap(i, j int) {
x := *bsrh
x[i], x[j] = x[j], x[i]
}
func (bsrh *blockStreamReaderHeap) Push(x any) {
*bsrh = append(*bsrh, x.(*blockStreamReader))
}
func (bsrh *blockStreamReaderHeap) Pop() any {
a := *bsrh
v := a[len(a)-1]
*bsrh = a[:len(a)-1]
return v
}