mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
016701841a | ||
|
|
3d4c1848dd |
@@ -41,7 +41,7 @@ type dateMetricIDCache struct {
|
||||
|
||||
func newDateMetricIDCache() *dateMetricIDCache {
|
||||
c := dateMetricIDCache{
|
||||
rotationPeriod: timeutil.AddJitterToDuration(1 * time.Hour),
|
||||
rotationPeriod: timeutil.AddJitterToDuration(1 * time.Minute),
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -2229,11 +2229,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
|
||||
var ptw *partitionWrapper
|
||||
var idb *indexDB
|
||||
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
hmCurrDate := hmCurr.hour / 24
|
||||
nextDayMetricIDsCache := s.nextDayMetricIDs.Load()
|
||||
nextDayIDBID := nextDayMetricIDsCache.idbID
|
||||
nextDayMetricIDs := &nextDayMetricIDsCache.metricIDs
|
||||
ts := fasttime.UnixTimestamp()
|
||||
// Start pre-populating the next per-day inverted index during the last hour of the current day.
|
||||
// pMin linearly increases from 0 to 1 during the last hour of the day.
|
||||
@@ -2261,6 +2258,16 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
|
||||
prevDate = date
|
||||
prevMetricID = metricID
|
||||
|
||||
// Slower path: check the dateMetricIDCache if the (date, metricID) pair
|
||||
// is already present in indexDB.
|
||||
if ptw == nil || !ptw.pt.HasTimestamp(r.Timestamp) {
|
||||
if ptw != nil {
|
||||
s.tb.PutPartition(ptw)
|
||||
}
|
||||
ptw = s.tb.MustGetPartition(r.Timestamp)
|
||||
idb = ptw.pt.idb
|
||||
}
|
||||
|
||||
if hmCurr.idbID == nextDayIDBID && pMin > 0 && hour == currentHour {
|
||||
// Gradually pre-populate per-day inverted index for the next day during the last hour of the current day.
|
||||
// This should reduce CPU usage spike and slowdown at the beginning of the next day
|
||||
@@ -2272,7 +2279,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
|
||||
// handled separately in prefillNextIndexDB.
|
||||
// TODO(@rtm0): See if prefillNextIndexDB() logic can be moved here.
|
||||
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
|
||||
if p < pMin && !nextDayMetricIDs.Has(metricID) {
|
||||
if p < pMin && !idb.dateMetricIDCache.Has(date+1, metricID) {
|
||||
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
|
||||
date: date + 1,
|
||||
tsid: &r.TSID,
|
||||
@@ -2282,26 +2289,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
|
||||
}
|
||||
}
|
||||
|
||||
if date == hmCurrDate && hmCurr.m.Has(metricID) {
|
||||
// Fast path: the metricID is in the current hour cache.
|
||||
// This means the metricID has been already added to per-day inverted index.
|
||||
continue
|
||||
}
|
||||
|
||||
if date == hmPrevDate && hmPrev.m.Has(metricID) {
|
||||
// Fast path: the metricID is already registered for its day on the previous hour.
|
||||
continue
|
||||
}
|
||||
|
||||
// Slower path: check the dateMetricIDCache if the (date, metricID) pair
|
||||
// is already present in indexDB.
|
||||
if ptw == nil || !ptw.pt.HasTimestamp(r.Timestamp) {
|
||||
if ptw != nil {
|
||||
s.tb.PutPartition(ptw)
|
||||
}
|
||||
ptw = s.tb.MustGetPartition(r.Timestamp)
|
||||
idb = ptw.pt.idb
|
||||
}
|
||||
// TODO(@rtm0): indexDB.dateMetricIDCache should not be used directly
|
||||
// since its purpose is to optimize is.hasDateMetricID(). See if this
|
||||
// function could be changed so that it does not rely on this cache.
|
||||
|
||||
@@ -524,11 +524,23 @@ func TestStorageAddRows_nextDayIndexPrefill(t *testing.T) {
|
||||
t.Fatalf("unexpected metric id count for next day: got %d, want > %d", got45min, got30min)
|
||||
}
|
||||
|
||||
time.Sleep(14 * time.Minute) // 2000-01-01T23:59:00Z
|
||||
mrs5 := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric5", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
s.AddRows(mrs5, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
got59min := countMetricIDs(t, s, "metric5", nextDay)
|
||||
if got59min < got45min {
|
||||
t.Fatalf("unexpected metric id count for next day: got %d, want > %d", got59min, got45min)
|
||||
}
|
||||
|
||||
// Sleep until the next day
|
||||
// do not close storage, it resets dataMetricID cache and it will result into slow inserts
|
||||
// since dateMetricID cache is not persisted on-disk
|
||||
|
||||
time.Sleep(35 * time.Minute) // 2000-01-02T00:20:00Z
|
||||
time.Sleep(2 * time.Minute) // 2000-01-02T00:01:00Z
|
||||
synctest.Wait()
|
||||
|
||||
// Ingest data for the next day, it must hit dateMetricID cache and
|
||||
@@ -536,12 +548,12 @@ func TestStorageAddRows_nextDayIndexPrefill(t *testing.T) {
|
||||
var m Metrics
|
||||
s.UpdateMetrics(&m)
|
||||
currDaySlowInserts := m.SlowPerDayIndexInserts
|
||||
mrs3NextDay := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric3", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-5 * time.Minute).UnixMilli(),
|
||||
mrs5NextDay := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric5", TimeRange{
|
||||
MinTimestamp: time.Now().Add(-1 * time.Minute).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UnixMilli(),
|
||||
})
|
||||
|
||||
s.AddRows(mrs3NextDay, defaultPrecisionBits)
|
||||
s.AddRows(mrs5NextDay, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
m.Reset()
|
||||
s.UpdateMetrics(&m)
|
||||
|
||||
Reference in New Issue
Block a user