lib/storage: reuse nextDayMetricIDs during the first hour of the day (#10704)

At 00:00 UTC the ingested samples start to have timestamps for the new
day (in the ingested samples are always recent). Even though there was a
next-day prefill of the per-day index during the last hour of the day,
some performance degradation is still possible.

For example, in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10698
it is manifested as `vminsert-to-vmstorage connection saturation` peaks
right after midnight.

Possible hypothesis why this is happening. At midnight,
currHourMetricIDs is empty and prevHourMetricIDs cannot be used because
it holds metricIDs for the previous day. So the ingestion logic hits
dateMetricIDsCache which may not have the metricID in its read-only
buffer and therefore should aquire lock to check its prev read-only
buffer or read-write buffer. Which creates lock contention and therefore
raises ingestion request latency.

A solution to this could be re-using the nextDayMetricIDs during the
first hour of the day. During this time, it is equivalent to
currHourMetricIDs.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
Signed-off-by: Artem Fetishev <149964189+rtm0@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
This commit is contained in:
Artem Fetishev
2026-04-09 16:33:42 +02:00
committed by GitHub
parent dfafd14767
commit 70b0115ea6
5 changed files with 151 additions and 17 deletions

View File

@@ -49,7 +49,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* BUGFIX: All VictoriaMetrics components: Fix an issue where `unsupported` metric metadata type was exposed for summaries and quantiles if a summary wasn't updated within a certain time window. See [metrics#120](https://github.com/VictoriaMetrics/metrics/issues/120) and [metrics#121](https://github.com/VictoriaMetrics/metrics/pull/121).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): align request body buffering flags - `maxRequestBodySizeToRetry` and `requestBufferSize` to the same `16KB` value. Allow disabling request buffering by setting `requestBufferSize=0`. See [#10675](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10675)
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix `scrape_series_added` metric to update only on successful scrapes, aligning its behavior with Prometheus. See [#10653](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10653).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent partial responses from second-level vmselect nodes in [multi-level cluster setups](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multi-level-cluster-setup). Ensures response completeness and correctness, and avoids cache pollution in top-level vmselect. See [#10678](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10678)
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent partial responses from second-level vmselect nodes in [multi-level cluster setups](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multi-level-cluster-setup). Ensures response completeness and correctness, and avoids cache pollution in top-level vmselect. See [#10678](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10678).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Fix storage connection saturation spikes at 00:00 UTC and improve data ingestion when the storage is restarted during the first hour of the day. See [10698](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10698).
## [v1.139.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.139.0)

View File

@@ -306,8 +306,8 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
s.pendingHourEntries = &uint64set.Set{}
// Load nextDayMetricIDs cache after the data table is opened since it
// requires the partition index to operate properly.
date := fasttime.UnixDate()
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
timestamp := fasttime.UnixTimestamp()
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(timestamp)
s.nextDayMetricIDs.Store(nextDayMetricIDs)
s.pendingNextDayMetricIDs = &uint64set.Set{}
@@ -793,12 +793,12 @@ func (s *Storage) nextDayMetricIDsUpdater() {
for {
select {
case <-s.stopCh:
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
timestamp := fasttime.UnixTimestamp()
s.updateNextDayMetricIDs(timestamp)
return
case <-ticker.C:
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
timestamp := fasttime.UnixTimestamp()
s.updateNextDayMetricIDs(timestamp)
}
}
}
@@ -859,7 +859,15 @@ func (s *Storage) MustClose() {
}
}
func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *nextDayMetricIDs {
func (s *Storage) mustLoadNextDayMetricIDs(timestamp uint64) *nextDayMetricIDs {
date := timestamp / (24 * 3600)
if isFirstHourOfDay(timestamp) {
// If this is the first hour of the day, allow to load nextDayMetricIDs
// collected during the next day index prefill during the last hour of
// the day before to speed up data ingestion. See updatePerDateData()
// and updateNextDayMetricIDs().
date--
}
ptw := s.tb.MustGetPartition(int64(date+1) * msecPerDay)
nextDayIDBID := ptw.pt.idb.id
s.tb.PutPartition(ptw)
@@ -1976,6 +1984,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
seriesRepopulated++
slowInsertsCount++
}
// TODO(rtm0): Possibly a bug: pending entry is added and if
// 1) it happens to be sync'ed to currHourMetricIDs before
// updatePerDateData() is executed AND
// 2) that metricID hasn't been registered for that day
// the metric will be lost.
addToPendingHourEntries(hour, lTSID.TSID.MetricID)
continue
}
@@ -2230,12 +2243,14 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
hmCurrDate := hmCurr.hour / 24
nextDayMetricIDsCache := s.nextDayMetricIDs.Load()
nextDayIDBID := nextDayMetricIDsCache.idbID
nextDayDate := nextDayMetricIDsCache.date + 1
nextDayMetricIDs := &nextDayMetricIDsCache.metricIDs
ts := fasttime.UnixTimestamp()
// Start pre-populating the next per-day inverted index during the last hour of the current day.
// pMin linearly increases from 0 to 1 during the last hour of the day.
pMin := (float64(ts%(3600*24)) / 3600) - 23
currentHour := ts / 3600
firstHourOfDay := isFirstHourOfDay(ts)
type pendingDateMetricID struct {
date uint64
tsid *TSID
@@ -2279,14 +2294,26 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow, hmPrev, hmC
}
}
if firstHourOfDay && date == nextDayDate && nextDayMetricIDs.Has(metricID) {
// Fast path: the metricID has already been added to the per-day
// index during the next day prefill.
//
// At 00:00 UTC, nextDayMetricIDs become equivalent to
// currHourMetricIDs. Use it during the first hour of the day
// since currHourMetricIDs is not populated yet.
continue
}
if date == hmCurrDate && hmCurr.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
// This means the metricID has been already added to per-day
// inverted index.
continue
}
if date == hmPrevDate && hmPrev.m.Has(metricID) {
// Fast path: the metricID is already registered for its day on the previous hour.
// Fast path: the metricID is already registered for its day on the
// previous hour.
continue
}
@@ -2400,12 +2427,29 @@ func fastHashUint64(x uint64) uint64 {
// the last hour of the day when the per-day index is prefilled with the next
// day entries (see updatePerDayData()).
type nextDayMetricIDs struct {
idbID uint64
date uint64
// idbID is the id of the indexDB that stores the next day (date+1) metrics.
idbID uint64
// date is the date (usually the current date) relatively to which the next
// day is taken. So next day is date+1.
date uint64
// metricIDs is the set of metricIDs for the next day.
metricIDs uint64set.Set
}
func (s *Storage) updateNextDayMetricIDs(date uint64) {
// updateNextDayMetricIDs updates s.nextDayMetricIDs with the metricIDs for the
// date that follows the timestamp date. For example, if timestamp corresponds
// to 2000-01-01, then s.nextDayMetricIDs holds the metricIDs for 2000-01-02.
//
// The s.nextDayMetricIDs.date and timestamp date must match, otherwise
// s.nextDayMetricIDs will be reset. The only exception is the first hour of the
// next day when s.nextDayMetricIDs is neither updated with new metricIDs nor
// reset. During this time s.nextDayMetricIDs is used in place of
// s.currHourMetricIDs to speed up per-day index creation.
// See updatePerDateData().
func (s *Storage) updateNextDayMetricIDs(timestamp uint64) {
date := timestamp / (3600 * 24)
ptw := s.tb.MustGetPartition(int64(date+1) * msecPerDay)
nextDayIDBID := ptw.pt.idb.id
s.tb.PutPartition(ptw)
@@ -2414,6 +2458,20 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
pendingMetricIDs := s.pendingNextDayMetricIDs
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.pendingNextDayMetricIDsLock.Unlock()
if e.date+1 == date && isFirstHourOfDay(timestamp) {
// Do not reset nextDayMetricIDs during the first hour of the next day
// to speed up the creation of per-day indexes in updatePerDateData().
//
// updatePerDateData() relies on currHourMetricIDs and
// prevHourMetricIDs contents to decide whether the per-day index
// entries have already been created. At exactly 00:00 UTC (and some
// time after it) currHourMetricIDs is empty and prevHourMetricIDs
// cannot be used because it contains metricIDs for the previous
// day.
return
}
// Not comparing indexDB IDs because different idb ids imply different date.
if pendingMetricIDs.Len() == 0 && e.date == date {
// Fast path: nothing to update.
@@ -2434,7 +2492,8 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
pendingMetricIDs.Union(&e.metricIDs)
} else {
// Do not add pendingMetricIDs from the previous day to the current day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// since this may result in missing registration of the metricIDs in the
// per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
pendingMetricIDs = &uint64set.Set{}
}

View File

@@ -692,10 +692,26 @@ func TestStorageNextDayMetricIDs_loadFromStoreToFile(t *testing.T) {
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
// Close storage and open it again at the first second of the next day.
// nextDayMetricIDs must be reset.
// nextDayMetricIDs must still be populated because nextDayMetricIDs
// needs to be preserved during the first hour of the day in order to
// speed up data ingestion.
s.MustClose()
sleepUntil(t, 2000, 1, 2, 0, 0, 0, 0)
s = MustOpenStorage(t.Name(), OpenOptions{})
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
// Close storage and open it again at the last moment of the first hour
// of the next day. nextDayMetricIDs must still be populated.
s.MustClose()
sleepUntil(t, 2000, 1, 2, 0, 59, 59, 999_999_999)
s = MustOpenStorage(t.Name(), OpenOptions{})
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
// Close storage and open it again at the first second of the second
// hour of the next day. nextDayMetricIDs must be reset.
s.MustClose()
sleepUntil(t, 2000, 1, 2, 1, 0, 0, 0)
s = MustOpenStorage(t.Name(), OpenOptions{})
assertNextDayMetricIDs(t, s, idbID, date+1, 0)
// Close the storage and open it again at the last hour.
@@ -722,6 +738,27 @@ func TestStorageNextDayMetricIDs_loadFromStoreToFile(t *testing.T) {
s = MustOpenStorage(t.Name(), OpenOptions{})
assertNextDayMetricIDs(t, s, idbID, date+2, 0)
// Ingest some data and confirm nextDayMetricIDs is not empty.
mrs = testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
MinTimestamp: time.Now().Add(-15 * time.Minute).UnixMilli(),
MaxTimestamp: time.Now().UnixMilli(),
})
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
numNextDayMetricIDs = s.pendingNextDayMetricIDs.Len()
time.Sleep(15 * time.Second)
assertNextDayMetricIDs(t, s, idbID, date+2, numNextDayMetricIDs)
// Close storage and open it again at the first hour of the day after
// tomorrow. While it is the last hour of the day, the metricIDs in
// nextDayMetricIDs is not from yesterday but the day before yesterday
// and therefore nextDayMetricIDs must not be populated but it's date
// must still be day before the current date.
s.MustClose()
sleepUntil(t, 2000, 1, 5, 0, 30, 0, 0)
s = MustOpenStorage(t.Name(), OpenOptions{})
assertNextDayMetricIDs(t, s, idbID, date+3, 0)
// Close the storage to conclude the test.
s.MustClose()
})
@@ -776,9 +813,14 @@ func TestStorageNextDayMetricIDs_update(t *testing.T) {
sleepUntil(t, 2000, 1, 1, 23, 59, 59, 999_999_999)
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
// Advance the time to the beginning of the first hour of the next day and
// Advance the time to the end of the first hour of the next day and
// confirm that nextDayMetricIDs are not reset during this hour.
sleepUntil(t, 2000, 1, 2, 0, 59, 59, 999_999_999)
assertNextDayMetricIDs(t, s, idbID, date, numNextDayMetricIDs)
// Advance the time to the beginning of the second hour of the next day and
// confirm that nextDayMetricIDs is reset.
sleepUntil(t, 2000, 1, 2, 0, 0, 30, 0)
sleepUntil(t, 2000, 1, 2, 1, 0, 30, 0)
assertNextDayMetricIDs(t, s, idbID, date+1, 0)
})
}

View File

@@ -28,6 +28,12 @@ func timestampFromTime(t time.Time) int64 {
return t.UnixNano() / 1e6
}
// Returns true if the timestamp (must be in seconds) is within the first hour
// of the day.
func isFirstHourOfDay(timestamp uint64) bool {
return (timestamp/3600)%24 == 0
}
// TimeRange is time range.
type TimeRange struct {
MinTimestamp int64

View File

@@ -203,3 +203,29 @@ func TestTimeRange_fromPartitionTimestamp(t *testing.T) {
MaxTimestamp: time.Date(2025, 3, 31, 23, 59, 59, 999_000_000, time.UTC).UnixMilli(),
})
}
func TestIsFirstHourOfDay(t *testing.T) {
f := func(tt time.Time, want bool) {
got := isFirstHourOfDay(uint64(tt.Unix()))
if got != want {
t.Fatalf("isFirstHourOfDay(%v) unexpected result: got %t, want %t", tt, got, want)
}
}
firstHourOfDay := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
f(firstHourOfDay, true)
firstHourOfDay = time.Date(2000, 1, 1, 0, 12, 34, 56789, time.UTC)
f(firstHourOfDay, true)
firstHourOfDay = time.Date(2000, 1, 1, 0, 59, 59, 999_999_999, time.UTC)
f(firstHourOfDay, true)
secondHourOfDay := time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)
f(secondHourOfDay, false)
sixthHourOfDay := time.Date(2000, 1, 1, 5, 0, 0, 0, time.UTC)
f(sixthHourOfDay, false)
lastHourOfDay := time.Date(2000, 1, 1, 23, 59, 59, 999_999_999, time.UTC)
f(lastHourOfDay, false)
}