mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-19 16:53:32 +03:00
Compare commits
1 Commits
master
...
revendor-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6706c3bc9 |
2
go.mod
2
go.mod
@@ -7,7 +7,7 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
|
||||
github.com/VictoriaMetrics/easyproto v1.2.0
|
||||
github.com/VictoriaMetrics/fastcache v1.13.3
|
||||
github.com/VictoriaMetrics/metrics v1.43.2
|
||||
|
||||
4
go.sum
4
go.sum
@@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
|
||||
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0/go.mod h1:6ZZMQhZKDvUvkJw2rc+oDP90tMMzuU/J+5HG1ZmPOmE=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0 h1:2x1Tszv41PnCdSMumEtejz/On1RQ45kHQ+hhKT53sOk=
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0/go.mod h1:fQtmzaSUL+HJmHozeAKmnTJTOMBT+vBccv/VWQEwhUQ=
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3 h1:W5gA6Jo/kvi/LyAgmm1D5C1nTcCouKtzNy2pgRHpPoE=
|
||||
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3/go.mod h1:H4sDxcvk6OmC6zOt++IlDyrwfbn4F1eSLwMpR+kpRt8=
|
||||
github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7Wn2dg/6oIE=
|
||||
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
|
||||
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block.go
generated
vendored
@@ -150,7 +150,7 @@ func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
|
||||
putValuesEncoder(ve)
|
||||
ch.valuesSize = uint64(len(bb.B))
|
||||
if ch.valuesSize > maxValuesBlockSize {
|
||||
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
|
||||
logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
|
||||
}
|
||||
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
|
||||
bloomValuesWriter.values.MustWrite(bb.B)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/filter_string_range.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/filter_string_range.go
generated
vendored
@@ -6,7 +6,7 @@ import (
|
||||
|
||||
var maxStringRangeValue = string([]byte{255, 255, 255, 255})
|
||||
|
||||
// filterStringRange matches tie given string range [minValue..maxValue)
|
||||
// filterStringRange matches the given string range [minValue..maxValue)
|
||||
//
|
||||
// Note that the minValue is included in the range, while the maxValue isn't included in the range.
|
||||
// This simplifies querying distinct log sets with string_range(A, B), string_range(B, C), etc.
|
||||
|
||||
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/if_filter.go
generated
vendored
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/if_filter.go
generated
vendored
@@ -34,6 +34,9 @@ func parseIfFilter(lex *lexer) (*ifFilter, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'if' filter: %w", err)
|
||||
}
|
||||
if lex.isKeyword(";") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword(")") {
|
||||
return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token)
|
||||
}
|
||||
|
||||
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/indexdb.go
generated
vendored
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/indexdb.go
generated
vendored
@@ -7,6 +7,7 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@@ -105,7 +106,7 @@ func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
|
||||
s: s,
|
||||
}
|
||||
var isReadOnly atomic.Bool
|
||||
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
|
||||
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, time.Second, mergeTagToStreamIDsRows, &isReadOnly)
|
||||
return idb
|
||||
}
|
||||
|
||||
|
||||
106
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/parser.go
generated
vendored
106
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/parser.go
generated
vendored
@@ -226,6 +226,24 @@ func (lex *lexer) checkPrevAdjacentToken(tokens ...string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lex *lexer) isQueryPartTrailer() bool {
|
||||
return lex.isKeywordAny(queryPartTrailers)
|
||||
}
|
||||
|
||||
var queryPartTrailers = []string{
|
||||
// filters and pipes are delimited by |
|
||||
"|",
|
||||
|
||||
// query finish with )
|
||||
")",
|
||||
|
||||
// query finish with ;
|
||||
";",
|
||||
|
||||
// query finish with EOF
|
||||
"",
|
||||
}
|
||||
|
||||
func (lex *lexer) isKeyword(keywords ...string) bool {
|
||||
return lex.isKeywordAny(keywords)
|
||||
}
|
||||
@@ -883,10 +901,7 @@ func (q *Query) addTimeFilterNoSubqueries(start, end int64) {
|
||||
|
||||
q.f = addTimeFilter(q.f, start, end, q.opts.timeOffset)
|
||||
|
||||
// Initialize rate functions with the step calculated from _time:[start, end] filter.
|
||||
// This fixes the bug where rate_sum() doesn't divide by stepSeconds when
|
||||
// time filter is specified via HTTP params instead of LogsQL expression
|
||||
q.initStatsRateFuncsFromTimeFilter()
|
||||
q.initStatsRateFuncStepsNoSubqueries()
|
||||
}
|
||||
|
||||
func addTimeFilter(f filter, start, end, offset int64) filter {
|
||||
@@ -1176,8 +1191,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
|
||||
// add _time:step to by (...) list at stats pipes.
|
||||
q.addByTimeFieldToStatsPipes(step, offset)
|
||||
|
||||
// propagate the step into rate* funcs at stats pipes.
|
||||
q.initStatsRateFuncs(step)
|
||||
q.initStatsRateFuncStepsNoSubqueries()
|
||||
|
||||
// add 'partition by (_time)' to 'sort', 'first' and 'last' pipes.
|
||||
q.addPartitionByTime(step)
|
||||
@@ -1829,28 +1843,34 @@ func ParseQueryAtTimestamp(s string, timestamp int64) (*Query, error) {
|
||||
return nil, fmt.Errorf("unexpected unparsed tail after [%s]; context: [%s]; tail: [%s]", q, lex.context(), lex.rawToken+lex.s)
|
||||
}
|
||||
q.optimize()
|
||||
q.initStatsRateFuncsFromTimeFilter()
|
||||
q.initStatsRateFuncSteps()
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *Query) initStatsRateFuncsFromTimeFilter() {
|
||||
start, end := q.GetFilterTimeRange()
|
||||
if start != math.MinInt64 && end != math.MaxInt64 {
|
||||
step := end - start
|
||||
|
||||
// The increment of the step is needed in order to cover the
|
||||
// last nanosecond in the selected time range [start, end].
|
||||
step++
|
||||
|
||||
q.initStatsRateFuncs(step)
|
||||
}
|
||||
func (q *Query) initStatsRateFuncSteps() {
|
||||
q.visitSubqueries(func(q *Query) {
|
||||
q.initStatsRateFuncStepsNoSubqueries()
|
||||
})
|
||||
}
|
||||
|
||||
func (q *Query) initStatsRateFuncs(step int64) {
|
||||
func (q *Query) initStatsRateFuncStepsNoSubqueries() {
|
||||
start, end := q.GetFilterTimeRange()
|
||||
step := int64(0)
|
||||
if start != math.MinInt64 && end != math.MaxInt64 {
|
||||
step = end - start
|
||||
|
||||
// The HTTP layer already converted the exclusive end into end-1, and the _time
|
||||
// filter is inclusive ([start, end]). So (end - start) is 1ns short of the real
|
||||
// window, and step++ adds that 1ns back.
|
||||
step++
|
||||
}
|
||||
|
||||
for _, p := range q.pipes {
|
||||
if ps, ok := p.(*pipeStats); ok {
|
||||
ps.initRateFuncs(step)
|
||||
if !ps.initRateFuncsFromTimeBucket() {
|
||||
ps.initRateFuncs(step)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1925,6 +1945,11 @@ func parseQuery(lex *lexer) (*Query, error) {
|
||||
q.pipes = pipes
|
||||
}
|
||||
|
||||
// Skip optional trailing semicolon
|
||||
if lex.isKeyword(";") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
return &q, nil
|
||||
}
|
||||
|
||||
@@ -2089,7 +2114,7 @@ func parseQueryOptions(dstOpts *queryOptions, lex *lexer) error {
|
||||
}
|
||||
|
||||
func parseFilter(lex *lexer, allowPipeKeywords bool) (filter, error) {
|
||||
if lex.isKeyword("|", ")", "") {
|
||||
if lex.isQueryPartTrailer() {
|
||||
return nil, fmt.Errorf("missing query")
|
||||
}
|
||||
|
||||
@@ -2118,7 +2143,7 @@ func parseFilterOr(lex *lexer, fieldName string) (filter, error) {
|
||||
}
|
||||
filters = append(filters, f)
|
||||
switch {
|
||||
case lex.isKeyword("|", ")", ""):
|
||||
case lex.isQueryPartTrailer():
|
||||
if len(filters) == 1 {
|
||||
return filters[0], nil
|
||||
}
|
||||
@@ -2139,7 +2164,7 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
|
||||
}
|
||||
filters = append(filters, f)
|
||||
switch {
|
||||
case lex.isKeyword("or", "|", ")", ""):
|
||||
case lex.isKeyword("or") || lex.isQueryPartTrailer():
|
||||
if len(filters) == 1 {
|
||||
return filters[0], nil
|
||||
}
|
||||
@@ -2152,6 +2177,10 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
|
||||
}
|
||||
|
||||
func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
|
||||
if lex.isKeyword("") {
|
||||
return nil, fmt.Errorf("unexpected end of query after %q; expecting a filter", lex.prevRawToken)
|
||||
}
|
||||
|
||||
// Verify the previous adjacent token
|
||||
if lex.isKeyword("(") {
|
||||
if err := lex.checkPrevAdjacentToken("|", ":", "(", "!", "-", "not", "and", "or"); err != nil {
|
||||
@@ -2277,13 +2306,6 @@ func parseFilterPhrase(lex *lexer, fieldName string) (filter, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// The phrase is either a search phrase or a search prefix.
|
||||
if !lex.isSkippedSpace && lex.isKeyword("*") {
|
||||
// The phrase is a search prefix in the form `foo*`.
|
||||
lex.nextToken()
|
||||
return newFilterPrefix(fieldName, phrase), nil
|
||||
}
|
||||
|
||||
// The phrase is a search phrase.
|
||||
return newFilterPhrase(fieldName, phrase), nil
|
||||
}
|
||||
@@ -2330,7 +2352,7 @@ func parseAnyCaseFilter(lex *lexer, fieldName string) (filter, error) {
|
||||
})
|
||||
}
|
||||
|
||||
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefiFilter bool) (filter, error)) (filter, error) {
|
||||
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefixFilter bool) (filter, error)) (filter, error) {
|
||||
lexState := lex.backupState()
|
||||
|
||||
funcName := lex.token
|
||||
@@ -2543,19 +2565,19 @@ func tryParseIPv6CIDR(s string) ([16]byte, [16]byte, bool) {
|
||||
func parseFilterContainsAll(lex *lexer, fieldName string) (filter, error) {
|
||||
var fi filterContainsAll
|
||||
fg := newFilterGeneric(fieldName, &fi)
|
||||
return parseInValues(lex, fieldName, fg, &fi.values)
|
||||
return parseInValues(lex, fg, &fi.values)
|
||||
}
|
||||
|
||||
func parseFilterContainsAny(lex *lexer, fieldName string) (filter, error) {
|
||||
var fi filterContainsAny
|
||||
fg := newFilterGeneric(fieldName, &fi)
|
||||
return parseInValues(lex, fieldName, fg, &fi.values)
|
||||
return parseInValues(lex, fg, &fi.values)
|
||||
}
|
||||
|
||||
func parseFilterIn(lex *lexer, fieldName string) (filter, error) {
|
||||
var fi filterIn
|
||||
fg := newFilterGeneric(fieldName, &fi)
|
||||
return parseInValues(lex, fieldName, fg, &fi.values)
|
||||
return parseInValues(lex, fg, &fi.values)
|
||||
}
|
||||
|
||||
func parseFilterContainsCommonCase(lex *lexer, fieldName string) (filter, error) {
|
||||
@@ -2588,10 +2610,10 @@ func parseFilterEqualsCommonCase(lex *lexer, fieldName string) (filter, error) {
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
func parseInValues(lex *lexer, fieldName string, f filter, iv *inValues) (filter, error) {
|
||||
func parseInValues(lex *lexer, f filter, iv *inValues) (filter, error) {
|
||||
// Try parsing in(arg1, ..., argN) at first
|
||||
lexState := lex.backupState()
|
||||
fi, err := parseFuncArgsPossibleWildcard(lex, fieldName, func(args []string) (filter, error) {
|
||||
fi, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
|
||||
iv.values = args
|
||||
return f, nil
|
||||
})
|
||||
@@ -2691,7 +2713,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
|
||||
return parseFilterGeneric(lex, "*")
|
||||
}
|
||||
|
||||
if lex.isSkippedSpace || lex.isKeyword("", ")", "|") {
|
||||
if lex.isSkippedSpace || lex.isQueryPartTrailer() {
|
||||
// '*' or 'fieldName:*' filter
|
||||
return newFilterPrefix(fieldName, ""), nil
|
||||
}
|
||||
@@ -2706,7 +2728,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
|
||||
}
|
||||
lex.nextToken()
|
||||
|
||||
if !lex.isSkippedSpace && !lex.isKeyword("", ")", "|") {
|
||||
if !lex.isSkippedSpace && !lex.isQueryPartTrailer() {
|
||||
return nil, fmt.Errorf("missing whitespace between *%q* and %q", phrase, lex.token)
|
||||
}
|
||||
return newFilterSubstring(fieldName, phrase), nil
|
||||
@@ -2969,7 +2991,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(funcName string,
|
||||
return callback(funcName, args)
|
||||
}
|
||||
|
||||
func parseFuncArgsPossibleWildcard(lex *lexer, fieldName string, callback func(args []string) (filter, error)) (filter, error) {
|
||||
func parseFuncArgsPossibleWildcard(lex *lexer, callback func(args []string) (filter, error)) (filter, error) {
|
||||
funcName := lex.token
|
||||
lex.nextToken()
|
||||
|
||||
@@ -3004,7 +3026,7 @@ func parseArgsInParens(lex *lexer) ([]string, error) {
|
||||
}
|
||||
arg, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse arg")
|
||||
return nil, fmt.Errorf("cannot parse arg: %w", err)
|
||||
}
|
||||
args = append(args, arg)
|
||||
if lex.isKeyword(")") {
|
||||
@@ -3042,7 +3064,7 @@ func parseArgsInParensPossibleWildcard(lex *lexer) ([]string, bool, error) {
|
||||
} else {
|
||||
token, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("cannot parse arg")
|
||||
return nil, false, fmt.Errorf("cannot parse arg: %w", err)
|
||||
}
|
||||
arg = token
|
||||
}
|
||||
@@ -3676,7 +3698,7 @@ func parseFilterStreamIDIn(lex *lexer) (filter, error) {
|
||||
|
||||
// Try parsing in(arg1, ..., argN) at first
|
||||
lexState := lex.backupState()
|
||||
fs, err := parseFuncArgsPossibleWildcard(lex, "_stream_id", func(args []string) (filter, error) {
|
||||
fs, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
|
||||
streamIDs := make([]streamID, len(args))
|
||||
for i, arg := range args {
|
||||
if !streamIDs[i].tryUnmarshalFromString(arg) {
|
||||
|
||||
10
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe.go
generated
vendored
10
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe.go
generated
vendored
@@ -122,12 +122,13 @@ func parsePipes(lex *lexer) ([]pipe, error) {
|
||||
pipes = append(pipes, p)
|
||||
|
||||
switch {
|
||||
case lex.isKeyword("|"):
|
||||
case lex.isQueryPartTrailer():
|
||||
if !lex.isKeyword("|") {
|
||||
return pipes, nil
|
||||
}
|
||||
lex.nextToken()
|
||||
case lex.isKeyword(")", ""):
|
||||
return pipes, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
|
||||
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|', ';' or ')'", pipes[len(pipes)-1], lex.token)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -178,6 +179,7 @@ func initPipeParsers() {
|
||||
pipeParsers = map[string]pipeParseFunc{
|
||||
"block_stats": parsePipeBlockStats,
|
||||
"blocks_count": parsePipeBlocksCount,
|
||||
"coalesce": parsePipeCoalesce,
|
||||
"collapse_nums": parsePipeCollapseNums,
|
||||
"copy": parsePipeCopy,
|
||||
"cp": parsePipeCopy,
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_blocks_count.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_blocks_count.go
generated
vendored
@@ -133,7 +133,7 @@ func parsePipeBlocksCount(lex *lexer) (pipe, error) {
|
||||
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
|
||||
}
|
||||
resultName = name
|
||||
} else if !lex.isKeyword("", "|") {
|
||||
} else if !lex.isQueryPartTrailer() {
|
||||
name, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
|
||||
|
||||
204
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_coalesce.go
generated
vendored
Normal file
204
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_coalesce.go
generated
vendored
Normal file
@@ -0,0 +1,204 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
|
||||
)
|
||||
|
||||
// pipeCoalesce implements '| coalesce (...) as ...' pipe.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe
|
||||
type pipeCoalesce struct {
|
||||
srcFieldFilters []string
|
||||
dstField string
|
||||
defaultValue string
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) String() string {
|
||||
if len(pc.srcFieldFilters) == 0 {
|
||||
logger.Panicf("BUG: pipeCoalesce must contain at least one srcField")
|
||||
}
|
||||
|
||||
s := "coalesce(" + fieldNamesString(pc.srcFieldFilters) + ")"
|
||||
if pc.defaultValue != "" {
|
||||
s += " default " + quoteTokenIfNeeded(pc.defaultValue)
|
||||
}
|
||||
if pc.dstField != "_msg" {
|
||||
s += " as " + quoteTokenIfNeeded(pc.dstField)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
|
||||
return pc, nil
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) canLiveTail() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) canReturnLastNResults() bool {
|
||||
return pc.dstField != "_time"
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) isFixedOutputFieldsOrder() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) updateNeededFields(pf *prefixfilter.Filter) {
|
||||
if pf.MatchString(pc.dstField) {
|
||||
pf.AddDenyFilter(pc.dstField)
|
||||
pf.AddAllowFilters(pc.srcFieldFilters)
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) hasFilterInWithQuery() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
|
||||
return pc, nil
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) visitSubqueries(_ func(q *Query)) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
func (pc *pipeCoalesce) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||
return &pipeCoalesceProcessor{
|
||||
pc: pc,
|
||||
ppNext: ppNext,
|
||||
}
|
||||
}
|
||||
|
||||
// pipeCoalesceProcessor processes the coalesce pipe
|
||||
type pipeCoalesceProcessor struct {
|
||||
pc *pipeCoalesce
|
||||
ppNext pipeProcessor
|
||||
|
||||
shards atomicutil.Slice[pipeCoalesceProcessorShard]
|
||||
}
|
||||
|
||||
type pipeCoalesceProcessorShard struct {
|
||||
rc resultColumn
|
||||
|
||||
cs []*blockResultColumn
|
||||
}
|
||||
|
||||
func (pcp *pipeCoalesceProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||
if br.rowsLen == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
shard := pcp.shards.Get(workerID)
|
||||
pc := pcp.pc
|
||||
|
||||
// Initialize shard.cs
|
||||
cs := br.getColumns()
|
||||
for _, ff := range pc.srcFieldFilters {
|
||||
if !prefixfilter.IsWildcardFilter(ff) {
|
||||
c := br.getColumnByName(ff)
|
||||
shard.addColumn(c)
|
||||
continue
|
||||
}
|
||||
for _, c := range cs {
|
||||
if prefixfilter.MatchFilter(ff, c.name) {
|
||||
shard.addColumn(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fill the shard.rc
|
||||
for rowIdx := range br.rowsLen {
|
||||
value := ""
|
||||
for _, c := range shard.cs {
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
if v != "" {
|
||||
value = v
|
||||
break
|
||||
}
|
||||
}
|
||||
if value == "" {
|
||||
value = pc.defaultValue
|
||||
}
|
||||
|
||||
shard.rc.addValue(value)
|
||||
}
|
||||
|
||||
shard.rc.name = pc.dstField
|
||||
br.addResultColumn(shard.rc)
|
||||
pcp.ppNext.writeBlock(workerID, br)
|
||||
|
||||
shard.rc.reset()
|
||||
|
||||
clear(shard.cs)
|
||||
shard.cs = shard.cs[:0]
|
||||
}
|
||||
|
||||
func (shard *pipeCoalesceProcessorShard) addColumn(c *blockResultColumn) {
|
||||
// verify whether the given column already exists in shard.cs
|
||||
for _, col := range shard.cs {
|
||||
if col.name == c.name {
|
||||
// Nothing to add - the column already exists
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Add the column to cs.
|
||||
shard.cs = append(shard.cs, c)
|
||||
}
|
||||
|
||||
func (pcp *pipeCoalesceProcessor) flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// parsePipeCoalesce parses '| coalesce(field1, field2, field3) default "default value" as result_field'
|
||||
func parsePipeCoalesce(lex *lexer) (pipe, error) {
|
||||
if !lex.isKeyword("coalesce") {
|
||||
return nil, fmt.Errorf("expecting 'coalesce'; got %q", lex.token)
|
||||
}
|
||||
lex.nextToken()
|
||||
|
||||
srcFieldFilters, err := parseFieldFiltersInParens(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse field names: %w", err)
|
||||
}
|
||||
|
||||
if len(srcFieldFilters) == 0 {
|
||||
return nil, fmt.Errorf("coalesce requires at least one field name")
|
||||
}
|
||||
|
||||
// Parse optional 'default' keyword and value
|
||||
defaultValue := ""
|
||||
if lex.isKeyword("default") {
|
||||
lex.nextToken()
|
||||
v, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse default value: %w", err)
|
||||
}
|
||||
defaultValue = v
|
||||
}
|
||||
|
||||
// Parse 'as' token
|
||||
dstField := "_msg"
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
v, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field name: %w", err)
|
||||
}
|
||||
dstField = v
|
||||
}
|
||||
|
||||
pc := &pipeCoalesce{
|
||||
srcFieldFilters: srcFieldFilters,
|
||||
dstField: dstField,
|
||||
defaultValue: defaultValue,
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
}
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_copy.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_copy.go
generated
vendored
@@ -126,7 +126,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
|
||||
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
|
||||
|
||||
switch {
|
||||
case lex.isKeyword("|", ")", ""):
|
||||
case lex.isQueryPartTrailer():
|
||||
pc := &pipeCopy{
|
||||
srcFieldFilters: srcFieldFilters,
|
||||
dstFieldFilters: dstFieldFilters,
|
||||
@@ -134,7 +134,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
|
||||
return pc, nil
|
||||
case lex.isKeyword(","):
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|' or ')'", lex.token)
|
||||
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|', ';' or ')'", lex.token)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_decolorize.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_decolorize.go
generated
vendored
@@ -72,7 +72,7 @@ func parsePipeDecolorize(lex *lexer) (pipe, error) {
|
||||
lex.nextToken()
|
||||
|
||||
field := "_msg"
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
f, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse field name after 'decolorize': %w", err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_field_names.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_field_names.go
generated
vendored
@@ -264,7 +264,7 @@ func parsePipeFieldNames(lex *lexer) (pipe, error) {
|
||||
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
|
||||
}
|
||||
resultName = name
|
||||
} else if !lex.isKeyword("", "|") {
|
||||
} else if !lex.isQueryPartTrailer() {
|
||||
name, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_hash.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_hash.go
generated
vendored
@@ -173,7 +173,7 @@ func parsePipeHash(lex *lexer) (pipe, error) {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
field, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field after 'hash(%s)': %w", quoteTokenIfNeeded(fieldName), err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_json_array_len.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_json_array_len.go
generated
vendored
@@ -182,7 +182,7 @@ func parsePipeJSONArrayLen(lex *lexer) (pipe, error) {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
field, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)
|
||||
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_last.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_last.go
generated
vendored
@@ -102,7 +102,7 @@ func parsePipeLast(lex *lexer) (pipe, error) {
|
||||
func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
|
||||
var ps pipeSort
|
||||
ps.limit = 1
|
||||
if !lex.isKeyword("by", "partition", "rank", "(", "|", ")", "") {
|
||||
if !lex.isKeyword("by", "partition", "rank", "(") && !lex.isQueryPartTrailer() {
|
||||
s, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse number: %w", err)
|
||||
@@ -144,7 +144,7 @@ func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
|
||||
if lex.isKeyword("rank") {
|
||||
rankFieldName, err := parseRankFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read rank field name: %s", err)
|
||||
return nil, fmt.Errorf("cannot read rank field name: %w", err)
|
||||
}
|
||||
ps.rankFieldName = rankFieldName
|
||||
}
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_len.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_len.go
generated
vendored
@@ -165,7 +165,7 @@ func parsePipeLen(lex *lexer) (pipe, error) {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
field, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_limit.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_limit.go
generated
vendored
@@ -113,7 +113,7 @@ func parsePipeLimit(lex *lexer) (pipe, error) {
|
||||
lex.nextToken()
|
||||
|
||||
limit := uint64(10)
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
limitStr, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse rows limit: %w", err)
|
||||
|
||||
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_math.go
generated
vendored
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_math.go
generated
vendored
@@ -460,7 +460,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
|
||||
switch {
|
||||
case lex.isKeyword(","):
|
||||
lex.nextToken()
|
||||
case lex.isKeyword("|", ")", ""):
|
||||
case lex.isQueryPartTrailer():
|
||||
if len(mes) == 0 {
|
||||
return nil, fmt.Errorf("missing 'math' expressions")
|
||||
}
|
||||
@@ -469,7 +469,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
|
||||
}
|
||||
return pm, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
|
||||
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|', ';' or ')'", mes[len(mes)-1], lex.token)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -481,7 +481,7 @@ func parseMathEntry(lex *lexer) (*mathEntry, error) {
|
||||
}
|
||||
|
||||
resultField := ""
|
||||
if lex.isKeyword(",", "|", ")", "") {
|
||||
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
|
||||
resultField = me.String()
|
||||
} else {
|
||||
if lex.isKeyword("as") {
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_pack_json.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_pack_json.go
generated
vendored
@@ -88,7 +88,7 @@ func parsePipePackJSON(lex *lexer) (pipe, error) {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
field, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_pack_logfmt.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_pack_logfmt.go
generated
vendored
@@ -88,7 +88,7 @@ func parsePipePackLogfmt(lex *lexer) (pipe, error) {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
if !lex.isKeyword("|", ")", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
field, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_rename.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_rename.go
generated
vendored
@@ -135,7 +135,7 @@ func parsePipeRename(lex *lexer) (pipe, error) {
|
||||
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
|
||||
|
||||
switch {
|
||||
case lex.isKeyword("|", ")", ""):
|
||||
case lex.isQueryPartTrailer():
|
||||
pr := &pipeRename{
|
||||
srcFieldFilters: srcFieldFilters,
|
||||
dstFieldFilters: dstFieldFilters,
|
||||
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_running_stats.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_running_stats.go
generated
vendored
@@ -415,7 +415,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
|
||||
f.f = sf
|
||||
|
||||
resultName := ""
|
||||
if lex.isKeyword(",", "|", ")", "") {
|
||||
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
|
||||
resultName = sf.String()
|
||||
} else {
|
||||
if lex.isKeyword("as") {
|
||||
@@ -435,7 +435,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
|
||||
|
||||
funcs = append(funcs, f)
|
||||
|
||||
if lex.isKeyword("|", ")", "") {
|
||||
if lex.isQueryPartTrailer() {
|
||||
ps.funcs = funcs
|
||||
return &ps, nil
|
||||
}
|
||||
|
||||
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_sort.go
generated
vendored
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_sort.go
generated
vendored
@@ -836,7 +836,7 @@ func parsePipeSort(lex *lexer) (pipe, error) {
|
||||
case lex.isKeyword("rank"):
|
||||
rankFieldName, err := parseRankFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read rank field name: %s", err)
|
||||
return nil, fmt.Errorf("cannot read rank field name: %w", err)
|
||||
}
|
||||
ps.rankFieldName = rankFieldName
|
||||
case lex.isKeyword("partition"):
|
||||
@@ -930,7 +930,7 @@ func parseLimit(lex *lexer) (uint64, error) {
|
||||
|
||||
limitStr, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse 'limit': %s", err)
|
||||
return 0, fmt.Errorf("cannot parse 'limit': %w", err)
|
||||
}
|
||||
|
||||
n, ok := tryParseUint64(limitStr)
|
||||
@@ -949,7 +949,7 @@ func parseOffset(lex *lexer) (uint64, error) {
|
||||
|
||||
limitStr, err := lex.nextCompoundToken()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse 'offset': %s", err)
|
||||
return 0, fmt.Errorf("cannot parse 'offset': %w", err)
|
||||
}
|
||||
|
||||
n, ok := tryParseUint64(limitStr)
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_sort_topk.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_sort_topk.go
generated
vendored
@@ -334,7 +334,7 @@ func (shard *pipeTopkProcessorShard) getRowsByPartition(partition string) *pipeT
|
||||
}
|
||||
partition = strings.Clone(partition)
|
||||
shard.rowsByPartition[partition] = rs
|
||||
shard.stateSizeBudget += int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
|
||||
shard.stateSizeBudget -= int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_split.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_split.go
generated
vendored
@@ -142,7 +142,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
srcField := "_msg"
|
||||
if !lex.isKeyword("as", ")", "|", "") {
|
||||
if !lex.isKeyword("as") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
}
|
||||
@@ -154,7 +154,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
dstField := srcField
|
||||
if !lex.isKeyword(")", "|", "") {
|
||||
if !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
16
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_stats.go
generated
vendored
16
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_stats.go
generated
vendored
@@ -415,6 +415,16 @@ func (ps *pipeStats) initRateFuncs(step int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *pipeStats) initRateFuncsFromTimeBucket() bool {
|
||||
for _, bf := range ps.byFields {
|
||||
if bf.name == "_time" && bf.bucketSize > 0 {
|
||||
ps.initRateFuncs(int64(bf.bucketSize))
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
const stateSizeBudgetChunk = 1 << 20
|
||||
|
||||
func (ps *pipeStats) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
|
||||
@@ -1389,11 +1399,11 @@ func parsePipeStatsExt(lex *lexer, needStatsKeyword bool) (pipe, error) {
|
||||
}
|
||||
ps.entries = append(ps.entries, e)
|
||||
|
||||
if lex.isKeyword("|", ")", "") {
|
||||
if lex.isQueryPartTrailer() {
|
||||
break
|
||||
}
|
||||
if !lex.isKeyword(",") {
|
||||
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", lex.token, e)
|
||||
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|', ';' or ')'", lex.token, e)
|
||||
}
|
||||
lex.nextToken()
|
||||
}
|
||||
@@ -1442,7 +1452,7 @@ func parseStatsEntry(lex *lexer) (pipeStatsEntry, error) {
|
||||
}
|
||||
|
||||
resultName := ""
|
||||
if lex.isKeyword(",", "|", ")", "") {
|
||||
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
|
||||
resultName = sf.String()
|
||||
if iff != nil {
|
||||
resultName += " " + iff.String()
|
||||
|
||||
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_top.go
generated
vendored
6
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_top.go
generated
vendored
@@ -620,7 +620,7 @@ func parsePipeTop(lex *lexer) (pipe, error) {
|
||||
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
|
||||
}
|
||||
byFields = bfs
|
||||
} else if !lex.isKeyword("hits", "rank", ")", "|", "") {
|
||||
} else if !lex.isKeyword("hits", "rank") && !lex.isQueryPartTrailer() {
|
||||
bfs, err := parseCommaSeparatedFields(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)
|
||||
@@ -674,11 +674,11 @@ func parseRankFieldName(lex *lexer) (string, error) {
|
||||
rankFieldName := "rank"
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
if lex.isKeyword("", "|", ")", "(") {
|
||||
if lex.isKeyword("(") || lex.isQueryPartTrailer() {
|
||||
return "", fmt.Errorf("missing rank name")
|
||||
}
|
||||
}
|
||||
if !lex.isKeyword("", "|", ")", "limit") {
|
||||
if !lex.isKeyword("limit") && !lex.isQueryPartTrailer() {
|
||||
s, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_uniq.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_uniq.go
generated
vendored
@@ -550,7 +550,7 @@ func parsePipeUniq(lex *lexer) (pipe, error) {
|
||||
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
|
||||
}
|
||||
byFields = bfs
|
||||
} else if !lex.isKeyword("filter", "with", "hits", "limit", ")", "|", "") {
|
||||
} else if !lex.isKeyword("filter", "with", "hits", "limit") && !lex.isQueryPartTrailer() {
|
||||
bfs, err := parseCommaSeparatedFields(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)
|
||||
|
||||
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_json.go
generated
vendored
3
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_json.go
generated
vendored
@@ -100,6 +100,7 @@ func (pu *pipeUnpackJSON) visitSubqueries(visitFunc func(q *Query)) {
|
||||
|
||||
func (pu *pipeUnpackJSON) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||
unpackJSON := func(uctx *fieldsUnpackerContext, s string) {
|
||||
s = trimJSONWhitespace(s)
|
||||
if len(s) == 0 || s[0] != '{' {
|
||||
// This isn't a JSON object
|
||||
return
|
||||
@@ -159,7 +160,7 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
fromField := "_msg"
|
||||
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
|
||||
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_logfmt.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_logfmt.go
generated
vendored
@@ -142,7 +142,7 @@ func parsePipeUnpackLogfmt(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
fromField := "_msg"
|
||||
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
|
||||
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_syslog.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_syslog.go
generated
vendored
@@ -2,6 +2,7 @@ package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -91,6 +92,7 @@ func (pu *pipeUnpackSyslog) newPipeProcessor(_ int, _ <-chan struct{}, _ func(),
|
||||
year := currentYear.Load()
|
||||
p := GetSyslogParser(int(year), pu.offsetTimezone)
|
||||
|
||||
s = strings.TrimLeft(s, " \t\n\r")
|
||||
p.Parse(s)
|
||||
for _, f := range p.Fields {
|
||||
uctx.addField(f.Name, f.Value)
|
||||
@@ -135,7 +137,7 @@ func parsePipeUnpackSyslog(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
fromField := "_msg"
|
||||
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields", ")", "|", "") {
|
||||
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_words.go
generated
vendored
4
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unpack_words.go
generated
vendored
@@ -139,7 +139,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
|
||||
lex.nextToken()
|
||||
|
||||
srcField := "_msg"
|
||||
if !lex.isKeyword("drop_duplicates", "as", ")", "|", "") {
|
||||
if !lex.isKeyword("drop_duplicates", "as") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
}
|
||||
@@ -151,7 +151,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
dstField := srcField
|
||||
if !lex.isKeyword("drop_duplicates", ")", "|", "") {
|
||||
if !lex.isKeyword("drop_duplicates") && !lex.isQueryPartTrailer() {
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
|
||||
1
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unroll.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/pipe_unroll.go
generated
vendored
@@ -256,6 +256,7 @@ func parsePipeUnroll(lex *lexer) (pipe, error) {
|
||||
}
|
||||
|
||||
func unpackJSONArray(dst []string, a *arena, s string) []string {
|
||||
s = trimJSONWhitespace(s)
|
||||
if s == "" || s[0] != '[' {
|
||||
return dst
|
||||
}
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/running_stats_last.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/running_stats_last.go
generated
vendored
@@ -67,7 +67,7 @@ func parseRunningStatsLast(lex *lexer) (runningStatsFunc, error) {
|
||||
return nil, err
|
||||
}
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("unexpeccted number of args for the last() function; got %d; want 1; args: %q", len(args), args)
|
||||
return nil, fmt.Errorf("unexpected number of args for the last() function; got %d; want 1; args: %q", len(args), args)
|
||||
}
|
||||
|
||||
fieldName := args[0]
|
||||
|
||||
14
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/storage.go
generated
vendored
14
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/storage.go
generated
vendored
@@ -170,7 +170,7 @@ type Storage struct {
|
||||
// partitions are sorted by time, e.g. partitions[0] has the smallest time.
|
||||
partitions []*partitionWrapper
|
||||
|
||||
// ptwHot is the "hot" partition, were the last rows were ingested.
|
||||
// ptwHot is the "hot" partition, where the last rows were ingested.
|
||||
//
|
||||
// It must be accessed under partitionsLock.
|
||||
ptwHot *partitionWrapper
|
||||
@@ -197,7 +197,7 @@ type Storage struct {
|
||||
// the check whether the given stream is already registered in the persistent storage.
|
||||
streamIDCache *cache
|
||||
|
||||
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
|
||||
// filterStreamCache caches streamIDs keyed by (partition, []TenantID, StreamFilter).
|
||||
//
|
||||
// It reduces the load on persistent storage during querying by _stream:{...} filter.
|
||||
filterStreamCache *cache
|
||||
@@ -235,7 +235,7 @@ func (s *Storage) PartitionAttach(name string) error {
|
||||
// Verify whether the given partition already exists in the attached partitions list.
|
||||
for _, ptw := range s.partitions {
|
||||
if ptw.pt.name == name {
|
||||
return fmt.Errorf("cannot attach the partition %q, because it is arleady attached", name)
|
||||
return fmt.Errorf("cannot attach the partition %q, because it is already attached", name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,7 +381,7 @@ func getSnapshotPaths(ptws []*partitionWrapper) []string {
|
||||
func (s *Storage) PartitionSnapshotDelete(snapshotPath string) error {
|
||||
snapshotName := filepath.Base(snapshotPath)
|
||||
if err := snapshotutil.Validate(snapshotName); err != nil {
|
||||
return fmt.Errorf("unsupported snapshot name %q at %q: %s", snapshotName, snapshotPath, err)
|
||||
return fmt.Errorf("unsupported snapshot name %q at %q: %w", snapshotName, snapshotPath, err)
|
||||
}
|
||||
|
||||
snapshotDir := filepath.Dir(snapshotPath)
|
||||
@@ -442,7 +442,7 @@ func (s *Storage) MustDeleteStalePartitionSnapshots(maxAge time.Duration) []stri
|
||||
|
||||
// DeleteRunTask starts deletion of logs according to the given filter f for the given tenantIDs.
|
||||
//
|
||||
// The taskID must contain an unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
|
||||
// The taskID must contain a unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
|
||||
// The timestamp must contain the timestamp in seconds when the task is started.
|
||||
func (s *Storage) DeleteRunTask(_ context.Context, taskID string, timestamp int64, tenantIDs []TenantID, f *Filter) error {
|
||||
// Register the task in the list of active delete tasks, so it survives application restarts and crashes.
|
||||
@@ -968,7 +968,7 @@ func (s *Storage) watchDeleteTasks() {
|
||||
|
||||
s.deleteTasks = s.deleteTasks[1:]
|
||||
if !ok {
|
||||
// The delete task coudn't be completed now. Try it later.
|
||||
// The delete task couldn't be completed now. Try it later.
|
||||
s.deleteTasks = append(s.deleteTasks, dt)
|
||||
}
|
||||
s.mustSaveDeleteTasksLocked()
|
||||
@@ -1031,7 +1031,7 @@ func (s *Storage) processDeleteTask(ctx context.Context, dt *DeleteTask) bool {
|
||||
}
|
||||
|
||||
// The task couldn't be processed at the moment
|
||||
logger.Warnf("cannot proceeed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
|
||||
logger.Warnf("cannot proceed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
13
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/storage_search.go
generated
vendored
13
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/storage_search.go
generated
vendored
@@ -23,7 +23,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
|
||||
)
|
||||
|
||||
// QueryContext is used for execting the query passed to NewQueryContext()
|
||||
// QueryContext is used for executing the query passed to NewQueryContext().
|
||||
type QueryContext struct {
|
||||
// Context is the context for executing the Query.
|
||||
Context context.Context
|
||||
@@ -43,12 +43,12 @@ type QueryContext struct {
|
||||
// HiddenFieldsFilters is an optional list of field filters, which must be hidden during query execution.
|
||||
//
|
||||
// The list may contain full field names and field prefixes ending with *.
|
||||
// Prefix match all the fields starting with the given prefix.
|
||||
// Prefixes match all the fields starting with the given prefix.
|
||||
HiddenFieldsFilters []string
|
||||
|
||||
// startTime is creation time for the QueryContext.
|
||||
//
|
||||
// It is used for calculating query druation.
|
||||
// It is used for calculating query duration.
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
@@ -128,13 +128,14 @@ type storageSearchOptions struct {
|
||||
// hiddenFieldsFilter is the filter of fields, which must be hidden during query
|
||||
hiddenFieldsFilter *prefixfilter.Filter
|
||||
|
||||
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected the _time values before these values are passed to query pipes.
|
||||
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected _time values
|
||||
// before these values are passed to query pipes.
|
||||
timeOffset int64
|
||||
}
|
||||
|
||||
// partitionSearchOptions is search options for the partition.
|
||||
//
|
||||
// this struct must be created via partition.getSearchOptions() call.
|
||||
// This struct must be created via partition.getSearchOptions() call.
|
||||
type partitionSearchOptions struct {
|
||||
// Optional sorted list of tenantIDs for the search.
|
||||
// If it is empty, then the search is performed by streamIDs
|
||||
@@ -1126,7 +1127,7 @@ func (db *DataBlock) RowsCount() int {
|
||||
|
||||
// GetColumns returns columns from db.
|
||||
//
|
||||
// If needSortSolumns is set, then the returned columns are sorted in alphabetical order
|
||||
// If needSortColumns is set, then the returned columns are sorted in alphabetical order.
|
||||
func (db *DataBlock) GetColumns(needSortColumns bool) []BlockColumn {
|
||||
if needSortColumns {
|
||||
sort.Slice(db.columns, func(i, j int) bool {
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/tenant_id.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/tenant_id.go
generated
vendored
@@ -135,7 +135,7 @@ func MarshalTenantIDsToJSON(tenantIDs []TenantID) []byte {
|
||||
func UnmarshalTenantIDsFromJSON(src []byte) ([]TenantID, error) {
|
||||
var tenantIDs []TenantID
|
||||
if err := json.Unmarshal(src, &tenantIDs); err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %s", err)
|
||||
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %w", err)
|
||||
}
|
||||
return tenantIDs, nil
|
||||
}
|
||||
|
||||
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
@@ -132,8 +132,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric
|
||||
# github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0
|
||||
## explicit; go 1.24.0
|
||||
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping
|
||||
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
|
||||
## explicit; go 1.26.2
|
||||
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
|
||||
## explicit; go 1.26.4
|
||||
github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage
|
||||
github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter
|
||||
# github.com/VictoriaMetrics/easyproto v1.2.0
|
||||
|
||||
Reference in New Issue
Block a user