Compare commits

...

10 Commits

Author SHA1 Message Date
func25
3d86b1409f convention 2025-01-15 10:36:49 +07:00
func25
6ed3a2fc10 new dropSamplesOnOverload rerouting 2025-01-15 10:31:11 +07:00
func25
ecda474534 delay sending bufs from broken node to other nodes 2025-01-09 22:08:38 +07:00
func25
26e8edb6d7 optimize 2025-01-09 21:41:15 +07:00
func25
e46bdd946b optimize 2025-01-09 21:16:42 +07:00
func25
824add0766 testing 2025-01-06 17:43:41 +07:00
func25
863f85c843 readonly nodes will be excluded immediately 2025-01-06 15:00:38 +07:00
func25
a64bb1a5e0 remove unused func 2025-01-06 15:00:38 +07:00
func25
69dd4759d1 fix setBroken 2025-01-06 15:00:38 +07:00
func25
f440f2375f drop disableRerouting, add rerouteDelay flag 2025-01-06 15:00:38 +07:00
2 changed files with 117 additions and 49 deletions

View File

@@ -186,8 +186,16 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompbmarshal.L
h := xxhash.Sum64(buf)
ctx.labelsBuf = buf
// Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push().
idx := ctx.snb.nodesHash.getNodeIdx(h, nil)
// Exclude long-broken storage nodes.
var excludeIdxs []int
if !*dropSamplesOnOverload {
for i := range ctx.snb.sns {
if ctx.snb.sns[i].isExcluded() {
excludeIdxs = append(excludeIdxs, i)
}
}
}
idx := ctx.snb.nodesHash.getNodeIdx(h, excludeIdxs)
return idx
}

View File

@@ -31,7 +31,7 @@ var (
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -disableReroutingOnUnavailable and -dropSamplesOnOverload")
_ = flag.Bool("disableRerouting", true, "This option is deprecated and has no effect. See also -disableReroutingOnUnavailable and -dropSamplesOnOverload.")
dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
vmstorageDialTimeout = flag.Duration("vmstorageDialTimeout", 3*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage. "+
"See also -vmstorageUserTimeout")
@@ -44,6 +44,7 @@ var (
"On the other side, disabled re-routing minimizes the number of active time series in the cluster "+
"during rolling restarts and during spikes in series churn rate. "+
"See also -disableRerouting")
rerouteDelay = flag.Duration("rerouteDelay", 20*time.Second, "The maximum time the system waits for vmstorage nodes to become available before re-routing the data to other vmstorage nodes, minimum value is 1s and rounding to seconds")
)
var errStorageReadOnly = errors.New("storage node is read only")
@@ -52,6 +53,16 @@ func (sn *storageNode) isReady() bool {
return !sn.isBroken.Load() && !sn.isReadOnly.Load()
}
func (sn *storageNode) isExcluded() bool {
return (sn.isBroken.Load() && fasttime.UnixTimestamp()-sn.brokenAt.Load() > uint64(*rerouteDelay/time.Second)) || sn.isReadOnly.Load()
}
func (sn *storageNode) setBroken(isBroken bool) {
if !sn.isBroken.Swap(isBroken) && isBroken {
sn.brokenAt.Store(fasttime.UnixTimestamp())
}
}
// push pushes buf to sn internal bufs.
//
// This function doesn't block on fast path.
@@ -108,15 +119,35 @@ again:
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
// Reroute buf to healthy vmstorage nodes if the current node is broken for too long.
timeoutAt := uint64(*rerouteDelay/time.Second) + sn.brokenAt.Load()
if timeoutAt <= fasttime.UnixTimestamp() || sn.isReadOnly.Load() {
sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
}
return nil
}
return nil
// Wait for the vmstorage node to change its state to ready, or timeout.
// sn.brCond.Wait() will be woken up at ~200ms intervals by the health checker.
waitLoop:
for sn.isBroken.Load() && timeoutAt > fasttime.UnixTimestamp() {
sn.brCond.Wait()
select {
case <-sn.stopCh:
break waitLoop
default:
}
}
goto again
}
if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
@@ -126,17 +157,20 @@ again:
sn.brLock.Unlock()
return nil
}
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
if *disableRerouting || len(sns) == 1 {
if len(sns) == 1 {
sn.brCond.Wait()
goto again
}
sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
}
return nil
}
@@ -194,7 +228,8 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
continue
}
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
for !trySendBufToStorages(snb, &br, snIdx, replicas, usedStorageNodes) {
d := timeutil.AddJitterToDuration(time.Millisecond * 200)
t := timerpool.Get(d)
select {
@@ -211,9 +246,25 @@ func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
}
}
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
func trySendBufToStorages(snb *storageNodesBucket, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
sns := snb.sns
// If the current storage node is broken, wait for it to be ready or timeout
if sns[snIdx].isBroken.Load() {
timeoutAt := uint64(*rerouteDelay/time.Second) + sns[snIdx].brokenAt.Load()
if timeoutAt > fasttime.UnixTimestamp() {
return false
}
}
if *dropSamplesOnOverload {
return tryReplicateBufToStorages(sns, br, snIdx, replicas, usedStorageNodes)
}
return tryReplicateBufToStoragesUntilExhausted(sns, br, snIdx, replicas, usedStorageNodes)
}
func tryReplicateBufToStoragesUntilExhausted(sns []*storageNode, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
@@ -255,6 +306,40 @@ func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, r
return true
}
func tryReplicateBufToStorages(sns []*storageNode, br *bufRows, snIdx, replicas int, usedStorageNodes map[*storageNode]struct{}) bool {
previousSuccessLen := len(usedStorageNodes)
for i := 0; i < replicas; i++ {
idx := snIdx + i
for {
if idx >= len(sns) {
idx %= len(sns)
}
sn := sns[idx]
idx++
if _, ok := usedStorageNodes[sn]; ok {
continue
}
if !sn.sendBufRowsNonblocking(br) {
continue
}
usedStorageNodes[sn] = struct{}{}
break
}
}
if _, ok := usedStorageNodes[sns[snIdx]]; !ok {
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to degraded node %s, %d/%d nodes are replicated", len(br.buf), br.rows, sns[snIdx].dialer.Addr(), len(usedStorageNodes), replicas)
return false
} else if previousSuccessLen != len(usedStorageNodes) && len(usedStorageNodes) < replicas {
rowsIncompletelyReplicatedTotal.Add(br.rows)
incompleteReplicationLogger.Warnf("dropping %d rows (%d bytes) as cannot make a copy #%d out of %d copies according to -replicationFactor=%d, since a part of storage nodes is temporarily unavailable", br.rows, len(br.buf), len(usedStorageNodes), replicas, *replicationFactor)
return true
}
return true
}
var (
cannotReplicateLogger = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second)
incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second)
@@ -270,7 +355,7 @@ func (sn *storageNode) checkHealth() {
}
bc, err := sn.dial()
if err != nil {
sn.isBroken.Store(true)
sn.setBroken(true)
sn.brCond.Broadcast()
if sn.lastDialErr == nil {
// Log the error only once.
@@ -282,7 +367,7 @@ func (sn *storageNode) checkHealth() {
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
sn.lastDialErr = nil
sn.bc = bc
sn.isBroken.Store(false)
sn.setBroken(false)
sn.brCond.Broadcast()
}
@@ -324,7 +409,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
sn.isBroken.Store(true)
sn.setBroken(true)
sn.brCond.Broadcast()
sn.connectionErrors.Inc()
return false
@@ -413,6 +498,7 @@ type storageNode struct {
// isBroken is set to true if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isBroken atomic.Bool
brokenAt atomic.Uint64
// isReadOnly is set to true if the given vmstorage node is read only
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
@@ -582,6 +668,11 @@ func initStorageNodes(unsortedAddrs []string, hashSeed uint64) *storageNodesBuck
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
*rerouteDelay = (*rerouteDelay).Round(time.Second)
if *rerouteDelay < time.Second {
*rerouteDelay = time.Second
}
metrics.RegisterSet(ms)
var wg sync.WaitGroup
snb := &storageNodesBucket{
@@ -649,17 +740,6 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0])
}
if *disableRerouting {
if !sn.sendBufMayBlock(rowBuf) {
return rowsProcessed, fmt.Errorf("graceful shutdown started")
}
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
continue
}
again:
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
@@ -694,9 +774,6 @@ func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNo
// It is expected than *disableRerouting isn't set when calling this function.
// It is expected that len(snb.sns) >= 2
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
}
sns := snb.sns
if len(sns) < 2 {
logger.Panicf("BUG: the number of storage nodes is too small for calling rerouteRowsToFreeStorageNodes: %d", len(sns))
@@ -802,23 +879,6 @@ func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
return sent
}
func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
sn.brLock.Lock()
for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
select {
case <-sn.stopCh:
sn.brLock.Unlock()
return false
default:
}
sn.brCond.Wait()
}
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows++
sn.brLock.Unlock()
return true
}
func (sn *storageNode) readOnlyChecker() {
d := timeutil.AddJitterToDuration(time.Second * 30)
ticker := time.NewTicker(d)
@@ -864,7 +924,7 @@ func (sn *storageNode) checkReadOnlyMode() {
cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
}
sn.bc = nil
sn.isBroken.Store(true)
sn.setBroken(true)
sn.brCond.Broadcast()
sn.connectionErrors.Inc()
}