diff --git a/lib/streamaggr/streamaggr_synctest_test.go b/lib/streamaggr/streamaggr_synctest_test.go index 6d4f86d17e..1cb016e3d6 100644 --- a/lib/streamaggr/streamaggr_synctest_test.go +++ b/lib/streamaggr/streamaggr_synctest_test.go @@ -17,11 +17,10 @@ func TestAggregatorsSuccess(t *testing.T) { f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) { t.Helper() + var matchIdxs []uint32 + var tssOutput []prompb.TimeSeries synctest.Test(t, func(t *testing.T) { - var matchIdxs []uint32 - var tssOutput []prompb.TimeSeries var tssOutputLock sync.Mutex - // Initialize Aggregators pushFunc := func(tss []prompb.TimeSeries) { tssOutputLock.Lock() @@ -32,31 +31,31 @@ func TestAggregatorsSuccess(t *testing.T) { if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } + offsetMsecs := time.Now().UnixMilli() for _, metrics := range inputMetrics { // Push the inputMetrics to Aggregators - offsetMsecs := time.Now().UnixMilli() tssInput := prometheus.MustParsePromMetrics(metrics, offsetMsecs) matchIdxs = append(matchIdxs, a.Push(tssInput, nil)...) - time.Sleep(interval) + time.Sleep(interval + time.Millisecond) // shift by 1ms from flush border to avoid flaky tests + offsetMsecs += interval.Milliseconds() } - a.MustStop() - - // Verify matchIdxs equals to matchIdxsExpected - matchIdxsStr := "" - for _, v := range matchIdxs { - matchIdxsStr += strconv.Itoa(int(v)) - } - if matchIdxsStr != matchIdxsStrExpected { - t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) - } - - // Verify the tssOutput contains the expected metrics - outputMetrics := timeSeriessToString(tssOutput) - if outputMetrics != outputMetricsExpected { - t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) - } }) + + // Verify matchIdxs equals to matchIdxsExpected + matchIdxsStr := "" + for _, v := range matchIdxs { + matchIdxsStr += strconv.Itoa(int(v)) + } + if matchIdxsStr != matchIdxsStrExpected { + t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) + } + + // Verify the tssOutput contains the expected metrics + outputMetrics := timeSeriessToString(tssOutput) + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } } // Empty config