mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-20 02:06:31 +03:00
Compare commits
3 Commits
issue-1065
...
debug-grou
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06054b8a73 | ||
|
|
9197cc8c4c | ||
|
|
1998469cb6 |
@@ -789,16 +789,7 @@ func firingAlertStaleTimeSeries(ls map[string]string, timestamp int64) []prompb.
|
||||
|
||||
// restore restores the value of ActiveAt field for active alerts,
|
||||
// based on previously written time series `alertForStateMetricName`.
|
||||
// Only rules with For > 0 can be restored.
|
||||
func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts time.Time, lookback time.Duration) error {
|
||||
if ar.For < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(ar.alerts) < 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
nameStr := fmt.Sprintf("%s=%q", alertNameLabel, ar.Name)
|
||||
if !*disableAlertGroupLabel {
|
||||
nameStr = fmt.Sprintf("%s=%q,%s=%q", alertGroupNameLabel, ar.GroupName, alertNameLabel, ar.Name)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"hash/fnv"
|
||||
"maps"
|
||||
"net/url"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -213,6 +214,7 @@ func (g *Group) CreateID() uint64 {
|
||||
// restore restores alerts state for group rules
|
||||
func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts time.Time, lookback time.Duration) error {
|
||||
for _, rule := range g.Rules {
|
||||
// Only alerting rule with for > 0 and has active alerts from the first evaluation can be restored
|
||||
ar, ok := rule.(*AlertingRule)
|
||||
if !ok {
|
||||
continue
|
||||
@@ -220,6 +222,9 @@ func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts ti
|
||||
if ar.For < 1 {
|
||||
continue
|
||||
}
|
||||
if len(ar.alerts) < 1 {
|
||||
return nil
|
||||
}
|
||||
q := qb.BuildWithParams(datasource.QuerierParams{
|
||||
EvaluationInterval: g.Interval,
|
||||
QueryParams: g.Params,
|
||||
@@ -333,6 +338,11 @@ func (g *Group) Init() {
|
||||
// Start starts group's evaluation
|
||||
func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
|
||||
defer func() { close(g.finishedCh) }()
|
||||
e := &executor{
|
||||
Rw: rw,
|
||||
notifierHeaders: g.NotifierHeaders,
|
||||
}
|
||||
|
||||
evalTS := time.Now()
|
||||
// sleep random duration to spread group rules evaluation
|
||||
// over maxStartDelay to reduce the load on datasource.
|
||||
@@ -367,11 +377,6 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
|
||||
evalTS = evalTS.Add(sleepBeforeStart)
|
||||
}
|
||||
|
||||
e := &executor{
|
||||
Rw: rw,
|
||||
notifierHeaders: g.NotifierHeaders,
|
||||
}
|
||||
|
||||
g.infof("started")
|
||||
|
||||
eval := func(ctx context.Context, ts time.Time) time.Time {
|
||||
@@ -381,7 +386,9 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
|
||||
|
||||
if len(g.Rules) < 1 {
|
||||
g.metrics.iterationDuration.UpdateDuration(start)
|
||||
g.mu.Lock()
|
||||
g.LastEvaluation = start
|
||||
g.mu.Unlock()
|
||||
return ts
|
||||
}
|
||||
|
||||
@@ -395,7 +402,32 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
|
||||
}
|
||||
}
|
||||
g.metrics.iterationDuration.UpdateDuration(start)
|
||||
g.mu.Lock()
|
||||
g.LastEvaluation = start
|
||||
g.mu.Unlock()
|
||||
if g.EvalOffset != nil && e.Rw != nil {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
hostname = "unknown"
|
||||
}
|
||||
labels := map[string]string{
|
||||
"__name__": "vmalert_eval_timestamp",
|
||||
"host": hostname,
|
||||
"group": g.Name,
|
||||
"file": g.File,
|
||||
}
|
||||
var ls []prompb.Label
|
||||
for k, v := range labels {
|
||||
ls = append(ls, prompb.Label{
|
||||
Name: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
ts := newTimeSeries([]float64{float64(ts.Unix())}, []int64{start.Unix()}, ls)
|
||||
if err := e.Rw.Push(ts); err != nil {
|
||||
logger.Errorf("group %q: failed to push evaluation timestamp: %s", g.Name, err)
|
||||
}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
@@ -405,11 +437,11 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
|
||||
g.mu.Unlock()
|
||||
defer g.evalCancel()
|
||||
|
||||
realEvalTS := eval(evalCtx, evalTS)
|
||||
|
||||
t := time.NewTicker(g.Interval)
|
||||
defer t.Stop()
|
||||
|
||||
realEvalTS := eval(evalCtx, evalTS)
|
||||
|
||||
// restore the rules state after the first evaluation
|
||||
// so only active alerts can be restored.
|
||||
if rr != nil {
|
||||
|
||||
@@ -40,7 +40,6 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
time.Sleep(interval)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
a.MustStop()
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
|
||||
Reference in New Issue
Block a user