mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-22 03:06:36 +03:00
Compare commits
7 Commits
weakpointe
...
vlogs-dash
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c422ab6f87 | ||
|
|
17c584147c | ||
|
|
f4474b9b00 | ||
|
|
d4e039f038 | ||
|
|
e9649fbd12 | ||
|
|
1d5d05c8b3 | ||
|
|
a0848f9235 |
@@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -192,6 +193,7 @@ type logMessageProcessor struct {
|
||||
|
||||
rowsIngestedTotal *metrics.Counter
|
||||
bytesIngestedTotal *metrics.Counter
|
||||
flushDuration *metrics.Summary
|
||||
}
|
||||
|
||||
func (lmp *logMessageProcessor) initPeriodicFlush() {
|
||||
@@ -293,6 +295,7 @@ func (lmp *logMessageProcessor) flushLocked() {
|
||||
lmp.lastFlushTime = time.Now()
|
||||
logRowsStorage.MustAddRows(lmp.lr)
|
||||
lmp.lr.ResetKeepSettings()
|
||||
lmp.flushDuration.UpdateDuration(lmp.lastFlushTime)
|
||||
}
|
||||
|
||||
// MustClose flushes the remaining data to the underlying storage and closes lmp.
|
||||
@@ -303,6 +306,7 @@ func (lmp *logMessageProcessor) MustClose() {
|
||||
lmp.flushLocked()
|
||||
logstorage.PutLogRows(lmp.lr)
|
||||
lmp.lr = nil
|
||||
messageProcessorCount.Add(-1)
|
||||
}
|
||||
|
||||
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
|
||||
@@ -312,12 +316,14 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
|
||||
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.DecolorizeFields, cp.ExtraFields, *defaultMsgValue)
|
||||
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
|
||||
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
|
||||
flushDuration := metrics.GetOrCreateSummary(fmt.Sprintf("vl_insert_flush_duration_seconds{type=%q}", protocolName))
|
||||
lmp := &logMessageProcessor{
|
||||
cp: cp,
|
||||
lr: lr,
|
||||
|
||||
rowsIngestedTotal: rowsIngestedTotal,
|
||||
bytesIngestedTotal: bytesIngestedTotal,
|
||||
flushDuration: flushDuration,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
@@ -326,10 +332,13 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
|
||||
lmp.initPeriodicFlush()
|
||||
}
|
||||
|
||||
messageProcessorCount.Add(1)
|
||||
return lmp
|
||||
}
|
||||
|
||||
var (
|
||||
rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
|
||||
rowsDroppedTotalTooManyFields = metrics.NewCounter(`vl_rows_dropped_total{reason="too_many_fields"}`)
|
||||
_ = metrics.NewGauge(`vl_insert_processors_count`, func() float64 { return float64(messageProcessorCount.Load()) })
|
||||
messageProcessorCount atomic.Int64
|
||||
)
|
||||
|
||||
@@ -78,7 +78,10 @@ var localStorage *logstorage.Storage
|
||||
var localStorageMetrics *metrics.Set
|
||||
|
||||
var netstorageInsert *netinsert.Storage
|
||||
var netstorageInsertMetrics *metrics.Set
|
||||
|
||||
var netstorageSelect *netselect.Storage
|
||||
var netstorageSelectMetrics *metrics.Set
|
||||
|
||||
// Init initializes vlstorage.
|
||||
//
|
||||
@@ -141,9 +144,19 @@ func initNetworkStorage() {
|
||||
|
||||
logger.Infof("starting insert service for nodes %s", *storageNodeAddrs)
|
||||
netstorageInsert = netinsert.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *insertConcurrency, *insertDisableCompression)
|
||||
netstorageInsertMetrics = metrics.NewSet()
|
||||
netstorageInsertMetrics.RegisterMetricsWriter(func(w io.Writer) {
|
||||
netstorageInsert.WriteMetrics(w)
|
||||
})
|
||||
metrics.RegisterSet(netstorageInsertMetrics)
|
||||
|
||||
logger.Infof("initializing select service for nodes %s", *storageNodeAddrs)
|
||||
netstorageSelect = netselect.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *selectDisableCompression)
|
||||
netstorageSelectMetrics = metrics.NewSet()
|
||||
netstorageSelectMetrics.RegisterMetricsWriter(func(w io.Writer) {
|
||||
netstorageSelect.WriteMetrics(w)
|
||||
})
|
||||
metrics.RegisterSet(netstorageSelectMetrics)
|
||||
|
||||
logger.Infof("initialized all the network services")
|
||||
}
|
||||
@@ -354,6 +367,10 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
|
||||
var ss logstorage.StorageStats
|
||||
strg.UpdateStats(&ss)
|
||||
|
||||
if maxDiskSpaceUsageBytes.N > 0 {
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_max_disk_space_usage_bytes{path=%q}`, *storageDataPath), uint64(maxDiskSpaceUsageBytes.N))
|
||||
}
|
||||
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath))
|
||||
|
||||
isReadOnly := uint64(0)
|
||||
@@ -365,10 +382,20 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
|
||||
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/inmemory"}`, ss.InmemoryActiveMerges)
|
||||
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/small"}`, ss.SmallPartActiveMerges)
|
||||
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/big"}`, ss.BigPartActiveMerges)
|
||||
metrics.WriteGaugeUint64(w, `vl_active_merges{type="indexdb/inmemory"}`, ss.IndexdbActiveInmemoryMerges)
|
||||
metrics.WriteGaugeUint64(w, `vl_active_merges{type="indexdb/file"}`, ss.IndexdbActiveFileMerges)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/inmemory"}`, ss.InmemoryMergesTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/small"}`, ss.SmallPartMergesTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/big"}`, ss.BigPartMergesTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_merges_total{type="indexdb/inmemory"}`, ss.IndexdbInmemoryMergesTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_merges_total{type="indexdb/file"}`, ss.IndexdbFileMergesTotal)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/inmemory"}`, ss.InmemoryMergeRowsTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/small"}`, ss.SmallPartMergeRowsTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/big"}`, ss.BigPartMergeRowsTotal)
|
||||
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="indexdb/inmemory"}`, ss.IndexdbInmemoryItemsMerged)
|
||||
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="indexdb/file"}`, ss.IndexdbFileItemsMerged)
|
||||
|
||||
metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/inmemory"}`, ss.InmemoryRowsCount)
|
||||
metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/small"}`, ss.SmallPartRowsCount)
|
||||
@@ -382,6 +409,9 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
|
||||
metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/small"}`, ss.SmallPartBlocks)
|
||||
metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/big"}`, ss.BigPartBlocks)
|
||||
|
||||
metrics.WriteGaugeUint64(w, `vl_pending_rows{type="storage"}`, ss.PendingRowsCount)
|
||||
metrics.WriteGaugeUint64(w, `vl_pending_rows{type="indexdb"}`, ss.IndexdbPendingItems)
|
||||
|
||||
metrics.WriteGaugeUint64(w, `vl_partitions`, ss.PartitionsCount)
|
||||
metrics.WriteCounterUint64(w, `vl_streams_created_total`, ss.StreamsCreatedTotal)
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// the maximum size of a single data block sent to storage node.
|
||||
@@ -33,6 +34,8 @@ const ProtocolVersion = "v1"
|
||||
|
||||
// Storage is a network storage for sending data to remote storage nodes in the cluster.
|
||||
type Storage struct {
|
||||
RemoteSendFailed atomic.Uint64 // Number of failed requests sent to remote storage nodes.
|
||||
|
||||
sns []*storageNode
|
||||
|
||||
disableCompression bool
|
||||
@@ -45,6 +48,16 @@ type Storage struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (s *Storage) WriteMetrics(w io.Writer) {
|
||||
activeStreams := s.GetActiveStreams()
|
||||
if activeStreams == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
metrics.WriteGaugeUint64(w, `vl_insert_active_streams`, activeStreams)
|
||||
metrics.WriteCounterUint64(w, `vl_insert_remote_send_errors_total`, s.RemoteSendFailed.Load())
|
||||
}
|
||||
|
||||
type storageNode struct {
|
||||
// scheme is http or https scheme to communicate with addr
|
||||
scheme string
|
||||
@@ -235,9 +248,7 @@ func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) erro
|
||||
|
||||
resp, err := sn.c.Do(req)
|
||||
if err != nil {
|
||||
// Disable sn for data writing for 10 seconds.
|
||||
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
|
||||
|
||||
sn.setDisableTemporarily()
|
||||
return fmt.Errorf("cannot send data block with the length %d to %q: %s", pendingData.Len(), reqURL, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -251,8 +262,7 @@ func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) erro
|
||||
respBody = []byte(fmt.Sprintf("%s", err))
|
||||
}
|
||||
|
||||
// Disable sn for data writing for 10 seconds.
|
||||
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
|
||||
sn.setDisableTemporarily()
|
||||
|
||||
return fmt.Errorf("unexpected status code returned when sending data block to %q: %d; want 2xx; response body: %q", reqURL, resp.StatusCode, respBody)
|
||||
}
|
||||
@@ -261,6 +271,11 @@ func (sn *storageNode) getRequestURL(path string) string {
|
||||
return fmt.Sprintf("%s://%s%s?version=%s", sn.scheme, sn.addr, path, url.QueryEscape(ProtocolVersion))
|
||||
}
|
||||
|
||||
func (sn *storageNode) setDisableTemporarily() {
|
||||
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
|
||||
sn.s.RemoteSendFailed.Add(1)
|
||||
}
|
||||
|
||||
var zstdBufPool bytesutil.ByteBufferPool
|
||||
|
||||
// NewStorage returns new Storage for the given addrs with the given authCfgs.
|
||||
@@ -307,6 +322,15 @@ func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
|
||||
sn.addRow(r)
|
||||
}
|
||||
|
||||
// GetActiveStreams returns the number of log streams being tracked since the Storage start.
|
||||
func (s *Storage) GetActiveStreams() uint64 {
|
||||
s.srt.mu.Lock()
|
||||
n := uint64(len(s.srt.rowsPerStream))
|
||||
s.srt.mu.Unlock()
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func (s *Storage) sendInsertRequestToAnyNode(pendingData *bytesutil.ByteBuffer) bool {
|
||||
startIdx := int(fastrand.Uint32n(uint32(len(s.sns))))
|
||||
for i := range s.sns {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -65,6 +67,9 @@ type Storage struct {
|
||||
sns []*storageNode
|
||||
|
||||
disableCompression bool
|
||||
|
||||
// remoteSendErrors is the number of errors when sending request to the remote storage node
|
||||
remoteSendErrors atomic.Uint64
|
||||
}
|
||||
|
||||
type storageNode struct {
|
||||
@@ -313,6 +318,10 @@ func (s *Storage) MustStop() {
|
||||
s.sns = nil
|
||||
}
|
||||
|
||||
func (s *Storage) WriteMetrics(w io.Writer) {
|
||||
metrics.WriteGaugeUint64(w, `vl_select_remote_send_errors_total`, s.remoteSendErrors.Load())
|
||||
}
|
||||
|
||||
// RunQuery runs the given q and calls writeBlock for the returned data blocks
|
||||
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error {
|
||||
nqr, err := logstorage.NewNetQueryRunner(ctx, tenantIDs, q, s.RunQuery, writeBlock)
|
||||
@@ -345,6 +354,9 @@ func (s *Storage) runQuery(stopCh <-chan struct{}, tenantIDs []logstorage.Tenant
|
||||
})
|
||||
if err != nil {
|
||||
// Cancel the remaining parallel queries
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
s.remoteSendErrors.Add(1)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -2,6 +2,7 @@ package logstorage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@@ -141,6 +142,9 @@ type blockSearch struct {
|
||||
// It is initialized lazily by calling getColumnsHeader().
|
||||
cshCache *columnsHeader
|
||||
|
||||
// bytesReadFromDisk tracks the total bytes read from disk files for this block search
|
||||
bytesReadFromDisk atomic.Uint64
|
||||
|
||||
// seenStreams contains seen streamIDs for the recent searches.
|
||||
//
|
||||
// It is used for speeding up fetching _stream column.
|
||||
@@ -197,6 +201,9 @@ func (bs *blockSearch) reset() {
|
||||
bs.cshCache = nil
|
||||
}
|
||||
|
||||
// Reset bytes read from disk counter
|
||||
bs.bytesReadFromDisk.Store(0)
|
||||
|
||||
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
|
||||
}
|
||||
|
||||
@@ -344,6 +351,9 @@ func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
|
||||
if bs.cshIndexCache == nil {
|
||||
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
|
||||
|
||||
// Track bytes read from disk
|
||||
bs.bytesReadFromDisk.Add(uint64(len(bs.cshIndexBlockCache)))
|
||||
|
||||
bs.cshIndexCache = getColumnsHeaderIndex()
|
||||
if err := bs.cshIndexCache.unmarshalInplace(bs.cshIndexBlockCache); err != nil {
|
||||
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
|
||||
@@ -376,6 +386,10 @@ func (bs *blockSearch) getColumnsHeader() *columnsHeader {
|
||||
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
|
||||
if !bs.cshBlockInitialized {
|
||||
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
|
||||
|
||||
// Track bytes read from disk
|
||||
bs.bytesReadFromDisk.Add(uint64(len(bs.cshBlockCache)))
|
||||
|
||||
bs.cshBlockInitialized = true
|
||||
}
|
||||
return bs.cshBlockCache
|
||||
@@ -425,6 +439,10 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
|
||||
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
|
||||
|
||||
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
|
||||
|
||||
// Track bytes read from disk
|
||||
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
|
||||
|
||||
bf = getBloomFilter()
|
||||
if err := bf.unmarshal(bb.B); err != nil {
|
||||
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
|
||||
@@ -458,6 +476,9 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
|
||||
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
|
||||
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
|
||||
|
||||
// Track bytes read from disk
|
||||
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
|
||||
|
||||
values = getStringBucket()
|
||||
var err error
|
||||
values.a, err = bs.sbu.unmarshal(values.a[:0], bb.B, bs.bsw.bh.rowsCount)
|
||||
@@ -493,6 +514,9 @@ func (bs *blockSearch) getTimestamps() []int64 {
|
||||
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(blockSize))
|
||||
p.timestampsFile.MustReadAt(bb.B, int64(th.blockOffset))
|
||||
|
||||
// Track bytes read from disk
|
||||
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
|
||||
|
||||
rowsCount := int(bs.bsw.bh.rowsCount)
|
||||
timestamps = encoding.GetInt64s(rowsCount)
|
||||
var err error
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// The maximum size of big part.
|
||||
@@ -54,14 +55,25 @@ type datadb struct {
|
||||
// mergeIdx is used for generating unique directory names for parts
|
||||
mergeIdx atomic.Uint64
|
||||
|
||||
inmemoryMergesTotal atomic.Uint64
|
||||
inmemoryActiveMerges atomic.Int64
|
||||
inmemoryMergesTotal atomic.Uint64
|
||||
inmemoryActiveMerges atomic.Int64
|
||||
inmemoryMergeRowsTotal atomic.Uint64
|
||||
|
||||
smallPartMergesTotal atomic.Uint64
|
||||
smallPartActiveMerges atomic.Int64
|
||||
smallPartMergesTotal atomic.Uint64
|
||||
smallPartActiveMerges atomic.Int64
|
||||
smallPartMergeRowsTotal atomic.Uint64
|
||||
|
||||
bigPartMergesTotal atomic.Uint64
|
||||
bigPartActiveMerges atomic.Int64
|
||||
bigPartMergesTotal atomic.Uint64
|
||||
bigPartActiveMerges atomic.Int64
|
||||
bigPartMergeRowsTotal atomic.Uint64
|
||||
|
||||
// metrics that need to be updated directly
|
||||
inmemoryPartMergeDuration *metrics.Summary
|
||||
inmemoryPartMergeBytes *metrics.Summary
|
||||
smallPartMergeDuration *metrics.Summary
|
||||
smallPartMergeBytes *metrics.Summary
|
||||
bigPartMergeDuration *metrics.Summary
|
||||
bigPartMergeBytes *metrics.Summary
|
||||
|
||||
// pt is the partition the datadb belongs to
|
||||
pt *partition
|
||||
@@ -188,10 +200,18 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
|
||||
ddb := &datadb{
|
||||
pt: pt,
|
||||
flushInterval: flushInterval,
|
||||
path: path,
|
||||
smallParts: smallParts,
|
||||
bigParts: bigParts,
|
||||
stopCh: make(chan struct{}),
|
||||
|
||||
inmemoryPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/inmemory"}`),
|
||||
inmemoryPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/inmemory"}`),
|
||||
smallPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/small"}`),
|
||||
smallPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/small"}`),
|
||||
bigPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/big"}`),
|
||||
bigPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/big"}`),
|
||||
|
||||
path: path,
|
||||
smallParts: smallParts,
|
||||
bigParts: bigParts,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
ddb.rb.init(&ddb.wg, ddb.mustFlushLogRows)
|
||||
ddb.mergeIdx.Store(uint64(time.Now().UnixNano()))
|
||||
@@ -593,6 +613,21 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
|
||||
|
||||
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
|
||||
switch dstPartType {
|
||||
case partInmemory:
|
||||
ddb.inmemoryMergeRowsTotal.Add(srcRowsCount)
|
||||
ddb.inmemoryPartMergeDuration.UpdateDuration(startTime)
|
||||
ddb.inmemoryPartMergeBytes.Update(float64(dstSize))
|
||||
case partSmall:
|
||||
ddb.smallPartMergeRowsTotal.Add(srcRowsCount)
|
||||
ddb.smallPartMergeDuration.UpdateDuration(startTime)
|
||||
ddb.smallPartMergeBytes.Update(float64(dstSize))
|
||||
case partBig:
|
||||
ddb.bigPartMergeRowsTotal.Add(srcRowsCount)
|
||||
ddb.bigPartMergeDuration.UpdateDuration(startTime)
|
||||
ddb.bigPartMergeBytes.Update(float64(dstSize))
|
||||
}
|
||||
|
||||
d := time.Since(startTime)
|
||||
if d <= time.Minute {
|
||||
return
|
||||
@@ -673,6 +708,21 @@ type rowsBuffer struct {
|
||||
nextIdx atomic.Uint64
|
||||
}
|
||||
|
||||
func (rb *rowsBuffer) Len() uint64 {
|
||||
shards := rb.shards
|
||||
n := uint64(0)
|
||||
for i := range shards {
|
||||
shard := &shards[i]
|
||||
shard.mu.Lock()
|
||||
if shard.lr != nil {
|
||||
n += uint64(shard.lr.Len())
|
||||
}
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func (rb *rowsBuffer) init(wg *sync.WaitGroup, flushFunc func(lr *logRows)) {
|
||||
shards := make([]rowsBufferShard, cgroup.AvailableCPUs())
|
||||
for i := range shards {
|
||||
@@ -684,7 +734,7 @@ func (rb *rowsBuffer) init(wg *sync.WaitGroup, flushFunc func(lr *logRows)) {
|
||||
}
|
||||
|
||||
type rowsBufferShard struct {
|
||||
wg *sync.WaitGroup
|
||||
wg *sync.WaitGroup // wg is shared with datadb.
|
||||
flushFunc func(lr *logRows)
|
||||
|
||||
mu sync.Mutex
|
||||
@@ -774,18 +824,30 @@ type DatadbStats struct {
|
||||
// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
|
||||
InmemoryActiveMerges uint64
|
||||
|
||||
// InmemoryMergeRowsTotal is the number of rows merged to inmemory parts.
|
||||
InmemoryMergeRowsTotal uint64
|
||||
|
||||
// SmallPartMergesTotal is the number of small file merges performed in the given datadb.
|
||||
SmallPartMergesTotal uint64
|
||||
|
||||
// SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb.
|
||||
SmallPartActiveMerges uint64
|
||||
|
||||
// SmallPartMergeRowsTotal is the number of rows merged to small parts.
|
||||
SmallPartMergeRowsTotal uint64
|
||||
|
||||
// BigPartMergesTotal is the number of big file merges performed in the given datadb.
|
||||
BigPartMergesTotal uint64
|
||||
|
||||
// BigPartActiveMerges is the number of currently active big file merges performed by the given datadb.
|
||||
BigPartActiveMerges uint64
|
||||
|
||||
// BigPartMergeRowsTotal is the number of rows merged to big parts.
|
||||
BigPartMergeRowsTotal uint64
|
||||
|
||||
// PendingRows is the number of rows, which weren't flushed to searchable part yet.
|
||||
PendingRowsCount uint64
|
||||
|
||||
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
|
||||
InmemoryRowsCount uint64
|
||||
|
||||
@@ -845,10 +907,15 @@ func (s *DatadbStats) RowsCount() uint64 {
|
||||
func (ddb *datadb) updateStats(s *DatadbStats) {
|
||||
s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load()
|
||||
s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load())
|
||||
s.InmemoryMergeRowsTotal += ddb.inmemoryMergeRowsTotal.Load()
|
||||
s.SmallPartMergesTotal += ddb.smallPartMergesTotal.Load()
|
||||
s.SmallPartActiveMerges += uint64(ddb.smallPartActiveMerges.Load())
|
||||
s.SmallPartMergeRowsTotal += ddb.smallPartMergeRowsTotal.Load()
|
||||
s.BigPartMergesTotal += ddb.bigPartMergesTotal.Load()
|
||||
s.BigPartActiveMerges += uint64(ddb.bigPartActiveMerges.Load())
|
||||
s.BigPartMergeRowsTotal += ddb.bigPartMergeRowsTotal.Load()
|
||||
|
||||
s.PendingRowsCount = ddb.rb.Len()
|
||||
|
||||
ddb.partsLock.Lock()
|
||||
|
||||
|
||||
@@ -206,7 +206,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
|
||||
var results []result
|
||||
|
||||
const workersCount = 3
|
||||
s.search(workersCount, so, nil, func(_ uint, br *blockResult) {
|
||||
s.search(workersCount, &searchStats{}, so, nil, func(_ uint, br *blockResult) {
|
||||
// Verify columns
|
||||
cs := br.getColumns()
|
||||
if len(cs) != 2 {
|
||||
|
||||
@@ -46,6 +46,27 @@ type IndexdbStats struct {
|
||||
|
||||
// IndexdbPartsCount is the number of parts in indexdb.
|
||||
IndexdbPartsCount uint64
|
||||
|
||||
// IndexdbPendingItems is the number of pending items in IndexedDB before they are merged into the part.
|
||||
IndexdbPendingItems uint64
|
||||
|
||||
// IndexdbActiveFileMerges is the number of active merges in indexdb.
|
||||
IndexdbActiveFileMerges uint64
|
||||
|
||||
// IndexdbActiveInmemoryMerges is the number of active merges in indexdb.
|
||||
IndexdbActiveInmemoryMerges uint64
|
||||
|
||||
// IndexdbFileMergesTotal is the number of merges in indexdb.
|
||||
IndexdbFileMergesTotal uint64
|
||||
|
||||
// IndexdbInmemoryMergesTotal is the number of merges in indexdb.
|
||||
IndexdbInmemoryMergesTotal uint64
|
||||
|
||||
// IndexdbFileItemsMerged is the number of items merged in indexdb.
|
||||
IndexdbFileItemsMerged uint64
|
||||
|
||||
// IndexdbInmemoryItemsMerged is the number of items merged in indexdb.
|
||||
IndexdbInmemoryItemsMerged uint64
|
||||
}
|
||||
|
||||
type indexdb struct {
|
||||
@@ -107,8 +128,15 @@ func (idb *indexdb) updateStats(d *IndexdbStats) {
|
||||
|
||||
d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
|
||||
d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
|
||||
d.IndexdbPendingItems += tm.PendingItems
|
||||
d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
|
||||
d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
|
||||
d.IndexdbActiveFileMerges = tm.ActiveFileMerges
|
||||
d.IndexdbActiveInmemoryMerges = tm.ActiveInmemoryMerges
|
||||
d.IndexdbFileMergesTotal += tm.FileMergesCount
|
||||
d.IndexdbInmemoryMergesTotal += tm.InmemoryMergesCount
|
||||
d.IndexdbFileItemsMerged += tm.FileItemsMerged
|
||||
d.IndexdbInmemoryItemsMerged += tm.InmemoryItemsMerged
|
||||
}
|
||||
|
||||
func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
|
||||
|
||||
@@ -255,6 +255,8 @@ type Query struct {
|
||||
|
||||
// timestamp is the timestamp context used for parsing the query.
|
||||
timestamp int64
|
||||
|
||||
searchStats *searchStats
|
||||
}
|
||||
|
||||
type queryOptions struct {
|
||||
@@ -1281,9 +1283,10 @@ func parseQuery(lex *lexer) (*Query, error) {
|
||||
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
|
||||
}
|
||||
q := &Query{
|
||||
opts: opts,
|
||||
f: f,
|
||||
timestamp: lex.currentTimestamp,
|
||||
opts: opts,
|
||||
f: f,
|
||||
timestamp: lex.currentTimestamp,
|
||||
searchStats: &searchStats{},
|
||||
}
|
||||
|
||||
if lex.isKeyword("|") {
|
||||
|
||||
@@ -8,6 +8,11 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
streamIDCacheHitRate = metrics.NewSummary(`vl_cache_hit_rate{type="stream_id"}`)
|
||||
)
|
||||
|
||||
// PartitionStats contains stats for the partition.
|
||||
@@ -118,18 +123,23 @@ func mustClosePartition(pt *partition) {
|
||||
}
|
||||
|
||||
func (pt *partition) mustAddRows(lr *LogRows) {
|
||||
var cacheMisses uint64
|
||||
var cacheHits uint64
|
||||
|
||||
// Register rows in indexdb
|
||||
var pendingRows []int
|
||||
streamIDs := lr.streamIDs
|
||||
for i := range lr.timestamps {
|
||||
streamID := &streamIDs[i]
|
||||
if pt.hasStreamIDInCache(streamID) {
|
||||
cacheHits++
|
||||
continue
|
||||
}
|
||||
if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) {
|
||||
pendingRows = append(pendingRows, i)
|
||||
}
|
||||
}
|
||||
|
||||
if len(pendingRows) > 0 {
|
||||
logNewStreams := pt.s.logNewStreams
|
||||
streamTagsCanonicals := lr.streamTagsCanonicals
|
||||
@@ -142,11 +152,14 @@ func (pt *partition) mustAddRows(lr *LogRows) {
|
||||
continue
|
||||
}
|
||||
if pt.hasStreamIDInCache(streamID) {
|
||||
cacheHits++
|
||||
continue
|
||||
}
|
||||
cacheMisses++
|
||||
if !pt.idb.hasStreamID(streamID) {
|
||||
streamTagsCanonical := streamTagsCanonicals[rowIdx]
|
||||
pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
|
||||
|
||||
if logNewStreams {
|
||||
pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx])
|
||||
}
|
||||
@@ -155,6 +168,8 @@ func (pt *partition) mustAddRows(lr *LogRows) {
|
||||
}
|
||||
}
|
||||
|
||||
streamIDCacheHitRate.Update(float64(cacheHits) / float64(cacheHits+cacheMisses))
|
||||
|
||||
// Add rows to datadb
|
||||
pt.ddb.mustAddRows(lr)
|
||||
if pt.s.logIngestedRows {
|
||||
|
||||
@@ -17,13 +17,13 @@ import (
|
||||
|
||||
// StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats().
|
||||
type StorageStats struct {
|
||||
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed
|
||||
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed.
|
||||
RowsDroppedTooBigTimestamp uint64
|
||||
|
||||
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed
|
||||
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed.
|
||||
RowsDroppedTooSmallTimestamp uint64
|
||||
|
||||
// PartitionsCount is the number of partitions in the storage
|
||||
// PartitionsCount is the number of partitions in the storage.
|
||||
PartitionsCount uint64
|
||||
|
||||
// IsReadOnly indicates whether the storage is read-only.
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -16,6 +17,14 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prefixfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsPerQuery = metrics.NewHistogram(`vl_storage_rows_read_per_query`)
|
||||
bytesPerQuery = metrics.NewHistogram(`vl_storage_bytes_read_per_query`)
|
||||
blocksPerQuery = metrics.NewHistogram(`vl_storage_blocks_read_per_query`)
|
||||
streamsUsedPerQuery = metrics.NewHistogram(`vl_storage_streams_used_per_query`)
|
||||
)
|
||||
|
||||
// genericSearchOptions contain options used for search.
|
||||
@@ -100,7 +109,16 @@ func (f writeBlockResultFunc) newDataBlockWriter() WriteDataBlockFunc {
|
||||
// RunQuery runs the given q and calls writeBlock for results.
|
||||
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteDataBlockFunc) error {
|
||||
writeBlockResult := writeBlock.newBlockResultWriter()
|
||||
return s.runQuery(ctx, tenantIDs, q, writeBlockResult)
|
||||
|
||||
err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
|
||||
|
||||
// Update metrics regardless of the error
|
||||
bytesPerQuery.Update(float64(q.searchStats.totalBytesFromDisk.Load()))
|
||||
rowsPerQuery.Update(float64(q.searchStats.totalRows.Load()))
|
||||
blocksPerQuery.Update(float64(q.searchStats.totalBlocks.Load()))
|
||||
streamsUsedPerQuery.Update(float64(q.searchStats.fetchStreams.Load()))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// runQueryFunc must run the given q and pass query results to writeBlock
|
||||
@@ -133,7 +151,7 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
|
||||
workersCount := q.GetConcurrency()
|
||||
|
||||
search := func(stopCh <-chan struct{}, writeBlockToPipes writeBlockResultFunc) error {
|
||||
s.search(workersCount, so, stopCh, writeBlockToPipes)
|
||||
s.search(workersCount, q.searchStats, so, stopCh, writeBlockToPipes)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1031,13 +1049,17 @@ func (db *DataBlock) initFromBlockResult(br *blockResult) {
|
||||
// search searches for the matching rows according to so.
|
||||
//
|
||||
// It calls writeBlock for each matching block.
|
||||
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
|
||||
func (s *Storage) search(workersCount int, ss *searchStats, so *genericSearchOptions, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
|
||||
// Spin up workers
|
||||
var wgWorkers sync.WaitGroup
|
||||
workCh := make(chan *blockSearchWorkBatch, workersCount)
|
||||
wgWorkers.Add(workersCount)
|
||||
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
var totalBytesFromDisk uint64
|
||||
var totalRows, totalBlocks int
|
||||
|
||||
bs := getBlockSearch()
|
||||
bm := getBitmap(0)
|
||||
for bswb := range workCh {
|
||||
@@ -1051,6 +1073,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
||||
}
|
||||
|
||||
bs.search(bsw, bm)
|
||||
totalBytesFromDisk += bs.bytesReadFromDisk.Load()
|
||||
totalRows += bs.br.rowsLen
|
||||
totalBlocks++
|
||||
|
||||
if bs.br.rowsLen > 0 {
|
||||
writeBlock(workerID, &bs.br)
|
||||
}
|
||||
@@ -1059,6 +1085,11 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
||||
bswb.bsws = bswb.bsws[:0]
|
||||
putBlockSearchWorkBatch(bswb)
|
||||
}
|
||||
|
||||
ss.totalBytesFromDisk.Add(totalBytesFromDisk)
|
||||
ss.totalRows.Add(uint64(totalRows))
|
||||
ss.totalBlocks.Add(uint64(totalBlocks))
|
||||
|
||||
putBlockSearch(bs)
|
||||
putBitmap(bm)
|
||||
wgWorkers.Done()
|
||||
@@ -1097,7 +1128,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
||||
partitionSearchConcurrencyLimitCh <- struct{}{}
|
||||
wgSearchers.Add(1)
|
||||
go func(idx int, pt *partition) {
|
||||
psfs[idx] = pt.search(sf, f, so, workCh, stopCh)
|
||||
psfs[idx] = pt.search(sf, ss, f, so, workCh, stopCh)
|
||||
wgSearchers.Done()
|
||||
<-partitionSearchConcurrencyLimitCh
|
||||
}(i, ptw.pt)
|
||||
@@ -1126,7 +1157,14 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
|
||||
|
||||
type partitionSearchFinalizer func()
|
||||
|
||||
func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
type searchStats struct {
|
||||
fetchStreams atomic.Uint64
|
||||
totalBytesFromDisk atomic.Uint64
|
||||
totalRows atomic.Uint64
|
||||
totalBlocks atomic.Uint64
|
||||
}
|
||||
|
||||
func (pt *partition) search(sf *StreamFilter, ss *searchStats, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
if needStop(stopCh) {
|
||||
// Do not spend CPU time on search, since it is already stopped.
|
||||
return func() {}
|
||||
@@ -1144,6 +1182,8 @@ func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions
|
||||
streamIDs = getStreamIDsForTenantIDs(so.streamIDs, tenantIDs)
|
||||
tenantIDs = nil
|
||||
}
|
||||
ss.fetchStreams.Add(uint64(len(streamIDs)))
|
||||
|
||||
if hasStreamFilters(f) {
|
||||
f = initStreamFilters(so.tenantIDs, pt.idb, f)
|
||||
}
|
||||
|
||||
@@ -937,7 +937,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, _ *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
})
|
||||
t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) {
|
||||
tenantID := TenantID{
|
||||
@@ -951,7 +951,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, _ *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
})
|
||||
t.Run("missing-tenant-middle", func(_ *testing.T) {
|
||||
tenantID := TenantID{
|
||||
@@ -965,7 +965,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, _ *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
})
|
||||
t.Run("matching-tenant-id", func(t *testing.T) {
|
||||
for i := 0; i < tenantsCount; i++ {
|
||||
@@ -981,7 +981,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -998,7 +998,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -1014,7 +1014,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, _ *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
})
|
||||
t.Run("matching-stream-id", func(t *testing.T) {
|
||||
for i := 0; i < streamsPerTenant; i++ {
|
||||
@@ -1031,7 +1031,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := blocksPerStream * rowsPerBlock
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -1053,7 +1053,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -1083,7 +1083,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -1104,7 +1104,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, br *blockResult) {
|
||||
rowsCountTotal.Add(uint32(br.rowsLen))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
|
||||
expectedRowsCount := blocksPerStream
|
||||
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||
@@ -1124,7 +1124,7 @@ func TestStorageSearch(t *testing.T) {
|
||||
processBlock := func(_ uint, _ *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
s.search(workersCount, &searchStats{}, so, nil, processBlock)
|
||||
})
|
||||
|
||||
s.MustClose()
|
||||
|
||||
Reference in New Issue
Block a user