mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-16 15:23:05 +03:00
lib/storage: refactoring - simplify nextDayMetricIDs data structure (#10058)
The data structure used for holding the nextDayMetricIDs is too complex and can be simplified (flattened). Follow up for: #9983 Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -117,7 +117,7 @@ type Storage struct {
|
||||
// This is needed in order to remove CPU usage spikes at 00:00 UTC
|
||||
// due to creation of per-day inverted index for active time series.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
|
||||
nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry]
|
||||
nextDayMetricIDs atomic.Pointer[nextDayMetricIDs]
|
||||
|
||||
// Pending MetricID values to be added to currHourMetricIDs.
|
||||
pendingHourEntriesLock sync.Mutex
|
||||
@@ -739,7 +739,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
|
||||
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
|
||||
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().metricIDs
|
||||
m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len())
|
||||
m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes()
|
||||
|
||||
@@ -1031,12 +1031,10 @@ func (s *Storage) MustClose() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *byDateMetricIDEntry {
|
||||
e := &byDateMetricIDEntry{
|
||||
k: generationDateKey{
|
||||
generation: generation,
|
||||
date: date,
|
||||
},
|
||||
func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *nextDayMetricIDs {
|
||||
e := &nextDayMetricIDs{
|
||||
generation: generation,
|
||||
date: date,
|
||||
}
|
||||
path := filepath.Join(s.cachePath, nextDayMetricIDsFilename)
|
||||
if !fs.IsPathExist(path) {
|
||||
@@ -1075,7 +1073,7 @@ func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *byDateMetri
|
||||
logger.Infof("discarding %s because non-empty tail left; len(tail)=%d", path, len(tail))
|
||||
return e
|
||||
}
|
||||
e.v = *m
|
||||
e.metricIDs = *m
|
||||
return e
|
||||
}
|
||||
|
||||
@@ -1118,16 +1116,16 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
return hm
|
||||
}
|
||||
|
||||
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
||||
func (s *Storage) mustSaveNextDayMetricIDs(e *nextDayMetricIDs) {
|
||||
path := filepath.Join(s.cachePath, nextDayMetricIDsFilename)
|
||||
dst := make([]byte, 0, e.v.Len()*8+16)
|
||||
dst := make([]byte, 0, e.metricIDs.Len()*8+16)
|
||||
|
||||
// Marshal header
|
||||
dst = encoding.MarshalUint64(dst, e.k.generation)
|
||||
dst = encoding.MarshalUint64(dst, e.k.date)
|
||||
dst = encoding.MarshalUint64(dst, e.generation)
|
||||
dst = encoding.MarshalUint64(dst, e.date)
|
||||
|
||||
// Marshal e.v
|
||||
dst = marshalUint64Set(dst, &e.v)
|
||||
dst = marshalUint64Set(dst, &e.metricIDs)
|
||||
|
||||
fs.MustWriteSync(path, dst)
|
||||
}
|
||||
@@ -2320,7 +2318,7 @@ func (s *Storage) updatePerDateData(idb *indexDB, rows []rawRow, mrs []*MetricRo
|
||||
)
|
||||
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().metricIDs
|
||||
ts := fasttime.UnixTimestamp()
|
||||
// Start pre-populating the next per-day inverted index during the last hour of the current day.
|
||||
// pMin linearly increases from 0 to 1 during the last hour of the day.
|
||||
@@ -2661,14 +2659,14 @@ func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set {
|
||||
return metricIDs
|
||||
}
|
||||
|
||||
type generationDateKey struct {
|
||||
// nextDayMetricIDs is a cache that holds the metricIDs for the next day.
|
||||
// The cache is used for improving the performance of data ingestion during
|
||||
// the last hour of the day when the per-day index is prefilled with the next
|
||||
// day entries (see updatePerDayData()).
|
||||
type nextDayMetricIDs struct {
|
||||
generation uint64
|
||||
date uint64
|
||||
}
|
||||
|
||||
type byDateMetricIDEntry struct {
|
||||
k generationDateKey
|
||||
v uint64set.Set
|
||||
metricIDs uint64set.Set
|
||||
}
|
||||
|
||||
func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
@@ -2678,27 +2676,24 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
pendingMetricIDs := s.pendingNextDayMetricIDs
|
||||
s.pendingNextDayMetricIDs = &uint64set.Set{}
|
||||
s.pendingNextDayMetricIDsLock.Unlock()
|
||||
if pendingMetricIDs.Len() == 0 && e.k.generation == generation && e.k.date == date {
|
||||
if pendingMetricIDs.Len() == 0 && e.generation == generation && e.date == date {
|
||||
// Fast path: nothing to update.
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path: union pendingMetricIDs with e.v
|
||||
if e.k.generation == generation && e.k.date == date {
|
||||
pendingMetricIDs.Union(&e.v)
|
||||
if e.generation == generation && e.date == date {
|
||||
pendingMetricIDs.Union(&e.metricIDs)
|
||||
} else {
|
||||
// Do not add pendingMetricIDs from the previous day to the current day,
|
||||
// since this may result in missing registration of the metricIDs in the per-day inverted index.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
|
||||
pendingMetricIDs = &uint64set.Set{}
|
||||
}
|
||||
k := generationDateKey{
|
||||
eNew := &nextDayMetricIDs{
|
||||
generation: generation,
|
||||
date: date,
|
||||
}
|
||||
eNew := &byDateMetricIDEntry{
|
||||
k: k,
|
||||
v: *pendingMetricIDs,
|
||||
metricIDs: *pendingMetricIDs,
|
||||
}
|
||||
s.nextDayMetricIDs.Store(eNew)
|
||||
}
|
||||
|
||||
@@ -598,16 +598,16 @@ func TestStorageAddRows_nextDayIndexPrefill(t *testing.T) {
|
||||
func TestStorageMustLoadNextDayMetricIDs(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
assertNextDayMetricIDs := func(t *testing.T, gotNextDayMetricIDs *byDateMetricIDEntry, wantGen, wantDate uint64, wantLen int) {
|
||||
assertNextDayMetricIDs := func(t *testing.T, gotNextDayMetricIDs *nextDayMetricIDs, wantGen, wantDate uint64, wantLen int) {
|
||||
t.Helper()
|
||||
|
||||
if got, want := gotNextDayMetricIDs.k.generation, wantGen; got != want {
|
||||
if got, want := gotNextDayMetricIDs.generation, wantGen; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs idb generation: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.k.date, wantDate; got != want {
|
||||
if got, want := gotNextDayMetricIDs.date, wantDate; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs date: got %d, want %d", got, want)
|
||||
}
|
||||
if got, want := gotNextDayMetricIDs.v.Len(), wantLen; got != want {
|
||||
if got, want := gotNextDayMetricIDs.metricIDs.Len(), wantLen; got != want {
|
||||
t.Fatalf("unexpected nextDayMetricIDs count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user