Compare commits

...

3 Commits

Author SHA1 Message Date
Haley Wang
06054b8a73 include group first evaluation into interval ticker 2026-04-08 20:48:29 +08:00
Haley Wang
9197cc8c4c fix data race in group lastEvaluation 2026-04-03 20:50:56 +08:00
Haley Wang
1998469cb6 vmalert: add new metric to help debug group eval timestamp 2026-03-31 20:26:56 +08:00
2 changed files with 39 additions and 16 deletions

View File

@@ -789,16 +789,7 @@ func firingAlertStaleTimeSeries(ls map[string]string, timestamp int64) []prompb.
// restore restores the value of ActiveAt field for active alerts,
// based on previously written time series `alertForStateMetricName`.
// Only rules with For > 0 can be restored.
func (ar *AlertingRule) restore(ctx context.Context, q datasource.Querier, ts time.Time, lookback time.Duration) error {
if ar.For < 1 {
return nil
}
if len(ar.alerts) < 1 {
return nil
}
nameStr := fmt.Sprintf("%s=%q", alertNameLabel, ar.Name)
if !*disableAlertGroupLabel {
nameStr = fmt.Sprintf("%s=%q,%s=%q", alertGroupNameLabel, ar.GroupName, alertNameLabel, ar.Name)

View File

@@ -8,6 +8,7 @@ import (
"hash/fnv"
"maps"
"net/url"
"os"
"sync"
"time"
@@ -213,6 +214,7 @@ func (g *Group) CreateID() uint64 {
// restore restores alerts state for group rules
func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts time.Time, lookback time.Duration) error {
for _, rule := range g.Rules {
// Only alerting rule with for > 0 and has active alerts from the first evaluation can be restored
ar, ok := rule.(*AlertingRule)
if !ok {
continue
@@ -220,6 +222,9 @@ func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts ti
if ar.For < 1 {
continue
}
if len(ar.alerts) < 1 {
return nil
}
q := qb.BuildWithParams(datasource.QuerierParams{
EvaluationInterval: g.Interval,
QueryParams: g.Params,
@@ -333,6 +338,11 @@ func (g *Group) Init() {
// Start starts group's evaluation
func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
defer func() { close(g.finishedCh) }()
e := &executor{
Rw: rw,
notifierHeaders: g.NotifierHeaders,
}
evalTS := time.Now()
// sleep random duration to spread group rules evaluation
// over maxStartDelay to reduce the load on datasource.
@@ -367,11 +377,6 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
evalTS = evalTS.Add(sleepBeforeStart)
}
e := &executor{
Rw: rw,
notifierHeaders: g.NotifierHeaders,
}
g.infof("started")
eval := func(ctx context.Context, ts time.Time) time.Time {
@@ -381,7 +386,9 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
if len(g.Rules) < 1 {
g.metrics.iterationDuration.UpdateDuration(start)
g.mu.Lock()
g.LastEvaluation = start
g.mu.Unlock()
return ts
}
@@ -395,7 +402,32 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
}
}
g.metrics.iterationDuration.UpdateDuration(start)
g.mu.Lock()
g.LastEvaluation = start
g.mu.Unlock()
if g.EvalOffset != nil && e.Rw != nil {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
labels := map[string]string{
"__name__": "vmalert_eval_timestamp",
"host": hostname,
"group": g.Name,
"file": g.File,
}
var ls []prompb.Label
for k, v := range labels {
ls = append(ls, prompb.Label{
Name: k,
Value: v,
})
}
ts := newTimeSeries([]float64{float64(ts.Unix())}, []int64{start.Unix()}, ls)
if err := e.Rw.Push(ts); err != nil {
logger.Errorf("group %q: failed to push evaluation timestamp: %s", g.Name, err)
}
}
return ts
}
@@ -405,11 +437,11 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
g.mu.Unlock()
defer g.evalCancel()
realEvalTS := eval(evalCtx, evalTS)
t := time.NewTicker(g.Interval)
defer t.Stop()
realEvalTS := eval(evalCtx, evalTS)
// restore the rules state after the first evaluation
// so only active alerts can be restored.
if rr != nil {