mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-23 19:56:31 +03:00
lib/storage: rotate dateMetricIDCache instead of resetting (#10169)
Currently, `dateMetricIDCache` is reset when it is full and it is never reset is not full but the data it stores is no longer needed. This leads to the following problems: - During regular data ingestion the cache sizeBytes may exceed max allowed size and the cache gets reset which may potentially slow down data ingestion (see #10064) - The cache is per-indexDB. This means that in partition index (#8134) there will be as many instances of this cache as the number of partitions. If someone performs a backfill across all partitions, this will fill all caches and they will never get reset even if no more historical data is ingested. So the solution is to periodically rotate the cache. After first rotation the data is not deleted but moved to `prev` storage. After second rotation `prev` gets deleted. This gives the cache an opportunity to restore the `prev` data if it is still in use. Based on #10167. This PR also removes the introduced recently introduced `-storage.cacheSizeIndexDBDateMetricID` flag (see #10135). This should be safe since it is new and its use case is very niche, i.e. no one would really use it. --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -75,8 +75,6 @@ var (
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
|
||||
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
|
||||
cacheSizeIndexDBDateMetricID = flagutil.NewBytes("storage.cacheSizeIndexDBDateMetricID", 0, "Overrides max size for indexdb/date_metricID cache. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cache-tuning")
|
||||
|
||||
disablePerDayIndex = flag.Bool("disablePerDayIndex", false, "Disable per-day index and use global index for all searches. "+
|
||||
"This may improve performance and decrease disk space usage for the use cases with fixed set of timeseries scattered across a "+
|
||||
@@ -127,7 +125,6 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
storage.SetMetricNamesStatsCacheSize(cacheSizeMetricNamesStats.IntN())
|
||||
storage.SetMetricNameCacheSize(cacheSizeStorageMetricName.IntN())
|
||||
storage.SetMetadataStorageSize(metadataStorageSize.IntN())
|
||||
storage.SetDateMetricIDCacheSize(cacheSizeIndexDBDateMetricID.IntN())
|
||||
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
|
||||
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
|
||||
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
|
||||
@@ -670,7 +667,6 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
|
||||
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeMaxBytes)
|
||||
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeMaxBytes)
|
||||
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeMaxBytes)
|
||||
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheSizeMaxBytes)
|
||||
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSizeMaxBytes)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheRequests)
|
||||
@@ -695,7 +691,6 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
|
||||
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheMisses)
|
||||
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheMisses)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheResetsCount)
|
||||
metrics.WriteCounterUint64(w, `vm_cache_resets_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheResets)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/tsid"}`, m.TSIDCacheCollisions)
|
||||
@@ -705,6 +700,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
|
||||
metrics.WriteCounterUint64(w, `vm_cache_syncs_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheSyncsCount)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_cache_rotations_total{type="indexdb/metricID"}`, idbm.MetricIDCacheRotationsCount)
|
||||
metrics.WriteCounterUint64(w, `vm_cache_rotations_total{type="indexdb/date_metricID"}`, idbm.DateMetricIDCacheRotationsCount)
|
||||
|
||||
metrics.WriteCounterUint64(w, `vm_deleted_metrics_total{type="indexdb"}`, m.DeletedMetricsCount)
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): expose `vmauth_user_request_backend_requests_total` and `vmauth_unauthorized_user_request_backend_requests_total` [metrics](https://docs.victoriametrics.com/victoriametrics/vmauth/#monitoring), which track the number of requests sent to backends. These counts may exceed `vmauth_user_requests_total` and `vmauth_unauthorized_user_requests_total` when requests are retried across multiple backends.
|
||||
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix `vmauth_user_request_backend_errors_total` and `vmauth_unauthorized_user_request_backend_errors_total` to only reflect backend request errors. Previously, these counters could be overcounted with user request error.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): rotate `dateMetricIDCache` instead of resetting it. This should make the eviction less agressive. Since the cache does not have fixed max size anymore the `-storage.cacheSizeIndexDBDateMetricID` flag has been removed. See [#10064](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10053) and PR [#10169](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10169).
|
||||
|
||||
## [v1.132.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.132.0)
|
||||
|
||||
|
||||
@@ -4,97 +4,104 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
var maxDateMetricIDCacheSize uint64
|
||||
|
||||
// SetDateMetricIDCacheSize overrides the default size of dateMetricIDCache
|
||||
func SetDateMetricIDCacheSize(size int) {
|
||||
maxDateMetricIDCacheSize = uint64(size)
|
||||
}
|
||||
|
||||
func getDateMetricIDCacheSize() uint64 {
|
||||
if maxDateMetricIDCacheSize <= 0 {
|
||||
return uint64(float64(memory.Allowed()) / 256)
|
||||
}
|
||||
return maxDateMetricIDCacheSize
|
||||
}
|
||||
|
||||
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
|
||||
// dateMetricIDCache stores (date, metricIDs) entries that have been added to
|
||||
// the index. It is used during data ingestion to decide whether a new entry
|
||||
// needs to be added to the per-day index.
|
||||
//
|
||||
// It should be faster than map[date]*uint64set.Set on multicore systems.
|
||||
type dateMetricIDCache struct {
|
||||
// Contains immutable map
|
||||
byDate atomic.Pointer[byDateMetricIDMap]
|
||||
// Contains immutable (date, metricIDs) entries.
|
||||
curr atomic.Pointer[byDateMetricIDMap]
|
||||
|
||||
// Contains mutable map protected by mu
|
||||
byDateMutable *byDateMetricIDMap
|
||||
|
||||
// Contains the number of slow accesses to byDateMutable.
|
||||
// Is used for deciding when to merge byDateMutable to byDate.
|
||||
// Contains immutable (date, metricIDs) entries that used to be current
|
||||
// before cache rotation. It is used to implement periodic cache clean-up.
|
||||
// Protected by mu.
|
||||
prev *byDateMetricIDMap
|
||||
|
||||
// Contains mutable (date metricIDs) entries that either have been added to
|
||||
// the cache recently or migrated from prev. Protected by mu.
|
||||
next *byDateMetricIDMap
|
||||
|
||||
// Contains the number of slow accesses to next. Is used for deciding when
|
||||
// to merge next to curr. Protected by mu.
|
||||
slowHits int
|
||||
|
||||
// Protected by mu.
|
||||
// Contains the number times the next was merged into curr. Protected by mu.
|
||||
syncsCount uint64
|
||||
|
||||
// Protected by mu.
|
||||
resetsCount uint64
|
||||
// Contains the number times the cache has been rotated. Protected by mu.
|
||||
rotationsCount uint64
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
stopCh chan struct{}
|
||||
rotationStoppedCh chan struct{}
|
||||
}
|
||||
|
||||
func newDateMetricIDCache() *dateMetricIDCache {
|
||||
var dmc dateMetricIDCache
|
||||
dmc.resetLocked()
|
||||
dmc := dateMetricIDCache{
|
||||
prev: newByDateMetricIDMap(),
|
||||
next: newByDateMetricIDMap(),
|
||||
stopCh: make(chan struct{}),
|
||||
rotationStoppedCh: make(chan struct{}),
|
||||
}
|
||||
dmc.curr.Store(newByDateMetricIDMap())
|
||||
go dmc.startRotation()
|
||||
return &dmc
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) resetLocked() {
|
||||
// Do not reset syncsCount and resetsCount
|
||||
dmc.byDate.Store(newByDateMetricIDMap())
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
dmc.slowHits = 0
|
||||
|
||||
dmc.resetsCount++
|
||||
func (dmc *dateMetricIDCache) MustStop() {
|
||||
close(dmc.stopCh)
|
||||
<-dmc.rotationStoppedCh
|
||||
}
|
||||
|
||||
type dateMetricIDCacheStats struct {
|
||||
Size uint64
|
||||
SizeBytes uint64
|
||||
SizeMaxBytes uint64
|
||||
ResetsCount uint64
|
||||
SyncsCount uint64
|
||||
Size uint64
|
||||
SizeBytes uint64
|
||||
SyncsCount uint64
|
||||
RotationsCount uint64
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Stats() dateMetricIDCacheStats {
|
||||
s := dateMetricIDCacheStats{
|
||||
SizeMaxBytes: getDateMetricIDCacheSize(),
|
||||
}
|
||||
|
||||
dmc.mu.Lock()
|
||||
defer dmc.mu.Unlock()
|
||||
|
||||
for _, metricIDs := range dmc.byDate.Load().m {
|
||||
s.Size += uint64(metricIDs.Len())
|
||||
s.SizeBytes += metricIDs.SizeBytes()
|
||||
var s dateMetricIDCacheStats
|
||||
for _, metricIDs := range dmc.curr.Load().m {
|
||||
if metricIDs.Len() > 0 {
|
||||
// empty uint64set.Set still occupies a few bytes. Ignore them.
|
||||
s.Size += uint64(metricIDs.Len())
|
||||
s.SizeBytes += metricIDs.SizeBytes()
|
||||
}
|
||||
}
|
||||
for _, metricIDs := range dmc.byDateMutable.m {
|
||||
s.Size += uint64(metricIDs.Len())
|
||||
s.SizeBytes += metricIDs.SizeBytes()
|
||||
for _, metricIDs := range dmc.prev.m {
|
||||
if metricIDs.Len() > 0 {
|
||||
s.Size += uint64(metricIDs.Len())
|
||||
s.SizeBytes += metricIDs.SizeBytes()
|
||||
}
|
||||
}
|
||||
for _, metricIDs := range dmc.next.m {
|
||||
if metricIDs.Len() > 0 {
|
||||
s.Size += uint64(metricIDs.Len())
|
||||
s.SizeBytes += metricIDs.SizeBytes()
|
||||
}
|
||||
}
|
||||
|
||||
s.ResetsCount = dmc.resetsCount
|
||||
s.SyncsCount = dmc.syncsCount
|
||||
s.RotationsCount = dmc.rotationsCount
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||
if byDate := dmc.byDate.Load(); byDate.get(date).Has(metricID) {
|
||||
curr := dmc.curr.Load()
|
||||
vCurr := curr.get(date)
|
||||
if vCurr.Has(metricID) {
|
||||
// Fast path. The majority of calls must go here.
|
||||
return true
|
||||
}
|
||||
@@ -109,19 +116,29 @@ func (dmc *dateMetricIDCache) hasSlow(date, metricID uint64) bool {
|
||||
|
||||
// First, check immutable map again because the entry may have been moved to
|
||||
// the immutable map by the time the caller acquires the lock.
|
||||
byDate := dmc.byDate.Load()
|
||||
v := byDate.get(date)
|
||||
if v.Has(metricID) {
|
||||
curr := dmc.curr.Load()
|
||||
vCurr := curr.get(date)
|
||||
if vCurr.Has(metricID) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Then check mutable map.
|
||||
vMutable := dmc.byDateMutable.get(date)
|
||||
ok := vMutable.Has(metricID)
|
||||
// Then check next and prev.
|
||||
vNext := dmc.next.getOrCreate(date)
|
||||
ok := vNext.Has(metricID)
|
||||
if !ok {
|
||||
vPrev := dmc.prev.get(date)
|
||||
ok = vPrev.Has(metricID)
|
||||
if ok {
|
||||
// The metricID is in prev but is still in use. Migrate it to next.
|
||||
vNext.Add(metricID)
|
||||
}
|
||||
}
|
||||
|
||||
if ok {
|
||||
dmc.slowHits++
|
||||
if dmc.slowHits > (v.Len()+vMutable.Len())/2 {
|
||||
// It is cheaper to merge byDateMutable into byDate than to pay inter-cpu sync costs when accessing vMutable.
|
||||
if dmc.slowHits > (vCurr.Len()+vNext.Len())/2 {
|
||||
// It is cheaper to merge next into curr than to pay inter-cpu sync
|
||||
// costs when accessing next.
|
||||
dmc.syncLocked()
|
||||
dmc.slowHits = 0
|
||||
}
|
||||
@@ -131,48 +148,50 @@ func (dmc *dateMetricIDCache) hasSlow(date, metricID uint64) bool {
|
||||
|
||||
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
|
||||
dmc.mu.Lock()
|
||||
v := dmc.byDateMutable.getOrCreate(date)
|
||||
v := dmc.next.getOrCreate(date)
|
||||
v.Add(metricID)
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) syncLocked() {
|
||||
if len(dmc.byDateMutable.m) == 0 {
|
||||
if len(dmc.next.m) == 0 {
|
||||
// Nothing to sync.
|
||||
return
|
||||
}
|
||||
|
||||
// Merge data from byDate into byDateMutable and then atomically replace byDate with the merged data.
|
||||
byDate := dmc.byDate.Load()
|
||||
byDateMutable := dmc.byDateMutable
|
||||
byDateMutable.hotEntry.Store(nil)
|
||||
// Merge data from curr into next and then atomically replace curr with the
|
||||
// merged data.
|
||||
curr := dmc.curr.Load()
|
||||
next := dmc.next
|
||||
next.hotEntry.Store(nil)
|
||||
|
||||
keepDatesMap := make(map[uint64]struct{}, len(byDateMutable.m))
|
||||
for date, metricIDsMutable := range byDateMutable.m {
|
||||
keepDatesMap := make(map[uint64]struct{}, len(next.m))
|
||||
for date, vNext := range next.m {
|
||||
keepDatesMap[date] = struct{}{}
|
||||
metricIDs := byDate.get(date)
|
||||
if metricIDs == nil {
|
||||
vCurr := curr.get(date)
|
||||
if vCurr == nil {
|
||||
// Nothing to merge
|
||||
continue
|
||||
}
|
||||
metricIDs = metricIDs.Clone()
|
||||
metricIDs.Union(metricIDsMutable)
|
||||
byDateMutable.m[date] = metricIDs
|
||||
vCurr = vCurr.Clone()
|
||||
vCurr.Union(vNext)
|
||||
next.m[date] = vCurr
|
||||
}
|
||||
|
||||
// Copy entries from byDate, which are missing in byDateMutable
|
||||
allDatesMap := make(map[uint64]struct{}, len(byDate.m))
|
||||
for date, metricIDs := range byDate.m {
|
||||
// Copy entries from curr, which are missing in next
|
||||
allDatesMap := make(map[uint64]struct{}, len(curr.m))
|
||||
for date, vCurr := range curr.m {
|
||||
allDatesMap[date] = struct{}{}
|
||||
v := byDateMutable.get(date)
|
||||
if v != nil {
|
||||
vNext := next.get(date)
|
||||
if vNext != nil {
|
||||
continue
|
||||
}
|
||||
byDateMutable.m[date] = metricIDs
|
||||
next.m[date] = vCurr
|
||||
}
|
||||
|
||||
if len(byDateMutable.m) > 2 {
|
||||
// Keep only entries for the last two dates from allDatesMap plus all the entries for byDateMutable.
|
||||
if len(next.m) > 2 {
|
||||
// Keep only entries for the last two dates from allDatesMap plus all
|
||||
// the entries for next.
|
||||
dates := make([]uint64, 0, len(allDatesMap))
|
||||
for date := range allDatesMap {
|
||||
dates = append(dates, date)
|
||||
@@ -186,29 +205,46 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
||||
for _, date := range dates {
|
||||
keepDatesMap[date] = struct{}{}
|
||||
}
|
||||
for date := range byDateMutable.m {
|
||||
for date := range next.m {
|
||||
if _, ok := keepDatesMap[date]; !ok {
|
||||
delete(byDateMutable.m, date)
|
||||
delete(next.m, date)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var sizeBytes uint64
|
||||
for _, v := range dmc.byDateMutable.m {
|
||||
sizeBytes += v.SizeBytes()
|
||||
}
|
||||
|
||||
// Atomically replace byDate with byDateMutable
|
||||
dmc.byDate.Store(dmc.byDateMutable)
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
// Atomically replace curr with next.
|
||||
dmc.curr.Store(dmc.next)
|
||||
dmc.next = newByDateMetricIDMap()
|
||||
|
||||
dmc.syncsCount++
|
||||
}
|
||||
|
||||
if sizeBytes > getDateMetricIDCacheSize() {
|
||||
dmc.resetLocked()
|
||||
func (dmc *dateMetricIDCache) startRotation() {
|
||||
d := timeutil.AddJitterToDuration(10 * time.Minute)
|
||||
ticker := time.NewTicker(d)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-dmc.stopCh:
|
||||
close(dmc.rotationStoppedCh)
|
||||
return
|
||||
case <-ticker.C:
|
||||
dmc.rotate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rotate atomically rotates next, curr, and prev cache parts.
|
||||
func (dmc *dateMetricIDCache) rotate() {
|
||||
dmc.mu.Lock()
|
||||
defer dmc.mu.Unlock()
|
||||
curr := dmc.curr.Load()
|
||||
dmc.prev = curr
|
||||
dmc.curr.Store(dmc.next)
|
||||
dmc.next = newByDateMetricIDMap()
|
||||
dmc.rotationsCount++
|
||||
}
|
||||
|
||||
// dateMetricIDs holds the date and corresponding metricIDs together and is used
|
||||
// for implementing hot entry fast path in byDateMetricIDMap.
|
||||
type dateMetricIDs struct {
|
||||
|
||||
@@ -6,12 +6,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
func TestDateMetricIDCacheSerial(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
defer c.MustStop()
|
||||
if err := testDateMetricIDCache(c, false); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
@@ -19,6 +19,7 @@ func TestDateMetricIDCacheSerial(t *testing.T) {
|
||||
|
||||
func TestDateMetricIDCacheConcurrent(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
defer c.MustStop()
|
||||
ch := make(chan error, 5)
|
||||
for i := 0; i < 5; i++ {
|
||||
go func() {
|
||||
@@ -63,9 +64,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
if i%34323 == 0 {
|
||||
c.mu.Lock()
|
||||
c.resetLocked()
|
||||
c.mu.Unlock()
|
||||
// Two rotations are needed to clear the cache.
|
||||
c.rotate()
|
||||
c.rotate()
|
||||
m = make(map[dmk]bool)
|
||||
}
|
||||
}
|
||||
@@ -87,13 +88,15 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Verify c.Reset
|
||||
// Verify that cache becomes empty after two rotations.
|
||||
if n := c.Stats().Size; !concurrent && n < 123 {
|
||||
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.resetLocked()
|
||||
c.mu.Unlock()
|
||||
c.rotate()
|
||||
if n := c.Stats().Size; !concurrent && n < 123 {
|
||||
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
||||
}
|
||||
c.rotate()
|
||||
if n := c.Stats().Size; !concurrent && n > 0 {
|
||||
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
|
||||
}
|
||||
@@ -108,6 +111,7 @@ func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
|
||||
numMetrics = 100000
|
||||
)
|
||||
dmc := newDateMetricIDCache()
|
||||
defer dmc.MustStop()
|
||||
var wg sync.WaitGroup
|
||||
for i := range concurrency {
|
||||
wg.Add(1)
|
||||
@@ -124,39 +128,9 @@ func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestDateMetricIDCache_SizeMaxBytes(t *testing.T) {
|
||||
defer SetDateMetricIDCacheSize(0)
|
||||
|
||||
assertSizeMaxBytes := func(dmc *dateMetricIDCache, want uint64) {
|
||||
t.Helper()
|
||||
if got := dmc.Stats().SizeMaxBytes; got != want {
|
||||
t.Fatalf("unexpected sizeMaxBytes: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
defaultSizeMaxBytes := uint64(float64(memory.Allowed()) / 256)
|
||||
var dmc *dateMetricIDCache
|
||||
|
||||
// Default.
|
||||
dmc = newDateMetricIDCache()
|
||||
assertSizeMaxBytes(dmc, defaultSizeMaxBytes)
|
||||
|
||||
// Overriden.
|
||||
SetDateMetricIDCacheSize(1024)
|
||||
dmc = newDateMetricIDCache()
|
||||
assertSizeMaxBytes(dmc, 1024)
|
||||
|
||||
// Overriden at runtime.
|
||||
SetDateMetricIDCacheSize(2048)
|
||||
assertSizeMaxBytes(dmc, 2048)
|
||||
|
||||
// Reset to default at runtime.
|
||||
SetDateMetricIDCacheSize(0)
|
||||
assertSizeMaxBytes(dmc, defaultSizeMaxBytes)
|
||||
}
|
||||
|
||||
func TestDateMetricIDCache_Size(t *testing.T) {
|
||||
dmc := newDateMetricIDCache()
|
||||
defer dmc.MustStop()
|
||||
for i := range 100_000 {
|
||||
date := 12345 + uint64(i%30)
|
||||
metricID := uint64(i)
|
||||
@@ -182,6 +156,7 @@ func TestDateMetricIDCache_Size(t *testing.T) {
|
||||
|
||||
func TestDateMetricIDCache_SizeBytes(t *testing.T) {
|
||||
dmc := newDateMetricIDCache()
|
||||
defer dmc.MustStop()
|
||||
metricIDs := &uint64set.Set{}
|
||||
for i := range 100_000 {
|
||||
date := uint64(123)
|
||||
|
||||
@@ -221,11 +221,10 @@ type IndexDBMetrics struct {
|
||||
MetricIDCacheSyncsCount uint64
|
||||
MetricIDCacheRotationsCount uint64
|
||||
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSizeBytes uint64
|
||||
DateMetricIDCacheSizeMaxBytes uint64
|
||||
DateMetricIDCacheSyncsCount uint64
|
||||
DateMetricIDCacheResetsCount uint64
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSizeBytes uint64
|
||||
DateMetricIDCacheSyncsCount uint64
|
||||
DateMetricIDCacheRotationsCount uint64
|
||||
|
||||
IndexDBRefCount uint64
|
||||
|
||||
@@ -290,9 +289,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||
if dmcs.SizeBytes > m.DateMetricIDCacheSizeBytes {
|
||||
m.DateMetricIDCacheSize = dmcs.Size
|
||||
m.DateMetricIDCacheSizeBytes = dmcs.SizeBytes
|
||||
m.DateMetricIDCacheSizeMaxBytes = dmcs.SizeMaxBytes
|
||||
m.DateMetricIDCacheSyncsCount = dmcs.SyncsCount
|
||||
m.DateMetricIDCacheResetsCount = dmcs.ResetsCount
|
||||
m.DateMetricIDCacheRotationsCount = dmcs.RotationsCount
|
||||
}
|
||||
|
||||
m.IndexDBRefCount += uint64(db.refCount.Load())
|
||||
@@ -334,10 +332,12 @@ func (db *indexDB) decRef() {
|
||||
db.tagFiltersToMetricIDsCache.MustStop()
|
||||
db.loopsPerDateTagFilterCache.MustStop()
|
||||
db.metricIDCache.MustStop()
|
||||
db.dateMetricIDCache.MustStop()
|
||||
|
||||
db.tagFiltersToMetricIDsCache = nil
|
||||
db.loopsPerDateTagFilterCache = nil
|
||||
db.metricIDCache = nil
|
||||
db.dateMetricIDCache = nil
|
||||
|
||||
if !db.mustDrop.Load() {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user