mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
10 Commits
query-debu
...
enhance-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d86b1409f | ||
|
|
6ed3a2fc10 | ||
|
|
ecda474534 | ||
|
|
26e8edb6d7 | ||
|
|
e46bdd946b | ||
|
|
824add0766 | ||
|
|
863f85c843 | ||
|
|
a64bb1a5e0 | ||
|
|
69dd4759d1 | ||
|
|
f440f2375f |
@@ -186,8 +186,16 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompbmarshal.L
|
||||
h := xxhash.Sum64(buf)
|
||||
ctx.labelsBuf = buf
|
||||
|
||||
// Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push().
|
||||
idx := ctx.snb.nodesHash.getNodeIdx(h, nil)
|
||||
// Exclude long-broken storage nodes.
|
||||
var excludeIdxs []int
|
||||
if !*dropSamplesOnOverload {
|
||||
for i := range ctx.snb.sns {
|
||||
if ctx.snb.sns[i].isExcluded() {
|
||||
excludeIdxs = append(excludeIdxs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
idx := ctx.snb.nodesHash.getNodeIdx(h, excludeIdxs)
|
||||
return idx
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ var (
|
||||
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
|
||||
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
|
||||
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
|
||||
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -disableReroutingOnUnavailable and -dropSamplesOnOverload")
|
||||
_ = flag.Bool("disableRerouting", true, "This option is deprecated and has no effect. See also -disableReroutingOnUnavailable and -dropSamplesOnOverload.")
|
||||
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
|
||||
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 3*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage. "+
|
||||
"See also -vmstorageUserTimeout")
|
||||
@@ -44,6 +44,7 @@ var (
|
||||
"On the other side, disabled re-routing minimizes the number of active time series in the cluster "+
|
||||
"during rolling restarts and during spikes in series churn rate. "+
|
||||
"See also -disableRerouting")
|
||||
rerouteDelay = flag.Duration("rerouteDelay", 20*time.Second, "The maximum time the system waits for vmstorage nodes to become available before re-routing the data to other vmstorage nodes, minimum value is 1s and rounding to seconds")
|
||||
)
|
||||
|
||||
var errStorageReadOnly = errors.New("storage node is read only")
|
||||
@@ -52,6 +53,16 @@ func (sn *storageNode) isReady() bool {
|
||||
return !sn.isBroken.Load() && !sn.isReadOnly.Load()
|
||||
}
|
||||
|
||||
func (sn *storageNode) isExcluded() bool {
|
||||
return (sn.isBroken.Load() && fasttime.UnixTimestamp()-sn.brokenAt.Load() > uint64(*rerouteDelay/time.Second)) || sn.isReadOnly.Load()
|
||||
}
|
||||
|
||||
func (sn *storageNode) setBroken(isBroken bool) {
|
||||
if !sn.isBroken.Swap(isBroken) && isBroken {
|
||||
sn.brokenAt.Store(fasttime.UnixTimestamp())
|
||||
}
|
||||
}
|
||||
|
||||
// push pushes buf to sn internal bufs.
|
||||
//
|
||||
// This function doesn't block on fast path.
|
||||
@@ -108,15 +119,35 @@ again:
|
||||
sn.brCond.Wait()
|
||||
goto again
|
||||
}
|
||||
sn.brLock.Unlock()
|
||||
|
||||
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
|
||||
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
|
||||
rows -= rowsProcessed
|
||||
if err != nil {
|
||||
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
|
||||
// Reroute buf to healthy vmstorage nodes if the current node is broken for too long.
|
||||
timeoutAt := uint64(*rerouteDelay/time.Second) + sn.brokenAt.Load()
|
||||
if timeoutAt <= fasttime.UnixTimestamp() || sn.isReadOnly.Load() {
|
||||
sn.brLock.Unlock()
|
||||
|
||||
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
|
||||
rows -= rowsProcessed
|
||||
if err != nil {
|
||||
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
|
||||
// Wait for the vmstorage node to change its state to ready, or timeout.
|
||||
// sn.brCond.Wait() will be woken up at ~200ms intervals by the health checker.
|
||||
waitLoop:
|
||||
for sn.isBroken.Load() && timeoutAt > fasttime.UnixTimestamp() {
|
||||
sn.brCond.Wait()
|
||||
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
break waitLoop
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
goto again
|
||||
}
|
||||
|
||||
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
|
||||
@@ -126,17 +157,20 @@ again:
|
||||
sn.brLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
|
||||
if *disableRerouting || len(sns) == 1 {
|
||||
if len(sns) == 1 {
|
||||
sn.brCond.Wait()
|
||||
goto again
|
||||
}
|
||||
sn.brLock.Unlock()
|
||||
|
||||
rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf)
|
||||
rows -= rowsProcessed
|
||||
if err != nil {
|
||||
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -194,7 +228,8 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
|
||||
continue
|
||||
}
|
||||
// Send br to replicas storage nodes starting from snIdx.
|
||||
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
|
||||
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
|
||||
for !trySendBufToStorages(snb, &br, snIdx, replicas, usedStorageNodes) {
|
||||
d := timeutil.AddJitterToDuration(time.Millisecond * 200)
|
||||
t := timerpool.Get(d)
|
||||
select {
|
||||
@@ -211,9 +246,25 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
|
||||
}
|
||||
}
|
||||
|
||||
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
|
||||
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
|
||||
func trySendBufToStorages(snb *storageNodesBucket, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
|
||||
sns := snb.sns
|
||||
|
||||
// If the current storage node is broken, wait for it to be ready or timeout
|
||||
if sns[snIdx].isBroken.Load() {
|
||||
timeoutAt := uint64(*rerouteDelay/time.Second) + sns[snIdx].brokenAt.Load()
|
||||
if timeoutAt > fasttime.UnixTimestamp() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if *dropSamplesOnOverload {
|
||||
return tryReplicateBufToStorages(sns, br, snIdx, replicas, usedStorageNodes)
|
||||
}
|
||||
|
||||
return tryReplicateBufToStoragesUntilExhausted(sns, br, snIdx, replicas, usedStorageNodes)
|
||||
}
|
||||
|
||||
func tryReplicateBufToStoragesUntilExhausted(sns []*storageNode, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
|
||||
for i := 0; i < replicas; i++ {
|
||||
idx := snIdx + i
|
||||
attempts := 0
|
||||
@@ -255,6 +306,40 @@ func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, r
|
||||
return true
|
||||
}
|
||||
|
||||
func tryReplicateBufToStorages(sns []*storageNode, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
|
||||
previousSuccessLen := len(usedStorageNodes)
|
||||
|
||||
for i := 0; i < replicas; i++ {
|
||||
idx := snIdx + i
|
||||
for {
|
||||
if idx >= len(sns) {
|
||||
idx %= len(sns)
|
||||
}
|
||||
sn := sns[idx]
|
||||
idx++
|
||||
if _, ok := usedStorageNodes[sn]; ok {
|
||||
continue
|
||||
}
|
||||
if !sn.sendBufRowsNonblocking(br) {
|
||||
continue
|
||||
}
|
||||
usedStorageNodes[sn] = struct{}{}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := usedStorageNodes[sns[snIdx]]; !ok {
|
||||
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to degraded node %s, %d/%d nodes are replicated", len(br.buf), br.rows, sns[snIdx].dialer.Addr(), len(usedStorageNodes), replicas)
|
||||
return false
|
||||
} else if previousSuccessLen != len(usedStorageNodes) && len(usedStorageNodes) < replicas {
|
||||
rowsIncompletelyReplicatedTotal.Add(br.rows)
|
||||
incompleteReplicationLogger.Warnf("dropping %d rows (%d bytes) as cannot make a copy #%d out of %d copies according to -replicationFactor=%d, since a part of storage nodes is temporarily unavailable", br.rows, len(br.buf), len(usedStorageNodes), replicas, *replicationFactor)
|
||||
return true
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
var (
|
||||
cannotReplicateLogger = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second)
|
||||
incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second)
|
||||
@@ -270,7 +355,7 @@ func (sn *storageNode) checkHealth() {
|
||||
}
|
||||
bc, err := sn.dial()
|
||||
if err != nil {
|
||||
sn.isBroken.Store(true)
|
||||
sn.setBroken(true)
|
||||
sn.brCond.Broadcast()
|
||||
if sn.lastDialErr == nil {
|
||||
// Log the error only once.
|
||||
@@ -282,7 +367,7 @@ func (sn *storageNode) checkHealth() {
|
||||
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
|
||||
sn.lastDialErr = nil
|
||||
sn.bc = bc
|
||||
sn.isBroken.Store(false)
|
||||
sn.setBroken(false)
|
||||
sn.brCond.Broadcast()
|
||||
}
|
||||
|
||||
@@ -324,7 +409,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
|
||||
cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
sn.bc = nil
|
||||
sn.isBroken.Store(true)
|
||||
sn.setBroken(true)
|
||||
sn.brCond.Broadcast()
|
||||
sn.connectionErrors.Inc()
|
||||
return false
|
||||
@@ -413,6 +498,7 @@ type storageNode struct {
|
||||
// isBroken is set to true if the given vmstorage node is temporarily unhealthy.
|
||||
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
|
||||
isBroken atomic.Bool
|
||||
brokenAt atomic.Uint64
|
||||
|
||||
// isReadOnly is set to true if the given vmstorage node is read only
|
||||
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
|
||||
@@ -582,6 +668,11 @@ func initStorageNodes(unsortedAddrs []string, hashSeed uint64) *storageNodesBuck
|
||||
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
|
||||
}
|
||||
|
||||
*rerouteDelay = (*rerouteDelay).Round(time.Second)
|
||||
if *rerouteDelay < time.Second {
|
||||
*rerouteDelay = time.Second
|
||||
}
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
var wg sync.WaitGroup
|
||||
snb := &storageNodesBucket{
|
||||
@@ -649,17 +740,6 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
||||
// re-generate idxsExclude list, since sn must be put there.
|
||||
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0])
|
||||
}
|
||||
if *disableRerouting {
|
||||
if !sn.sendBufMayBlock(rowBuf) {
|
||||
return rowsProcessed, fmt.Errorf("graceful shutdown started")
|
||||
}
|
||||
rowsProcessed++
|
||||
if sn != snSource {
|
||||
snSource.rowsReroutedFromHere.Inc()
|
||||
sn.rowsReroutedToHere.Inc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
again:
|
||||
if sn.trySendBuf(rowBuf, 1) {
|
||||
rowsProcessed++
|
||||
@@ -694,9 +774,6 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
|
||||
// It is expected than *disableRerouting isn't set when calling this function.
|
||||
// It is expected that len(snb.sns) >= 2
|
||||
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
|
||||
if *disableRerouting {
|
||||
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
|
||||
}
|
||||
sns := snb.sns
|
||||
if len(sns) < 2 {
|
||||
logger.Panicf("BUG: the number of storage nodes is too small for calling rerouteRowsToFreeStorageNodes: %d", len(sns))
|
||||
@@ -802,23 +879,6 @@ func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
|
||||
return sent
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
|
||||
sn.brLock.Lock()
|
||||
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
sn.brLock.Unlock()
|
||||
return false
|
||||
default:
|
||||
}
|
||||
sn.brCond.Wait()
|
||||
}
|
||||
sn.br.buf = append(sn.br.buf, buf...)
|
||||
sn.br.rows++
|
||||
sn.brLock.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
func (sn *storageNode) readOnlyChecker() {
|
||||
d := timeutil.AddJitterToDuration(time.Second * 30)
|
||||
ticker := time.NewTicker(d)
|
||||
@@ -864,7 +924,7 @@ func (sn *storageNode) checkReadOnlyMode() {
|
||||
cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
sn.bc = nil
|
||||
sn.isBroken.Store(true)
|
||||
sn.setBroken(true)
|
||||
sn.brCond.Broadcast()
|
||||
sn.connectionErrors.Inc()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user