vendor: update github.com/VictoriaMetrics/VictoriaLogs from v0.0.0-20260125191521-bc89d84cd61d to v0.0.0-20260218111324-95b48d57d032

This commit is contained in:
Aliaksandr Valialkin
2026-02-18 20:39:19 +01:00
parent 8531d86da0
commit 53514febdc
95 changed files with 1251 additions and 601 deletions

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260125191521-bc89d84cd61d
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260218111324-95b48d57d032
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.41.2

4
go.sum
View File

@@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260125191521-bc89d84cd61d h1:KXP2mkLdLJZDNmw4s7SxWBziDEyaWs57EQcgP9r19mc=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260125191521-bc89d84cd61d/go.mod h1:FWICi4QDCcNA5KoPp1trxCNojzM/ggLADz/B97HwIlY=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260218111324-95b48d57d032 h1:kKVeXC+HAcMeMLefoKCWf934y9MoLU8V3Da7k6WP4K8=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260218111324-95b48d57d032/go.mod h1:WQ8hGgfKx1lXCCcS1SJSOklN9fToSbshtvKHp3xsv4w=
github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7Wn2dg/6oIE=
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=

View File

@@ -133,7 +133,7 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
continue
}
wordNew := word
for j := 0; j < 64; j++ {
for j := range 64 {
mask := uint64(1) << j
if (word & mask) == 0 {
continue
@@ -156,7 +156,7 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) {
if bm.areAllBitsSet() {
n := bm.bitsLen
for i := 0; i < n; i++ {
for i := range n {
f(i)
}
return
@@ -168,7 +168,7 @@ func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) {
if word == 0 {
continue
}
for j := 0; j < 64; j++ {
for j := range 64 {
mask := uint64(1) << j
if (word & mask) == 0 {
continue

View File

@@ -324,7 +324,7 @@ func unmarshalColumnHeadersRefsInplace(dst []columnHeaderRef, src []byte) ([]col
}
src = src[nSize:]
for i := uint64(0); i < n; i++ {
for i := range n {
columnNameID, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal column name ID number %d out of %d", i, n)
@@ -988,8 +988,8 @@ func (th *timestampsHeader) copyFrom(src *timestampsHeader) {
func (th *timestampsHeader) subTimeOffset(timeOffset int64) {
if timeOffset != 0 {
th.minTimestamp = subNoOverflowInt64(th.minTimestamp, timeOffset)
th.maxTimestamp = subNoOverflowInt64(th.maxTimestamp, timeOffset)
th.minTimestamp = SubInt64NoOverflow(th.minTimestamp, timeOffset)
th.maxTimestamp = SubInt64NoOverflow(th.maxTimestamp, timeOffset)
}
}

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"math"
"slices"
"sort"
"strconv"
"strings"
@@ -233,8 +234,9 @@ func (br *blockResult) initFromDataBlock(db *DataBlock) {
br.rowsLen = db.RowsCount()
for i := range db.Columns {
c := &db.Columns[i]
columns := db.columns
for i := range columns {
c := &columns[i]
if c.Name == "_time" {
var ok bool
br.timestampsBuf, ok = tryParseTimestamps(br.timestampsBuf[:0], c.Values)
@@ -818,7 +820,7 @@ func (br *blockResult) getBucketedTimestampValues(bf *byStatsField) []string {
func truncateTimestamp(ts, bucketSizeInt, bucketOffsetInt int64, bucketSizeStr string) int64 {
if bucketSizeStr == "week" {
// Adjust the week to be started from Monday.
bucketOffsetInt += 4 * nsecsPerDay
bucketOffsetInt += 3 * nsecsPerDay
}
if bucketOffsetInt == 0 && bucketSizeStr != "month" && bucketSizeStr != "year" {
// Fast path for timestamps without offsets
@@ -829,7 +831,7 @@ func truncateTimestamp(ts, bucketSizeInt, bucketOffsetInt int64, bucketSizeStr s
return ts - r
}
ts -= bucketOffsetInt
ts += bucketOffsetInt
switch bucketSizeStr {
case "month":
ts = truncateTimestampToMonth(ts)
@@ -842,7 +844,7 @@ func truncateTimestamp(ts, bucketSizeInt, bucketOffsetInt int64, bucketSizeStr s
}
ts -= r
}
ts += bucketOffsetInt
ts -= bucketOffsetInt
return ts
}
@@ -1242,9 +1244,9 @@ func truncateUint64(n, bucketSizeInt, bucketOffsetInt uint64) uint64 {
return 0
}
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
n -= n % bucketSizeInt
n -= bucketOffsetInt
return n
}
@@ -1339,13 +1341,13 @@ func truncateInt64(n, bucketSizeInt, bucketOffsetInt int64) int64 {
return n - r
}
n -= bucketOffsetInt
n += bucketOffsetInt
r := n % bucketSizeInt
if r < 0 {
r += bucketSizeInt
}
n -= r
n += bucketOffsetInt
n -= bucketOffsetInt
return n
}
@@ -1443,14 +1445,14 @@ func truncateFloat64(f float64, p10 float64, bucketSizeP10 int64, bucketOffset f
return float64(fP10) / p10
}
f -= bucketOffset
f += bucketOffset
fP10 := int64(math.Floor(f * p10))
r := fP10 % bucketSizeP10
fP10 -= r
f = float64(fP10) / p10
f += bucketOffset
f -= bucketOffset
return f
}
@@ -1545,9 +1547,9 @@ func truncateUint32(n, bucketSizeInt, bucketOffsetInt uint32) uint32 {
return 0
}
n -= bucketOffsetInt
n -= n % bucketSizeInt
n += bucketOffsetInt
n -= n % bucketSizeInt
n -= bucketOffsetInt
return n
}
@@ -1868,14 +1870,35 @@ func (br *blockResult) deleteColumnsByFilters(columnFilters []string) {
// setColumnFilters sets the resulting columns according to the given columnFilters.
func (br *blockResult) setColumnFilters(columnFilters []string) {
if br.areSameColumns(columnFilters) {
cs := br.getColumns()
if !hasWildcardFilters(columnFilters) {
if areSameColumns(cs, columnFilters) {
// Fast path - nothing to change.
return
}
// Slow path - construct the requested columns in the requested order.
br.csInitialized = false
csBufLen := len(br.csBuf)
for _, field := range columnFilters {
idx := getBlockResultColumnIdxByName(cs, field)
if idx >= 0 {
br.csBuf = append(br.csBuf, *cs[idx])
} else {
br.addConstColumn(field, "")
}
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
return
}
if areSameWildcardColumns(cs, columnFilters) {
// Fast path - nothing to change.
return
}
// Slow path - construct the requested columns
cs := br.getColumns()
br.csInitialized = false
csBuf := br.csBuf
csBufLen := len(csBuf)
@@ -1899,8 +1922,19 @@ func (br *blockResult) setColumnFilters(columnFilters []string) {
}
}
func (br *blockResult) areSameColumns(columnFilters []string) bool {
cs := br.getColumns()
func areSameColumns(cs []*blockResultColumn, columnFilters []string) bool {
if len(cs) != len(columnFilters) {
return false
}
for i, c := range cs {
if columnFilters[i] != c.name {
return false
}
}
return true
}
func areSameWildcardColumns(cs []*blockResultColumn, columnFilters []string) bool {
for _, c := range cs {
if !prefixfilter.MatchFilters(columnFilters, c.name) {
return false
@@ -1915,10 +1949,13 @@ func (br *blockResult) areSameColumns(columnFilters []string) bool {
return false
}
}
return true
}
func hasWildcardFilters(columnFilters []string) bool {
return slices.ContainsFunc(columnFilters, prefixfilter.IsWildcardFilter)
}
func getMatchingColumns(br *blockResult, filters []string) *matchingColumns {
v := matchingColumnsPool.Get()
if v == nil {

View File

@@ -517,7 +517,7 @@ func (bs *blockSearch) subTimeOffsetToTimestamps(timeOffset int64) {
func subTimeOffset(timestamps []int64, timeOffset int64) {
for i := range timestamps {
timestamps[i] = subNoOverflowInt64(timestamps[i], timeOffset)
timestamps[i] = SubInt64NoOverflow(timestamps[i], timeOffset)
}
}

View File

@@ -134,7 +134,7 @@ func appendTokensHashes(dst []uint64, tokens []string) []uint64 {
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, token := range tokens {
*hp = xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
for i := 0; i < bloomFilterHashesCount; i++ {
for range bloomFilterHashesCount {
h := xxhash.Sum64(buf[:])
(*hp)++
dst = append(dst, h)
@@ -160,7 +160,7 @@ func appendHashesHashes(dst, hashes []uint64) []uint64 {
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, h := range hashes {
*hp = h
for i := 0; i < bloomFilterHashesCount; i++ {
for range bloomFilterHashesCount {
h := xxhash.Sum64(buf[:])
(*hp)++
dst = append(dst, h)

View File

@@ -24,11 +24,7 @@ func newCache() *cache {
c.prev.Store(&sync.Map{})
c.stopCh = make(chan struct{})
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.runCleaner()
}()
c.wg.Go(c.runCleaner)
return &c
}

View File

@@ -50,7 +50,7 @@ func unmarshalColumnIdxs(src []byte, columnNames []string, shardsCount uint64) (
}
shardIdxs := make(map[string]uint64, n)
for i := uint64(0); i < n; i++ {
for i := range n {
columnID, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse columnID #%d", i)
@@ -126,7 +126,7 @@ func unmarshalColumnNames(src []byte) ([]string, map[string]uint64, error) {
columnNameIDs := make(map[string]uint64, n)
columnNames := make([]string, n)
for id := uint64(0); id < n; id++ {
for id := range n {
name, nBytes := encoding.UnmarshalBytes(src)
if nBytes <= 0 {
return nil, nil, fmt.Errorf("cannot parse column name number %d out of %d", id, n)

View File

@@ -235,7 +235,7 @@ var (
func (ddb *datadb) startSmallPartsMergers() {
ddb.partsLock.Lock()
for i := 0; i < cap(smallPartsConcurrencyCh); i++ {
for range cap(smallPartsConcurrencyCh) {
ddb.startSmallPartsMergerLocked()
}
ddb.partsLock.Unlock()
@@ -243,7 +243,7 @@ func (ddb *datadb) startSmallPartsMergers() {
func (ddb *datadb) startBigPartsMergers() {
ddb.partsLock.Lock()
for i := 0; i < cap(bigPartsConcurrencyCh); i++ {
for range cap(bigPartsConcurrencyCh) {
ddb.startBigPartsMergerLocked()
}
ddb.partsLock.Unlock()
@@ -253,41 +253,25 @@ func (ddb *datadb) startInmemoryPartsMergerLocked() {
if needStop(ddb.stopCh) {
return
}
ddb.wg.Add(1)
go func() {
ddb.inmemoryPartsMerger()
ddb.wg.Done()
}()
ddb.wg.Go(ddb.inmemoryPartsMerger)
}
func (ddb *datadb) startSmallPartsMergerLocked() {
if needStop(ddb.stopCh) {
return
}
ddb.wg.Add(1)
go func() {
ddb.smallPartsMerger()
ddb.wg.Done()
}()
ddb.wg.Go(ddb.smallPartsMerger)
}
func (ddb *datadb) startBigPartsMergerLocked() {
if needStop(ddb.stopCh) {
return
}
ddb.wg.Add(1)
go func() {
ddb.bigPartsMerger()
ddb.wg.Done()
}()
ddb.wg.Go(ddb.bigPartsMerger)
}
func (ddb *datadb) startInmemoryPartsFlusher() {
ddb.wg.Add(1)
go func() {
ddb.inmemoryPartsFlusher()
ddb.wg.Done()
}()
ddb.wg.Go(ddb.inmemoryPartsFlusher)
}
func (ddb *datadb) inmemoryPartsFlusher() {
@@ -324,16 +308,13 @@ func (ddb *datadb) mustMergePartsToFiles(pws []*partWrapper) {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
ddb.mustMergeParts(pwsChunk, true)
}(pwsToMerge)
wg.Go(func() {
ddb.mustMergeParts(pwsToMerge, true)
<-inmemoryPartsConcurrencyCh
})
pws = pwsRemaining
}
wg.Wait()
@@ -1040,39 +1021,41 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d
removedSmallParts := 0
removedBigParts := 0
ddb.partsLock.Lock()
func() {
// Prevent from deadlock mentioned at https://github.com/VictoriaMetrics/VictoriaLogs/issues/1020#issuecomment-3763912067
ddb.partsLock.Lock()
defer ddb.partsLock.Unlock()
ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
ddb.smallParts, removedSmallParts = removeParts(ddb.smallParts, partsToRemove)
ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove)
ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
ddb.smallParts, removedSmallParts = removeParts(ddb.smallParts, partsToRemove)
ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove)
if pwNew != nil {
switch dstPartType {
case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startInmemoryPartsMergerLocked()
case partSmall:
ddb.smallParts = append(ddb.smallParts, pwNew)
ddb.startSmallPartsMergerLocked()
case partBig:
ddb.bigParts = append(ddb.bigParts, pwNew)
ddb.startBigPartsMergerLocked()
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
if pwNew != nil {
switch dstPartType {
case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startInmemoryPartsMergerLocked()
case partSmall:
ddb.smallParts = append(ddb.smallParts, pwNew)
ddb.startSmallPartsMergerLocked()
case partBig:
ddb.bigParts = append(ddb.bigParts, pwNew)
ddb.startBigPartsMergerLocked()
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
}
}
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && dstPartType != partInmemory {
smallPartNames := getPartNames(ddb.smallParts)
bigPartNames := getPartNames(ddb.bigParts)
partNames := append(smallPartNames, bigPartNames...)
mustWritePartNames(ddb.path, partNames)
}
ddb.partsLock.Unlock()
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedSmallParts > 0 || removedBigParts > 0 || (pwNew != nil && dstPartType != partInmemory) {
smallPartNames := getPartNames(ddb.smallParts)
bigPartNames := getPartNames(ddb.bigParts)
partNames := append(smallPartNames, bigPartNames...)
mustWritePartNames(ddb.path, partNames)
}
}()
removedParts := removedInmemoryParts + removedSmallParts + removedBigParts
if removedParts != len(partsToRemove) {
@@ -1151,10 +1134,7 @@ func (ddb *datadb) getFlushToDiskDeadline(pws []*partWrapper) time.Time {
func getMaxInmemoryPartSize() uint64 {
// Allocate 10% of allowed memory for in-memory parts.
n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition)
if n < 1e6 {
n = 1e6
}
n := max(uint64(0.1*float64(memory.Allowed())/maxInmemoryPartsPerPartition), 1e6)
return n
}
@@ -1186,10 +1166,7 @@ func (ddb *datadb) getMaxSmallPartSize() uint64 {
// Small parts are cached in the OS page cache,
// so limit their size by the remaining free RAM.
mem := memory.Remaining()
n := uint64(mem) / defaultPartsToMerge
if n < 10e6 {
n = 10e6
}
n := max(uint64(mem)/defaultPartsToMerge, 10e6)
// Make sure the output part fits available disk space for small parts.
sizeLimit := getMaxOutBytes(ddb.path)
if n > sizeLimit {
@@ -1199,10 +1176,7 @@ func (ddb *datadb) getMaxSmallPartSize() uint64 {
}
func getMaxOutBytes(path string) uint64 {
n := availableDiskSpace(path)
if n > maxBigPartSize {
n = maxBigPartSize
}
n := min(availableDiskSpace(path), maxBigPartSize)
return n
}
@@ -1403,20 +1377,14 @@ func appendPartsToMerge(dst, src []*partWrapper, maxOutBytes uint64) []*partWrap
sortPartsForOptimalMerge(src)
maxSrcParts := defaultPartsToMerge
if maxSrcParts > len(src) {
maxSrcParts = len(src)
}
minSrcParts := (maxSrcParts + 1) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
maxSrcParts := min(defaultPartsToMerge, len(src))
minSrcParts := max((maxSrcParts+1)/2, 2)
// Exhaustive search for parts giving the lowest write amplification when merged.
var pws []*partWrapper
maxM := float64(0)
for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ {
for j := range len(src) - i + 1 {
a := src[j : j+i]
if a[0].p.ph.CompressedSizeBytes*uint64(len(a)) < a[len(a)-1].p.ph.CompressedSizeBytes {
// Do not merge parts with too big difference in size,
@@ -1513,16 +1481,13 @@ func (ddb *datadb) mustForceMergeAllParts() {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
bigPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-bigPartsConcurrencyCh
wg.Done()
}()
ddb.mustMergeParts(pwsChunk, false)
}(pwsToMerge)
wg.Go(func() {
ddb.mustMergeParts(pwsToMerge, false)
<-bigPartsConcurrencyCh
})
pws = pwsRemaining
}
wg.Wait()

View File

@@ -187,7 +187,7 @@ func matchAnyCasePhrase(s, phraseLowercase string) bool {
}
func isASCIILowercase(s string) bool {
for i := 0; i < len(s); i++ {
for i := range len(s) {
c := s[i]
if c >= utf8.RuneSelf || (c >= 'A' && c <= 'Z') {
return false

View File

@@ -124,7 +124,7 @@ func (fr *filterDayRange) matchTimestampValue(timestamp int64) bool {
}
func (fr *filterDayRange) dayRangeOffset(timestamp int64) int64 {
timestamp = subNoOverflowInt64(timestamp, -fr.offset)
timestamp = SubInt64NoOverflow(timestamp, -fr.offset)
return timestamp % nsecsPerDay
}

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"slices"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -216,12 +217,7 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens
return true
}
bf := bs.getBloomFilterForColumn(ch)
for _, tokens := range tokenSets {
if bf.containsAll(tokens) {
return true
}
}
return false
return slices.ContainsFunc(tokenSets, bf.containsAll)
}
// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter.

View File

@@ -0,0 +1,253 @@
package logstorage
import (
"fmt"
"net/netip"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
// filterIPv6Range matches the given ipv6 range [minValue..maxValue].
//
// Example LogsQL: `fieldName:ipv6_range(::1, ::2)`
type filterIPv6Range struct {
fieldName string
minValue [16]byte
maxValue [16]byte
minMaxIPv4ValuesOnce sync.Once
minIPv4Value uint32
maxIPv4Value uint32
isIPv4 bool
}
func (fr *filterIPv6Range) String() string {
minValue := netip.AddrFrom16(fr.minValue).String()
maxValue := netip.AddrFrom16(fr.maxValue).String()
return fmt.Sprintf("%sipv6_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), minValue, maxValue)
}
func (fr *filterIPv6Range) getMinMaxIPv4Values() (uint32, uint32, bool) {
fr.minMaxIPv4ValuesOnce.Do(fr.initMinMaxIPv4Values)
return fr.minIPv4Value, fr.maxIPv4Value, fr.isIPv4
}
func (fr *filterIPv6Range) initMinMaxIPv4Values() {
minValue6 := fr.minValue
if ipv6Less(minValue6, minIPv6ForIPv4Value) {
minValue6 = minIPv6ForIPv4Value
}
minValue4, okMin := getIPv4ValueFrom16(minValue6)
maxValue6 := fr.maxValue
if ipv6Less(maxIPv6ForIPv4Value, maxValue6) {
maxValue6 = maxIPv6ForIPv4Value
}
maxValue4, okMax := getIPv4ValueFrom16(maxValue6)
if okMin && okMax {
fr.minIPv4Value = minValue4
fr.maxIPv4Value = maxValue4
fr.isIPv4 = true
}
}
var (
minIPv6ForIPv4Value = [16]byte{10: 255, 11: 255}
maxIPv6ForIPv4Value = [16]byte{10: 255, 11: 255, 12: 255, 13: 255, 14: 255, 15: 255}
)
func getIPv4ValueFrom16(a [16]byte) (uint32, bool) {
addr := netip.AddrFrom16(a).Unmap()
if !addr.Is4() {
return 0, false
}
ip4 := addr.As4()
return encoding.UnmarshalUint32(ip4[:]), true
}
func (fr *filterIPv6Range) updateNeededFields(pf *prefixfilter.Filter) {
pf.AddAllowFilter(fr.fieldName)
}
func (fr *filterIPv6Range) matchRow(fields []Field) bool {
v := getFieldValueByName(fields, fr.fieldName)
return matchIPv6Range(v, fr.minValue, fr.maxValue)
}
func (fr *filterIPv6Range) applyToBlockResult(br *blockResult, bm *bitmap) {
minValue := fr.minValue
maxValue := fr.maxValue
if ipv6Less(maxValue, minValue) {
bm.resetBits()
return
}
c := br.getColumnByName(fr.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
if !matchIPv6Range(v, minValue, maxValue) {
bm.resetBits()
}
return
}
if c.isTime {
bm.resetBits()
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return matchIPv6Range(v, minValue, maxValue)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if matchIPv6Range(v, minValue, maxValue) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeInt64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
minValue4, maxValue4, ok := fr.getMinMaxIPv4Values()
if !ok {
bm.resetBits()
} else {
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
ip := unmarshalIPv4(valuesEncoded[idx])
return ip >= minValue4 && ip <= maxValue4
})
}
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fr *filterIPv6Range) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
fieldName := fr.fieldName
minValue := fr.minValue
maxValue := fr.maxValue
if ipv6Less(maxValue, minValue) {
bm.resetBits()
return
}
v := bs.getConstColumnValue(fieldName)
if v != "" {
if !matchIPv6Range(v, minValue, maxValue) {
bm.resetBits()
}
return
}
// Verify whether filter matches other columns
ch := bs.getColumnHeader(fieldName)
if ch == nil {
// Fast path - there are no matching columns.
bm.resetBits()
return
}
switch ch.valueType {
case valueTypeString:
matchStringByIPv6Range(bs, ch, bm, minValue, maxValue)
case valueTypeDict:
matchValuesDictByIPv6Range(bs, ch, bm, minValue, maxValue)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeInt64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
minValue4, maxValue4, ok := fr.getMinMaxIPv4Values()
if !ok {
bm.resetBits()
} else {
matchIPv4ByRange(bs, ch, bm, minValue4, maxValue4)
}
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
}
}
func matchValuesDictByIPv6Range(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue [16]byte) {
bb := bbPool.Get()
for _, v := range ch.valuesDict.values {
c := byte(0)
if matchIPv6Range(v, minValue, maxValue) {
c = 1
}
bb.B = append(bb.B, c)
}
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)
}
func matchStringByIPv6Range(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue [16]byte) {
visitValues(bs, ch, bm, func(v string) bool {
return matchIPv6Range(v, minValue, maxValue)
})
}
func matchIPv6Range(s string, minValue, maxValue [16]byte) bool {
ip, ok := tryParseIPv6(s)
if !ok {
return false
}
if ipv6Less(ip, minValue) || ipv6Less(maxValue, ip) {
return false
}
return true
}
func ipv6Less(a, b [16]byte) bool {
for i := range 16 {
if a[i] < b[i] {
return true
}
if a[i] > b[i] {
return false
}
}
return false
}

View File

@@ -126,7 +126,7 @@ func (fr *filterWeekRange) matchTimestampValue(timestamp int64) bool {
}
func (fr *filterWeekRange) weekday(timestamp int64) time.Weekday {
timestamp = subNoOverflowInt64(timestamp, -fr.offset)
timestamp = SubInt64NoOverflow(timestamp, -fr.offset)
return time.Unix(0, timestamp).UTC().Weekday()
}

View File

@@ -1,6 +1,7 @@
package logstorage
import (
"slices"
"sync"
"github.com/cespare/xxhash/v2"
@@ -156,10 +157,8 @@ func (t *hashTokenizer) addToken(token string) (uint64, bool) {
if b.v == h {
return h, false
}
for _, v := range b.overflow {
if v == h {
return h, false
}
if slices.Contains(b.overflow, h) {
return h, false
}
b.overflow = append(b.overflow, h)
return h, true

View File

@@ -334,13 +334,10 @@ func hitsMapMergeParallel(hmas []*hitsMapAdaptive, stopCh <-chan struct{}, f fun
if hma.hmShards != nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var a chunkedAllocator
hma.moveToShards(&a)
}()
})
}
wg.Wait()
if needStop(stopCh) {
@@ -349,11 +346,8 @@ func hitsMapMergeParallel(hmas []*hitsMapAdaptive, stopCh <-chan struct{}, f fun
cpusCount := len(hmas[0].hmShards)
for i := 0; i < cpusCount; i++ {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
for cpuIdx := range cpusCount {
wg.Go(func() {
hm := &hmas[0].hmShards[cpuIdx].hitsMap
for j := range hmas[1:] {
src := &hmas[1+j].hmShards[cpuIdx].hitsMap
@@ -361,7 +355,7 @@ func hitsMapMergeParallel(hmas []*hitsMapAdaptive, stopCh <-chan struct{}, f fun
src.reset()
}
f(hm)
}(i)
})
}
wg.Wait()
}

View File

@@ -145,9 +145,11 @@ func (idb *indexdb) updateStats(d *IndexdbStats) {
}
func (idb *indexdb) appendStreamString(dst []byte, sid *streamID) []byte {
dstLen := len(dst)
dst = idb.appendStreamTagsByStreamID(dst, sid)
if len(dst) == dstLen {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = idb.appendStreamTagsByStreamID(bb.B, sid)
if len(bb.B) == 0 {
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
// was recently registered and its tags aren't visible to search yet.
// The stream tags must become visible in a few seconds.
@@ -156,9 +158,9 @@ func (idb *indexdb) appendStreamString(dst []byte, sid *streamID) []byte {
}
st := GetStreamTags()
streamTagsCanonical := bytesutil.ToUnsafeString(dst[dstLen:])
mustUnmarshalStreamTags(st, streamTagsCanonical)
dst = st.marshalString(dst[:dstLen])
streamTagsCanonical := bytesutil.ToUnsafeString(bb.B)
mustUnmarshalStreamTagsInplace(st, streamTagsCanonical)
dst = st.marshalString(dst)
PutStreamTags(st)
return dst
@@ -366,8 +368,8 @@ func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagNam
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
kb.B = marshalTagValue(kb.B, tagName)
kb.B = marshalTagValue(kb.B, tagValue)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
@@ -430,7 +432,7 @@ func (is *indexSearch) getStreamIDsForTagName(tenantID TenantID, tagName string)
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
kb.B = marshalTagValue(kb.B, tagName)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
@@ -462,7 +464,7 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
kb.B = marshalTagValue(kb.B, tagName)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
@@ -533,7 +535,7 @@ func (is *indexSearch) getTenantIDs() []TenantID {
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
mustUnmarshalStreamTagsInplace(st, streamTagsCanonical)
tenantID := streamID.tenantID
bi := getBatchItems()
@@ -608,7 +610,7 @@ func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilte
}
src := data[nSize:]
streamIDs := make([]streamID, n)
for i := uint64(0); i < n; i++ {
for i := range n {
tail, err := streamIDs[i].unmarshal(src)
if err != nil {
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
@@ -625,7 +627,7 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
// marshal streamIDs
var b []byte
b = encoding.MarshalVarUint64(b, uint64(len(streamIDs)))
for i := 0; i < len(streamIDs); i++ {
for i := range streamIDs {
b = streamIDs[i].marshal(b)
}
@@ -872,6 +874,9 @@ type tagToStreamIDsRowParser struct {
// Tag contains parsed tag after Init call
Tag streamTag
// tagBuf is a buffer used during Tag parsing.
tagBuf []byte
// tail contains the remaining unparsed streamIDs
tail []byte
}
@@ -881,6 +886,7 @@ func (sp *tagToStreamIDsRowParser) Reset() {
sp.StreamIDs = sp.StreamIDs[:0]
sp.streamIDsParsed = false
sp.Tag.reset()
sp.tagBuf = sp.tagBuf[:0]
sp.tail = nil
}
@@ -897,7 +903,7 @@ func (sp *tagToStreamIDsRowParser) Init(b []byte) error {
if nsPrefix != nsPrefixTagToStreamIDs {
return fmt.Errorf("invalid prefix for tenantID:name:value -> streamIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixTagToStreamIDs)
}
tail, err = sp.Tag.indexdbUnmarshal(tail)
tail, sp.tagBuf, err = sp.Tag.indexdbUnmarshal(tail, sp.tagBuf[:0])
if err != nil {
return fmt.Errorf("cannot unmarshal tag from tenantID:name:value -> streamIDs row %q: %w", b, err)
}
@@ -959,7 +965,7 @@ func (sp *tagToStreamIDsRowParser) ParseStreamIDs() {
sp.StreamIDs = slicesutil.SetLength(sp.StreamIDs, n)
streamIDs := sp.StreamIDs
_ = streamIDs[n-1]
for i := 0; i < n; i++ {
for i := range n {
var err error
tail, err = streamIDs[i].unmarshal(tail)
if err != nil {

View File

@@ -1,6 +1,7 @@
package logstorage
import (
"slices"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -29,6 +30,9 @@ type JSONParser struct {
// prefixBuf is used for holding the current key prefix
// when it is composed from multiple keys.
prefixBuf []byte
preserveKeys []string
maxFieldNameLen int
}
func (p *JSONParser) reset() {
@@ -36,6 +40,10 @@ func (p *JSONParser) reset() {
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
p.prefixBuf = p.prefixBuf[:0]
p.preserveKeys = nil
p.maxFieldNameLen = 0
}
// GetJSONParser returns JSONParser ready to parse JSON lines.
@@ -61,17 +69,20 @@ var parserPool sync.Pool
// ParseLogMessage parses the given JSON log message msg into p.Fields.
//
// JSON values for keys from the preserveKeys list are preserved without flattening.
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) ParseLogMessage(msg []byte) error {
return p.parseLogMessage(msg, maxFieldNameSize)
func (p *JSONParser) ParseLogMessage(msg []byte, preserveKeys []string) error {
return p.parseLogMessage(msg, preserveKeys, maxFieldNameSize)
}
// ParseLogMessage parses the given JSON log message msg into p.Fields.
// parseLogMessage parses the given JSON log message msg into p.Fields.
//
// Items in nested objects are flattened with `k1.k2. ... .kN` key until its' length exceeds maxFieldNameLen.
// Items in nested objects are flattened with `k1.k2. ... .kN` key until the key matches one of the preserveKeys
// or its length exceeds maxFieldNameLen.
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) parseLogMessage(msg []byte, maxFieldNameLen int) error {
func (p *JSONParser) parseLogMessage(msg []byte, preserveKeys []string, maxFieldNameLen int) error {
p.reset()
msgStr := bytesutil.ToUnsafeString(msg)
@@ -83,32 +94,16 @@ func (p *JSONParser) parseLogMessage(msg []byte, maxFieldNameLen int) error {
if err != nil {
return err
}
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, o, maxFieldNameLen)
p.maxFieldNameLen = maxFieldNameLen
p.preserveKeys = preserveKeys
p.appendLogFields(o)
return nil
}
func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, o *fastjson.Object, maxFieldNameLen int) ([]Field, []byte, []byte) {
maxKeyLen := 0
o.Visit(func(k []byte, _ *fastjson.Value) {
if len(k) > maxKeyLen {
maxKeyLen = len(k)
}
})
prefixLen := len(prefixBuf)
if prefixLen+maxKeyLen > maxFieldNameLen {
// Too long composite key. Convert o to string representation
if len(prefixBuf) > 0 && prefixBuf[len(prefixBuf)-1] == '.' {
// Drop trailing dot if needed
prefixBuf = prefixBuf[:len(prefixBuf)-1]
}
dstBufLen := len(dstBuf)
dstBuf = o.MarshalTo(dstBuf)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, nil, value)
return dst, dstBuf, prefixBuf[:prefixLen]
func (p *JSONParser) appendLogFields(o *fastjson.Object) {
if p.isTooLongKey(o) || p.shouldPreserveKeyPrefix() {
p.appendPreservedLogField(o)
return
}
// Flatten JSON object o.
@@ -125,34 +120,71 @@ func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, o *fastjson.Object,
logger.Panicf("BUG: unexpected error: %s", err)
}
prefixBuf = append(prefixBuf, k...)
prefixBuf = append(prefixBuf, '.')
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, o, maxFieldNameLen)
prefixBuf = prefixBuf[:prefixLen]
prefixLen := len(p.prefixBuf)
p.prefixBuf = append(p.prefixBuf, k...)
p.prefixBuf = append(p.prefixBuf, '.')
p.appendLogFields(o)
p.prefixBuf = p.prefixBuf[:prefixLen]
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
// Convert JSON arrays, numbers, true and false values to their string representation
dstBufLen := len(dstBuf)
dstBuf = v.MarshalTo(dstBuf)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
bufLen := len(p.buf)
p.buf = v.MarshalTo(p.buf)
value := p.buf[bufLen:]
p.appendLogField(k, value)
case fastjson.TypeString:
// Decode JSON strings
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, v.GetStringBytes()...)
value := dstBuf[dstBufLen:]
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
bufLen := len(p.buf)
p.buf = append(p.buf, v.GetStringBytes()...)
value := p.buf[bufLen:]
p.appendLogField(k, value)
default:
logger.Panicf("BUG: unexpected JSON type: %s", t)
}
})
return dst, dstBuf, prefixBuf
}
func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, []byte) {
dstBufLen := len(dstBuf)
dstBuf = append(dstBuf, prefixBuf...)
dstBuf = append(dstBuf, k...)
name := dstBuf[dstBufLen:]
func (p *JSONParser) isTooLongKey(o *fastjson.Object) bool {
maxKeyLen := 0
o.Visit(func(k []byte, _ *fastjson.Value) {
if len(k) > maxKeyLen {
maxKeyLen = len(k)
}
})
return len(p.prefixBuf)+maxKeyLen > p.maxFieldNameLen
}
func (p *JSONParser) shouldPreserveKeyPrefix() bool {
if len(p.prefixBuf) == 0 {
return false
}
key := bytesutil.ToUnsafeString(p.prefixBuf)
// Drop trailing dot
key = key[:len(key)-1]
return slices.Contains(p.preserveKeys, key)
}
func (p *JSONParser) appendPreservedLogField(o *fastjson.Object) {
prefixLen := len(p.prefixBuf)
if prefixLen > 0 {
// Drop trailing dot
p.prefixBuf = p.prefixBuf[:prefixLen-1]
}
bufLen := len(p.buf)
p.buf = o.MarshalTo(p.buf)
value := p.buf[bufLen:]
p.appendLogField(nil, value)
p.prefixBuf = p.prefixBuf[:prefixLen]
}
func (p *JSONParser) appendLogField(k, value []byte) {
bufLen := len(p.buf)
p.buf = append(p.buf, p.prefixBuf...)
p.buf = append(p.buf, k...)
name := p.buf[bufLen:]
nameStr := bytesutil.ToUnsafeString(name)
if nameStr == "" {
@@ -160,9 +192,8 @@ func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, [
}
valueStr := bytesutil.ToUnsafeString(value)
dst = append(dst, Field{
p.Fields = append(p.Fields, Field{
Name: nameStr,
Value: valueStr,
})
return dst, dstBuf
}

View File

@@ -38,7 +38,7 @@ type LogRows struct {
streamTagsCanonicals []string
// streamFields contains names for stream fields
streamFields map[string]struct{}
streamFields []string
// ignoreFields is a filter for fields, which must be ignored during data ingestion
ignoreFields prefixfilter.Filter
@@ -251,10 +251,7 @@ func (lr *LogRows) ForEachRow(callback func(streamHash uint64, r *InsertRow)) {
func (lr *LogRows) Reset() {
lr.ResetKeepSettings()
sfs := lr.streamFields
for k := range sfs {
delete(sfs, k)
}
lr.streamFields = lr.streamFields[:0]
lr.ignoreFields.Reset()
lr.decolorizeFields.Reset()
@@ -299,7 +296,7 @@ func (lr *LogRows) ResetKeepSettings() {
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (lr *LogRows) NeedFlush() bool {
return len(lr.a.b) > (maxUncompressedBlockSize/8)*7
return len(lr.a.b) > (maxUncompressedBlockSize/8)*7 || len(lr.rows) > maxUncompressedBlockSize/100
}
// MustAddInsertRow adds r to lr.
@@ -307,7 +304,7 @@ func (lr *LogRows) MustAddInsertRow(r *InsertRow) {
// verify r.StreamTagsCanonical
st := GetStreamTags()
streamTagsCanonical := bytesutil.ToUnsafeBytes(r.StreamTagsCanonical)
tail, err := st.UnmarshalCanonical(streamTagsCanonical)
tail, err := st.UnmarshalCanonicalInplace(streamTagsCanonical)
if err != nil {
line := MarshalFieldsToJSON(nil, r.Fields)
logger.Warnf("cannot unmarshal streamTagsCanonical: %w; skipping the log entry; log entry: %s", err, line)
@@ -374,10 +371,10 @@ func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field, s
return
}
// Compose StreamTags from fields according to streamFieldsLen, lr.streamFields and lr.extraStreamFields
// Compose StreamTags from fields
st := GetStreamTags()
if streamFieldsLen >= 0 {
// lr.streamFields with fields[:streamFieldsLen]
// Compose StreamTags from fields[:streamFieldsLen] and ignore lr.streamFields with lr.extraStreamFields.
for _, f := range fields[:streamFieldsLen] {
fieldName := getCanonicalFieldName(f.Name)
if !lr.ignoreFields.MatchString(fieldName) {
@@ -385,9 +382,10 @@ func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field, s
}
}
} else {
// Compose StreamTags from lr.streamFields and lr.extraStreamFields.
for _, f := range fields {
fieldName := getCanonicalFieldName(f.Name)
if _, ok := lr.streamFields[fieldName]; ok {
if slices.Contains(lr.streamFields, fieldName) {
st.Add(fieldName, f.Value)
}
}
@@ -588,15 +586,10 @@ func GetLogRows(streamFields, ignoreFields, decolorizeFields []string, extraFiel
}
// Initialize streamFields
sfs := lr.streamFields
if sfs == nil {
sfs = make(map[string]struct{}, len(streamFields))
lr.streamFields = sfs
}
for _, f := range streamFields {
f = getCanonicalFieldName(f)
if !lr.ignoreFields.MatchString(f) {
sfs[f] = struct{}{}
lr.streamFields = append(lr.streamFields, f)
}
}
@@ -605,7 +598,7 @@ func GetLogRows(streamFields, ignoreFields, decolorizeFields []string, extraFiel
fieldName := getCanonicalFieldName(f.Name)
if slices.Contains(streamFields, fieldName) {
lr.extraStreamFields = append(lr.extraStreamFields, f)
delete(sfs, fieldName)
lr.streamFields = slices.DeleteFunc(lr.streamFields, func(s string) bool { return s == fieldName })
}
}

View File

@@ -77,9 +77,11 @@ func splitQueryToRemoteAndLocal(q *Query) (*Query, []pipe) {
pipesRemote, pipesLocal := getRemoteAndLocalPipes(q)
qRemote.pipes = pipesRemote
// Limit fields to select at the remote storage.
pf := getNeededColumns(pipesLocal)
qRemote.addFieldsFilters(pf)
if !qRemote.IsFixedOutputFieldsOrder() {
// Limit fields to select at the remote storage if the output fields aren't fixed.
pf := getNeededColumns(pipesLocal)
qRemote.addFieldsFilters(pf)
}
return qRemote, pipesLocal
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"maps"
"math"
"net/netip"
"slices"
"strconv"
"strings"
@@ -210,12 +211,7 @@ var mathStopCompoundTokens = []string{
func (lex *lexer) isPrevRawToken(tokens []string) bool {
prevTokenLower := strings.ToLower(lex.prevRawToken)
for _, token := range tokens {
if token == prevTokenLower {
return true
}
}
return false
return slices.Contains(tokens, prevTokenLower)
}
func (lex *lexer) checkPrevAdjacentToken(tokens ...string) error {
@@ -239,12 +235,7 @@ func (lex *lexer) isKeywordAny(keywords []string) bool {
return false
}
tokenLower := strings.ToLower(lex.token)
for _, kw := range keywords {
if kw == tokenLower {
return true
}
}
return false
return slices.Contains(keywords, tokenLower)
}
func (lex *lexer) context() string {
@@ -759,6 +750,22 @@ func (q *Query) GetFilterTimeRange() (int64, int64) {
return getFilterTimeRange(q.f)
}
// IsFixedOutputFieldsOrder returns true if the query results have fixed order of fields.
func (q *Query) IsFixedOutputFieldsOrder() bool {
pipes := q.pipes
for i := len(pipes) - 1; i >= 0; i-- {
p := pipes[i]
if p.isFixedOutputFieldsOrder() {
return true
}
if pu, ok := p.(*pipeUnion); ok && !pu.q.IsFixedOutputFieldsOrder() {
return false
}
}
return false
}
func getFilterTimeRange(f filter) (int64, int64) {
switch t := f.(type) {
case *filterAnd:
@@ -815,8 +822,8 @@ func addTimeFilter(f filter, start, end, offset int64) filter {
endStr := marshalTimestampRFC3339NanoPreciseString(nil, end)
ft := &filterTime{
minTimestamp: subNoOverflowInt64(start, offset),
maxTimestamp: subNoOverflowInt64(end, offset),
minTimestamp: SubInt64NoOverflow(start, offset),
maxTimestamp: SubInt64NoOverflow(end, offset),
stringRepr: fmt.Sprintf("[%s,%s]", startStr, endStr),
}
@@ -1037,13 +1044,13 @@ func mergeFiltersStreamInternal(fss []*filterStream) []*filterStream {
//
// The remaining fields are considered metrics.
func (q *Query) GetStatsLabels() ([]string, error) {
return q.GetStatsLabelsAddGroupingByTime(0)
return q.GetStatsLabelsAddGroupingByTime(0, 0)
}
// GetStatsLabelsAddGroupingByTime returns stats labels from q for /select/logsql/stats_query and /select/logsql/stats_query_range endpoints
//
// if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
func (q *Query) GetStatsLabelsAddGroupingByTime(step int64) ([]string, error) {
func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, error) {
idx := getLastPipeStatsIdx(q.pipes)
if idx < 0 {
return nil, fmt.Errorf("missing `| stats ...` pipe in the query [%s]", q)
@@ -1054,7 +1061,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step int64) ([]string, error) {
// do not modify or delete the `_time` field, since it is required for bucketing by step.
// For instant stats (step == 0), allow such pipes for broader query flexibility.
if step > 0 {
for i := 0; i < idx; i++ {
for i := range idx {
p := q.pipes[i]
if _, ok := p.(*pipeStats); ok {
// Skip `stats` pipe, since it is updated with the grouping by `_time` in the addByTimeFieldToStatsPipes() below.
@@ -1067,7 +1074,7 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step int64) ([]string, error) {
}
// add _time:step to by (...) list at stats pipes.
q.addByTimeFieldToStatsPipes(step)
q.addByTimeFieldToStatsPipes(step, offset)
// propagate the step into rate* funcs at stats pipes.
q.initStatsRateFuncs(step)
@@ -1282,16 +1289,16 @@ func updateFilterWithTimeOffset(f filter, timeOffset int64) filter {
switch ft := f.(type) {
case *filterTime:
ftCopy := *ft
ftCopy.minTimestamp = subNoOverflowInt64(ft.minTimestamp, timeOffset)
ftCopy.maxTimestamp = subNoOverflowInt64(ft.maxTimestamp, timeOffset)
ftCopy.minTimestamp = SubInt64NoOverflow(ft.minTimestamp, timeOffset)
ftCopy.maxTimestamp = SubInt64NoOverflow(ft.maxTimestamp, timeOffset)
return &ftCopy, nil
case *filterDayRange:
ftCopy := *ft
ftCopy.offset = subNoOverflowInt64(ft.offset, -timeOffset)
ftCopy.offset = SubInt64NoOverflow(ft.offset, -timeOffset)
return &ftCopy, nil
case *filterWeekRange:
ftCopy := *ft
ftCopy.offset = subNoOverflowInt64(ft.offset, -timeOffset)
ftCopy.offset = SubInt64NoOverflow(ft.offset, -timeOffset)
return &ftCopy, nil
default:
logger.Panicf("BUG: unexpected filter passed to copyFunc: %T; [%s]", f, f)
@@ -1467,7 +1474,7 @@ func optimizeOffsetLimitPipesInternal(pipes []pipe) []pipe {
// Replace '| offset X | limit Y' with '| limit X+Y | offset X'.
// This reduces the number of rows processed by remote storage.
// See: https://github.com/VictoriaMetrics/VictoriaLogs/issues/620#issuecomment-3276624504
for i := 0; i < len(pipes)-1; i++ {
for i := range len(pipes) - 1 {
po, ok := pipes[i].(*pipeOffset)
if !ok {
continue
@@ -1730,10 +1737,10 @@ func (q *Query) initStatsRateFuncs(step int64) {
}
}
func (q *Query) addByTimeFieldToStatsPipes(step int64) {
func (q *Query) addByTimeFieldToStatsPipes(step, offset int64) {
for _, p := range q.pipes {
if ps, ok := p.(*pipeStats); ok {
ps.addByTimeField(step)
ps.addByTimeField(step, offset)
}
}
}
@@ -2021,7 +2028,7 @@ func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
// Detect the filter.
switch {
case lex.isKeyword("{"):
return parseFilterStream(lex, fieldName)
return parseFilterStreamInternal(lex, fieldName)
case lex.isKeyword("*"):
return parseFilterStar(lex, fieldName)
case lex.isKeyword("("):
@@ -2058,6 +2065,8 @@ func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
return parseFilterIn(lex, fieldName)
case lex.isKeyword("ipv4_range"):
return parseFilterIPv4Range(lex, fieldName)
case lex.isKeyword("ipv6_range"):
return parseFilterIPv6Range(lex, fieldName)
case lex.isKeyword("le_field"):
return parseFilterLeField(lex, fieldName)
case lex.isKeyword("len_range"):
@@ -2087,19 +2096,7 @@ func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
case lex.isKeyword("_stream_id"):
return parseFilterStreamID(lex, fieldName)
case lex.isKeyword("_stream"):
if fieldName != "" {
return parseFilterPhrase(lex, fieldName)
}
lexState := lex.backupState()
lex.nextToken()
if !lex.isKeyword(":") {
lex.restoreState(lexState)
return parseFilterPhrase(lex, "")
}
lex.nextToken()
return parseFilterStream(lex, "_stream")
return parseFilterStream(lex, fieldName)
default:
return parseFilterPhrase(lex, fieldName)
}
@@ -2118,7 +2115,17 @@ func parseFilterPhrase(lex *lexer, fieldName string) (filter, error) {
if fieldName == "" && lex.isKeyword(":") {
// The phrase contains a field name for the filter
lex.nextToken()
return parseFilterGeneric(lex, phrase)
switch phrase {
case "_time":
return parseFilterTimeInternal(lex)
case "_stream_id":
return parseFilterStreamIDInternal(lex)
case "_stream":
return parseFilterStreamInternal(lex, "_stream")
default:
return parseFilterGeneric(lex, phrase)
}
}
// The phrase is either a search phrase or a search prefix.
@@ -2318,6 +2325,40 @@ func parseFilterIPv4Range(lex *lexer, fieldName string) (filter, error) {
})
}
func parseFilterIPv6Range(lex *lexer, fieldName string) (filter, error) {
return parseFuncArgs(lex, fieldName, func(funcName string, args []string) (filter, error) {
if len(args) == 1 {
minValue, maxValue, ok := tryParseIPv6CIDR(args[0])
if !ok {
return nil, fmt.Errorf("cannot parse IPv6 address or IPv6 CIDR %q at %s()", args[0], funcName)
}
fr := &filterIPv6Range{
fieldName: getCanonicalColumnName(fieldName),
minValue: minValue,
maxValue: maxValue,
}
return fr, nil
}
if len(args) != 2 {
return nil, fmt.Errorf("unexpected number of args for %s(); got %d; want 2", funcName, len(args))
}
minValue, ok := tryParseIPv6(args[0])
if !ok {
return nil, fmt.Errorf("cannot parse lower bound ip %q in %s()", args[0], funcName)
}
maxValue, ok := tryParseIPv6(args[1])
if !ok {
return nil, fmt.Errorf("cannot parse upper bound ip %q in %s()", args[1], funcName)
}
fr := &filterIPv6Range{
fieldName: getCanonicalColumnName(fieldName),
minValue: minValue,
maxValue: maxValue,
}
return fr, nil
})
}
func tryParseIPv4CIDR(s string) (uint32, uint32, bool) {
n := strings.IndexByte(s, '/')
if n < 0 {
@@ -2338,6 +2379,59 @@ func tryParseIPv4CIDR(s string) (uint32, uint32, bool) {
return minValue, maxValue, true
}
// tryParseIPv6 tries parsing s as ipv6 address.
//
// It also returns ipv4 wrapped into ipv6 if s contains ipv4 address.
func tryParseIPv6(s string) ([16]byte, bool) {
// IPv6 and IPv4 string length must be between 2 and 45 characters.
// This quickly rejects obviously invalid strings before doing more expensive checks.
if len(s) < 2 || len(s) > 45 {
return [16]byte{}, false
}
addr, err := netip.ParseAddr(s)
if err != nil {
return [16]byte{}, false
}
return addr.As16(), true
}
func tryParseIPv6CIDR(s string) ([16]byte, [16]byte, bool) {
var zero [16]byte
n := strings.IndexByte(s, '/')
if n < 0 {
ip, ok := tryParseIPv6(s)
return ip, ip, ok
}
ip, ok := tryParseIPv6(s[:n])
if !ok {
return zero, zero, false
}
maskBits, ok := tryParseUint64(s[n+1:])
if !ok || maskBits > 128 {
return zero, zero, false
}
minValue := ip
maxValue := ip
byteIdx := maskBits / 8
bitIdx := maskBits % 8
if bitIdx > 0 {
mask := byte(0xff) << (8 - bitIdx)
minValue[byteIdx] &= mask
maxValue[byteIdx] |= ^mask
byteIdx++
}
for byteIdx < uint64(len(minValue)) {
minValue[byteIdx] = 0
maxValue[byteIdx] = 0xff
byteIdx++
}
return minValue, maxValue, true
}
func parseFilterContainsAll(lex *lexer, fieldName string) (filter, error) {
fi := &filterContainsAll{
fieldName: getCanonicalColumnName(fieldName),
@@ -2961,7 +3055,7 @@ func startsWithYear(s string) bool {
if len(s) < 4 {
return false
}
for i := 0; i < 4; i++ {
for i := range 4 {
c := s[i]
if c < '0' || c > '9' {
return false
@@ -2989,6 +3083,10 @@ func parseFilterTimeGeneric(lex *lexer, fieldName string) (filter, error) {
}
lex.nextToken()
return parseFilterTimeInternal(lex)
}
func parseFilterTimeInternal(lex *lexer) (filter, error) {
switch {
case lex.isKeyword("day_range"):
return parseFilterDayRange(lex)
@@ -3207,7 +3305,7 @@ func parseFilterTimeRange(lex *lexer) (*filterTime, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse offset for _time filter []: %w", err)
}
ft.maxTimestamp = subNoOverflowInt64(ft.maxTimestamp, offset)
ft.maxTimestamp = SubInt64NoOverflow(ft.maxTimestamp, offset)
ft.stringRepr = offsetStr
return ft, nil
}
@@ -3224,8 +3322,8 @@ func parseFilterTimeRange(lex *lexer) (*filterTime, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse offset for _time filter [%s]: %w", ft, err)
}
ft.minTimestamp = subNoOverflowInt64(ft.minTimestamp, offset)
ft.maxTimestamp = subNoOverflowInt64(ft.maxTimestamp, offset)
ft.minTimestamp = SubInt64NoOverflow(ft.minTimestamp, offset)
ft.maxTimestamp = SubInt64NoOverflow(ft.maxTimestamp, offset)
ft.stringRepr += " " + offsetStr
return ft, nil
}
@@ -3355,7 +3453,7 @@ func parseFilterTimeGt(lex *lexer) (*filterTime, error) {
}
ft := &filterTime{
minTimestamp: math.MinInt64,
maxTimestamp: subNoOverflowInt64(lex.currentTimestamp, d),
maxTimestamp: SubInt64NoOverflow(lex.currentTimestamp, d),
stringRepr: prefix + s,
}
@@ -3404,7 +3502,7 @@ func parseFilterTimeLt(lex *lexer) (*filterTime, error) {
d--
}
ft := &filterTime{
minTimestamp: subNoOverflowInt64(lex.currentTimestamp, d),
minTimestamp: SubInt64NoOverflow(lex.currentTimestamp, d),
maxTimestamp: lex.currentTimestamp,
stringRepr: prefix + s,
@@ -3446,7 +3544,7 @@ func parseFilterTimeEq(lex *lexer) (*filterTime, error) {
d = -d
}
ft := &filterTime{
minTimestamp: subNoOverflowInt64(lex.currentTimestamp, d),
minTimestamp: SubInt64NoOverflow(lex.currentTimestamp, d),
maxTimestamp: lex.currentTimestamp,
stringRepr: prefix + s,
@@ -3543,7 +3641,7 @@ func isAllDigits(s string) bool {
if len(s) == 0 {
return false
}
for i := 0; i < len(s); i++ {
for i := range len(s) {
if s[i] < '0' || s[i] > '9' {
return false
}
@@ -3582,6 +3680,10 @@ func parseFilterStreamID(lex *lexer, fieldName string) (filter, error) {
}
lex.nextToken()
return parseFilterStreamIDInternal(lex)
}
func parseFilterStreamIDInternal(lex *lexer) (filter, error) {
if lex.isKeyword("in") {
return parseFilterStreamIDIn(lex)
}
@@ -3705,7 +3807,23 @@ func parseStreamID(lex *lexer) (streamID, error) {
return sid, nil
}
func parseFilterStream(lex *lexer, fieldName string) (*filterStream, error) {
func parseFilterStream(lex *lexer, fieldName string) (filter, error) {
if fieldName != "" {
return parseFilterPhrase(lex, fieldName)
}
lexState := lex.backupState()
lex.nextToken()
if !lex.isKeyword(":") {
lex.restoreState(lexState)
return parseFilterPhrase(lex, "")
}
lex.nextToken()
return parseFilterStreamInternal(lex, "_stream")
}
func parseFilterStreamInternal(lex *lexer, fieldName string) (*filterStream, error) {
if fieldName != "" && fieldName != "_stream" {
return nil, fmt.Errorf("stream filter cannot be applied to %q field; it can be applied only to _stream field", fieldName)
}
@@ -3744,6 +3862,13 @@ func parseDuration(lex *lexer) (int64, string, error) {
return d, s, nil
}
// TryParseDuration tries parsing duration at s and returns the duration in nanoseconds.
//
// If the duration cannot be parsed, false is returned.
func TryParseDuration(s string) (int64, bool) {
return tryParseDuration(s)
}
func quoteStringTokenIfNeeded(s string) string {
if !needQuoteStringToken(s) {
return s
@@ -3863,6 +3988,7 @@ var reservedKeywords = func() map[string]struct{} {
"i",
"in",
"ipv4_range",
"ipv6_range",
"le_field",
"len_range",
"lt_field",
@@ -3947,18 +4073,25 @@ func toFieldsFilters(pf *prefixfilter.Filter) string {
return qStr
}
func subNoOverflowInt64(a, b int64) int64 {
if a == math.MinInt64 || a == math.MaxInt64 {
// Assume that a is either +Inf or -Inf.
// Subtracting any number from Inf must result in Inf.
return a
}
// SubInt64NoOverflow calculates a-b and makes sure that the result doesn't overlow int64.
//
// It clamps the result to the int64 value range.
func SubInt64NoOverflow(a, b int64) int64 {
if b >= 0 {
if a == math.MaxInt64 {
// Subtracting any number from +Inf must result in +Inf.
return a
}
if a < math.MinInt64+b {
return math.MinInt64
}
return a - b
}
if a == math.MinInt64 {
// Adding any number to -Inf must result in -Inf.
return a
}
if a > math.MaxInt64+b {
return math.MaxInt64
}

View File

@@ -399,7 +399,7 @@ func indexGenericPlaceholderEnd(s string, start int, nums int, separator byte) i
if end < 0 {
return -1
}
for i := 0; i < nums-1; i++ {
for range nums - 1 {
if end >= len(s) || s[end] != separator {
return -1
}

View File

@@ -33,6 +33,9 @@ type pipe interface {
// The pipe can return last N results if it doesn't modify the _time field.
canReturnLastNResults() bool
// isFixedOutputFieldsOrder must return true if the pipe returns the output log fields in a fixed order.
isFixedOutputFieldsOrder() bool
// updateNeededFields must update pf with fields it needs and not needs at the input.
updateNeededFields(pf *prefixfilter.Filter)

View File

@@ -32,6 +32,10 @@ func (ps *pipeBlockStats) canReturnLastNResults() bool {
return false
}
func (ps *pipeBlockStats) isFixedOutputFieldsOrder() bool {
return true
}
func (ps *pipeBlockStats) hasFilterInWithQuery() bool {
return false
}

View File

@@ -42,6 +42,10 @@ func (pc *pipeBlocksCount) canReturnLastNResults() bool {
return false
}
func (pc *pipeBlocksCount) isFixedOutputFieldsOrder() bool {
return true
}
func (pc *pipeBlocksCount) updateNeededFields(pf *prefixfilter.Filter) {
pf.Reset()
}

View File

@@ -49,6 +49,10 @@ func (pc *pipeCollapseNums) canReturnLastNResults() bool {
return true
}
func (pc *pipeCollapseNums) isFixedOutputFieldsOrder() bool {
return false
}
func (pc *pipeCollapseNums) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUpdatePipe(pf, pc.field, pc.iff)
}
@@ -263,7 +267,7 @@ func canBeTreatedAsNum(s string) bool {
}
func hasHexChars(s string) bool {
for i := 0; i < len(s); i++ {
for i := range len(s) {
if isHexChar(s[i]) {
return true
}

View File

@@ -45,6 +45,10 @@ func (pc *pipeCopy) canReturnLastNResults() bool {
return !prefixfilter.MatchFilters(pc.dstFieldFilters, "_time")
}
func (pc *pipeCopy) isFixedOutputFieldsOrder() bool {
return false
}
func (pc *pipeCopy) updateNeededFields(f *prefixfilter.Filter) {
for i := len(pc.srcFieldFilters) - 1; i >= 0; i-- {
srcFieldFilter := pc.srcFieldFilters[i]

View File

@@ -35,6 +35,10 @@ func (pd *pipeDecolorize) canReturnLastNResults() bool {
return true
}
func (pd *pipeDecolorize) isFixedOutputFieldsOrder() bool {
return false
}
func (pd *pipeDecolorize) updateNeededFields(_ *prefixfilter.Filter) {
// nothing to do
}

View File

@@ -36,6 +36,10 @@ func (pd *pipeDelete) canReturnLastNResults() bool {
return !prefixfilter.MatchFilters(pd.fieldFilters, "_time")
}
func (pd *pipeDelete) isFixedOutputFieldsOrder() bool {
return false
}
func (pd *pipeDelete) updateNeededFields(pf *prefixfilter.Filter) {
pf.AddDenyFilters(pd.fieldFilters)
}

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"slices"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
@@ -31,6 +32,10 @@ func (pd *pipeDropEmptyFields) canReturnLastNResults() bool {
return true
}
func (pd *pipeDropEmptyFields) isFixedOutputFieldsOrder() bool {
return false
}
func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool {
return false
}
@@ -91,7 +96,7 @@ func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResu
shard.wctx.init(workerID, pdp.ppNext)
fields := shard.fields
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
fields = fields[:0]
for i, values := range columnValues {
v := values[rowIdx]
@@ -218,10 +223,8 @@ func parsePipeDropEmptyFields(lex *lexer) (pipe, error) {
func hasEmptyValues(columnValues [][]string) bool {
for _, values := range columnValues {
for _, v := range values {
if v == "" {
return true
}
if slices.Contains(values, "") {
return true
}
}
return false

View File

@@ -57,6 +57,10 @@ func (pe *pipeExtract) canReturnLastNResults() bool {
return true
}
func (pe *pipeExtract) isFixedOutputFieldsOrder() bool {
return false
}
func (pe *pipeExtract) hasFilterInWithQuery() bool {
return pe.iff.hasFilterInWithQuery()
}

View File

@@ -64,6 +64,10 @@ func (pe *pipeExtractRegexp) canReturnLastNResults() bool {
return true
}
func (pe *pipeExtractRegexp) isFixedOutputFieldsOrder() bool {
return false
}
func (pe *pipeExtractRegexp) hasFilterInWithQuery() bool {
return pe.iff.hasFilterInWithQuery()
}

View File

@@ -68,7 +68,8 @@ func (pf *pipeFacets) splitToRemoteAndLocal(timestamp int64) (pipe, []pipe) {
| filter field_values_count:<=%d
| delete field_values_count
| sort by (hits desc) limit %d partition by (field_name)
| sort by (field_name, hits desc)`, pf.maxValuesPerField, pf.limit)
| sort by (field_name, hits desc, field_value)
| fields field_name, field_value, hits`, pf.maxValuesPerField, pf.limit)
psLocal := mustParsePipes(psLocalStr, timestamp)
return &pRemote, psLocal
@@ -82,6 +83,10 @@ func (pf *pipeFacets) canReturnLastNResults() bool {
return false
}
func (pf *pipeFacets) isFixedOutputFieldsOrder() bool {
return true
}
func (pf *pipeFacets) updateNeededFields(f *prefixfilter.Filter) {
f.AddAllowFilter("*")
}
@@ -221,7 +226,7 @@ func (shard *pipeFacetsProcessorShard) updateFacetsForColumn(br *blockResult, c
shard.updateStateInt64(fhs, n)
}
default:
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
v := c.getValueAtRow(br, i)
shard.updateStateGeneric(fhs, v, 1)
}

View File

@@ -43,6 +43,10 @@ func (pf *pipeFieldNames) canReturnLastNResults() bool {
return false
}
func (pf *pipeFieldNames) isFixedOutputFieldsOrder() bool {
return true
}
func (pf *pipeFieldNames) updateNeededFields(f *prefixfilter.Filter) {
if pf.isFirstPipe {
f.Reset()

View File

@@ -38,6 +38,10 @@ func (pf *pipeFieldValues) canReturnLastNResults() bool {
return false
}
func (pf *pipeFieldValues) isFixedOutputFieldsOrder() bool {
return true
}
func (pf *pipeFieldValues) updateNeededFields(f *prefixfilter.Filter) {
f.Reset()
f.AddAllowFilter(pf.field)

View File

@@ -38,6 +38,10 @@ func (pf *pipeFieldValuesLocal) canReturnLastNResults() bool {
return false
}
func (pf *pipeFieldValuesLocal) isFixedOutputFieldsOrder() bool {
return true
}
func (pf *pipeFieldValuesLocal) updateNeededFields(f *prefixfilter.Filter) {
f.Reset()

View File

@@ -35,6 +35,10 @@ func (pf *pipeFields) canReturnLastNResults() bool {
return prefixfilter.MatchFilters(pf.fieldFilters, "_time")
}
func (pf *pipeFields) isFixedOutputFieldsOrder() bool {
return !hasWildcardFilters(pf.fieldFilters)
}
func (pf *pipeFields) updateNeededFields(f *prefixfilter.Filter) {
fOrig := f.Clone()
f.Reset()

View File

@@ -32,6 +32,10 @@ func (pf *pipeFilter) canReturnLastNResults() bool {
return true
}
func (pf *pipeFilter) isFixedOutputFieldsOrder() bool {
return false
}
func (pf *pipeFilter) updateNeededFields(f *prefixfilter.Filter) {
pf.f.updateNeededFields(f)
}

View File

@@ -29,6 +29,10 @@ func (pf *pipeFirst) canReturnLastNResults() bool {
return false
}
func (pf *pipeFirst) isFixedOutputFieldsOrder() bool {
return false
}
func (pf *pipeFirst) updateNeededFields(f *prefixfilter.Filter) {
pf.ps.updateNeededFields(f)
}

View File

@@ -62,6 +62,10 @@ func (pf *pipeFormat) canReturnLastNResults() bool {
return pf.resultField != "_time"
}
func (pf *pipeFormat) isFixedOutputFieldsOrder() bool {
return false
}
func (pf *pipeFormat) updateNeededFields(f *prefixfilter.Filter) {
if !f.MatchString(pf.resultField) {
return
@@ -140,7 +144,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
shard.rc.name = pf.resultField
resultColumn := br.getColumnByName(pf.resultField)
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
v := ""
if pf.iff == nil || bm.isSetBit(rowIdx) {
v = shard.formatRow(pf, br, rowIdx)
@@ -363,7 +367,7 @@ func appendURLDecode(dst []byte, s string) []byte {
func appendURLEncode(dst []byte, s string) []byte {
n := len(s)
for i := 0; i < n; i++ {
for i := range n {
c := s[i]
// See http://www.w3.org/TR/html5/forms.html#form-submission-algorithm
@@ -426,7 +430,7 @@ func appendHexUint64Decode(dst []byte, s string) []byte {
}
func appendHexEncode(dst []byte, s string) []byte {
for i := 0; i < len(s); i++ {
for i := range len(s) {
c := s[i]
hi := hexCharUpper(c >> 4)
lo := hexCharUpper(c & 15)

View File

@@ -32,6 +32,10 @@ func (pg *pipeGenerateSequence) canReturnLastNResults() bool {
return false
}
func (pg *pipeGenerateSequence) isFixedOutputFieldsOrder() bool {
return true
}
func (pg *pipeGenerateSequence) updateNeededFields(pf *prefixfilter.Filter) {
pf.Reset()
}
@@ -79,7 +83,7 @@ func (pgp *pipeGenerateSequenceProcessor) flush() error {
var br blockResult
var buf []byte
for i := uint64(0); i < pgp.pg.n; i++ {
for i := range pgp.pg.n {
if needStop(pgp.stopCh) {
return nil
}

View File

@@ -39,6 +39,10 @@ func (ph *pipeHash) canReturnLastNResults() bool {
return ph.resultField != "_time"
}
func (ph *pipeHash) isFixedOutputFieldsOrder() bool {
return false
}
func (ph *pipeHash) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(ph.resultField) {
pf.AddDenyFilter(ph.resultField)

View File

@@ -54,6 +54,10 @@ func (pj *pipeJoin) canReturnLastNResults() bool {
return false
}
func (pj *pipeJoin) isFixedOutputFieldsOrder() bool {
return false
}
func (pj *pipeJoin) hasFilterInWithQuery() bool {
// Do not check for in(...) filters at pj.q, since they are checked separately during pj.q execution.
return false
@@ -127,7 +131,7 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) {
}
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
clear(byValues)
for j := range cs {
if cIdx := byValuesIdxs[j]; cIdx >= 0 {

View File

@@ -38,6 +38,10 @@ func (pl *pipeJSONArrayLen) canReturnLastNResults() bool {
return pl.resultField != "_time"
}
func (pl *pipeJSONArrayLen) isFixedOutputFieldsOrder() bool {
return false
}
func (pl *pipeJSONArrayLen) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pl.resultField) {
pf.AddDenyFilter(pl.resultField)

View File

@@ -54,6 +54,10 @@ func (pl *pipeLast) canReturnLastNResults() bool {
return false
}
func (pl *pipeLast) isFixedOutputFieldsOrder() bool {
return false
}
func (pl *pipeLast) updateNeededFields(pf *prefixfilter.Filter) {
pl.ps.updateNeededFields(pf)
}

View File

@@ -38,6 +38,10 @@ func (pl *pipeLen) canReturnLastNResults() bool {
return pl.resultField != "_time"
}
func (pl *pipeLen) isFixedOutputFieldsOrder() bool {
return false
}
func (pl *pipeLen) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pl.resultField) {
pf.AddDenyFilter(pl.resultField)

View File

@@ -30,6 +30,10 @@ func (pl *pipeLimit) canReturnLastNResults() bool {
return false
}
func (pl *pipeLimit) isFixedOutputFieldsOrder() bool {
return false
}
func (pl *pipeLimit) updateNeededFields(_ *prefixfilter.Filter) {
// nothing to do
}

View File

@@ -86,6 +86,10 @@ func (pm *pipeMath) canReturnLastNResults() bool {
return true
}
func (pm *pipeMath) isFixedOutputFieldsOrder() bool {
return false
}
func (me *mathEntry) String() string {
s := me.expr.String()
if isMathBinaryOp(me.expr.op) {
@@ -311,7 +315,7 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult)
if me.isConst {
r := shard.rs[rIdx]
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
r[i] = me.constValue
}
return
@@ -908,7 +912,11 @@ func mathFuncMod(result []float64, args [][]float64) {
yInt := int64(y)
if float64(xInt) == x && float64(yInt) == y {
// Fast path - integer modulo
result[i] = float64(xInt % yInt)
if yInt == 0 {
result[i] = nan
} else {
result[i] = float64(xInt % yInt)
}
} else {
// Slow path - floating point modulo
result[i] = math.Mod(x, y)

View File

@@ -34,6 +34,10 @@ func (po *pipeOffset) canReturnLastNResults() bool {
return false
}
func (po *pipeOffset) isFixedOutputFieldsOrder() bool {
return false
}
func (po *pipeOffset) updateNeededFields(_ *prefixfilter.Filter) {
// nothing to do
}

View File

@@ -73,7 +73,7 @@ func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) {
buf := shard.buf[:0]
fields := shard.fields
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
fields = fields[:0]
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)

View File

@@ -40,6 +40,10 @@ func (pp *pipePackJSON) canReturnLastNResults() bool {
return pp.resultField != "_time"
}
func (pp *pipePackJSON) isFixedOutputFieldsOrder() bool {
return false
}
func (pp *pipePackJSON) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForPipePack(pf, pp.resultField, pp.fieldFilters)
}

View File

@@ -40,6 +40,10 @@ func (pp *pipePackLogfmt) canReturnLastNResults() bool {
return pp.resultField != "_time"
}
func (pp *pipePackLogfmt) isFixedOutputFieldsOrder() bool {
return false
}
func (pp *pipePackLogfmt) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForPipePack(pf, pp.resultField, pp.fieldFilters)
}

View File

@@ -31,6 +31,10 @@ func (ps *pipeQueryStats) canReturnLastNResults() bool {
return false
}
func (ps *pipeQueryStats) isFixedOutputFieldsOrder() bool {
return true
}
func (ps *pipeQueryStats) updateNeededFields(pf *prefixfilter.Filter) {
pf.AddAllowFilter("*")
}

View File

@@ -27,6 +27,10 @@ func (ps *pipeQueryStatsLocal) canReturnLastNResults() bool {
return false
}
func (ps *pipeQueryStatsLocal) isFixedOutputFieldsOrder() bool {
return true
}
func (ps *pipeQueryStatsLocal) updateNeededFields(_ *prefixfilter.Filter) {
// Nothing to do
}

View File

@@ -51,6 +51,10 @@ func (pr *pipeRename) canReturnLastNResults() bool {
return true
}
func (pr *pipeRename) isFixedOutputFieldsOrder() bool {
return false
}
func (pr *pipeRename) updateNeededFields(pf *prefixfilter.Filter) {
for i := len(pr.srcFieldFilters) - 1; i >= 0; i-- {
srcFieldFilter := pr.srcFieldFilters[i]

View File

@@ -51,6 +51,10 @@ func (pr *pipeReplace) canReturnLastNResults() bool {
return true
}
func (pr *pipeReplace) isFixedOutputFieldsOrder() bool {
return false
}
func (pr *pipeReplace) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUpdatePipe(pf, pr.field, pr.iff)
}

View File

@@ -58,6 +58,10 @@ func (pr *pipeReplaceRegexp) canReturnLastNResults() bool {
return true
}
func (pr *pipeReplaceRegexp) isFixedOutputFieldsOrder() bool {
return false
}
func (pr *pipeReplaceRegexp) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUpdatePipe(pf, pr.field, pr.iff)
}

View File

@@ -98,6 +98,10 @@ func (ps *pipeRunningStats) canReturnLastNResults() bool {
return false
}
func (ps *pipeRunningStats) isFixedOutputFieldsOrder() bool {
return false
}
func (ps *pipeRunningStats) updateNeededFields(pf *prefixfilter.Filter) {
pfOrig := pf.Clone()
@@ -173,7 +177,7 @@ func (shard *pipeRunningStatsProcessorShard) writeBlock(br *blockResult) {
}
shard.columnValues = columnValues
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
fields := make([]Field, len(cs))
shard.stateSizeBudget -= int(unsafe.Sizeof(fields[0])) * len(fields)

View File

@@ -37,6 +37,10 @@ func (ps *pipeSample) canReturnLastNResults() bool {
return false
}
func (ps *pipeSample) isFixedOutputFieldsOrder() bool {
return false
}
func (ps *pipeSample) updateNeededFields(_ *prefixfilter.Filter) {
// nothing to do
}

View File

@@ -41,6 +41,10 @@ func (ps *pipeSetStreamFields) canReturnLastNResults() bool {
return true
}
func (ps *pipeSetStreamFields) isFixedOutputFieldsOrder() bool {
return false
}
func (ps *pipeSetStreamFields) updateNeededFields(f *prefixfilter.Filter) {
if !f.MatchString("_stream") {
return
@@ -117,7 +121,7 @@ func (psp *pipeSetStreamFieldsProcessor) writeBlock(workerID uint, br *blockResu
streamColumn := br.getColumnByName("_stream")
streamIDColumn := br.getColumnByName("_stream_id")
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
stream := ""
streamID := ""
if ps.iff == nil || bm.isSetBit(rowIdx) {
@@ -159,6 +163,7 @@ func (shard *pipeSetStreamFieldsProcessorShard) setLogStreamFields(ps *pipeSetSt
bLen := len(shard.a.b)
sort.Sort(st)
shard.a.b = st.marshalString(shard.a.b)
PutStreamTags(st)
return bytesutil.ToUnsafeString(shard.a.b[bLen:])

View File

@@ -93,6 +93,10 @@ func (ps *pipeSort) canReturnLastNResults() bool {
return false
}
func (ps *pipeSort) isFixedOutputFieldsOrder() bool {
return false
}
func (ps *pipeSort) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchNothing() {
// There is no need in fetching any fields, since all of them are ignored by the caller.
@@ -268,7 +272,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0]))
bb := bbPool.Get()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
// Marshal all the columns per each row into a single string
// and sort rows by the resulting string.
bb.B = bb.B[:0]
@@ -366,7 +370,7 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
blockIdx := len(shard.blocks) - 1
rowRefs := shard.rowRefs
rowRefsLen := len(rowRefs)
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
rowRefs = append(rowRefs, sortRowRef{
blockIdx: blockIdx,
rowIdx: i,
@@ -464,10 +468,7 @@ func (psp *pipeSortProcessor) flush() error {
var wg sync.WaitGroup
for _, shard := range shards {
wg.Add(1)
go func(shard *pipeSortProcessorShard) {
defer wg.Done()
wg.Go(func() {
// TODO: interrupt long sorting when psp.stopCh is closed.
if sort.IsSorted(shard) {
@@ -476,7 +477,7 @@ func (psp *pipeSortProcessor) flush() error {
return
}
sort.Sort(shard)
}(shard)
})
}
wg.Wait()

View File

@@ -199,7 +199,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
byColumns := slicesutil.SetLength(shard.byColumns, 1)
byColumnsIsTime := slicesutil.SetLength(shard.byColumnsIsTime, 1)
bb := bbPool.Get()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
bb.B = bb.B[:0]
for i, values := range byColumnValues {
v := values[rowIdx]
@@ -254,7 +254,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
if slices.Contains(byColumnsIsTime, true) {
timestamps = br.getTimestamps()
}
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
for i, values := range byColumnValues {
v := ""
if !byColumnsIsTime[i] {
@@ -395,11 +395,9 @@ func (ptp *pipeTopkProcessor) flush() error {
var wg sync.WaitGroup
for _, shard := range shards {
wg.Add(1)
go func(shard *pipeTopkProcessorShard) {
defer wg.Done()
wg.Go(func() {
shard.sortRows(ptp.stopCh)
}(shard)
})
}
wg.Wait()

View File

@@ -47,6 +47,10 @@ func (ps *pipeSplit) canReturnLastNResults() bool {
return ps.dstField != "_time"
}
func (ps *pipeSplit) isFixedOutputFieldsOrder() bool {
return false
}
func (ps *pipeSplit) hasFilterInWithQuery() bool {
return false
}

View File

@@ -202,6 +202,10 @@ func (ps *pipeStats) canReturnLastNResults() bool {
return false
}
func (ps *pipeStats) isFixedOutputFieldsOrder() bool {
return true
}
func (ps *pipeStats) updateNeededFields(pf *prefixfilter.Filter) {
if ps.mode.needImportState() {
ps.updateNeededFieldsLocal(pf)
@@ -269,33 +273,30 @@ func (ps *pipeStats) visitSubqueries(visitFunc func(q *Query)) {
}
}
func (ps *pipeStats) addByTimeField(step int64) {
func (ps *pipeStats) addByTimeField(step, offset int64) {
if step <= 0 {
return
}
// add step to byFields
stepStr := fmt.Sprintf("%d", step)
bf := &byStatsField{
name: "_time",
bucketSizeStr: fmt.Sprintf("%d", step),
bucketSize: float64(step),
}
if offset != 0 {
bf.bucketOffsetStr = fmt.Sprintf("%d", offset)
bf.bucketOffset = float64(offset)
}
dstFields := make([]*byStatsField, 0, len(ps.byFields)+1)
hasByTime := false
dstFields = append(dstFields, bf)
for _, f := range ps.byFields {
if f.name == "_time" {
f = &byStatsField{
name: "_time",
bucketSizeStr: stepStr,
bucketSize: float64(step),
}
hasByTime = true
if f.name != "_time" {
dstFields = append(dstFields, f)
}
dstFields = append(dstFields, f)
}
if !hasByTime {
dstFields = append(dstFields, &byStatsField{
name: "_time",
bucketSizeStr: stepStr,
bucketSize: float64(step),
})
}
ps.byFields = dstFields
}
@@ -607,7 +608,7 @@ func (shard *pipeStatsProcessorShard) writeBlockDefault(br *blockResult) {
// The slowest path - group by multiple columns with different values across rows.
var psg *pipeStatsGroup
keyBuf := shard.keyBuf[:0]
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
// Verify whether the key for 'by (...)' fields equals the previous key
sameValue := i > 0
for _, values := range columnValues {
@@ -663,7 +664,7 @@ func (shard *pipeStatsProcessorShard) writeBlockLocal(br *blockResult) {
return
}
if len(byFields) == 1 {
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
v := byFieldValues[0][rowIdx]
psg := shard.getPipeStatsGroupGeneric(v)
stateSize, err := psg.importStateFromRow(columnValues, rowIdx, stopCh)
@@ -681,7 +682,7 @@ func (shard *pipeStatsProcessorShard) writeBlockLocal(br *blockResult) {
}
keyBuf := shard.keyBuf
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
keyBuf = keyBuf[:0]
for _, values := range byFieldValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[rowIdx]))
@@ -724,7 +725,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
}
var psg *pipeStatsGroup
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
if i <= 0 || values[i-1] != values[i] {
psg = shard.getPipeStatsGroupGeneric(values[i])
}
@@ -795,7 +796,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
values := c.getValues(br)
var psg *pipeStatsGroup
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
if i <= 0 || values[i-1] != values[i] {
psg = shard.getPipeStatsGroupGeneric(values[i])
}
@@ -1039,15 +1040,12 @@ func (psp *pipeStatsProcessor) flush() error {
// Write the calculated stats in parallel to the next pipe.
var wg sync.WaitGroup
for i := range psms {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
psw := newPipeStatsWriter(psp, workerID)
for workerID := range psms {
wg.Go(func() {
psw := newPipeStatsWriter(psp, uint(workerID))
psw.writeShardData(psms[workerID])
psw.flush()
}(uint(i))
})
}
wg.Wait()
@@ -1207,13 +1205,10 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() []*pipeStatsGroupMap {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var a chunkedAllocator
shard.moveGroupMapToShards(&a)
}()
})
}
wg.Wait()
if needStop(psp.stopCh) {
@@ -1222,11 +1217,8 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() []*pipeStatsGroupMap {
psms := shards[0].groupMapShards
shards = shards[1:]
for i := range psms {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
for cpuIdx := range psms {
wg.Go(func() {
var a chunkedAllocator
psm := &psms[cpuIdx].pipeStatsGroupMap
for _, shard := range shards {
@@ -1234,7 +1226,7 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() []*pipeStatsGroupMap {
psm.mergeState(&a, src, psp.stopCh)
src.reset()
}
}(i)
})
}
wg.Wait()
if needStop(psp.stopCh) {

View File

@@ -4,6 +4,7 @@ import (
"container/heap"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
@@ -69,6 +70,10 @@ func (pc *pipeStreamContext) canReturnLastNResults() bool {
return false
}
func (ps *pipeStreamContext) isFixedOutputFieldsOrder() bool {
return false
}
func (pc *pipeStreamContext) withRunQuery(qctx *QueryContext, runQuery runQueryFunc, fieldsFilter *prefixfilter.Filter) pipe {
pcNew := *pc
pcNew.qctx = qctx
@@ -136,9 +141,7 @@ func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRow
for i := range neededRows {
neededTimestamps[i] = neededRows[i].timestamp
}
sort.Slice(neededTimestamps, func(i, j int) bool {
return neededTimestamps[i] < neededTimestamps[j]
})
slices.Sort(neededTimestamps)
trs, stateSize, err := pcp.getTimeRangesForStreamRowss(streamID, neededTimestamps, stateSizeBudget)
if err != nil {

View File

@@ -39,6 +39,10 @@ func (pa *pipeTimeAdd) canReturnLastNResults() bool {
return true
}
func (pa *pipeTimeAdd) isFixedOutputFieldsOrder() bool {
return false
}
func (pa *pipeTimeAdd) updateNeededFields(_ *prefixfilter.Filter) {
// do nothing
}
@@ -85,11 +89,11 @@ func (pap *pipeTimeAddProcessor) writeBlock(workerID uint, br *blockResult) {
shard.rc.name = pa.field
c := br.getColumnByName(pa.field)
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
v := c.getValueAtRow(br, rowIdx)
ts, ok := TryParseTimestampRFC3339Nano(v)
if ok {
ts = subNoOverflowInt64(ts, pa.offset)
ts = SubInt64NoOverflow(ts, pa.offset)
bufLen := len(shard.buf)
shard.buf = marshalTimestampRFC3339NanoString(shard.buf, ts)
v = bytesutil.ToUnsafeString(shard.buf[bufLen:])

View File

@@ -65,6 +65,10 @@ func (pt *pipeTop) splitToRemoteAndLocal(timestamp int64) (pipe, []pipe) {
if pt.rankFieldName != "" {
pLocalStr += rankFieldNameString(pt.rankFieldName)
}
pLocalStr += fmt.Sprintf(` | fields %s, %s`, fieldsQuoted, hitsQuoted)
if pt.rankFieldName != "" {
pLocalStr += ", " + quoteTokenIfNeeded(pt.rankFieldName)
}
psLocal := mustParsePipes(pLocalStr, timestamp)
@@ -82,6 +86,10 @@ func (pt *pipeTop) canReturnLastNResults() bool {
return false
}
func (pt *pipeTop) isFixedOutputFieldsOrder() bool {
return true
}
func (pt *pipeTop) updateNeededFields(pf *prefixfilter.Filter) {
pf.Reset()
pf.AddAllowFilters(pt.byFields)

View File

@@ -41,6 +41,10 @@ func (pu *pipeUnion) canReturnLastNResults() bool {
return false
}
func (pu *pipeUnion) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnion) hasFilterInWithQuery() bool {
// The pu.q query with possible in(...) filters is processed independently at pu.flush(), so return false here.
return false

View File

@@ -60,6 +60,10 @@ func (pu *pipeUniq) canReturnLastNResults() bool {
return false
}
func (pu *pipeUniq) isFixedOutputFieldsOrder() bool {
return true
}
func (pu *pipeUniq) updateNeededFields(pf *prefixfilter.Filter) {
pf.Reset()
pf.AddAllowFilters(pu.byFields)
@@ -153,7 +157,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
shard.columnValues = columnValues
keyBuf := shard.keyBuf
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
seenValue := true
for _, values := range columnValues {
if needHits || i == 0 || values[i-1] != values[i] {
@@ -310,12 +314,10 @@ func (pup *pipeUniqProcessor) flush() error {
// Write the calculated stats in parallel to the next pipe.
var wg sync.WaitGroup
for i := range hms {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
pup.writeShardData(workerID, hms[workerID], resetHits)
}(uint(i))
for workerID := range hms {
wg.Go(func() {
pup.writeShardData(uint(workerID), hms[workerID], resetHits)
})
}
wg.Wait()

View File

@@ -39,6 +39,10 @@ func (pu *pipeUniqLocal) canReturnLastNResults() bool {
return false
}
func (pu *pipeUniqLocal) isFixedOutputFieldsOrder() bool {
return true
}
func (pu *pipeUniqLocal) updateNeededFields(pf *prefixfilter.Filter) {
pf.Reset()
@@ -93,7 +97,7 @@ func (pup *pipeUniqLocalProcessor) writeBlock(workerID uint, br *blockResult) {
hits := cHits.getValues(br)
var buf []byte
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
buf = buf[:0]
for _, columnValues := range columnValuess {
v := columnValues[rowIdx]
@@ -150,7 +154,7 @@ func (pup *pipeUniqLocalProcessor) flush() error {
rowValues := make([]string, len(pu.byFields)+1)
for i := range result {
src := bytesutil.ToUnsafeBytes(result[i].Value)
for i := 0; i < len(rowValues)-1; i++ {
for i := range len(rowValues) - 1 {
v, n := encoding.UnmarshalBytes(src)
if n <= 0 {
logger.Panicf("BUG: cannot unmarshal field value")

View File

@@ -142,7 +142,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
v := c.valuesEncoded[0]
shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v)
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
if pup.iff == nil || bm.isSetBit(rowIdx) {
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
} else {

View File

@@ -19,6 +19,9 @@ type pipeUnpackJSON struct {
// fieldFilters is a list of field filters to extract from json.
fieldFilters []string
// preserveKeys is a list of JSON keys for preserving JSON values.
preserveKeys []string
// resultPrefix is prefix to add to unpacked field names
resultPrefix string
@@ -40,6 +43,9 @@ func (pu *pipeUnpackJSON) String() string {
if !prefixfilter.MatchAll(pu.fieldFilters) {
s += " fields (" + fieldNamesString(pu.fieldFilters) + ")"
}
if len(pu.preserveKeys) > 0 {
s += " preserve_keys (" + fieldNamesString(pu.preserveKeys) + ")"
}
if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
}
@@ -66,6 +72,10 @@ func (pu *pipeUnpackJSON) canReturnLastNResults() bool {
return true
}
func (pu *pipeUnpackJSON) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnpackJSON) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.resultPrefix, pu.fieldFilters, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, pf)
}
@@ -95,7 +105,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), p
return
}
p := GetJSONParser()
err := p.parseLogMessage(bytesutil.ToUnsafeBytes(s), math.MaxInt)
err := p.parseLogMessage(bytesutil.ToUnsafeBytes(s), pu.preserveKeys, math.MaxInt)
if err != nil {
for _, filter := range pu.fieldFilters {
if !prefixfilter.IsWildcardFilter(filter) {
@@ -149,7 +159,7 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}
@@ -173,6 +183,16 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
fieldFilters = []string{"*"}
}
var preserveKeys []string
if lex.isKeyword("preserve_keys") {
lex.nextToken()
fn, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'preserve_keys': %w", err)
}
preserveKeys = fn
}
resultPrefix := ""
if lex.isKeyword("result_prefix") {
lex.nextToken()
@@ -197,6 +217,7 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
pu := &pipeUnpackJSON{
fromField: fromField,
fieldFilters: fieldFilters,
preserveKeys: preserveKeys,
resultPrefix: resultPrefix,
keepOriginalFields: keepOriginalFields,
skipEmptyResults: skipEmptyResults,

View File

@@ -63,6 +63,10 @@ func (pu *pipeUnpackLogfmt) canReturnLastNResults() bool {
return true
}
func (pu *pipeUnpackLogfmt) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnpackLogfmt) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.resultPrefix, pu.fieldFilters, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, pf)
}

View File

@@ -60,6 +60,10 @@ func (pu *pipeUnpackSyslog) canReturnLastNResults() bool {
return true
}
func (pu *pipeUnpackSyslog) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnpackSyslog) updateNeededFields(pf *prefixfilter.Filter) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.resultPrefix, nil, pu.keepOriginalFields, false, pu.iff, pf)
}

View File

@@ -49,6 +49,10 @@ func (pu *pipeUnpackWords) canReturnLastNResults() bool {
return pu.dstField != "_time"
}
func (pu *pipeUnpackWords) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnpackWords) hasFilterInWithQuery() bool {
return false
}

View File

@@ -45,6 +45,10 @@ func (pu *pipeUnroll) canReturnLastNResults() bool {
return true
}
func (pu *pipeUnroll) isFixedOutputFieldsOrder() bool {
return false
}
func (pu *pipeUnroll) hasFilterInWithQuery() bool {
return pu.iff.hasFilterInWithQuery()
}
@@ -126,7 +130,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
}
fields := shard.fields
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
if needStop(pup.stopCh) {
return
}
@@ -180,7 +184,7 @@ func (shard *pipeUnrollProcessorShard) writeUnrolledFields(fieldNames []string,
// write unrolled values to the next pipe.
fields := shard.fields
for unrollIdx := 0; unrollIdx < rows; unrollIdx++ {
for unrollIdx := range rows {
fields = fields[:0]
for i, values := range unrolledValues {
v := ""

View File

@@ -122,7 +122,7 @@ func (qs *QueryStats) CreateDataBlock(queryDurationNsecs int64) *DataBlock {
qs.addEntries(addUint64Entry, queryDurationNsecs)
return &DataBlock{
Columns: cs,
columns: cs,
}
}

View File

@@ -151,7 +151,7 @@ func unmarshalUint64Set(dst *map[uint64]struct{}, src []byte, stopCh <-chan stru
return src, 0, fmt.Errorf("cannot unmarshal %d uint64 values from %d bytes; need %d bytes", entriesLen, len(src), 8*entriesLen)
}
m := make(map[uint64]struct{}, entriesLen)
for i := uint64(0); i < entriesLen; i++ {
for range entriesLen {
u64 := encoding.UnmarshalUint64(src)
src = src[8:]
@@ -195,7 +195,7 @@ func unmarshalStringSet(a *chunkedAllocator, dst *map[string]struct{}, src []byt
stateSize := 0
m := make(map[string]struct{}, entriesLen)
for i := uint64(0); i < entriesLen; i++ {
for range entriesLen {
v, n := encoding.UnmarshalBytes(src)
if n <= 0 {
return src, 0, fmt.Errorf("cannot unmarshal string entry")
@@ -346,7 +346,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(sf statsFunc, br *bloc
sup.columnValues = columnValues
keyBuf := sup.keyBuf[:0]
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
seenKey := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
@@ -719,18 +719,15 @@ func (sup *statsCountUniqProcessor) mergeShardssParallel(stopCh <-chan struct{})
result := make([]statsCountUniqSet, len(shardss[0]))
var wg sync.WaitGroup
for i := range result {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
for cpuIdx := range result {
wg.Go(func() {
sus := &shardss[0][cpuIdx]
for _, perCPU := range shardss[1:] {
sus.mergeState(&perCPU[cpuIdx], stopCh)
perCPU[cpuIdx].reset()
}
result[cpuIdx] = *sus
}(i)
})
}
wg.Wait()

View File

@@ -173,7 +173,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRows(sf statsFunc, br *
sup.columnValues = columnValues
keyBuf := sup.keyBuf[:0]
for i := 0; i < br.rowsLen; i++ {
for i := range br.rowsLen {
seenKey := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
@@ -547,18 +547,15 @@ func (sup *statsCountUniqHashProcessor) mergeShardssParallel(stopCh <-chan struc
result := make([]statsCountUniqHashSet, len(shardss[0]))
var wg sync.WaitGroup
for i := range result {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
for cpuIdx := range result {
wg.Go(func() {
sus := &shardss[0][cpuIdx]
for _, perCPU := range shardss[1:] {
sus.mergeState(&perCPU[cpuIdx], stopCh)
perCPU[cpuIdx].reset()
}
result[cpuIdx] = *sus
}(i)
})
}
wg.Wait()

View File

@@ -47,7 +47,7 @@ func (shp *statsHistogramProcessor) updateStatsForAllRows(sf statsFunc, br *bloc
v := c.valuesEncoded[0]
f, ok := tryParseNumber(v)
if ok {
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for range br.rowsLen {
shp.h.Update(f)
}
}
@@ -202,7 +202,7 @@ func (shp *statsHistogramProcessor) importState(src []byte, _ <-chan struct{}) (
stateSizeIncrease := 0
m := make(map[string]uint64, bucketsLen)
for i := uint64(0); i < bucketsLen; i++ {
for range bucketsLen {
v, n := encoding.UnmarshalBytes(src)
if n <= 0 {
return 0, fmt.Errorf("cannot unmarshal vmrange")

View File

@@ -86,7 +86,7 @@ func (svp *statsJSONValuesProcessor) updateStatsForAllRows(sf statsFunc, br *blo
stateSizeIncrease := 0
mc := getMatchingColumns(br, sv.fieldFilters)
mc.sort()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
stateSizeIncrease += svp.updateStateForRow(br, mc.cs, rowIdx)
}
putMatchingColumns(mc)

View File

@@ -61,7 +61,7 @@ func (svp *statsJSONValuesSortedProcessor) updateStatsForAllRows(sf statsFunc, b
stateSizeIncrease := 0
mc := getMatchingColumns(br, sv.fieldFilters)
mc.sort()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
stateSizeIncrease += svp.updateStateForRow(br, mc.cs, rowIdx)
}
putMatchingColumns(mc)

View File

@@ -24,7 +24,7 @@ func (svp *statsJSONValuesTopkProcessor) updateStatsForAllRows(sf statsFunc, br
stateSizeIncrease := 0
mc := getMatchingColumns(br, sv.fieldFilters)
mc.sort()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
stateSizeIncrease += svp.updateStateForRow(sv, br, mc.cs, rowIdx)
}
putMatchingColumns(mc)

View File

@@ -78,7 +78,7 @@ func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *bloc
if c.isConst {
v := c.valuesEncoded[0]
for i := 0; i < br.rowsLen; i++ {
for range br.rowsLen {
stateSizeIncrease += h.update(v)
}
return stateSizeIncrease

View File

@@ -179,7 +179,7 @@ func (sup *statsUniqValuesProcessor) importState(src []byte, stopCh <-chan struc
m := make(map[string]struct{}, itemsLen)
stateSize := 0
for i := uint64(0); i < itemsLen; i++ {
for range itemsLen {
v, n := encoding.UnmarshalBytes(src)
if n <= 0 {
return 0, fmt.Errorf("cannot unmarshal item")
@@ -233,11 +233,8 @@ func mergeSetsParallel(ms []map[string]struct{}, concurrency uint, stopCh <-chan
var wg sync.WaitGroup
msShards := make([][]map[string]struct{}, shardsLen)
for i := range msShards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
for idx := range msShards {
wg.Go(func() {
perCPU := make([]map[string]struct{}, cpusCount)
for i := range perCPU {
perCPU[i] = make(map[string]struct{})
@@ -253,16 +250,13 @@ func mergeSetsParallel(ms []map[string]struct{}, concurrency uint, stopCh <-chan
}
msShards[idx] = perCPU
}(i)
})
}
wg.Wait()
perCPUItems := make([][]string, cpusCount)
for i := range perCPUItems {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
for cpuIdx := range perCPUItems {
wg.Go(func() {
m := msShards[0][cpuIdx]
for _, perCPU := range msShards[1:] {
for s := range perCPU[cpuIdx] {
@@ -278,7 +272,7 @@ func mergeSetsParallel(ms []map[string]struct{}, concurrency uint, stopCh <-chan
items := setToSortedSlice(m)
perCPUItems[cpuIdx] = items
}(i)
})
}
wg.Wait()

View File

@@ -61,7 +61,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum
stateSizeIncrease += len(v)
values := svp.values
for i := 0; i < br.rowsLen; i++ {
for range br.rowsLen {
values = append(values, v)
}
svp.values = values

View File

@@ -89,6 +89,12 @@ type StorageConfig struct {
// Log entries with timestamps older than now-MaxBackfillAge are ignored.
MaxBackfillAge time.Duration
// SnapshotsMaxAge is the maximum age for the created partition snapshots.
//
// Snapshots are automatically dropped after that duration.
// See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle
SnapshotsMaxAge time.Duration
// MinFreeDiskSpaceBytes is the minimum free disk space at storage path after which the storage stops accepting new data
// and enters read-only mode.
MinFreeDiskSpaceBytes int64
@@ -140,6 +146,11 @@ type Storage struct {
// maxBackfillAge is the maximum age of logs with historical timestamps to accept
maxBackfillAge time.Duration
// snapshotsMaxAge is the maximum age for the created partition snapshots.
//
// Older snapshots are automatically deleted. See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle
snapshotsMaxAge time.Duration
// minFreeDiskSpaceBytes is the minimum free disk space at path after which the storage stops accepting new data
minFreeDiskSpaceBytes uint64
@@ -295,48 +306,47 @@ func (s *Storage) PartitionList() []string {
return ptNames
}
// PartitionSnapshotCreate creates a snapshot for the partition with the given name
// PartitionSnapshotMustCreate creates snapshots for partitions with the given partitionPrefix
//
// The snaphsot name must have YYYYMMDD format.
// The partitionPrefix must match one of the following formats:
// - YYYYMMDD - matches partitions for the given day
// - YYYYMM - matches partitions for the given month
// - YYYY - matches partitions for the given year
// - an empty string - matches all the partitions
//
// The function returns path to the created snapshot on success.
func (s *Storage) PartitionSnapshotCreate(name string) (string, error) {
ptw := func() *partitionWrapper {
s.partitionsLock.Lock()
defer s.partitionsLock.Unlock()
// The function returns paths to created snapshots
func (s *Storage) PartitionSnapshotMustCreate(partitionPrefix string) []string {
ptws := s.getPartitions()
defer s.putPartitions(ptws)
for _, ptw := range s.partitions {
if ptw.pt.name == name {
ptw.incRef()
return ptw
}
var snapshotPaths []string
for _, ptw := range ptws {
if strings.HasPrefix(ptw.pt.name, partitionPrefix) {
snapshotPath := ptw.pt.mustCreateSnapshot()
snapshotPaths = append(snapshotPaths, snapshotPath)
}
return nil
}()
if ptw == nil {
return "", fmt.Errorf("cannot create snapshot from partition %q, because it is missing", name)
}
snapshotPath := ptw.pt.mustCreateSnapshot()
ptw.decRef()
return snapshotPath, nil
return snapshotPaths
}
// PartitionSnapshotList returns a list of paths to all the snapshots across active partitions.
func (s *Storage) PartitionSnapshotList() []string {
s.partitionsLock.Lock()
ptws := append([]*partitionWrapper{}, s.partitions...)
for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()
ptws := s.getPartitions()
defer s.putPartitions(ptws)
snapshotPaths := getSnapshotPaths(ptws)
sort.Strings(snapshotPaths)
return snapshotPaths
}
func getSnapshotPaths(ptws []*partitionWrapper) []string {
var snapshotPaths []string
for _, ptw := range ptws {
ptPath := ptw.pt.path
snapshotsPath := filepath.Join(ptPath, snapshotsDirname)
snapshotsPath := filepath.Join(ptw.pt.path, snapshotsDirname)
if !fs.IsPathExist(snapshotsPath) {
continue
}
@@ -349,17 +359,11 @@ func (s *Storage) PartitionSnapshotList() []string {
continue
}
path := filepath.Join(snapshotsPath, name)
snapshotPaths = append(snapshotPaths, path)
snapshotPath := filepath.Join(snapshotsPath, name)
snapshotPaths = append(snapshotPaths, snapshotPath)
}
}
for _, ptw := range ptws {
ptw.decRef()
}
sort.Strings(snapshotPaths)
return snapshotPaths
}
@@ -376,13 +380,12 @@ func (s *Storage) PartitionSnapshotDelete(snapshotPath string) error {
}
partitionPath := filepath.Dir(snapshotDir)
ptw := func() *partitionWrapper {
s.partitionsLock.Lock()
defer s.partitionsLock.Unlock()
ptws := s.getPartitions()
defer s.putPartitions(ptws)
for _, ptw := range s.partitions {
if partitionPath == ptw.pt.path {
ptw.incRef()
ptw := func() *partitionWrapper {
for _, ptw := range ptws {
if ptw.pt.path == partitionPath {
return ptw
}
}
@@ -392,11 +395,41 @@ func (s *Storage) PartitionSnapshotDelete(snapshotPath string) error {
if ptw == nil {
return fmt.Errorf("partition path %q cannot be found across active partitions", partitionPath)
}
defer ptw.decRef()
return ptw.pt.deleteSnapshot(snapshotName)
}
// MustDeleteStalePartitionSnapshots deletes snapshots older than maxAge.
//
// The list of paths to deleted snapshots is returned from this function.
func (s *Storage) MustDeleteStalePartitionSnapshots(maxAge time.Duration) []string {
var deletedSnapshotPaths []string
currentTime := time.Now()
ptws := s.getPartitions()
defer s.putPartitions(ptws)
snapshotPaths := getSnapshotPaths(ptws)
for _, snapshotPath := range snapshotPaths {
fi, err := os.Stat(snapshotPath)
if err != nil {
logger.Warnf("skipping snapshot at %s since cannot access it: %s", snapshotPath, err)
continue
}
creationTime := fi.ModTime()
if currentTime.Sub(creationTime) > maxAge {
logger.Infof("deleting snapshot at %s because it became older than maxAge=%s (snapshot creation time: %s)", snapshotPath, maxAge, creationTime)
fs.MustRemoveDir(snapshotPath)
deletedSnapshotPaths = append(deletedSnapshotPaths, snapshotPath)
logger.Infof("deleted snapshot at %s", snapshotPath)
}
}
return deletedSnapshotPaths
}
// DeleteRunTask starts deletion of logs according to the given filter f for the given tenantIDs.
//
// The taskID must contain an unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
@@ -583,20 +616,11 @@ func mustCreateStorage(path string) {
//
// MustClose must be called on the returned Storage when it is no longer needed.
func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
flushInterval := cfg.FlushInterval
if flushInterval < time.Second {
flushInterval = time.Second
}
flushInterval := max(cfg.FlushInterval, time.Second)
retention := cfg.Retention
if retention < 24*time.Hour {
retention = 24 * time.Hour
}
retention := max(cfg.Retention, 24*time.Hour)
futureRetention := cfg.FutureRetention
if futureRetention < 24*time.Hour {
futureRetention = 24 * time.Hour
}
futureRetention := max(cfg.FutureRetention, 24*time.Hour)
maxBackfillAge := cfg.MaxBackfillAge
if maxBackfillAge <= 0 || maxBackfillAge > retention {
@@ -631,6 +655,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
flushInterval: flushInterval,
futureRetention: futureRetention,
maxBackfillAge: maxBackfillAge,
snapshotsMaxAge: cfg.SnapshotsMaxAge,
minFreeDiskSpaceBytes: minFreeDiskSpaceBytes,
logIngestedRows: cfg.LogIngestedRows,
flockF: flockF,
@@ -654,7 +679,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
// when it opens many partitions.
var wg sync.WaitGroup
concurrencyLimiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for i, de := range des {
for idx, de := range des {
fname := de.Name()
partitionDir := filepath.Join(partitionsPath, fname)
@@ -664,14 +689,8 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
continue
}
wg.Add(1)
concurrencyLimiterCh <- struct{}{}
go func(idx int) {
defer func() {
<-concurrencyLimiterCh
wg.Done()
}()
wg.Go(func() {
day, err := getPartitionDayFromName(fname)
if err != nil {
logger.Panicf("FATAL: cannot parse partition filename %q at %q: %s", fname, partitionsPath, err)
@@ -680,7 +699,9 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
partitionPath := filepath.Join(partitionsPath, fname)
pt := mustOpenPartition(s, partitionPath)
ptws[idx] = newPartitionWrapper(pt, day)
}(i)
<-concurrencyLimiterCh
})
}
wg.Wait()
@@ -710,6 +731,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
s.runRetentionWatcher()
s.runMaxDiskSpaceUsageWatcher()
s.runDeleteTasksWatcher()
s.runSnapshotsMaxAgeWatcher()
return s
}
@@ -720,30 +742,22 @@ func sortPartitions(ptws []*partitionWrapper) {
}
func (s *Storage) runRetentionWatcher() {
s.wg.Add(1)
go func() {
s.watchRetention()
s.wg.Done()
}()
s.wg.Go(s.watchRetention)
}
func (s *Storage) runMaxDiskSpaceUsageWatcher() {
if s.maxDiskSpaceUsageBytes <= 0 && s.maxDiskUsagePercent <= 0 {
return // nothing to watch
}
s.wg.Add(1)
go func() {
s.watchMaxDiskSpaceUsage()
s.wg.Done()
}()
s.wg.Go(s.watchMaxDiskSpaceUsage)
}
func (s *Storage) runDeleteTasksWatcher() {
s.wg.Add(1)
go func() {
s.watchDeleteTasks()
s.wg.Done()
}()
s.wg.Go(s.watchDeleteTasks)
}
func (s *Storage) runSnapshotsMaxAgeWatcher() {
s.wg.Go(s.watchSnapshotsMaxAge)
}
func (s *Storage) watchRetention() {
@@ -874,6 +888,26 @@ func (s *Storage) watchMaxDiskSpaceUsage() {
}
}
func (s *Storage) watchSnapshotsMaxAge() {
if s.snapshotsMaxAge <= 0 {
return
}
d := timeutil.AddJitterToDuration(time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
}
s.MustDeleteStalePartitionSnapshots(s.snapshotsMaxAge)
}
}
func (s *Storage) watchDeleteTasks() {
d := timeutil.AddJitterToDuration(time.Second)
ticker := time.NewTicker(d)
@@ -1064,29 +1098,24 @@ func (s *Storage) MustClose() {
s.path = ""
}
// MustForceMerge force-merges parts in s partitions with names starting from the given partitionNamePrefix.
// MustForceMerge force-merges parts in s partitions with names starting from the given partitionPrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func (s *Storage) MustForceMerge(partitionNamePrefix string) {
var ptws []*partitionWrapper
s.partitionsLock.Lock()
for _, ptw := range s.partitions {
if strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
ptw.incRef()
ptws = append(ptws, ptw)
}
}
s.partitionsLock.Unlock()
func (s *Storage) MustForceMerge(partitionPrefix string) {
ptws := s.getPartitions()
defer s.putPartitions(ptws)
s.wg.Add(1)
defer s.wg.Done()
for _, ptw := range ptws {
if !strings.HasPrefix(ptw.pt.name, partitionPrefix) {
continue
}
logger.Infof("started force merge for partition %s", ptw.pt.name)
startTime := time.Now()
ptw.pt.mustForceMerge()
ptw.decRef()
logger.Infof("finished force merge for partition %s in %.3fs", ptw.pt.name, time.Since(startTime).Seconds())
}
}
@@ -1294,6 +1323,15 @@ func (s *Storage) IsReadOnly() bool {
//
// This function is for debugging and testing purposes only, since it is slow.
func (s *Storage) DebugFlush() {
ptws := s.getPartitions()
defer s.putPartitions(ptws)
for _, ptw := range ptws {
ptw.pt.debugFlush()
}
}
func (s *Storage) getPartitions() []*partitionWrapper {
s.partitionsLock.Lock()
ptws := append([]*partitionWrapper{}, s.partitions...)
for _, ptw := range ptws {
@@ -1301,8 +1339,11 @@ func (s *Storage) DebugFlush() {
}
s.partitionsLock.Unlock()
return ptws
}
func (s *Storage) putPartitions(ptws []*partitionWrapper) {
for _, ptw := range ptws {
ptw.pt.debugFlush()
ptw.decRef()
}
}

View File

@@ -378,7 +378,7 @@ func getJoinMapGeneric(qctx *QueryContext, runQuery runQueryFunc, byFields []str
byValues := make([]string, len(byFields))
var tmpBuf []byte
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
for rowIdx := range br.rowsLen {
fields := make([]Field, 0, len(cs))
clear(byValues)
for j := range cs {
@@ -651,10 +651,8 @@ func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]TenantI
// spin up workers
var wg sync.WaitGroup
workCh := make(chan *partition, workersCount)
for i := 0; i < workersCount; i++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for workerID := range workersCount {
wg.Go(func() {
for pt := range workCh {
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
@@ -663,7 +661,7 @@ func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]TenantI
tenantIDs := pt.idb.searchTenants()
tenantIDByWorker[workerID] = append(tenantIDByWorker[workerID], tenantIDs...)
}
}(uint(i))
})
}
// Select partitions according to the selected time range
@@ -1093,25 +1091,44 @@ type BlockColumn struct {
// DataBlock is a single block of data
type DataBlock struct {
// Columns represents columns in the data block.
Columns []BlockColumn
// columns represents columns in the data block.
columns []BlockColumn
}
// Reset resets db
func (db *DataBlock) Reset() {
clear(db.Columns)
db.Columns = db.Columns[:0]
clear(db.columns)
db.columns = db.columns[:0]
}
// RowsCount returns the number of rows in db.
func (db *DataBlock) RowsCount() int {
columns := db.Columns
columns := db.columns
if len(columns) > 0 {
return len(columns[0].Values)
}
return 0
}
// GetColumns returns columns from db.
//
// If needSortSolumns is set, then the returned columns are sorted in alphabetical order
func (db *DataBlock) GetColumns(needSortColumns bool) []BlockColumn {
if needSortColumns {
sort.Slice(db.columns, func(i, j int) bool {
return db.columns[i].Name < db.columns[j].Name
})
}
return db.columns
}
// SetColumns sets columns to db.
//
// db owns columns after returning from the call.
func (db *DataBlock) SetColumns(columns []BlockColumn) {
db.columns = columns
}
// GetTimestamps appends _time column values from db to dst and returns the result.
//
// It returns false if db doesn't have _time column or this column has invalid timestamps.
@@ -1127,7 +1144,7 @@ func (db *DataBlock) GetTimestamps(dst []int64) ([]int64, bool) {
//
// nil is returned if there is no such column.
func (db *DataBlock) GetColumnByName(name string) *BlockColumn {
columns := db.Columns
columns := db.columns
for i := range columns {
c := &columns[i]
if c.Name == name {
@@ -1142,7 +1159,7 @@ func (db *DataBlock) Marshal(dst []byte) []byte {
rowsCount := db.RowsCount()
dst = encoding.MarshalVarUint64(dst, uint64(rowsCount))
columns := db.Columns
columns := db.columns
dst = encoding.MarshalVarUint64(dst, uint64(len(columns)))
for i := range columns {
c := &columns[i]
@@ -1175,7 +1192,7 @@ const (
// UnmarshalInplace unmarshals db from src and returns the tail
//
// db is valid until src is changed.
// valuesBuf holds all the values in the unmarshaled db.Columns.
// valuesBuf holds all the values in the unmarshaled db.columns.
func (db *DataBlock) UnmarshalInplace(src []byte, valuesBuf []string) ([]byte, []string, error) {
srcOrig := src
@@ -1203,7 +1220,7 @@ func (db *DataBlock) UnmarshalInplace(src []byte, valuesBuf []string) ([]byte, [
src = src[n:]
// Unmarshal columns
columns := slicesutil.SetLength(db.Columns, int(columnsLen))
columns := slicesutil.SetLength(db.columns, int(columnsLen))
for i := range columns {
name, n := encoding.UnmarshalBytes(src)
if n <= 0 {
@@ -1229,11 +1246,11 @@ func (db *DataBlock) UnmarshalInplace(src []byte, valuesBuf []string) ([]byte, [
src = src[n:]
value := bytesutil.ToUnsafeString(v)
for j := 0; j < rowsCount; j++ {
for j := range rowsCount {
valuesBufA[j] = value
}
case valuesTypeRegular:
for j := 0; j < rowsCount; j++ {
for j := range rowsCount {
v, n := encoding.UnmarshalBytes(src)
if n <= 0 {
return srcOrig, valuesBuf, fmt.Errorf("cannot unmarshal value #%d out of %d values for column #%d with name %q from len(src)=%d",
@@ -1252,7 +1269,7 @@ func (db *DataBlock) UnmarshalInplace(src []byte, valuesBuf []string) ([]byte, [
Values: valuesBufA,
}
}
db.Columns = columns
db.columns = columns
return src, valuesBuf, nil
}
@@ -1263,7 +1280,7 @@ func (db *DataBlock) initFromBlockResult(br *blockResult) {
cs := br.getColumns()
for _, c := range cs {
values := c.getValues(br)
db.Columns = append(db.Columns, BlockColumn{
db.columns = append(db.columns, BlockColumn{
Name: c.name,
Values: values,
})
@@ -1277,11 +1294,8 @@ func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs
// spin up workers
var wg sync.WaitGroup
workCh := make(chan *blockSearchWorkBatch, workersCount)
for workerID := 0; workerID < workersCount; workerID++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for workerID := range workersCount {
wg.Go(func() {
qsLocal := &QueryStats{}
bs := getBlockSearch()
bm := getBitmap(0)
@@ -1303,7 +1317,7 @@ func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs
if sso.timeOffset != 0 {
bs.subTimeOffsetToTimestamps(sso.timeOffset)
}
writeBlock(workerID, &bs.br)
writeBlock(uint(workerID), &bs.br)
}
bsw.reset()
@@ -1319,7 +1333,7 @@ func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs
putBitmap(bm)
qs.UpdateAtomic(qsLocal)
}(uint(workerID))
})
}
// Select partitions according to the selected time range
@@ -1329,19 +1343,17 @@ func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs
// Schedule concurrent search across matching partitions.
psfs := make([]partitionSearchFinalizer, len(ptws))
var wgSearchers sync.WaitGroup
for i, ptw := range ptws {
for idx, ptw := range ptws {
partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1)
go func(idx int, pt *partition) {
wgSearchers.Go(func() {
qsLocal := &QueryStats{}
psfs[idx] = pt.search(sso, qsLocal, workCh, stopCh)
psfs[idx] = ptw.pt.search(sso, qsLocal, workCh, stopCh)
qs.UpdateAtomic(qsLocal)
wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh
}(i, ptw.pt)
})
}
wgSearchers.Wait()
@@ -1550,11 +1562,8 @@ func (p *part) hasMatchingRows(pso *partitionSearchOptions, stopCh <-chan struct
var wg sync.WaitGroup
workersCount := cgroup.AvailableCPUs()
workCh := make(chan *blockSearchWorkBatch, workersCount)
for workerID := 0; workerID < workersCount; workerID++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for range workersCount {
wg.Go(func() {
qsLocal := &QueryStats{}
bs := getBlockSearch()
bm := getBitmap(0)
@@ -1580,7 +1589,7 @@ func (p *part) hasMatchingRows(pso *partitionSearchOptions, stopCh <-chan struct
putBlockSearch(bs)
putBitmap(bm)
}(uint(workerID))
})
}
// execute the search

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -31,20 +32,17 @@ var streamTagsPool sync.Pool
// StreamTags contains stream tags.
type StreamTags struct {
// buf holds all the data backed by tags
buf []byte
// tags contains added tags.
tags []streamTag
}
// Reset resets st for reuse
func (st *StreamTags) Reset() {
st.buf = st.buf[:0]
// Clear references to strings, which can belong to external buffers.
// This guarantees that these buffers can be cleared by GC.
clear(st.tags)
tags := st.tags
clear(tags)
st.tags = tags[:0]
st.tags = st.tags[:0]
}
// String returns string representation of st.
@@ -72,6 +70,8 @@ func (st *StreamTags) marshalString(dst []byte) []byte {
}
// Add adds (name:value) tag to st.
//
// name and value mustn't be changed while st is in use.
func (st *StreamTags) Add(name, value string) {
if len(value) == 0 {
return
@@ -81,21 +81,9 @@ func (st *StreamTags) Add(name, value string) {
name = "_msg"
}
buf := st.buf
bufLen := len(buf)
buf = append(buf, name...)
bName := buf[bufLen:]
bufLen = len(buf)
buf = append(buf, value...)
bValue := buf[bufLen:]
st.buf = buf
st.tags = append(st.tags, streamTag{
Name: bName,
Value: bValue,
Name: name,
Value: value,
})
}
@@ -107,14 +95,16 @@ func (st *StreamTags) MarshalCanonical(dst []byte) []byte {
dst = encoding.MarshalVarUint64(dst, uint64(len(tags)))
for i := range tags {
tag := &tags[i]
dst = encoding.MarshalBytes(dst, tag.Name)
dst = encoding.MarshalBytes(dst, tag.Value)
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(tag.Name))
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(tag.Value))
}
return dst
}
// UnmarshalCanonical unmarshals st from src marshaled with MarshalCanonical.
func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
// UnmarshalCanonicalInplace unmarshals st from src marshaled with MarshalCanonical.
//
// st points to to src, so src mustn't be changed while st is in use.
func (st *StreamTags) UnmarshalCanonicalInplace(src []byte) ([]byte, error) {
st.Reset()
srcOrig := src
@@ -124,7 +114,7 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
return srcOrig, fmt.Errorf("cannot unmarshal tags len")
}
src = src[nSize:]
for i := uint64(0); i < n; i++ {
for range n {
name, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal tag name")
@@ -151,16 +141,16 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
func getStreamTagsString(streamTagsCanonical string) string {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
mustUnmarshalStreamTagsInplace(st, streamTagsCanonical)
s := st.String()
PutStreamTags(st)
return s
}
func mustUnmarshalStreamTags(dst *StreamTags, streamTagsCanonical string) {
func mustUnmarshalStreamTagsInplace(dst *StreamTags, streamTagsCanonical string) {
src := bytesutil.ToUnsafeBytes(streamTagsCanonical)
tail, err := dst.UnmarshalCanonical(src)
tail, err := dst.UnmarshalCanonicalInplace(src)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal StreamTags: %s", err)
}
@@ -188,14 +178,14 @@ func (st *StreamTags) Swap(i, j int) {
// streamTag represents a (name:value) tag for stream.
type streamTag struct {
Name []byte
Value []byte
Name string
Value string
}
func (tag *streamTag) marshalString(dst []byte) []byte {
dst = append(dst, tag.Name...)
dst = append(dst, '=')
dst = strconv.AppendQuote(dst, bytesutil.ToUnsafeString(tag.Value))
dst = strconv.AppendQuote(dst, tag.Value)
return dst
}
@@ -222,17 +212,24 @@ func (tag *streamTag) indexdbMarshal(dst []byte) []byte {
return dst
}
func (tag *streamTag) indexdbUnmarshal(src []byte) ([]byte, error) {
func (tag *streamTag) indexdbUnmarshal(src, buf []byte) ([]byte, []byte, error) {
var err error
src, tag.Name, err = unmarshalTagValue(tag.Name[:0], src)
bufLen := len(buf)
src, buf, err = unmarshalTagValue(buf, src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal key: %w", err)
return src, buf, fmt.Errorf("cannot unmarshal key: %w", err)
}
src, tag.Value, err = unmarshalTagValue(tag.Value[:0], src)
tag.Name = bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
src, buf, err = unmarshalTagValue(buf, src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal value: %w", err)
return src, buf, fmt.Errorf("cannot unmarshal value: %w", err)
}
return src, nil
tag.Value = bytesutil.ToUnsafeString(buf[bufLen:])
return src, buf, nil
}
const (
@@ -241,10 +238,10 @@ const (
kvSeparatorChar = 2
)
func marshalTagValue(dst, src []byte) []byte {
n1 := bytes.IndexByte(src, escapeChar)
n2 := bytes.IndexByte(src, tagSeparatorChar)
n3 := bytes.IndexByte(src, kvSeparatorChar)
func marshalTagValue(dst []byte, src string) []byte {
n1 := strings.IndexByte(src, escapeChar)
n2 := strings.IndexByte(src, tagSeparatorChar)
n3 := strings.IndexByte(src, kvSeparatorChar)
if n1 < 0 && n2 < 0 && n3 < 0 {
// Fast path.
dst = append(dst, src...)
@@ -253,7 +250,8 @@ func marshalTagValue(dst, src []byte) []byte {
}
// Slow path.
for _, ch := range src {
for i := range len(src) {
ch := src[i]
switch ch {
case escapeChar:
dst = append(dst, escapeChar, '0')

View File

@@ -124,7 +124,7 @@ func (p *SyslogParser) parseCEEMessage(s string) bool {
if p.jsonParser == nil {
p.jsonParser = GetJSONParser()
}
err := p.jsonParser.ParseLogMessage(bytesutil.ToUnsafeBytes(s))
err := p.jsonParser.ParseLogMessage(bytesutil.ToUnsafeBytes(s), nil)
if err != nil {
return false
}

View File

@@ -96,7 +96,7 @@ func ParseTenantID(s string) (TenantID, error) {
return tenantID, nil
}
n := strings.Index(s, ":")
n := strings.IndexByte(s, ':')
if n < 0 {
account, err := getUint32FromString(s)
if err != nil {

View File

@@ -131,7 +131,7 @@ func isTokenChar(c byte) bool {
var tokenCharTable = func() *[256]byte {
var a [256]byte
for c := uint(0); c < 256; c++ {
for c := range uint(256) {
if c >= 'a' && c <= 'z' || c >= 'A' && c <= 'Z' || c >= '0' && c <= '9' || c == '_' {
a[c] = 1
}

View File

@@ -354,7 +354,7 @@ func TryParseTimestampRFC3339Nano(s string) (int64, bool) {
if !ok {
return 0, false
}
nsecs = subNoOverflowInt64(nsecs, offsetNsecs)
nsecs = SubInt64NoOverflow(nsecs, offsetNsecs)
s = prefix
// Parse optional fractional part of seconds.
@@ -566,7 +566,7 @@ func tryParseUint64(s string) (uint64, bool) {
}
n := uint64(0)
for i := 0; i < len(s); i++ {
for i := range len(s) {
ch := s[i]
if ch == '_' {
continue
@@ -606,7 +606,7 @@ func tryParseDateUint64(s string) (uint64, bool) {
}
n := uint64(0)
for i := 0; i < len(s); i++ {
for i := range len(s) {
ch := s[i]
if ch < '0' || ch > '9' {
return 0, false
@@ -1314,7 +1314,7 @@ func (vd *valuesDict) unmarshalInplace(src []byte) ([]byte, error) {
}
dictLen := int(src[0])
src = src[1:]
for i := 0; i < dictLen; i++ {
for i := range dictLen {
data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict", i, dictLen)

4
vendor/modules.txt vendored
View File

@@ -132,8 +132,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric
# github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0
## explicit; go 1.24.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping
# github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260125191521-bc89d84cd61d
## explicit; go 1.25.6
# github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20260218111324-95b48d57d032
## explicit; go 1.26.0
github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage
github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter
# github.com/VictoriaMetrics/easyproto v1.2.0