mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/streamaggr: update sync tests (#10939)
synctest runs inner closure in a new goroutine, which makes `t.Helper` instruction
useless on `t.Fatalf` checks. So when test fails we observe the log line where `t.Fatalf`
was called, instead of where `f()` was called.
Moving checks out of synctest closure makes `t.Helper` useful again.
--
In the synctest we were waiting for ingest a new batch of samples for aggregation interval.
Because of this, the new batch had 50% chance to be ingested in the previous or current
aggregation interval, depending on whether go run time initiated flush() call or no.
This change waits for additional 1ms for flush to happen. Locally, it stopped producing
flaky tests.
---------
Signed-off-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit b30c307bbb)
This commit is contained in:
committed by
hagen1778
parent
001d93cf29
commit
221c88797b
@@ -17,11 +17,10 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) {
|
||||
t.Helper()
|
||||
|
||||
var matchIdxs []uint32
|
||||
var tssOutput []prompb.TimeSeries
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
var matchIdxs []uint32
|
||||
var tssOutput []prompb.TimeSeries
|
||||
var tssOutputLock sync.Mutex
|
||||
|
||||
// Initialize Aggregators
|
||||
pushFunc := func(tss []prompb.TimeSeries) {
|
||||
tssOutputLock.Lock()
|
||||
@@ -32,31 +31,31 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
offsetMsecs := time.Now().UnixMilli()
|
||||
for _, metrics := range inputMetrics {
|
||||
// Push the inputMetrics to Aggregators
|
||||
offsetMsecs := time.Now().UnixMilli()
|
||||
tssInput := prometheus.MustParsePromMetrics(metrics, offsetMsecs)
|
||||
matchIdxs = append(matchIdxs, a.Push(tssInput, nil)...)
|
||||
time.Sleep(interval)
|
||||
time.Sleep(interval + time.Millisecond) // shift by 1ms from flush border to avoid flaky tests
|
||||
offsetMsecs += interval.Milliseconds()
|
||||
}
|
||||
|
||||
a.MustStop()
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
for _, v := range matchIdxs {
|
||||
matchIdxsStr += strconv.Itoa(int(v))
|
||||
}
|
||||
if matchIdxsStr != matchIdxsStrExpected {
|
||||
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
||||
}
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
outputMetrics := timeSeriessToString(tssOutput)
|
||||
if outputMetrics != outputMetricsExpected {
|
||||
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
||||
}
|
||||
})
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
for _, v := range matchIdxs {
|
||||
matchIdxsStr += strconv.Itoa(int(v))
|
||||
}
|
||||
if matchIdxsStr != matchIdxsStrExpected {
|
||||
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
||||
}
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
outputMetrics := timeSeriessToString(tssOutput)
|
||||
if outputMetrics != outputMetricsExpected {
|
||||
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
// Empty config
|
||||
|
||||
Reference in New Issue
Block a user