lib/streamaggr: update sync tests (#10939)

synctest runs inner closure in a new goroutine, which makes `t.Helper` instruction
useless on `t.Fatalf` checks. So when test fails we observe the log line where `t.Fatalf`
was called, instead of where `f()` was called.

Moving checks out of synctest closure makes `t.Helper` useful again.

--

In the synctest we were waiting for ingest a new batch of samples for aggregation interval.
Because of this, the new batch had 50% chance to be ingested in the previous or current
aggregation interval, depending on whether go run time initiated flush() call or no.

This change waits for additional 1ms for flush to happen. Locally, it stopped producing
flaky tests.
---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko
2026-05-11 13:06:36 +02:00
committed by GitHub
parent 45177e2683
commit b30c307bbb

View File

@@ -17,11 +17,10 @@ func TestAggregatorsSuccess(t *testing.T) {
f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) {
t.Helper()
var matchIdxs []uint32
var tssOutput []prompb.TimeSeries
synctest.Test(t, func(t *testing.T) {
var matchIdxs []uint32
var tssOutput []prompb.TimeSeries
var tssOutputLock sync.Mutex
// Initialize Aggregators
pushFunc := func(tss []prompb.TimeSeries) {
tssOutputLock.Lock()
@@ -32,31 +31,31 @@ func TestAggregatorsSuccess(t *testing.T) {
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
offsetMsecs := time.Now().UnixMilli()
for _, metrics := range inputMetrics {
// Push the inputMetrics to Aggregators
offsetMsecs := time.Now().UnixMilli()
tssInput := prometheus.MustParsePromMetrics(metrics, offsetMsecs)
matchIdxs = append(matchIdxs, a.Push(tssInput, nil)...)
time.Sleep(interval)
time.Sleep(interval + time.Millisecond) // shift by 1ms from flush border to avoid flaky tests
offsetMsecs += interval.Milliseconds()
}
a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
}
// Verify the tssOutput contains the expected metrics
outputMetrics := timeSeriessToString(tssOutput)
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
})
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
}
// Verify the tssOutput contains the expected metrics
outputMetrics := timeSeriessToString(tssOutput)
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
// Empty config