Compare commits

...

1 Commits

Author SHA1 Message Date
func25
d6706c3bc9 update 2026-06-16 21:51:28 +07:00
39 changed files with 353 additions and 106 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.43.2

4
go.sum
View File

@@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0/go.mod h1:6ZZMQhZKDvUvkJw2rc+oDP90tMMzuU/J+5HG1ZmPOmE=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0 h1:2x1Tszv41PnCdSMumEtejz/On1RQ45kHQ+hhKT53sOk=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0/go.mod h1:fQtmzaSUL+HJmHozeAKmnTJTOMBT+vBccv/VWQEwhUQ=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3 h1:W5gA6Jo/kvi/LyAgmm1D5C1nTcCouKtzNy2pgRHpPoE=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3/go.mod h1:H4sDxcvk6OmC6zOt++IlDyrwfbn4F1eSLwMpR+kpRt8=
github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7Wn2dg/6oIE=
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=

View File

@@ -150,7 +150,7 @@ func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
putValuesEncoder(ve)
ch.valuesSize = uint64(len(bb.B))
if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
}
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
bloomValuesWriter.values.MustWrite(bb.B)

View File

@@ -6,7 +6,7 @@ import (
var maxStringRangeValue = string([]byte{255, 255, 255, 255})
// filterStringRange matches tie given string range [minValue..maxValue)
// filterStringRange matches the given string range [minValue..maxValue)
//
// Note that the minValue is included in the range, while the maxValue isn't included in the range.
// This simplifies querying distinct log sets with string_range(A, B), string_range(B, C), etc.

View File

@@ -34,6 +34,9 @@ func parseIfFilter(lex *lexer) (*ifFilter, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter: %w", err)
}
if lex.isKeyword(";") {
lex.nextToken()
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token)
}

View File

@@ -7,6 +7,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -105,7 +106,7 @@ func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
s: s,
}
var isReadOnly atomic.Bool
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, time.Second, mergeTagToStreamIDsRows, &isReadOnly)
return idb
}

View File

@@ -226,6 +226,24 @@ func (lex *lexer) checkPrevAdjacentToken(tokens ...string) error {
return nil
}
func (lex *lexer) isQueryPartTrailer() bool {
return lex.isKeywordAny(queryPartTrailers)
}
var queryPartTrailers = []string{
// filters and pipes are delimited by |
"|",
// query finish with )
")",
// query finish with ;
";",
// query finish with EOF
"",
}
func (lex *lexer) isKeyword(keywords ...string) bool {
return lex.isKeywordAny(keywords)
}
@@ -883,10 +901,7 @@ func (q *Query) addTimeFilterNoSubqueries(start, end int64) {
q.f = addTimeFilter(q.f, start, end, q.opts.timeOffset)
// Initialize rate functions with the step calculated from _time:[start, end] filter.
// This fixes the bug where rate_sum() doesn't divide by stepSeconds when
// time filter is specified via HTTP params instead of LogsQL expression
q.initStatsRateFuncsFromTimeFilter()
q.initStatsRateFuncStepsNoSubqueries()
}
func addTimeFilter(f filter, start, end, offset int64) filter {
@@ -1176,8 +1191,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
// add _time:step to by (...) list at stats pipes.
q.addByTimeFieldToStatsPipes(step, offset)
// propagate the step into rate* funcs at stats pipes.
q.initStatsRateFuncs(step)
q.initStatsRateFuncStepsNoSubqueries()
// add 'partition by (_time)' to 'sort', 'first' and 'last' pipes.
q.addPartitionByTime(step)
@@ -1829,28 +1843,34 @@ func ParseQueryAtTimestamp(s string, timestamp int64) (*Query, error) {
return nil, fmt.Errorf("unexpected unparsed tail after [%s]; context: [%s]; tail: [%s]", q, lex.context(), lex.rawToken+lex.s)
}
q.optimize()
q.initStatsRateFuncsFromTimeFilter()
q.initStatsRateFuncSteps()
return q, nil
}
func (q *Query) initStatsRateFuncsFromTimeFilter() {
start, end := q.GetFilterTimeRange()
if start != math.MinInt64 && end != math.MaxInt64 {
step := end - start
// The increment of the step is needed in order to cover the
// last nanosecond in the selected time range [start, end].
step++
q.initStatsRateFuncs(step)
}
func (q *Query) initStatsRateFuncSteps() {
q.visitSubqueries(func(q *Query) {
q.initStatsRateFuncStepsNoSubqueries()
})
}
func (q *Query) initStatsRateFuncs(step int64) {
func (q *Query) initStatsRateFuncStepsNoSubqueries() {
start, end := q.GetFilterTimeRange()
step := int64(0)
if start != math.MinInt64 && end != math.MaxInt64 {
step = end - start
// The HTTP layer already converted the exclusive end into end-1, and the _time
// filter is inclusive ([start, end]). So (end - start) is 1ns short of the real
// window, and step++ adds that 1ns back.
step++
}
for _, p := range q.pipes {
if ps, ok := p.(*pipeStats); ok {
ps.initRateFuncs(step)
if !ps.initRateFuncsFromTimeBucket() {
ps.initRateFuncs(step)
}
}
}
}
@@ -1925,6 +1945,11 @@ func parseQuery(lex *lexer) (*Query, error) {
q.pipes = pipes
}
// Skip optional trailing semicolon
if lex.isKeyword(";") {
lex.nextToken()
}
return &q, nil
}
@@ -2089,7 +2114,7 @@ func parseQueryOptions(dstOpts *queryOptions, lex *lexer) error {
}
func parseFilter(lex *lexer, allowPipeKeywords bool) (filter, error) {
if lex.isKeyword("|", ")", "") {
if lex.isQueryPartTrailer() {
return nil, fmt.Errorf("missing query")
}
@@ -2118,7 +2143,7 @@ func parseFilterOr(lex *lexer, fieldName string) (filter, error) {
}
filters = append(filters, f)
switch {
case lex.isKeyword("|", ")", ""):
case lex.isQueryPartTrailer():
if len(filters) == 1 {
return filters[0], nil
}
@@ -2139,7 +2164,7 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
}
filters = append(filters, f)
switch {
case lex.isKeyword("or", "|", ")", ""):
case lex.isKeyword("or") || lex.isQueryPartTrailer():
if len(filters) == 1 {
return filters[0], nil
}
@@ -2152,6 +2177,10 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
}
func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
if lex.isKeyword("") {
return nil, fmt.Errorf("unexpected end of query after %q; expecting a filter", lex.prevRawToken)
}
// Verify the previous adjacent token
if lex.isKeyword("(") {
if err := lex.checkPrevAdjacentToken("|", ":", "(", "!", "-", "not", "and", "or"); err != nil {
@@ -2277,13 +2306,6 @@ func parseFilterPhrase(lex *lexer, fieldName string) (filter, error) {
}
}
// The phrase is either a search phrase or a search prefix.
if !lex.isSkippedSpace && lex.isKeyword("*") {
// The phrase is a search prefix in the form `foo*`.
lex.nextToken()
return newFilterPrefix(fieldName, phrase), nil
}
// The phrase is a search phrase.
return newFilterPhrase(fieldName, phrase), nil
}
@@ -2330,7 +2352,7 @@ func parseAnyCaseFilter(lex *lexer, fieldName string) (filter, error) {
})
}
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefiFilter bool) (filter, error)) (filter, error) {
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefixFilter bool) (filter, error)) (filter, error) {
lexState := lex.backupState()
funcName := lex.token
@@ -2543,19 +2565,19 @@ func tryParseIPv6CIDR(s string) ([16]byte, [16]byte, bool) {
func parseFilterContainsAll(lex *lexer, fieldName string) (filter, error) {
var fi filterContainsAll
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fieldName, fg, &fi.values)
return parseInValues(lex, fg, &fi.values)
}
func parseFilterContainsAny(lex *lexer, fieldName string) (filter, error) {
var fi filterContainsAny
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fieldName, fg, &fi.values)
return parseInValues(lex, fg, &fi.values)
}
func parseFilterIn(lex *lexer, fieldName string) (filter, error) {
var fi filterIn
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fieldName, fg, &fi.values)
return parseInValues(lex, fg, &fi.values)
}
func parseFilterContainsCommonCase(lex *lexer, fieldName string) (filter, error) {
@@ -2588,10 +2610,10 @@ func parseFilterEqualsCommonCase(lex *lexer, fieldName string) (filter, error) {
return fi, nil
}
func parseInValues(lex *lexer, fieldName string, f filter, iv *inValues) (filter, error) {
func parseInValues(lex *lexer, f filter, iv *inValues) (filter, error) {
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fi, err := parseFuncArgsPossibleWildcard(lex, fieldName, func(args []string) (filter, error) {
fi, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
iv.values = args
return f, nil
})
@@ -2691,7 +2713,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
return parseFilterGeneric(lex, "*")
}
if lex.isSkippedSpace || lex.isKeyword("", ")", "|") {
if lex.isSkippedSpace || lex.isQueryPartTrailer() {
// '*' or 'fieldName:*' filter
return newFilterPrefix(fieldName, ""), nil
}
@@ -2706,7 +2728,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
}
lex.nextToken()
if !lex.isSkippedSpace && !lex.isKeyword("", ")", "|") {
if !lex.isSkippedSpace && !lex.isQueryPartTrailer() {
return nil, fmt.Errorf("missing whitespace between *%q* and %q", phrase, lex.token)
}
return newFilterSubstring(fieldName, phrase), nil
@@ -2969,7 +2991,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(funcName string,
return callback(funcName, args)
}
func parseFuncArgsPossibleWildcard(lex *lexer, fieldName string, callback func(args []string) (filter, error)) (filter, error) {
func parseFuncArgsPossibleWildcard(lex *lexer, callback func(args []string) (filter, error)) (filter, error) {
funcName := lex.token
lex.nextToken()
@@ -3004,7 +3026,7 @@ func parseArgsInParens(lex *lexer) ([]string, error) {
}
arg, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse arg")
return nil, fmt.Errorf("cannot parse arg: %w", err)
}
args = append(args, arg)
if lex.isKeyword(")") {
@@ -3042,7 +3064,7 @@ func parseArgsInParensPossibleWildcard(lex *lexer) ([]string, bool, error) {
} else {
token, err := lex.nextCompoundToken()
if err != nil {
return nil, false, fmt.Errorf("cannot parse arg")
return nil, false, fmt.Errorf("cannot parse arg: %w", err)
}
arg = token
}
@@ -3676,7 +3698,7 @@ func parseFilterStreamIDIn(lex *lexer) (filter, error) {
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fs, err := parseFuncArgsPossibleWildcard(lex, "_stream_id", func(args []string) (filter, error) {
fs, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
streamIDs := make([]streamID, len(args))
for i, arg := range args {
if !streamIDs[i].tryUnmarshalFromString(arg) {

View File

@@ -122,12 +122,13 @@ func parsePipes(lex *lexer) ([]pipe, error) {
pipes = append(pipes, p)
switch {
case lex.isKeyword("|"):
case lex.isQueryPartTrailer():
if !lex.isKeyword("|") {
return pipes, nil
}
lex.nextToken()
case lex.isKeyword(")", ""):
return pipes, nil
default:
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|', ';' or ')'", pipes[len(pipes)-1], lex.token)
}
}
}
@@ -178,6 +179,7 @@ func initPipeParsers() {
pipeParsers = map[string]pipeParseFunc{
"block_stats": parsePipeBlockStats,
"blocks_count": parsePipeBlocksCount,
"coalesce": parsePipeCoalesce,
"collapse_nums": parsePipeCollapseNums,
"copy": parsePipeCopy,
"cp": parsePipeCopy,

View File

@@ -133,7 +133,7 @@ func parsePipeBlocksCount(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
} else if !lex.isQueryPartTrailer() {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)

View File

@@ -0,0 +1,204 @@
package logstorage
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
// pipeCoalesce implements '| coalesce (...) as ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe
type pipeCoalesce struct {
srcFieldFilters []string
dstField string
defaultValue string
}
func (pc *pipeCoalesce) String() string {
if len(pc.srcFieldFilters) == 0 {
logger.Panicf("BUG: pipeCoalesce must contain at least one srcField")
}
s := "coalesce(" + fieldNamesString(pc.srcFieldFilters) + ")"
if pc.defaultValue != "" {
s += " default " + quoteTokenIfNeeded(pc.defaultValue)
}
if pc.dstField != "_msg" {
s += " as " + quoteTokenIfNeeded(pc.dstField)
}
return s
}
func (pc *pipeCoalesce) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
return pc, nil
}
func (pc *pipeCoalesce) canLiveTail() bool {
return true
}
func (pc *pipeCoalesce) canReturnLastNResults() bool {
return pc.dstField != "_time"
}
func (pc *pipeCoalesce) isFixedOutputFieldsOrder() bool {
return false
}
func (pc *pipeCoalesce) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pc.dstField) {
pf.AddDenyFilter(pc.dstField)
pf.AddAllowFilters(pc.srcFieldFilters)
}
}
func (pc *pipeCoalesce) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeCoalesce) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeCoalesce) visitSubqueries(_ func(q *Query)) {
// nothing to do
}
func (pc *pipeCoalesce) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeCoalesceProcessor{
pc: pc,
ppNext: ppNext,
}
}
// pipeCoalesceProcessor processes the coalesce pipe
type pipeCoalesceProcessor struct {
pc *pipeCoalesce
ppNext pipeProcessor
shards atomicutil.Slice[pipeCoalesceProcessorShard]
}
type pipeCoalesceProcessorShard struct {
rc resultColumn
cs []*blockResultColumn
}
func (pcp *pipeCoalesceProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := pcp.shards.Get(workerID)
pc := pcp.pc
// Initialize shard.cs
cs := br.getColumns()
for _, ff := range pc.srcFieldFilters {
if !prefixfilter.IsWildcardFilter(ff) {
c := br.getColumnByName(ff)
shard.addColumn(c)
continue
}
for _, c := range cs {
if prefixfilter.MatchFilter(ff, c.name) {
shard.addColumn(c)
}
}
}
// Fill the shard.rc
for rowIdx := range br.rowsLen {
value := ""
for _, c := range shard.cs {
v := c.getValueAtRow(br, rowIdx)
if v != "" {
value = v
break
}
}
if value == "" {
value = pc.defaultValue
}
shard.rc.addValue(value)
}
shard.rc.name = pc.dstField
br.addResultColumn(shard.rc)
pcp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
clear(shard.cs)
shard.cs = shard.cs[:0]
}
func (shard *pipeCoalesceProcessorShard) addColumn(c *blockResultColumn) {
// verify whether the given column already exists in shard.cs
for _, col := range shard.cs {
if col.name == c.name {
// Nothing to add - the column already exists
return
}
}
// Add the column to cs.
shard.cs = append(shard.cs, c)
}
func (pcp *pipeCoalesceProcessor) flush() error {
return nil
}
// parsePipeCoalesce parses '| coalesce(field1, field2, field3) default "default value" as result_field'
func parsePipeCoalesce(lex *lexer) (pipe, error) {
if !lex.isKeyword("coalesce") {
return nil, fmt.Errorf("expecting 'coalesce'; got %q", lex.token)
}
lex.nextToken()
srcFieldFilters, err := parseFieldFiltersInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field names: %w", err)
}
if len(srcFieldFilters) == 0 {
return nil, fmt.Errorf("coalesce requires at least one field name")
}
// Parse optional 'default' keyword and value
defaultValue := ""
if lex.isKeyword("default") {
lex.nextToken()
v, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse default value: %w", err)
}
defaultValue = v
}
// Parse 'as' token
dstField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
v, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field name: %w", err)
}
dstField = v
}
pc := &pipeCoalesce{
srcFieldFilters: srcFieldFilters,
dstField: dstField,
defaultValue: defaultValue,
}
return pc, nil
}

View File

@@ -126,7 +126,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
switch {
case lex.isKeyword("|", ")", ""):
case lex.isQueryPartTrailer():
pc := &pipeCopy{
srcFieldFilters: srcFieldFilters,
dstFieldFilters: dstFieldFilters,
@@ -134,7 +134,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
return pc, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|' or ')'", lex.token)
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|', ';' or ')'", lex.token)
}
}
}

View File

@@ -72,7 +72,7 @@ func parsePipeDecolorize(lex *lexer) (pipe, error) {
lex.nextToken()
field := "_msg"
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name after 'decolorize': %w", err)

View File

@@ -264,7 +264,7 @@ func parsePipeFieldNames(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
} else if !lex.isQueryPartTrailer() {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)

View File

@@ -173,7 +173,7 @@ func parsePipeHash(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'hash(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -182,7 +182,7 @@ func parsePipeJSONArrayLen(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -102,7 +102,7 @@ func parsePipeLast(lex *lexer) (pipe, error) {
func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
var ps pipeSort
ps.limit = 1
if !lex.isKeyword("by", "partition", "rank", "(", "|", ")", "") {
if !lex.isKeyword("by", "partition", "rank", "(") && !lex.isQueryPartTrailer() {
s, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse number: %w", err)
@@ -144,7 +144,7 @@ func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
if lex.isKeyword("rank") {
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %s", err)
return nil, fmt.Errorf("cannot read rank field name: %w", err)
}
ps.rankFieldName = rankFieldName
}

View File

@@ -165,7 +165,7 @@ func parsePipeLen(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -113,7 +113,7 @@ func parsePipeLimit(lex *lexer) (pipe, error) {
lex.nextToken()
limit := uint64(10)
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse rows limit: %w", err)

View File

@@ -460,7 +460,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
switch {
case lex.isKeyword(","):
lex.nextToken()
case lex.isKeyword("|", ")", ""):
case lex.isQueryPartTrailer():
if len(mes) == 0 {
return nil, fmt.Errorf("missing 'math' expressions")
}
@@ -469,7 +469,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
}
return pm, nil
default:
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|', ';' or ')'", mes[len(mes)-1], lex.token)
}
}
}
@@ -481,7 +481,7 @@ func parseMathEntry(lex *lexer) (*mathEntry, error) {
}
resultField := ""
if lex.isKeyword(",", "|", ")", "") {
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
resultField = me.String()
} else {
if lex.isKeyword("as") {

View File

@@ -88,7 +88,7 @@ func parsePipePackJSON(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)

View File

@@ -88,7 +88,7 @@ func parsePipePackLogfmt(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
if !lex.isQueryPartTrailer() {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err)

View File

@@ -135,7 +135,7 @@ func parsePipeRename(lex *lexer) (pipe, error) {
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
switch {
case lex.isKeyword("|", ")", ""):
case lex.isQueryPartTrailer():
pr := &pipeRename{
srcFieldFilters: srcFieldFilters,
dstFieldFilters: dstFieldFilters,

View File

@@ -415,7 +415,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
f.f = sf
resultName := ""
if lex.isKeyword(",", "|", ")", "") {
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
resultName = sf.String()
} else {
if lex.isKeyword("as") {
@@ -435,7 +435,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
funcs = append(funcs, f)
if lex.isKeyword("|", ")", "") {
if lex.isQueryPartTrailer() {
ps.funcs = funcs
return &ps, nil
}

View File

@@ -836,7 +836,7 @@ func parsePipeSort(lex *lexer) (pipe, error) {
case lex.isKeyword("rank"):
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %s", err)
return nil, fmt.Errorf("cannot read rank field name: %w", err)
}
ps.rankFieldName = rankFieldName
case lex.isKeyword("partition"):
@@ -930,7 +930,7 @@ func parseLimit(lex *lexer) (uint64, error) {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return 0, fmt.Errorf("cannot parse 'limit': %s", err)
return 0, fmt.Errorf("cannot parse 'limit': %w", err)
}
n, ok := tryParseUint64(limitStr)
@@ -949,7 +949,7 @@ func parseOffset(lex *lexer) (uint64, error) {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return 0, fmt.Errorf("cannot parse 'offset': %s", err)
return 0, fmt.Errorf("cannot parse 'offset': %w", err)
}
n, ok := tryParseUint64(limitStr)

View File

@@ -334,7 +334,7 @@ func (shard *pipeTopkProcessorShard) getRowsByPartition(partition string) *pipeT
}
partition = strings.Clone(partition)
shard.rowsByPartition[partition] = rs
shard.stateSizeBudget += int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
shard.stateSizeBudget -= int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
}
return rs
}

View File

@@ -142,7 +142,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
}
srcField := "_msg"
if !lex.isKeyword("as", ")", "|", "") {
if !lex.isKeyword("as") && !lex.isQueryPartTrailer() {
if lex.isKeyword("from") {
lex.nextToken()
}
@@ -154,7 +154,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
}
dstField := srcField
if !lex.isKeyword(")", "|", "") {
if !lex.isQueryPartTrailer() {
if lex.isKeyword("as") {
lex.nextToken()
}

View File

@@ -415,6 +415,16 @@ func (ps *pipeStats) initRateFuncs(step int64) {
}
}
func (ps *pipeStats) initRateFuncsFromTimeBucket() bool {
for _, bf := range ps.byFields {
if bf.name == "_time" && bf.bucketSize > 0 {
ps.initRateFuncs(int64(bf.bucketSize))
return true
}
}
return false
}
const stateSizeBudgetChunk = 1 << 20
func (ps *pipeStats) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
@@ -1389,11 +1399,11 @@ func parsePipeStatsExt(lex *lexer, needStatsKeyword bool) (pipe, error) {
}
ps.entries = append(ps.entries, e)
if lex.isKeyword("|", ")", "") {
if lex.isQueryPartTrailer() {
break
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", lex.token, e)
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|', ';' or ')'", lex.token, e)
}
lex.nextToken()
}
@@ -1442,7 +1452,7 @@ func parseStatsEntry(lex *lexer) (pipeStatsEntry, error) {
}
resultName := ""
if lex.isKeyword(",", "|", ")", "") {
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
resultName = sf.String()
if iff != nil {
resultName += " " + iff.String()

View File

@@ -620,7 +620,7 @@ func parsePipeTop(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
}
byFields = bfs
} else if !lex.isKeyword("hits", "rank", ")", "|", "") {
} else if !lex.isKeyword("hits", "rank") && !lex.isQueryPartTrailer() {
bfs, err := parseCommaSeparatedFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)
@@ -674,11 +674,11 @@ func parseRankFieldName(lex *lexer) (string, error) {
rankFieldName := "rank"
if lex.isKeyword("as") {
lex.nextToken()
if lex.isKeyword("", "|", ")", "(") {
if lex.isKeyword("(") || lex.isQueryPartTrailer() {
return "", fmt.Errorf("missing rank name")
}
}
if !lex.isKeyword("", "|", ")", "limit") {
if !lex.isKeyword("limit") && !lex.isQueryPartTrailer() {
s, err := parseFieldName(lex)
if err != nil {
return "", err

View File

@@ -550,7 +550,7 @@ func parsePipeUniq(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
}
byFields = bfs
} else if !lex.isKeyword("filter", "with", "hits", "limit", ")", "|", "") {
} else if !lex.isKeyword("filter", "with", "hits", "limit") && !lex.isQueryPartTrailer() {
bfs, err := parseCommaSeparatedFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)

View File

@@ -100,6 +100,7 @@ func (pu *pipeUnpackJSON) visitSubqueries(visitFunc func(q *Query)) {
func (pu *pipeUnpackJSON) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackJSON := func(uctx *fieldsUnpackerContext, s string) {
s = trimJSONWhitespace(s)
if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object
return
@@ -159,7 +160,7 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -142,7 +142,7 @@ func parsePipeUnpackLogfmt(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"strings"
"sync/atomic"
"time"
@@ -91,6 +92,7 @@ func (pu *pipeUnpackSyslog) newPipeProcessor(_ int, _ <-chan struct{}, _ func(),
year := currentYear.Load()
p := GetSyslogParser(int(year), pu.offsetTimezone)
s = strings.TrimLeft(s, " \t\n\r")
p.Parse(s)
for _, f := range p.Fields {
uctx.addField(f.Name, f.Value)
@@ -135,7 +137,7 @@ func parsePipeUnpackSyslog(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields", ")", "|", "") {
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields") && !lex.isQueryPartTrailer() {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -139,7 +139,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
lex.nextToken()
srcField := "_msg"
if !lex.isKeyword("drop_duplicates", "as", ")", "|", "") {
if !lex.isKeyword("drop_duplicates", "as") && !lex.isQueryPartTrailer() {
if lex.isKeyword("from") {
lex.nextToken()
}
@@ -151,7 +151,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
}
dstField := srcField
if !lex.isKeyword("drop_duplicates", ")", "|", "") {
if !lex.isKeyword("drop_duplicates") && !lex.isQueryPartTrailer() {
if lex.isKeyword("as") {
lex.nextToken()
}

View File

@@ -256,6 +256,7 @@ func parsePipeUnroll(lex *lexer) (pipe, error) {
}
func unpackJSONArray(dst []string, a *arena, s string) []string {
s = trimJSONWhitespace(s)
if s == "" || s[0] != '[' {
return dst
}

View File

@@ -67,7 +67,7 @@ func parseRunningStatsLast(lex *lexer) (runningStatsFunc, error) {
return nil, err
}
if len(args) != 1 {
return nil, fmt.Errorf("unexpeccted number of args for the last() function; got %d; want 1; args: %q", len(args), args)
return nil, fmt.Errorf("unexpected number of args for the last() function; got %d; want 1; args: %q", len(args), args)
}
fieldName := args[0]

View File

@@ -170,7 +170,7 @@ type Storage struct {
// partitions are sorted by time, e.g. partitions[0] has the smallest time.
partitions []*partitionWrapper
// ptwHot is the "hot" partition, were the last rows were ingested.
// ptwHot is the "hot" partition, where the last rows were ingested.
//
// It must be accessed under partitionsLock.
ptwHot *partitionWrapper
@@ -197,7 +197,7 @@ type Storage struct {
// the check whether the given stream is already registered in the persistent storage.
streamIDCache *cache
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
// filterStreamCache caches streamIDs keyed by (partition, []TenantID, StreamFilter).
//
// It reduces the load on persistent storage during querying by _stream:{...} filter.
filterStreamCache *cache
@@ -235,7 +235,7 @@ func (s *Storage) PartitionAttach(name string) error {
// Verify whether the given partition already exists in the attached partitions list.
for _, ptw := range s.partitions {
if ptw.pt.name == name {
return fmt.Errorf("cannot attach the partition %q, because it is arleady attached", name)
return fmt.Errorf("cannot attach the partition %q, because it is already attached", name)
}
}
@@ -381,7 +381,7 @@ func getSnapshotPaths(ptws []*partitionWrapper) []string {
func (s *Storage) PartitionSnapshotDelete(snapshotPath string) error {
snapshotName := filepath.Base(snapshotPath)
if err := snapshotutil.Validate(snapshotName); err != nil {
return fmt.Errorf("unsupported snapshot name %q at %q: %s", snapshotName, snapshotPath, err)
return fmt.Errorf("unsupported snapshot name %q at %q: %w", snapshotName, snapshotPath, err)
}
snapshotDir := filepath.Dir(snapshotPath)
@@ -442,7 +442,7 @@ func (s *Storage) MustDeleteStalePartitionSnapshots(maxAge time.Duration) []stri
// DeleteRunTask starts deletion of logs according to the given filter f for the given tenantIDs.
//
// The taskID must contain an unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
// The taskID must contain a unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
// The timestamp must contain the timestamp in seconds when the task is started.
func (s *Storage) DeleteRunTask(_ context.Context, taskID string, timestamp int64, tenantIDs []TenantID, f *Filter) error {
// Register the task in the list of active delete tasks, so it survives application restarts and crashes.
@@ -968,7 +968,7 @@ func (s *Storage) watchDeleteTasks() {
s.deleteTasks = s.deleteTasks[1:]
if !ok {
// The delete task coudn't be completed now. Try it later.
// The delete task couldn't be completed now. Try it later.
s.deleteTasks = append(s.deleteTasks, dt)
}
s.mustSaveDeleteTasksLocked()
@@ -1031,7 +1031,7 @@ func (s *Storage) processDeleteTask(ctx context.Context, dt *DeleteTask) bool {
}
// The task couldn't be processed at the moment
logger.Warnf("cannot proceeed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
logger.Warnf("cannot proceed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
return false
}

View File

@@ -23,7 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
// QueryContext is used for execting the query passed to NewQueryContext()
// QueryContext is used for executing the query passed to NewQueryContext().
type QueryContext struct {
// Context is the context for executing the Query.
Context context.Context
@@ -43,12 +43,12 @@ type QueryContext struct {
// HiddenFieldsFilters is an optional list of field filters, which must be hidden during query execution.
//
// The list may contain full field names and field prefixes ending with *.
// Prefix match all the fields starting with the given prefix.
// Prefixes match all the fields starting with the given prefix.
HiddenFieldsFilters []string
// startTime is creation time for the QueryContext.
//
// It is used for calculating query druation.
// It is used for calculating query duration.
startTime time.Time
}
@@ -128,13 +128,14 @@ type storageSearchOptions struct {
// hiddenFieldsFilter is the filter of fields, which must be hidden during query
hiddenFieldsFilter *prefixfilter.Filter
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected the _time values before these values are passed to query pipes.
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected _time values
// before these values are passed to query pipes.
timeOffset int64
}
// partitionSearchOptions is search options for the partition.
//
// this struct must be created via partition.getSearchOptions() call.
// This struct must be created via partition.getSearchOptions() call.
type partitionSearchOptions struct {
// Optional sorted list of tenantIDs for the search.
// If it is empty, then the search is performed by streamIDs
@@ -1126,7 +1127,7 @@ func (db *DataBlock) RowsCount() int {
// GetColumns returns columns from db.
//
// If needSortSolumns is set, then the returned columns are sorted in alphabetical order
// If needSortColumns is set, then the returned columns are sorted in alphabetical order.
func (db *DataBlock) GetColumns(needSortColumns bool) []BlockColumn {
if needSortColumns {
sort.Slice(db.columns, func(i, j int) bool {

View File

@@ -135,7 +135,7 @@ func MarshalTenantIDsToJSON(tenantIDs []TenantID) []byte {
func UnmarshalTenantIDsFromJSON(src []byte) ([]TenantID, error) {
var tenantIDs []TenantID
if err := json.Unmarshal(src, &tenantIDs); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %s", err)
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %w", err)
}
return tenantIDs, nil
}

4
vendor/modules.txt vendored
View File

@@ -132,8 +132,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric
# github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0
## explicit; go 1.24.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
## explicit; go 1.26.2
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
## explicit; go 1.26.4
github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage
github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter
# github.com/VictoriaMetrics/easyproto v1.2.0