Compare commits

...

7 Commits

Author SHA1 Message Date
func25
c422ab6f87 WIP 2025-06-29 01:51:36 +07:00
func25
17c584147c WIP 2025-06-27 16:37:45 +07:00
func25
f4474b9b00 WIP 2025-06-27 16:31:16 +07:00
func25
d4e039f038 cache_hit_rate metric 2025-06-25 19:05:00 +07:00
func25
e9649fbd12 add vl_merge_bytes{type} 2025-06-25 19:05:00 +07:00
func25
1d5d05c8b3 copilot 2025-06-25 19:05:00 +07:00
func25
a0848f9235 add metrics 2025-06-25 19:05:00 +07:00
15 changed files with 11734 additions and 4014 deletions

View File

@@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
@@ -192,6 +193,7 @@ type logMessageProcessor struct {
rowsIngestedTotal *metrics.Counter
bytesIngestedTotal *metrics.Counter
flushDuration *metrics.Summary
}
func (lmp *logMessageProcessor) initPeriodicFlush() {
@@ -293,6 +295,7 @@ func (lmp *logMessageProcessor) flushLocked() {
lmp.lastFlushTime = time.Now()
logRowsStorage.MustAddRows(lmp.lr)
lmp.lr.ResetKeepSettings()
lmp.flushDuration.UpdateDuration(lmp.lastFlushTime)
}
// MustClose flushes the remaining data to the underlying storage and closes lmp.
@@ -303,6 +306,7 @@ func (lmp *logMessageProcessor) MustClose() {
lmp.flushLocked()
logstorage.PutLogRows(lmp.lr)
lmp.lr = nil
messageProcessorCount.Add(-1)
}
// NewLogMessageProcessor returns new LogMessageProcessor for the given cp.
@@ -312,12 +316,14 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.DecolorizeFields, cp.ExtraFields, *defaultMsgValue)
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
flushDuration := metrics.GetOrCreateSummary(fmt.Sprintf("vl_insert_flush_duration_seconds{type=%q}", protocolName))
lmp := &logMessageProcessor{
cp: cp,
lr: lr,
rowsIngestedTotal: rowsIngestedTotal,
bytesIngestedTotal: bytesIngestedTotal,
flushDuration: flushDuration,
stopCh: make(chan struct{}),
}
@@ -326,10 +332,13 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
lmp.initPeriodicFlush()
}
messageProcessorCount.Add(1)
return lmp
}
var (
rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
rowsDroppedTotalTooManyFields = metrics.NewCounter(`vl_rows_dropped_total{reason="too_many_fields"}`)
_ = metrics.NewGauge(`vl_insert_processors_count`, func() float64 { return float64(messageProcessorCount.Load()) })
messageProcessorCount atomic.Int64
)

View File

@@ -78,7 +78,10 @@ var localStorage *logstorage.Storage
var localStorageMetrics *metrics.Set
var netstorageInsert *netinsert.Storage
var netstorageInsertMetrics *metrics.Set
var netstorageSelect *netselect.Storage
var netstorageSelectMetrics *metrics.Set
// Init initializes vlstorage.
//
@@ -141,9 +144,19 @@ func initNetworkStorage() {
logger.Infof("starting insert service for nodes %s", *storageNodeAddrs)
netstorageInsert = netinsert.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *insertConcurrency, *insertDisableCompression)
netstorageInsertMetrics = metrics.NewSet()
netstorageInsertMetrics.RegisterMetricsWriter(func(w io.Writer) {
netstorageInsert.WriteMetrics(w)
})
metrics.RegisterSet(netstorageInsertMetrics)
logger.Infof("initializing select service for nodes %s", *storageNodeAddrs)
netstorageSelect = netselect.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *selectDisableCompression)
netstorageSelectMetrics = metrics.NewSet()
netstorageSelectMetrics.RegisterMetricsWriter(func(w io.Writer) {
netstorageSelect.WriteMetrics(w)
})
metrics.RegisterSet(netstorageSelectMetrics)
logger.Infof("initialized all the network services")
}
@@ -354,6 +367,10 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
if maxDiskSpaceUsageBytes.N > 0 {
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_max_disk_space_usage_bytes{path=%q}`, *storageDataPath), uint64(maxDiskSpaceUsageBytes.N))
}
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath))
isReadOnly := uint64(0)
@@ -365,10 +382,20 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/inmemory"}`, ss.InmemoryActiveMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/small"}`, ss.SmallPartActiveMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/big"}`, ss.BigPartActiveMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="indexdb/inmemory"}`, ss.IndexdbActiveInmemoryMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="indexdb/file"}`, ss.IndexdbActiveFileMerges)
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/inmemory"}`, ss.InmemoryMergesTotal)
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/small"}`, ss.SmallPartMergesTotal)
metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/big"}`, ss.BigPartMergesTotal)
metrics.WriteCounterUint64(w, `vl_merges_total{type="indexdb/inmemory"}`, ss.IndexdbInmemoryMergesTotal)
metrics.WriteCounterUint64(w, `vl_merges_total{type="indexdb/file"}`, ss.IndexdbFileMergesTotal)
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/inmemory"}`, ss.InmemoryMergeRowsTotal)
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/small"}`, ss.SmallPartMergeRowsTotal)
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="storage/big"}`, ss.BigPartMergeRowsTotal)
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="indexdb/inmemory"}`, ss.IndexdbInmemoryItemsMerged)
metrics.WriteCounterUint64(w, `vl_rows_merged_total{type="indexdb/file"}`, ss.IndexdbFileItemsMerged)
metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/inmemory"}`, ss.InmemoryRowsCount)
metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/small"}`, ss.SmallPartRowsCount)
@@ -382,6 +409,9 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/small"}`, ss.SmallPartBlocks)
metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/big"}`, ss.BigPartBlocks)
metrics.WriteGaugeUint64(w, `vl_pending_rows{type="storage"}`, ss.PendingRowsCount)
metrics.WriteGaugeUint64(w, `vl_pending_rows{type="indexdb"}`, ss.IndexdbPendingItems)
metrics.WriteGaugeUint64(w, `vl_partitions`, ss.PartitionsCount)
metrics.WriteCounterUint64(w, `vl_streams_created_total`, ss.StreamsCreatedTotal)

View File

@@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
// the maximum size of a single data block sent to storage node.
@@ -33,6 +34,8 @@ const ProtocolVersion = "v1"
// Storage is a network storage for sending data to remote storage nodes in the cluster.
type Storage struct {
RemoteSendFailed atomic.Uint64 // Number of failed requests sent to remote storage nodes.
sns []*storageNode
disableCompression bool
@@ -45,6 +48,16 @@ type Storage struct {
wg sync.WaitGroup
}
func (s *Storage) WriteMetrics(w io.Writer) {
activeStreams := s.GetActiveStreams()
if activeStreams == 0 {
return
}
metrics.WriteGaugeUint64(w, `vl_insert_active_streams`, activeStreams)
metrics.WriteCounterUint64(w, `vl_insert_remote_send_errors_total`, s.RemoteSendFailed.Load())
}
type storageNode struct {
// scheme is http or https scheme to communicate with addr
scheme string
@@ -235,9 +248,7 @@ func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) erro
resp, err := sn.c.Do(req)
if err != nil {
// Disable sn for data writing for 10 seconds.
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
sn.setDisableTemporarily()
return fmt.Errorf("cannot send data block with the length %d to %q: %s", pendingData.Len(), reqURL, err)
}
defer resp.Body.Close()
@@ -251,8 +262,7 @@ func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) erro
respBody = []byte(fmt.Sprintf("%s", err))
}
// Disable sn for data writing for 10 seconds.
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
sn.setDisableTemporarily()
return fmt.Errorf("unexpected status code returned when sending data block to %q: %d; want 2xx; response body: %q", reqURL, resp.StatusCode, respBody)
}
@@ -261,6 +271,11 @@ func (sn *storageNode) getRequestURL(path string) string {
return fmt.Sprintf("%s://%s%s?version=%s", sn.scheme, sn.addr, path, url.QueryEscape(ProtocolVersion))
}
func (sn *storageNode) setDisableTemporarily() {
sn.disabledUntil.Store(fasttime.UnixTimestamp() + 10)
sn.s.RemoteSendFailed.Add(1)
}
var zstdBufPool bytesutil.ByteBufferPool
// NewStorage returns new Storage for the given addrs with the given authCfgs.
@@ -307,6 +322,15 @@ func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
sn.addRow(r)
}
// GetActiveStreams returns the number of log streams being tracked since the Storage start.
func (s *Storage) GetActiveStreams() uint64 {
s.srt.mu.Lock()
n := uint64(len(s.srt.rowsPerStream))
s.srt.mu.Unlock()
return n
}
func (s *Storage) sendInsertRequestToAnyNode(pendingData *bytesutil.ByteBuffer) bool {
startIdx := int(fastrand.Uint32n(uint32(len(s.sns))))
for i := range s.sns {

View File

@@ -10,6 +10,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -21,6 +22,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"
)
const (
@@ -65,6 +67,9 @@ type Storage struct {
sns []*storageNode
disableCompression bool
// remoteSendErrors is the number of errors when sending request to the remote storage node
remoteSendErrors atomic.Uint64
}
type storageNode struct {
@@ -313,6 +318,10 @@ func (s *Storage) MustStop() {
s.sns = nil
}
func (s *Storage) WriteMetrics(w io.Writer) {
metrics.WriteGaugeUint64(w, `vl_select_remote_send_errors_total`, s.remoteSendErrors.Load())
}
// RunQuery runs the given q and calls writeBlock for the returned data blocks
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error {
nqr, err := logstorage.NewNetQueryRunner(ctx, tenantIDs, q, s.RunQuery, writeBlock)
@@ -345,6 +354,9 @@ func (s *Storage) runQuery(stopCh <-chan struct{}, tenantIDs []logstorage.Tenant
})
if err != nil {
// Cancel the remaining parallel queries
if !errors.Is(err, context.Canceled) {
s.remoteSendErrors.Add(1)
}
cancel()
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -141,6 +142,9 @@ type blockSearch struct {
// It is initialized lazily by calling getColumnsHeader().
cshCache *columnsHeader
// bytesReadFromDisk tracks the total bytes read from disk files for this block search
bytesReadFromDisk atomic.Uint64
// seenStreams contains seen streamIDs for the recent searches.
//
// It is used for speeding up fetching _stream column.
@@ -197,6 +201,9 @@ func (bs *blockSearch) reset() {
bs.cshCache = nil
}
// Reset bytes read from disk counter
bs.bytesReadFromDisk.Store(0)
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
}
@@ -344,6 +351,9 @@ func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
if bs.cshIndexCache == nil {
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
// Track bytes read from disk
bs.bytesReadFromDisk.Add(uint64(len(bs.cshIndexBlockCache)))
bs.cshIndexCache = getColumnsHeaderIndex()
if err := bs.cshIndexCache.unmarshalInplace(bs.cshIndexBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
@@ -376,6 +386,10 @@ func (bs *blockSearch) getColumnsHeader() *columnsHeader {
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
if !bs.cshBlockInitialized {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
// Track bytes read from disk
bs.bytesReadFromDisk.Add(uint64(len(bs.cshBlockCache)))
bs.cshBlockInitialized = true
}
return bs.cshBlockCache
@@ -425,6 +439,10 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
// Track bytes read from disk
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
bf = getBloomFilter()
if err := bf.unmarshal(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
@@ -458,6 +476,9 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
// Track bytes read from disk
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
values = getStringBucket()
var err error
values.a, err = bs.sbu.unmarshal(values.a[:0], bb.B, bs.bsw.bh.rowsCount)
@@ -493,6 +514,9 @@ func (bs *blockSearch) getTimestamps() []int64 {
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(blockSize))
p.timestampsFile.MustReadAt(bb.B, int64(th.blockOffset))
// Track bytes read from disk
bs.bytesReadFromDisk.Add(uint64(len(bb.B)))
rowsCount := int(bs.bsw.bh.rowsCount)
timestamps = encoding.GetInt64s(rowsCount)
var err error

View File

@@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/metrics"
)
// The maximum size of big part.
@@ -54,14 +55,25 @@ type datadb struct {
// mergeIdx is used for generating unique directory names for parts
mergeIdx atomic.Uint64
inmemoryMergesTotal atomic.Uint64
inmemoryActiveMerges atomic.Int64
inmemoryMergesTotal atomic.Uint64
inmemoryActiveMerges atomic.Int64
inmemoryMergeRowsTotal atomic.Uint64
smallPartMergesTotal atomic.Uint64
smallPartActiveMerges atomic.Int64
smallPartMergesTotal atomic.Uint64
smallPartActiveMerges atomic.Int64
smallPartMergeRowsTotal atomic.Uint64
bigPartMergesTotal atomic.Uint64
bigPartActiveMerges atomic.Int64
bigPartMergesTotal atomic.Uint64
bigPartActiveMerges atomic.Int64
bigPartMergeRowsTotal atomic.Uint64
// metrics that need to be updated directly
inmemoryPartMergeDuration *metrics.Summary
inmemoryPartMergeBytes *metrics.Summary
smallPartMergeDuration *metrics.Summary
smallPartMergeBytes *metrics.Summary
bigPartMergeDuration *metrics.Summary
bigPartMergeBytes *metrics.Summary
// pt is the partition the datadb belongs to
pt *partition
@@ -188,10 +200,18 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
ddb := &datadb{
pt: pt,
flushInterval: flushInterval,
path: path,
smallParts: smallParts,
bigParts: bigParts,
stopCh: make(chan struct{}),
inmemoryPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/inmemory"}`),
inmemoryPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/inmemory"}`),
smallPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/small"}`),
smallPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/small"}`),
bigPartMergeDuration: metrics.GetOrCreateSummary(`vl_merge_duration_seconds{type="storage/big"}`),
bigPartMergeBytes: metrics.GetOrCreateSummary(`vl_merge_bytes{type="storage/big"}`),
path: path,
smallParts: smallParts,
bigParts: bigParts,
stopCh: make(chan struct{}),
}
ddb.rb.init(&ddb.wg, ddb.mustFlushLogRows)
ddb.mergeIdx.Store(uint64(time.Now().UnixNano()))
@@ -593,6 +613,21 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
switch dstPartType {
case partInmemory:
ddb.inmemoryMergeRowsTotal.Add(srcRowsCount)
ddb.inmemoryPartMergeDuration.UpdateDuration(startTime)
ddb.inmemoryPartMergeBytes.Update(float64(dstSize))
case partSmall:
ddb.smallPartMergeRowsTotal.Add(srcRowsCount)
ddb.smallPartMergeDuration.UpdateDuration(startTime)
ddb.smallPartMergeBytes.Update(float64(dstSize))
case partBig:
ddb.bigPartMergeRowsTotal.Add(srcRowsCount)
ddb.bigPartMergeDuration.UpdateDuration(startTime)
ddb.bigPartMergeBytes.Update(float64(dstSize))
}
d := time.Since(startTime)
if d <= time.Minute {
return
@@ -673,6 +708,21 @@ type rowsBuffer struct {
nextIdx atomic.Uint64
}
func (rb *rowsBuffer) Len() uint64 {
shards := rb.shards
n := uint64(0)
for i := range shards {
shard := &shards[i]
shard.mu.Lock()
if shard.lr != nil {
n += uint64(shard.lr.Len())
}
shard.mu.Unlock()
}
return n
}
func (rb *rowsBuffer) init(wg *sync.WaitGroup, flushFunc func(lr *logRows)) {
shards := make([]rowsBufferShard, cgroup.AvailableCPUs())
for i := range shards {
@@ -684,7 +734,7 @@ func (rb *rowsBuffer) init(wg *sync.WaitGroup, flushFunc func(lr *logRows)) {
}
type rowsBufferShard struct {
wg *sync.WaitGroup
wg *sync.WaitGroup // wg is shared with datadb.
flushFunc func(lr *logRows)
mu sync.Mutex
@@ -774,18 +824,30 @@ type DatadbStats struct {
// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
InmemoryActiveMerges uint64
// InmemoryMergeRowsTotal is the number of rows merged to inmemory parts.
InmemoryMergeRowsTotal uint64
// SmallPartMergesTotal is the number of small file merges performed in the given datadb.
SmallPartMergesTotal uint64
// SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb.
SmallPartActiveMerges uint64
// SmallPartMergeRowsTotal is the number of rows merged to small parts.
SmallPartMergeRowsTotal uint64
// BigPartMergesTotal is the number of big file merges performed in the given datadb.
BigPartMergesTotal uint64
// BigPartActiveMerges is the number of currently active big file merges performed by the given datadb.
BigPartActiveMerges uint64
// BigPartMergeRowsTotal is the number of rows merged to big parts.
BigPartMergeRowsTotal uint64
// PendingRows is the number of rows, which weren't flushed to searchable part yet.
PendingRowsCount uint64
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
InmemoryRowsCount uint64
@@ -845,10 +907,15 @@ func (s *DatadbStats) RowsCount() uint64 {
func (ddb *datadb) updateStats(s *DatadbStats) {
s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load()
s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load())
s.InmemoryMergeRowsTotal += ddb.inmemoryMergeRowsTotal.Load()
s.SmallPartMergesTotal += ddb.smallPartMergesTotal.Load()
s.SmallPartActiveMerges += uint64(ddb.smallPartActiveMerges.Load())
s.SmallPartMergeRowsTotal += ddb.smallPartMergeRowsTotal.Load()
s.BigPartMergesTotal += ddb.bigPartMergesTotal.Load()
s.BigPartActiveMerges += uint64(ddb.bigPartActiveMerges.Load())
s.BigPartMergeRowsTotal += ddb.bigPartMergeRowsTotal.Load()
s.PendingRowsCount = ddb.rb.Len()
ddb.partsLock.Lock()

View File

@@ -206,7 +206,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
var results []result
const workersCount = 3
s.search(workersCount, so, nil, func(_ uint, br *blockResult) {
s.search(workersCount, &searchStats{}, so, nil, func(_ uint, br *blockResult) {
// Verify columns
cs := br.getColumns()
if len(cs) != 2 {

View File

@@ -46,6 +46,27 @@ type IndexdbStats struct {
// IndexdbPartsCount is the number of parts in indexdb.
IndexdbPartsCount uint64
// IndexdbPendingItems is the number of pending items in IndexedDB before they are merged into the part.
IndexdbPendingItems uint64
// IndexdbActiveFileMerges is the number of active merges in indexdb.
IndexdbActiveFileMerges uint64
// IndexdbActiveInmemoryMerges is the number of active merges in indexdb.
IndexdbActiveInmemoryMerges uint64
// IndexdbFileMergesTotal is the number of merges in indexdb.
IndexdbFileMergesTotal uint64
// IndexdbInmemoryMergesTotal is the number of merges in indexdb.
IndexdbInmemoryMergesTotal uint64
// IndexdbFileItemsMerged is the number of items merged in indexdb.
IndexdbFileItemsMerged uint64
// IndexdbInmemoryItemsMerged is the number of items merged in indexdb.
IndexdbInmemoryItemsMerged uint64
}
type indexdb struct {
@@ -107,8 +128,15 @@ func (idb *indexdb) updateStats(d *IndexdbStats) {
d.IndexdbSizeBytes += tm.InmemorySizeBytes + tm.FileSizeBytes
d.IndexdbItemsCount += tm.InmemoryItemsCount + tm.FileItemsCount
d.IndexdbPendingItems += tm.PendingItems
d.IndexdbPartsCount += tm.InmemoryPartsCount + tm.FilePartsCount
d.IndexdbBlocksCount += tm.InmemoryBlocksCount + tm.FileBlocksCount
d.IndexdbActiveFileMerges = tm.ActiveFileMerges
d.IndexdbActiveInmemoryMerges = tm.ActiveInmemoryMerges
d.IndexdbFileMergesTotal += tm.FileMergesCount
d.IndexdbInmemoryMergesTotal += tm.InmemoryMergesCount
d.IndexdbFileItemsMerged += tm.FileItemsMerged
d.IndexdbInmemoryItemsMerged += tm.InmemoryItemsMerged
}
func (idb *indexdb) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {

View File

@@ -255,6 +255,8 @@ type Query struct {
// timestamp is the timestamp context used for parsing the query.
timestamp int64
searchStats *searchStats
}
type queryOptions struct {
@@ -1281,9 +1283,10 @@ func parseQuery(lex *lexer) (*Query, error) {
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
}
q := &Query{
opts: opts,
f: f,
timestamp: lex.currentTimestamp,
opts: opts,
f: f,
timestamp: lex.currentTimestamp,
searchStats: &searchStats{},
}
if lex.isKeyword("|") {

View File

@@ -8,6 +8,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
var (
streamIDCacheHitRate = metrics.NewSummary(`vl_cache_hit_rate{type="stream_id"}`)
)
// PartitionStats contains stats for the partition.
@@ -118,18 +123,23 @@ func mustClosePartition(pt *partition) {
}
func (pt *partition) mustAddRows(lr *LogRows) {
var cacheMisses uint64
var cacheHits uint64
// Register rows in indexdb
var pendingRows []int
streamIDs := lr.streamIDs
for i := range lr.timestamps {
streamID := &streamIDs[i]
if pt.hasStreamIDInCache(streamID) {
cacheHits++
continue
}
if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) {
pendingRows = append(pendingRows, i)
}
}
if len(pendingRows) > 0 {
logNewStreams := pt.s.logNewStreams
streamTagsCanonicals := lr.streamTagsCanonicals
@@ -142,11 +152,14 @@ func (pt *partition) mustAddRows(lr *LogRows) {
continue
}
if pt.hasStreamIDInCache(streamID) {
cacheHits++
continue
}
cacheMisses++
if !pt.idb.hasStreamID(streamID) {
streamTagsCanonical := streamTagsCanonicals[rowIdx]
pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
if logNewStreams {
pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx])
}
@@ -155,6 +168,8 @@ func (pt *partition) mustAddRows(lr *LogRows) {
}
}
streamIDCacheHitRate.Update(float64(cacheHits) / float64(cacheHits+cacheMisses))
// Add rows to datadb
pt.ddb.mustAddRows(lr)
if pt.s.logIngestedRows {

View File

@@ -17,13 +17,13 @@ import (
// StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats().
type StorageStats struct {
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed.
RowsDroppedTooBigTimestamp uint64
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed.
RowsDroppedTooSmallTimestamp uint64
// PartitionsCount is the number of partitions in the storage
// PartitionsCount is the number of partitions in the storage.
PartitionsCount uint64
// IsReadOnly indicates whether the storage is read-only.

View File

@@ -8,6 +8,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -16,6 +17,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prefixfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsPerQuery = metrics.NewHistogram(`vl_storage_rows_read_per_query`)
bytesPerQuery = metrics.NewHistogram(`vl_storage_bytes_read_per_query`)
blocksPerQuery = metrics.NewHistogram(`vl_storage_blocks_read_per_query`)
streamsUsedPerQuery = metrics.NewHistogram(`vl_storage_streams_used_per_query`)
)
// genericSearchOptions contain options used for search.
@@ -100,7 +109,16 @@ func (f writeBlockResultFunc) newDataBlockWriter() WriteDataBlockFunc {
// RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteDataBlockFunc) error {
writeBlockResult := writeBlock.newBlockResultWriter()
return s.runQuery(ctx, tenantIDs, q, writeBlockResult)
err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
// Update metrics regardless of the error
bytesPerQuery.Update(float64(q.searchStats.totalBytesFromDisk.Load()))
rowsPerQuery.Update(float64(q.searchStats.totalRows.Load()))
blocksPerQuery.Update(float64(q.searchStats.totalBlocks.Load()))
streamsUsedPerQuery.Update(float64(q.searchStats.fetchStreams.Load()))
return err
}
// runQueryFunc must run the given q and pass query results to writeBlock
@@ -133,7 +151,7 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
workersCount := q.GetConcurrency()
search := func(stopCh <-chan struct{}, writeBlockToPipes writeBlockResultFunc) error {
s.search(workersCount, so, stopCh, writeBlockToPipes)
s.search(workersCount, q.searchStats, so, stopCh, writeBlockToPipes)
return nil
}
@@ -1031,13 +1049,17 @@ func (db *DataBlock) initFromBlockResult(br *blockResult) {
// search searches for the matching rows according to so.
//
// It calls writeBlock for each matching block.
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
func (s *Storage) search(workersCount int, ss *searchStats, so *genericSearchOptions, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
// Spin up workers
var wgWorkers sync.WaitGroup
workCh := make(chan *blockSearchWorkBatch, workersCount)
wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
var totalBytesFromDisk uint64
var totalRows, totalBlocks int
bs := getBlockSearch()
bm := getBitmap(0)
for bswb := range workCh {
@@ -1051,6 +1073,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
}
bs.search(bsw, bm)
totalBytesFromDisk += bs.bytesReadFromDisk.Load()
totalRows += bs.br.rowsLen
totalBlocks++
if bs.br.rowsLen > 0 {
writeBlock(workerID, &bs.br)
}
@@ -1059,6 +1085,11 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
bswb.bsws = bswb.bsws[:0]
putBlockSearchWorkBatch(bswb)
}
ss.totalBytesFromDisk.Add(totalBytesFromDisk)
ss.totalRows.Add(uint64(totalRows))
ss.totalBlocks.Add(uint64(totalBlocks))
putBlockSearch(bs)
putBitmap(bm)
wgWorkers.Done()
@@ -1097,7 +1128,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1)
go func(idx int, pt *partition) {
psfs[idx] = pt.search(sf, f, so, workCh, stopCh)
psfs[idx] = pt.search(sf, ss, f, so, workCh, stopCh)
wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh
}(i, ptw.pt)
@@ -1126,7 +1157,14 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
type partitionSearchFinalizer func()
func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
type searchStats struct {
fetchStreams atomic.Uint64
totalBytesFromDisk atomic.Uint64
totalRows atomic.Uint64
totalBlocks atomic.Uint64
}
func (pt *partition) search(sf *StreamFilter, ss *searchStats, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
if needStop(stopCh) {
// Do not spend CPU time on search, since it is already stopped.
return func() {}
@@ -1144,6 +1182,8 @@ func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions
streamIDs = getStreamIDsForTenantIDs(so.streamIDs, tenantIDs)
tenantIDs = nil
}
ss.fetchStreams.Add(uint64(len(streamIDs)))
if hasStreamFilters(f) {
f = initStreamFilters(so.tenantIDs, pt.idb, f)
}

View File

@@ -937,7 +937,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
})
t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) {
tenantID := TenantID{
@@ -951,7 +951,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
})
t.Run("missing-tenant-middle", func(_ *testing.T) {
tenantID := TenantID{
@@ -965,7 +965,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
})
t.Run("matching-tenant-id", func(t *testing.T) {
for i := 0; i < tenantsCount; i++ {
@@ -981,7 +981,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -998,7 +998,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -1014,7 +1014,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
})
t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ {
@@ -1031,7 +1031,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -1053,7 +1053,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -1083,7 +1083,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -1104,7 +1104,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, br *blockResult) {
rowsCountTotal.Add(uint32(br.rowsLen))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@@ -1124,7 +1124,7 @@ func TestStorageSearch(t *testing.T) {
processBlock := func(_ uint, _ *blockResult) {
panic(fmt.Errorf("unexpected match"))
}
s.search(workersCount, so, nil, processBlock)
s.search(workersCount, &searchStats{}, so, nil, processBlock)
})
s.MustClose()