go.mod: update github.com/VictoriaMetrics/VictoriaLogs to v0.0.0-20250725215216-8de283002ba8

This commit is contained in:
Aliaksandr Valialkin
2025-07-26 00:04:13 +02:00
parent eb2235d354
commit da5c065f29
9 changed files with 130 additions and 92 deletions

2
go.mod
View File

@@ -32,7 +32,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.1
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250714230749-82aca88cecd8
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250725215216-8de283002ba8
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.5
github.com/VictoriaMetrics/metrics v1.38.0

4
go.sum
View File

@@ -36,8 +36,8 @@ github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U
github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250714230749-82aca88cecd8 h1:WBEsRwIGccNaGTLCdhqxCg+TsnW/sEnhnnkVCtpKlSM=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250714230749-82aca88cecd8/go.mod h1:VxFZ5YvwAvkgk8MYelZ24VyYvwVyQXKBW7dnmAxeMFM=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250725215216-8de283002ba8 h1:fGtN88orLtkOa4AULJAFfqny35q0uQHUca2bXGCdtNA=
github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250725215216-8de283002ba8/go.mod h1:owxthqh9bGf0k0juol651PUEzBpuv7ULLYcAPb3ABAg=
github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V2KhTBPf+Sc=
github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.12.5 h1:966OX9JjqYmDAFdp3wEXLwzukiHIm+GVlZHv6B8KW3k=

View File

@@ -41,70 +41,51 @@ func (b *block) reset() {
// uncompressedSizeBytes returns the total size of the original log entries stored in b.
//
// It is supposed that every log entry has the following format:
// It uses JSON format to calculate the size as if each log entry were represented as JSON.
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
func (b *block) uncompressedSizeBytes() uint64 {
rowsCount := uint64(b.Len())
// The calculation logic must stay in sync with EstimatedJSONRowLen() in log_rows.go.
// If you change logic here, update EstimatedJSONRowLen() accordingly and vice versa.
func (b *block) uncompressedSizeBytes() int {
rowsCount := b.Len()
if rowsCount == 0 {
return 0
}
// Take into account timestamps
n := rowsCount * uint64(len(time.RFC3339Nano))
totalSize := (len("{}\n") + len(`"_time":""`) + len(time.RFC3339Nano)) * rowsCount
// size of constant fields (included in every row)
for i := range b.constColumns {
cc := &b.constColumns[i]
name := getCanonicalColumnName(cc.Name)
totalSize += estimatedJSONFieldLen(name, cc.Value) * rowsCount
}
// add size of variable fields
for i := range b.columns {
c := &b.columns[i]
name := getCanonicalColumnName(c.name)
// Take into account columns
cs := b.columns
for i := range cs {
c := &cs[i]
nameLen := uint64(len(c.name))
if nameLen == 0 {
nameLen = uint64(len("_msg"))
}
for _, v := range c.values {
if len(v) > 0 {
n += nameLen + 2 + uint64(len(v))
// VictoriaLogs data model (https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
// treats empty values as non-existing values
if len(v) == 0 {
continue
}
totalSize += estimatedJSONFieldLen(name, v)
}
}
// Take into account constColumns
ccs := b.constColumns
for i := range ccs {
cc := &ccs[i]
nameLen := uint64(len(cc.Name))
if nameLen == 0 {
nameLen = uint64(len("_msg"))
}
n += rowsCount * (2 + nameLen + uint64(len(cc.Value)))
}
return n
return totalSize
}
// uncompressedRowsSizeBytes returns the size of the uncompressed rows.
//
// It is supposed that every row has the following format:
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
// It is assumed that each row is in JSON format.
func uncompressedRowsSizeBytes(rows [][]Field) uint64 {
n := uint64(0)
for _, fields := range rows {
n += uncompressedRowSizeBytes(fields)
}
return n
}
// uncompressedRowSizeBytes returns the size of uncompressed row.
//
// It is supposed that the row has the following format:
//
// 2006-01-02T15:04:05.999999999Z07:00 field1=value1 ... fieldN=valueN
func uncompressedRowSizeBytes(fields []Field) uint64 {
n := uint64(len(time.RFC3339Nano)) // log timestamp
for _, f := range fields {
nameLen := len(f.Name)
if nameLen == 0 {
nameLen = len("_msg")
}
n += uint64(2 + nameLen + len(f.Value))
n += uint64(EstimatedJSONRowLen(fields))
}
return n
}
@@ -478,7 +459,7 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
bh.reset()
bh.streamID = *sid
bh.uncompressedSizeBytes = b.uncompressedSizeBytes()
bh.uncompressedSizeBytes = uint64(b.uncompressedSizeBytes())
bh.rowsCount = uint64(b.Len())
// Marshal timestamps

View File

@@ -2707,13 +2707,6 @@ func putValuesBuf(vb *valuesBuf) {
var valuesBufPool sync.Pool
func getCanonicalColumnName(columnName string) string {
if columnName == "" {
return "_msg"
}
return columnName
}
func tryParseNumber(s string) (float64, bool) {
if len(s) == 0 {
return 0, false

View File

@@ -168,9 +168,8 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
partPath := filepath.Join(path, partName)
if !fs.IsPathExist(partPath) {
partsFile := filepath.Join(path, partsFilename)
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
"ensure %q contents is not corrupted; remove %q to rebuild its content from the list of existing parts",
partPath, partsFile, partsFile, partsFile)
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; ensure %q contents is not corrupted; remove %q from %q in order to fix this error",
partPath, partsFile, partsFile, partPath, partsFile)
}
p := mustOpenFilePart(pt, partPath)
@@ -564,8 +563,8 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
mpNew.ph = ph
} else {
ph.mustWriteMetadata(dstPartPath)
// Make sure the created part directory listing is synced.
fs.MustSyncPath(dstPartPath)
// Make sure the created part directory contents is synced and visible in case of unclean shutdown.
fs.MustSyncPathAndParentDir(dstPartPath)
}
if needStop(stopCh) {
// Remove incomplete destination part

View File

@@ -96,7 +96,7 @@ func (mp *inmemoryPart) mustInitFromRows(lr *logRows) {
fields := rows[i]
trs.timestamps = append(trs.timestamps, timestamps[i])
trs.rows = append(trs.rows, fields)
uncompressedBlockSizeBytes += uncompressedRowSizeBytes(fields)
uncompressedBlockSizeBytes += uint64(EstimatedJSONRowLen(fields))
}
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
putTmpRows(trs)
@@ -138,8 +138,9 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
mp.ph.mustWriteMetadata(path)
fs.MustSyncPath(path)
// Do not sync parent directory - it must be synced by the caller.
// Sync the path contents and the path parent dir in order to guarantee
// all the path contents is visible in case of unclean shutdown.
fs.MustSyncPathAndParentDir(path)
}
// tmpRows is used as a helper for inmemoryPart.mustInitFromRows()

View File

@@ -359,7 +359,7 @@ func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields, streamFie
return
}
}
rowLen := uncompressedRowSizeBytes(fields)
rowLen := EstimatedJSONRowLen(fields)
if rowLen > maxUncompressedBlockSize {
line := MarshalFieldsToJSON(nil, fields)
logger.Warnf("ignoring too long log entry with the estimated length of %d bytes, since it exceeds the limit %d bytes; "+
@@ -512,6 +512,13 @@ func getCanonicalFieldName(fieldName string) string {
return fieldName
}
func getCanonicalColumnName(fieldName string) string {
if fieldName == "" {
return "_msg"
}
return fieldName
}
// GetRowString returns string representation of the row with the given idx.
func (lr *LogRows) GetRowString(idx int) string {
tf := TimeFormatter(lr.timestamps[idx])
@@ -610,19 +617,32 @@ func PutLogRows(lr *LogRows) {
var logRowsPool sync.Pool
// EstimatedJSONRowLen returns an approximate length of the log entry with the given fields if represented as JSON.
//
// The calculation logic must stay in sync with block.uncompressedSizeBytes() in block.go.
// If you change logic here, update block.uncompressedSizeBytes() accordingly and vice versa.
func EstimatedJSONRowLen(fields []Field) int {
n := len("{}\n")
n += len(`"_time":""`) + len(time.RFC3339Nano)
for _, f := range fields {
nameLen := len(f.Name)
if nameLen == 0 {
nameLen = len("_msg")
// VictoriaLogs data model (https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
// treats empty values as non-existing values
if f.Value == "" {
continue
}
n += len(`,"":""`) + nameLen + len(f.Value)
name := getCanonicalColumnName(f.Name)
n += estimatedJSONFieldLen(name, f.Value)
}
return n
}
// estimatedJSONFieldLen returns an approximate length of the field with the given name and value if represented as JSON.
//
// The field name must be in raw form (e.g., "" to "_msg") before passing.
func estimatedJSONFieldLen(name, value string) int {
return len(`,"":""`) + len(name) + len(value)
}
// GetInsertRow returns InsertRow from a pool.
//
// Pass the returned row to PutInsertRow when it is no longer needed, so it could be reused.

View File

@@ -3,6 +3,7 @@ package logstorage
import (
"os"
"path/filepath"
"slices"
"sort"
"strings"
"sync"
@@ -114,7 +115,7 @@ type Storage struct {
//
// It must be accessed under partitionsLock.
//
// partitions are sorted by time.
// partitions are sorted by time, e.g. partitions[0] has the smallest time.
partitions []*partitionWrapper
// ptwHot is the "hot" partition, were the last rows were ingested.
@@ -141,6 +142,12 @@ type Storage struct {
//
// It reduces the load on persistent storage during querying by _stream:{...} filter.
filterStreamCache *cache
// minRetentionDay is the minimum allowed day for logs' ingestion because of the configured retention.
// Older logs are rejected during data ingestion.
//
// minRetentionDay must be accessed under the partitionsLock.
minRetentionDay int64
}
type partitionWrapper struct {
@@ -363,26 +370,33 @@ func (s *Storage) watchRetention() {
// Delete outdated partitions.
// s.partitions are sorted by day, so the partitions, which can become outdated, are located at the beginning of the list
for _, ptw := range s.partitions {
if ptw.day >= minAllowedDay {
break
ptws := s.partitions
for i, ptw := range ptws {
if ptw.day < minAllowedDay {
continue
}
ptwsToDelete = append(ptwsToDelete, ptw)
if ptw == s.ptwHot {
// ptws are sorted by time, so just drop all the partitions until i.
ptwsToDelete = ptws[:i]
s.partitions = ptws[i:]
s.updateMinRetentionDayLocked(ptwsToDelete)
// Remove reference to deleted partitions from s.ptwHot
if slices.Contains(ptwsToDelete, s.ptwHot) {
s.ptwHot = nil
}
break
}
for i := range ptwsToDelete {
s.partitions[i] = nil
}
s.partitions = s.partitions[len(ptwsToDelete):]
s.partitionsLock.Unlock()
for _, ptw := range ptwsToDelete {
for i, ptw := range ptwsToDelete {
logger.Infof("the partition %s is scheduled to be deleted because it is outside the -retentionPeriod=%dd", ptw.pt.path, durationToDays(s.retention))
ptw.mustDrop.Store(true)
ptw.decRef()
ptwsToDelete[i] = nil
}
select {
@@ -399,6 +413,7 @@ func (s *Storage) watchMaxDiskSpaceUsage() {
defer ticker.Stop()
for {
s.partitionsLock.Lock()
var n uint64
ptws := s.partitions
var ptwsToDelete []*partitionWrapper
@@ -419,17 +434,16 @@ func (s *Storage) watchMaxDiskSpaceUsage() {
i++
ptwsToDelete = ptws[:i]
s.partitions = ptws[i:]
s.updateMinRetentionDayLocked(ptwsToDelete)
// Remove reference to deleted partitions from s.ptwHot
for _, ptw := range ptwsToDelete {
if ptw == s.ptwHot {
s.ptwHot = nil
break
}
if slices.Contains(ptwsToDelete, s.ptwHot) {
s.ptwHot = nil
}
break
}
s.partitionsLock.Unlock()
for i, ptw := range ptwsToDelete {
@@ -449,6 +463,18 @@ func (s *Storage) watchMaxDiskSpaceUsage() {
}
}
func (s *Storage) updateMinRetentionDayLocked(ptwsToDelete []*partitionWrapper) {
if len(ptwsToDelete) == 0 {
// Nothing to update
return
}
minDay := ptwsToDelete[len(ptwsToDelete)-1].day + 1
if s.minRetentionDay < minDay {
s.minRetentionDay = minDay
}
}
func (s *Storage) getMinAllowedDay() int64 {
return time.Now().UTC().Add(-s.retention).UnixNano() / nsecsPerDay
}
@@ -582,8 +608,15 @@ func (s *Storage) MustAddRows(lr *LogRows) {
}
for day, lrPart := range m {
ptw := s.getPartitionForDay(day)
ptw.pt.mustAddRows(lrPart)
ptw.decRef()
if ptw != nil {
ptw.pt.mustAddRows(lrPart)
ptw.decRef()
} else {
// the lrPart must contain at least a single row, so log it.
line := MarshalFieldsToJSON(nil, lrPart.rows[0])
tooSmallTimestampLogger.Warnf("skipping log entry with too small timestamp because of -retention.maxDiskSpaceUsageBytes=%d; log entry: %s",
s.maxDiskSpaceUsageBytes, line)
}
PutLogRows(lrPart)
}
}
@@ -601,6 +634,10 @@ func (tf *TimeFormatter) String() string {
return t.Format(time.RFC3339Nano)
}
// getPartitionForDay returns the partition for the given day.
//
// It may return nil if the partition for the given day has been already dropped
// because of the disk-size based retention.
func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
s.partitionsLock.Lock()
@@ -618,6 +655,13 @@ func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
}
if ptw == nil {
// Missing partition for the given day. Create it.
if day < s.minRetentionDay {
// Cannot create the partition for the given day, since it has been already removed because of retention.
s.partitionsLock.Unlock()
return nil
}
fname := time.Unix(0, day*nsecsPerDay).UTC().Format(partitionNameFormat)
partitionPath := filepath.Join(s.path, partitionsDirname, fname)
mustCreatePartition(partitionPath)

2
vendor/modules.txt vendored
View File

@@ -111,7 +111,7 @@ github.com/AzureAD/microsoft-authentication-library-for-go/apps/internal/shared
github.com/AzureAD/microsoft-authentication-library-for-go/apps/internal/version
github.com/AzureAD/microsoft-authentication-library-for-go/apps/managedidentity
github.com/AzureAD/microsoft-authentication-library-for-go/apps/public
# github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250714230749-82aca88cecd8
# github.com/VictoriaMetrics/VictoriaLogs v0.0.0-20250725215216-8de283002ba8
## explicit; go 1.24.5
github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage
github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter