mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-23 03:36:31 +03:00
Compare commits
1 Commits
weakpointe
...
vlogs-grac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed784e8c51 |
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
)
|
||||
|
||||
@@ -155,6 +156,7 @@ type LogMessageProcessor interface {
|
||||
}
|
||||
|
||||
type logMessageProcessor struct {
|
||||
isStreamMode bool
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
@@ -183,6 +185,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
|
||||
case <-lmp.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if vlstorage.CanWriteData() != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
lmp.mu.Lock()
|
||||
if time.Since(lmp.lastFlushTime) >= d {
|
||||
lmp.flushLocked()
|
||||
@@ -197,6 +203,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
|
||||
//
|
||||
// If streamFields is non-nil, then it is used as log stream fields instead of the pre-configured stream fields.
|
||||
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
|
||||
if !lmp.canWriteData() {
|
||||
lmp.waitUntilStorageIsAvailable()
|
||||
}
|
||||
|
||||
lmp.rowsIngestedTotal.Inc()
|
||||
n := logstorage.EstimatedJSONRowLen(fields)
|
||||
lmp.bytesIngestedTotal.Add(n)
|
||||
@@ -225,6 +235,23 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []l
|
||||
}
|
||||
}
|
||||
|
||||
func (lmp *logMessageProcessor) waitUntilStorageIsAvailable() {
|
||||
t := timerpool.Get(time.Second)
|
||||
for lmp.canWriteData() {
|
||||
select {
|
||||
case <-lmp.stopCh:
|
||||
timerpool.Put(t)
|
||||
return
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lmp *logMessageProcessor) canWriteData() bool {
|
||||
return !lmp.isStreamMode || vlstorage.CanWriteData() == nil
|
||||
}
|
||||
|
||||
// InsertRowProcessor is used by native data ingestion protocol parser.
|
||||
type InsertRowProcessor interface {
|
||||
// AddInsertRow must add r to the underlying storage.
|
||||
@@ -286,8 +313,9 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
|
||||
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
|
||||
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
|
||||
lmp := &logMessageProcessor{
|
||||
cp: cp,
|
||||
lr: lr,
|
||||
isStreamMode: isStreamMode,
|
||||
cp: cp,
|
||||
lr: lr,
|
||||
|
||||
rowsIngestedTotal: rowsIngestedTotal,
|
||||
bytesIngestedTotal: bytesIngestedTotal,
|
||||
|
||||
@@ -255,18 +255,24 @@ func processForceFlush(w http.ResponseWriter, r *http.Request) bool {
|
||||
|
||||
// CanWriteData returns non-nil error if it cannot write data to vlstorage
|
||||
func CanWriteData() error {
|
||||
if localStorage == nil {
|
||||
// The data can be always written in non-local mode.
|
||||
if netstorageInsert != nil {
|
||||
if netstorageInsert.IsBroken() {
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("cannot write data to vlstorage because all the storage nodes are broken"),
|
||||
StatusCode: http.StatusTooManyRequests,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if localStorage.IsReadOnly() {
|
||||
if localStorage != nil && localStorage.IsReadOnly() {
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("cannot add rows into storage in read-only mode; the storage can be in read-only mode "+
|
||||
"because of lack of free disk space at -storageDataPath=%s", *storageDataPath),
|
||||
StatusCode: http.StatusTooManyRequests,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package netinsert
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -23,6 +24,11 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
)
|
||||
|
||||
var (
|
||||
maxInsertRetries = flag.Int("insert.maxRetries", 20, "The maximum number of retry attempts when sending data to storage nodes. "+
|
||||
"After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover.")
|
||||
)
|
||||
|
||||
// the maximum size of a single data block sent to storage node.
|
||||
const maxInsertBlockSize = 2 * 1024 * 1024
|
||||
|
||||
@@ -41,6 +47,10 @@ type Storage struct {
|
||||
|
||||
pendingDataBuffers chan *bytesutil.ByteBuffer
|
||||
|
||||
retryDataBuffersMu sync.Mutex
|
||||
needDrainRetryData atomic.Bool
|
||||
retryDataBuffers []*bytesutil.ByteBuffer
|
||||
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
@@ -110,6 +120,10 @@ func (sn *storageNode) backgroundFlusher() {
|
||||
case <-sn.s.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
if sn.s.needDrainRetryData.Load() {
|
||||
sn.flushRetryData()
|
||||
continue
|
||||
}
|
||||
sn.flushPendingData()
|
||||
}
|
||||
}
|
||||
@@ -126,7 +140,31 @@ func (sn *storageNode) flushPendingData() {
|
||||
pendingData := sn.grabPendingDataForFlushLocked()
|
||||
sn.pendingDataMu.Unlock()
|
||||
|
||||
sn.mustSendInsertRequest(pendingData)
|
||||
_ = sn.mustSendInsertRequest(pendingData)
|
||||
}
|
||||
|
||||
func (sn *storageNode) flushRetryData() {
|
||||
sn.s.retryDataBuffersMu.Lock()
|
||||
defer sn.s.retryDataBuffersMu.Unlock()
|
||||
|
||||
for len(sn.s.retryDataBuffers) > 0 {
|
||||
lastIdx := len(sn.s.retryDataBuffers) - 1
|
||||
pendingData := sn.s.retryDataBuffers[lastIdx]
|
||||
sn.s.retryDataBuffers = sn.s.retryDataBuffers[:lastIdx]
|
||||
if !sn.mustSendInsertRequest(pendingData) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
sn.s.needDrainRetryData.Store(len(sn.s.retryDataBuffers) > 0)
|
||||
}
|
||||
|
||||
func (sn *storageNode) addRetryData(pendingData *bytesutil.ByteBuffer) {
|
||||
sn.s.retryDataBuffersMu.Lock()
|
||||
defer sn.s.retryDataBuffersMu.Unlock()
|
||||
|
||||
sn.s.retryDataBuffers = append(sn.s.retryDataBuffers, pendingData)
|
||||
sn.s.needDrainRetryData.Store(true)
|
||||
}
|
||||
|
||||
func (sn *storageNode) addRow(r *logstorage.InsertRow) {
|
||||
@@ -152,7 +190,7 @@ func (sn *storageNode) addRow(r *logstorage.InsertRow) {
|
||||
bbPool.Put(bb)
|
||||
|
||||
if pendingData != nil {
|
||||
sn.mustSendInsertRequest(pendingData)
|
||||
_ = sn.mustSendInsertRequest(pendingData)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,21 +204,37 @@ func (sn *storageNode) grabPendingDataForFlushLocked() *bytesutil.ByteBuffer {
|
||||
return pendingData
|
||||
}
|
||||
|
||||
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) {
|
||||
// mustSendInsertRequest guarantees that data will be sent to storage nodes or buffered for retry.
|
||||
// It attempts to send pendingData to storage nodes with retry logic and returns:
|
||||
// - true: data was handled (successfully sent to a storage node, or operation was cancelled during shutdown)
|
||||
// - false: all storage nodes are unavailable after maxInsertRetries attempts, data has been added to retry buffer
|
||||
//
|
||||
// When this method returns false, it indicates that the storages are temporarily unavailable
|
||||
// and the data has been queued in the retry buffer for later processing when nodes become available.
|
||||
// The retry buffer prevents data loss while protecting against infinite memory accumulation.
|
||||
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) (handled bool) {
|
||||
defer func() {
|
||||
pendingData.Reset()
|
||||
sn.s.pendingDataBuffers <- pendingData
|
||||
if handled {
|
||||
pendingData.Reset()
|
||||
sn.s.pendingDataBuffers <- pendingData
|
||||
}
|
||||
}()
|
||||
|
||||
err := sn.sendInsertRequest(pendingData)
|
||||
if err == nil {
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
if !errors.Is(err, errTemporarilyDisabled) {
|
||||
logger.Warnf("%s; re-routing the data block to the remaining nodes", err)
|
||||
}
|
||||
for !sn.s.sendInsertRequestToAnyNode(pendingData) {
|
||||
|
||||
for i := 0; !sn.s.sendInsertRequestToAnyNode(pendingData); i++ {
|
||||
if *maxInsertRetries > 0 && i >= *maxInsertRetries {
|
||||
sn.addRetryData(pendingData)
|
||||
return false
|
||||
}
|
||||
|
||||
logger.Errorf("cannot send pending data to all storage nodes, since all of them are unavailable; re-trying to send the data in a second")
|
||||
|
||||
t := timerpool.Get(time.Second)
|
||||
@@ -188,11 +242,13 @@ func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer)
|
||||
case <-sn.s.stopCh:
|
||||
timerpool.Put(t)
|
||||
logger.Errorf("dropping %d bytes of data, since there are no available storage nodes", pendingData.Len())
|
||||
return
|
||||
return true
|
||||
case <-t.C:
|
||||
timerpool.Put(t)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) error {
|
||||
@@ -272,13 +328,16 @@ var zstdBufPool bytesutil.ByteBufferPool
|
||||
// Call MustStop on the returned storage when it is no longer needed.
|
||||
func NewStorage(addrs []string, authCfgs []*promauth.Config, isTLSs []bool, concurrency int, disableCompression bool) *Storage {
|
||||
pendingDataBuffers := make(chan *bytesutil.ByteBuffer, concurrency*len(addrs))
|
||||
for i := 0; i < cap(pendingDataBuffers); i++ {
|
||||
for range cap(pendingDataBuffers) {
|
||||
pendingDataBuffers <- &bytesutil.ByteBuffer{}
|
||||
}
|
||||
|
||||
retryDataBuffers := []*bytesutil.ByteBuffer{}
|
||||
|
||||
s := &Storage{
|
||||
disableCompression: disableCompression,
|
||||
pendingDataBuffers: pendingDataBuffers,
|
||||
retryDataBuffers: retryDataBuffers,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -307,6 +366,12 @@ func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
|
||||
sn.addRow(r)
|
||||
}
|
||||
|
||||
// IsBroken returns true if the storage is in a broken state where retry data needs to be drained first.
|
||||
// When true, it indicates that all storage nodes are temporarily unavailable and data is being buffered for retry.
|
||||
func (s *Storage) IsBroken() bool {
|
||||
return s.needDrainRetryData.Load()
|
||||
}
|
||||
|
||||
func (s *Storage) sendInsertRequestToAnyNode(pendingData *bytesutil.ByteBuffer) bool {
|
||||
startIdx := int(fastrand.Uint32n(uint32(len(s.sns))))
|
||||
for i := range s.sns {
|
||||
|
||||
@@ -18,6 +18,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/data-ingestion/): add `-insert.maxRetries` command-line flag for limiting retry attempts when sending data to storage nodes. After exhausting retries, data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. See [#9121](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9121).
|
||||
* BUGFIX: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): support `-` as a timestamp value, as described in [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.3).
|
||||
* FEATURE: [`delete` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe): allow deleting all the fields with common prefix via `... | delete prefix*` syntax.
|
||||
* FEATURE: [`fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe): allow keeping all the fields with common prefix via `... | fields prefix*` syntax.
|
||||
|
||||
@@ -444,6 +444,8 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-insert.maxRetries int
|
||||
The maximum number of retry attempts when sending data to storage nodes. After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. (default 20)
|
||||
-internStringCacheExpireDuration duration
|
||||
The expiry duration for caches for interned strings. See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache (default 6m0s)
|
||||
-internStringDisableCache
|
||||
|
||||
Reference in New Issue
Block a user