mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
lib/storage: refactor storage synctests
Exctract repeated code from nextDayMetricIDs synctests into separate funcs to make the code more readable. The change was originally introduced in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10704 and was extracted into a separate PR to keep the original change simple.
This commit is contained in:
@@ -554,36 +554,72 @@ func TestStorageAddRows_nextDayIndexPrefill(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageMustLoadNextDayMetricIDs(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
assertNextDayMetricIDs := func(t *testing.T, gotNextDayMetricIDs *nextDayMetricIDs, wantIDBID, wantDate uint64, wantLen int) {
|
||||
t.Helper()
|
||||
|
||||
if got, want := gotNextDayMetricIDs.idbID, wantIDBID; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs idb id: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.date, wantDate; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs date: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.metricIDs.Len(), wantLen; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs count: got %d, want %d", got, want)
|
||||
}
|
||||
func assertPendingNextDayMetricIDsEmpty(t *testing.T, s *Storage) {
|
||||
t.Helper()
|
||||
got := s.pendingNextDayMetricIDs.Len()
|
||||
if got != 0 {
|
||||
t.Fatalf("unexpected s.pendingNextDayMetricIDs count: got %d, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func assertPendingNextDayMetricIDsNotEmpty(t *testing.T, s *Storage) {
|
||||
t.Helper()
|
||||
got := s.pendingNextDayMetricIDs.Len()
|
||||
if got == 0 {
|
||||
t.Fatalf("unexpected s.pendingNextDayMetricIDs count: got 0, want > 0")
|
||||
}
|
||||
}
|
||||
|
||||
func assertNextDayMetricIDs(t *testing.T, s *Storage, wantIDBID, wantDate uint64, wantLen int) {
|
||||
t.Helper()
|
||||
|
||||
gotNextDayMetricIDs := s.nextDayMetricIDs.Load()
|
||||
|
||||
if got, want := gotNextDayMetricIDs.idbID, wantIDBID; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs idb id: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.date, wantDate; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs date: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.metricIDs.Len(), wantLen; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func sleepUntil(t *testing.T, year int, month time.Month, day, hour, min, sec, nsec int) {
|
||||
t.Helper()
|
||||
future := time.Date(year, month, day, hour, min, sec, nsec, time.UTC)
|
||||
now := time.Now().UTC()
|
||||
d := future.Sub(now)
|
||||
if d <= 0 {
|
||||
t.Fatalf("future time %v is before now time %v", future, now)
|
||||
}
|
||||
time.Sleep(d)
|
||||
}
|
||||
|
||||
// TestStorageNextDayMetricIDs_updatedAsynchronously verifies that the metricIDs
|
||||
// registered in per-day index during the next day prefill do not appear in
|
||||
// Storage.nextDayMetricIDs right away but only after some time (seconds).
|
||||
func TestStorageNextDayMetricIDs_updatedAsynchronously(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
// synctest starts at 2000-01-01T00:00:00Z.
|
||||
// Advance time to 23:30 to enable next day prefill.
|
||||
time.Sleep(23*time.Hour + 30*time.Minute) // 2000-01-01T23:30:00Z
|
||||
date := uint64(time.Now().UnixMilli()) / msecPerDay
|
||||
|
||||
// Advance time to the last hour of the day to enable next day index
|
||||
// prefill.
|
||||
sleepUntil(t, 2000, 1, 1, 23, 30, 30, 0)
|
||||
|
||||
const numSeries = 1000
|
||||
s := MustOpenStorage(t.Name(), OpenOptions{})
|
||||
defer s.MustClose()
|
||||
ptw := s.tb.MustGetPartition(time.Now().UnixMilli())
|
||||
idbID := ptw.pt.idb.id
|
||||
s.tb.PutPartition(ptw)
|
||||
|
||||
date := uint64(time.Now().UnixMilli()) / msecPerDay
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
|
||||
// Insert some data.
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
@@ -591,50 +627,159 @@ func TestStorageMustLoadNextDayMetricIDs(t *testing.T) {
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
|
||||
// The next day metricIDs must appear in pendingNextDayMetricIDs cache.
|
||||
if s.pendingNextDayMetricIDs.Len() == 0 {
|
||||
t.Fatalf("unexpected pendingNextDayMetricIDs count: got 0, want > 0")
|
||||
}
|
||||
// Immediately after data ingestion, the next day metricIDs must appear
|
||||
// in s.pendingNextDayMetricIDs but not in the s.nextDayMetricIDs. The
|
||||
// pending metrics will be moved to it by a background task a few seconds
|
||||
// later.
|
||||
assertPendingNextDayMetricIDsNotEmpty(t, s)
|
||||
assertNextDayMetricIDs(t, s, idbID, date, 0)
|
||||
numNextDayMetricIDs := s.pendingNextDayMetricIDs.Len()
|
||||
// But not in the nextDayMetricIDs cache. The pending metrics will be
|
||||
// moved to it by a bg process a few seconds later.
|
||||
assertNextDayMetricIDs(t, s.nextDayMetricIDs.Load(), idbID, date, 0)
|
||||
|
||||
// Wait for nextDayMetricIDs cache to populate.
|
||||
time.Sleep(15 * time.Second)
|
||||
synctest.Wait()
|
||||
|
||||
// At this point, pending metricIDs must have been moved to
|
||||
// nextDayMetricIDs cache and the pendingNextDayMetricIDs must be empty.
|
||||
if got := s.pendingNextDayMetricIDs.Len(); got != 0 {
|
||||
t.Fatalf("unexpected pendingNextDayMetricIDs count: got %d, want 0", got)
|
||||
}
|
||||
// While the actual cache, must contain the exact number of metricIDs
|
||||
// Wait for nextDayMetricIDs to populate. Pending metricIDs must be
|
||||
// moved to nextDayMetricIDs after which pendingNextDayMetricIDs must be
|
||||
// empty. nextDayMetricIDs must contain the exact number of metricIDs
|
||||
// that once were pending.
|
||||
assertNextDayMetricIDs(t, s.nextDayMetricIDs.Load(), idbID, date, numNextDayMetricIDs)
|
||||
time.Sleep(15 * time.Second)
|
||||
assertPendingNextDayMetricIDsEmpty(t, s)
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
})
|
||||
}
|
||||
|
||||
// Close the storage to persist nextDayMetricIDs cache to a file.
|
||||
s.MustClose()
|
||||
// Open the storage again to ensure that the cache is populated
|
||||
// correctly.
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
if got := s.pendingNextDayMetricIDs.Len(); got != 0 {
|
||||
t.Fatalf("unexpected pendingNextDayMetricIDs count: got %d, want 0", got)
|
||||
}
|
||||
assertNextDayMetricIDs(t, s.nextDayMetricIDs.Load(), idbID, date, numNextDayMetricIDs)
|
||||
s.MustClose()
|
||||
// TestStorageNextDayMetricIDs_loadFromStoreToFile verifies the logic of loading
|
||||
// Storage.nextDayMetricIDs from file during the storage start-up and storing it
|
||||
// back to a file during the storage shutdown.
|
||||
func TestStorageNextDayMetricIDs_loadFromStoreToFile(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
// Advance the time by one day and open the storage.
|
||||
// Since the current date and the date in the cache file do not match,
|
||||
// nothing will be loaded into cache.
|
||||
time.Sleep(24 * time.Hour)
|
||||
date = uint64(time.Now().UnixMilli()) / msecPerDay
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
if got := s.pendingNextDayMetricIDs.Len(); got != 0 {
|
||||
t.Fatalf("unexpected pendingNextDayMetricIDs count: got %d, want 0", got)
|
||||
}
|
||||
assertNextDayMetricIDs(t, s.nextDayMetricIDs.Load(), idbID, date, 0)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
// synctest starts at 2000-01-01T00:00:00Z.
|
||||
|
||||
// Advance time to the last hour of the day to enable next day index
|
||||
// prefill.
|
||||
sleepUntil(t, 2000, 1, 1, 23, 30, 30, 0)
|
||||
|
||||
const numSeries = 1000
|
||||
s := MustOpenStorage(t.Name(), OpenOptions{})
|
||||
ptw := s.tb.MustGetPartition(time.Now().UnixMilli())
|
||||
idbID := ptw.pt.idb.id
|
||||
s.tb.PutPartition(ptw)
|
||||
date := uint64(time.Now().UnixMilli()) / msecPerDay
|
||||
|
||||
// Insert some data. Next day metricIDs must appear in
|
||||
// Storage.nextDayMetricIDs.
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
numNextDayMetricIDs := s.pendingNextDayMetricIDs.Len()
|
||||
time.Sleep(15 * time.Second)
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
|
||||
// Close the storage to persist nextDayMetricIDs to a file and then open
|
||||
// it again to ensure that nextDayMetricIDs is populated correctly.
|
||||
s.MustClose()
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
|
||||
// Close storage and open it again at the last moment of the day.
|
||||
// nextDayMetricIDs must still be populated.
|
||||
s.MustClose()
|
||||
sleepUntil(t, 2000, 1, 1, 23, 59, 59, 999_999_999)
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
|
||||
// Close storage and open it again at the first second of the next day.
|
||||
// nextDayMetricIDs must be reset.
|
||||
s.MustClose()
|
||||
sleepUntil(t, 2000, 1, 2, 0, 0, 0, 0)
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
assertNextDayMetricIDs(t, s, idbID, date+1, 0)
|
||||
|
||||
// Close the storage and open it again at the last hour.
|
||||
// Insert data again to populate nextDayMetricIDs.
|
||||
s.MustClose()
|
||||
sleepUntil(t, 2000, 1, 2, 23, 30, 0, 0)
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
numNextDayMetricIDs = s.pendingNextDayMetricIDs.Len()
|
||||
time.Sleep(15 * time.Second)
|
||||
assertNextDayMetricIDs(t, s, idbID, date+1, numNextDayMetricIDs)
|
||||
|
||||
// Close storage and open it again 24h later.
|
||||
// While it is the last hour of the day, the current date and the date
|
||||
// in nextDayMetricIDs do not match and therefore nextDayMetricIDs must
|
||||
// not be populated.
|
||||
s.MustClose()
|
||||
sleepUntil(t, 2000, 1, 3, 23, 30, 0, 0)
|
||||
s = MustOpenStorage(t.Name(), OpenOptions{})
|
||||
assertNextDayMetricIDs(t, s, idbID, date+2, 0)
|
||||
|
||||
// Close the storage to conclude the test.
|
||||
s.MustClose()
|
||||
})
|
||||
}
|
||||
|
||||
// TestStorageNextDayMetricIDs_loadFromStoreToFile verifies the logic of updating
|
||||
// Storage.nextDayMetricIDs at runtime.
|
||||
func TestStorageNextDayMetricIDs_update(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
// synctest starts at 2000-01-01T00:00:00Z.
|
||||
|
||||
// Advance time to just before the last hour of the day.
|
||||
sleepUntil(t, 2000, 1, 1, 22, 59, 59, 999_999_999)
|
||||
|
||||
const numSeries = 1000
|
||||
s := MustOpenStorage(t.Name(), OpenOptions{})
|
||||
defer s.MustClose()
|
||||
ptw := s.tb.MustGetPartition(time.Now().UnixMilli())
|
||||
idbID := ptw.pt.idb.id
|
||||
s.tb.PutPartition(ptw)
|
||||
date := uint64(time.Now().UnixMilli()) / msecPerDay
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
|
||||
// The next day index prefill must not start before the last hour of the
|
||||
// day. Therefore, nextDayMetricIDs must be empty.
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
assertNextDayMetricIDs(t, s, idbID, date, 0)
|
||||
|
||||
// Advance time to the middle of the last hour of the day to enable next
|
||||
// day prefill and insert some data. Next day metricIDs must appear in
|
||||
// Storage.nextDayMetricIDs.
|
||||
sleepUntil(t, 2000, 1, 1, 23, 30, 0, 0)
|
||||
mrs = testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
numNextDayMetricIDs := s.pendingNextDayMetricIDs.Len()
|
||||
time.Sleep(15 * time.Second)
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
|
||||
// Advance the time to the end of the last hour of the current day and
|
||||
// confirm that nextDayMetricIDs are not reset during this hour.
|
||||
sleepUntil(t, 2000, 1, 1, 23, 59, 59, 999_999_999)
|
||||
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
|
||||
|
||||
// Advance the time to the beginning of the first hour of the next day and
|
||||
// confirm that nextDayMetricIDs is reset.
|
||||
sleepUntil(t, 2000, 1, 2, 0, 0, 30, 0)
|
||||
assertNextDayMetricIDs(t, s, idbID, date+1, 0)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user