mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
Searching metricName by metricID happens many times during a single API call. This requires getting the current set of idbs before those calls happen. Which is fine but requires propagating idbs across the code base. This is also fine in case of OSS version as it is used in Search only. Propagating idbs across the code base becomes a problem in Enterprise version as it is used in at least 3 places. As a result it becomes very difficult to merge things from OSS to Ent. Localizing the all the dependencies in one searchMetricName type and reusing this type everywhere should make things simpler. Related enterprise changes: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/compare/search-metric-name-ent?expand=1 Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9756
156 lines
3.0 KiB
Go
156 lines
3.0 KiB
Go
package storage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"io"
|
|
)
|
|
|
|
// blockStreamMerger is used for merging block streams.
|
|
type blockStreamMerger struct {
|
|
// The current block to work with.
|
|
Block *Block
|
|
|
|
bsrHeap blockStreamReaderHeap
|
|
|
|
// Blocks with smaller timestamps are removed because of retention.
|
|
retentionDeadline int64
|
|
|
|
// Whether the call to NextBlock must be no-op.
|
|
nextBlockNoop bool
|
|
|
|
// The last error
|
|
err error
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.Block = nil
|
|
|
|
for i := range bsm.bsrHeap {
|
|
bsm.bsrHeap[i] = nil
|
|
}
|
|
bsm.bsrHeap = bsm.bsrHeap[:0]
|
|
|
|
bsm.retentionDeadline = 0
|
|
bsm.nextBlockNoop = false
|
|
bsm.err = nil
|
|
}
|
|
|
|
// Init initializes bsm with the given bsrs.
|
|
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
|
|
bsm.reset()
|
|
bsm.retentionDeadline = retentionDeadline
|
|
for _, bsr := range bsrs {
|
|
if bsr.NextBlock() {
|
|
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
|
|
continue
|
|
}
|
|
if err := bsr.Error(); err != nil {
|
|
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
if len(bsm.bsrHeap) == 0 {
|
|
bsm.err = io.EOF
|
|
return
|
|
}
|
|
|
|
heap.Init(&bsm.bsrHeap)
|
|
bsm.Block = &bsm.bsrHeap[0].Block
|
|
bsm.nextBlockNoop = true
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) getRetentionDeadline(_ *blockHeader) int64 {
|
|
return bsm.retentionDeadline
|
|
}
|
|
|
|
// NextBlock stores the next block in bsm.Block.
|
|
//
|
|
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
|
|
// for the same TSID may contain overlapped time ranges.
|
|
func (bsm *blockStreamMerger) NextBlock() bool {
|
|
if bsm.err != nil {
|
|
return false
|
|
}
|
|
if bsm.nextBlockNoop {
|
|
bsm.nextBlockNoop = false
|
|
return true
|
|
}
|
|
|
|
bsm.err = bsm.nextBlock()
|
|
switch bsm.err {
|
|
case nil:
|
|
return true
|
|
case io.EOF:
|
|
return false
|
|
default:
|
|
bsm.err = fmt.Errorf("cannot obtain the next block to merge: %w", bsm.err)
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) nextBlock() error {
|
|
bsrMin := bsm.bsrHeap[0]
|
|
if bsrMin.NextBlock() {
|
|
heap.Fix(&bsm.bsrHeap, 0)
|
|
bsm.Block = &bsm.bsrHeap[0].Block
|
|
return nil
|
|
}
|
|
|
|
if err := bsrMin.Error(); err != nil {
|
|
bsm.Block = nil
|
|
return err
|
|
}
|
|
|
|
heap.Pop(&bsm.bsrHeap)
|
|
|
|
if len(bsm.bsrHeap) == 0 {
|
|
bsm.Block = nil
|
|
return io.EOF
|
|
}
|
|
|
|
bsm.Block = &bsm.bsrHeap[0].Block
|
|
return nil
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) Error() error {
|
|
if bsm.err == io.EOF {
|
|
return nil
|
|
}
|
|
return bsm.err
|
|
}
|
|
|
|
type blockStreamReaderHeap []*blockStreamReader
|
|
|
|
func (bsrh *blockStreamReaderHeap) Len() int {
|
|
return len(*bsrh)
|
|
}
|
|
|
|
func (bsrh *blockStreamReaderHeap) Less(i, j int) bool {
|
|
x := *bsrh
|
|
a, b := &x[i].Block.bh, &x[j].Block.bh
|
|
if a.TSID.MetricID == b.TSID.MetricID {
|
|
// Fast path for identical TSID values.
|
|
return a.MinTimestamp < b.MinTimestamp
|
|
}
|
|
// Slow path for distinct TSID values.
|
|
return a.TSID.Less(&b.TSID)
|
|
}
|
|
|
|
func (bsrh *blockStreamReaderHeap) Swap(i, j int) {
|
|
x := *bsrh
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (bsrh *blockStreamReaderHeap) Push(x any) {
|
|
*bsrh = append(*bsrh, x.(*blockStreamReader))
|
|
}
|
|
|
|
func (bsrh *blockStreamReaderHeap) Pop() any {
|
|
a := *bsrh
|
|
v := a[len(a)-1]
|
|
*bsrh = a[:len(a)-1]
|
|
return v
|
|
}
|