Compare commits

...

1 Commits

Author SHA1 Message Date
func25
ed784e8c51 fix 2025-06-17 00:57:47 +07:00
5 changed files with 116 additions and 14 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
@@ -155,6 +156,7 @@ type LogMessageProcessor interface {
}
type logMessageProcessor struct {
isStreamMode bool
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
@@ -183,6 +185,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
case <-lmp.stopCh:
return
case <-ticker.C:
if vlstorage.CanWriteData() != nil {
continue
}
lmp.mu.Lock()
if time.Since(lmp.lastFlushTime) >= d {
lmp.flushLocked()
@@ -197,6 +203,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
//
// If streamFields is non-nil, then it is used as log stream fields instead of the pre-configured stream fields.
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
if !lmp.canWriteData() {
lmp.waitUntilStorageIsAvailable()
}
lmp.rowsIngestedTotal.Inc()
n := logstorage.EstimatedJSONRowLen(fields)
lmp.bytesIngestedTotal.Add(n)
@@ -225,6 +235,23 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []l
}
}
func (lmp *logMessageProcessor) waitUntilStorageIsAvailable() {
t := timerpool.Get(time.Second)
for lmp.canWriteData() {
select {
case <-lmp.stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
}
}
func (lmp *logMessageProcessor) canWriteData() bool {
return !lmp.isStreamMode || vlstorage.CanWriteData() == nil
}
// InsertRowProcessor is used by native data ingestion protocol parser.
type InsertRowProcessor interface {
// AddInsertRow must add r to the underlying storage.
@@ -286,8 +313,9 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
lmp := &logMessageProcessor{
cp: cp,
lr: lr,
isStreamMode: isStreamMode,
cp: cp,
lr: lr,
rowsIngestedTotal: rowsIngestedTotal,
bytesIngestedTotal: bytesIngestedTotal,

View File

@@ -255,18 +255,24 @@ func processForceFlush(w http.ResponseWriter, r *http.Request) bool {
// CanWriteData returns non-nil error if it cannot write data to vlstorage
func CanWriteData() error {
if localStorage == nil {
// The data can be always written in non-local mode.
if netstorageInsert != nil {
if netstorageInsert.IsBroken() {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot write data to vlstorage because all the storage nodes are broken"),
StatusCode: http.StatusTooManyRequests,
}
}
return nil
}
if localStorage.IsReadOnly() {
if localStorage != nil && localStorage.IsReadOnly() {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot add rows into storage in read-only mode; the storage can be in read-only mode "+
"because of lack of free disk space at -storageDataPath=%s", *storageDataPath),
StatusCode: http.StatusTooManyRequests,
}
}
return nil
}

View File

@@ -2,6 +2,7 @@ package netinsert
import (
"errors"
"flag"
"fmt"
"io"
"net/http"
@@ -23,6 +24,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
var (
maxInsertRetries = flag.Int("insert.maxRetries", 20, "The maximum number of retry attempts when sending data to storage nodes. "+
"After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover.")
)
// the maximum size of a single data block sent to storage node.
const maxInsertBlockSize = 2 * 1024 * 1024
@@ -41,6 +47,10 @@ type Storage struct {
pendingDataBuffers chan *bytesutil.ByteBuffer
retryDataBuffersMu sync.Mutex
needDrainRetryData atomic.Bool
retryDataBuffers []*bytesutil.ByteBuffer
stopCh chan struct{}
wg sync.WaitGroup
}
@@ -110,6 +120,10 @@ func (sn *storageNode) backgroundFlusher() {
case <-sn.s.stopCh:
return
case <-t.C:
if sn.s.needDrainRetryData.Load() {
sn.flushRetryData()
continue
}
sn.flushPendingData()
}
}
@@ -126,7 +140,31 @@ func (sn *storageNode) flushPendingData() {
pendingData := sn.grabPendingDataForFlushLocked()
sn.pendingDataMu.Unlock()
sn.mustSendInsertRequest(pendingData)
_ = sn.mustSendInsertRequest(pendingData)
}
func (sn *storageNode) flushRetryData() {
sn.s.retryDataBuffersMu.Lock()
defer sn.s.retryDataBuffersMu.Unlock()
for len(sn.s.retryDataBuffers) > 0 {
lastIdx := len(sn.s.retryDataBuffers) - 1
pendingData := sn.s.retryDataBuffers[lastIdx]
sn.s.retryDataBuffers = sn.s.retryDataBuffers[:lastIdx]
if !sn.mustSendInsertRequest(pendingData) {
return
}
}
sn.s.needDrainRetryData.Store(len(sn.s.retryDataBuffers) > 0)
}
func (sn *storageNode) addRetryData(pendingData *bytesutil.ByteBuffer) {
sn.s.retryDataBuffersMu.Lock()
defer sn.s.retryDataBuffersMu.Unlock()
sn.s.retryDataBuffers = append(sn.s.retryDataBuffers, pendingData)
sn.s.needDrainRetryData.Store(true)
}
func (sn *storageNode) addRow(r *logstorage.InsertRow) {
@@ -152,7 +190,7 @@ func (sn *storageNode) addRow(r *logstorage.InsertRow) {
bbPool.Put(bb)
if pendingData != nil {
sn.mustSendInsertRequest(pendingData)
_ = sn.mustSendInsertRequest(pendingData)
}
}
@@ -166,21 +204,37 @@ func (sn *storageNode) grabPendingDataForFlushLocked() *bytesutil.ByteBuffer {
return pendingData
}
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) {
// mustSendInsertRequest guarantees that data will be sent to storage nodes or buffered for retry.
// It attempts to send pendingData to storage nodes with retry logic and returns:
// - true: data was handled (successfully sent to a storage node, or operation was cancelled during shutdown)
// - false: all storage nodes are unavailable after maxInsertRetries attempts, data has been added to retry buffer
//
// When this method returns false, it indicates that the storages are temporarily unavailable
// and the data has been queued in the retry buffer for later processing when nodes become available.
// The retry buffer prevents data loss while protecting against infinite memory accumulation.
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) (handled bool) {
defer func() {
pendingData.Reset()
sn.s.pendingDataBuffers <- pendingData
if handled {
pendingData.Reset()
sn.s.pendingDataBuffers <- pendingData
}
}()
err := sn.sendInsertRequest(pendingData)
if err == nil {
return
return true
}
if !errors.Is(err, errTemporarilyDisabled) {
logger.Warnf("%s; re-routing the data block to the remaining nodes", err)
}
for !sn.s.sendInsertRequestToAnyNode(pendingData) {
for i := 0; !sn.s.sendInsertRequestToAnyNode(pendingData); i++ {
if *maxInsertRetries > 0 && i >= *maxInsertRetries {
sn.addRetryData(pendingData)
return false
}
logger.Errorf("cannot send pending data to all storage nodes, since all of them are unavailable; re-trying to send the data in a second")
t := timerpool.Get(time.Second)
@@ -188,11 +242,13 @@ func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer)
case <-sn.s.stopCh:
timerpool.Put(t)
logger.Errorf("dropping %d bytes of data, since there are no available storage nodes", pendingData.Len())
return
return true
case <-t.C:
timerpool.Put(t)
}
}
return true
}
func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) error {
@@ -272,13 +328,16 @@ var zstdBufPool bytesutil.ByteBufferPool
// Call MustStop on the returned storage when it is no longer needed.
func NewStorage(addrs []string, authCfgs []*promauth.Config, isTLSs []bool, concurrency int, disableCompression bool) *Storage {
pendingDataBuffers := make(chan *bytesutil.ByteBuffer, concurrency*len(addrs))
for i := 0; i < cap(pendingDataBuffers); i++ {
for range cap(pendingDataBuffers) {
pendingDataBuffers <- &bytesutil.ByteBuffer{}
}
retryDataBuffers := []*bytesutil.ByteBuffer{}
s := &Storage{
disableCompression: disableCompression,
pendingDataBuffers: pendingDataBuffers,
retryDataBuffers: retryDataBuffers,
stopCh: make(chan struct{}),
}
@@ -307,6 +366,12 @@ func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
sn.addRow(r)
}
// IsBroken returns true if the storage is in a broken state where retry data needs to be drained first.
// When true, it indicates that all storage nodes are temporarily unavailable and data is being buffered for retry.
func (s *Storage) IsBroken() bool {
return s.needDrainRetryData.Load()
}
func (s *Storage) sendInsertRequestToAnyNode(pendingData *bytesutil.ByteBuffer) bool {
startIdx := int(fastrand.Uint32n(uint32(len(s.sns))))
for i := range s.sns {

View File

@@ -18,6 +18,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/data-ingestion/): add `-insert.maxRetries` command-line flag for limiting retry attempts when sending data to storage nodes. After exhausting retries, data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. See [#9121](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9121).
* BUGFIX: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): support `-` as a timestamp value, as described in [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.3).
* FEATURE: [`delete` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe): allow deleting all the fields with common prefix via `... | delete prefix*` syntax.
* FEATURE: [`fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe): allow keeping all the fields with common prefix via `... | fields prefix*` syntax.

View File

@@ -444,6 +444,8 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
-insert.maxQueueDuration duration
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
-insert.maxRetries int
The maximum number of retry attempts when sending data to storage nodes. After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. (default 20)
-internStringCacheExpireDuration duration
The expiry duration for caches for interned strings. See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache (default 6m0s)
-internStringDisableCache