mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/storage: rename cache methods to match unified format (#10534)
Per @valyala's request, rename storage cache methods to adhere the following format: ``` get[Value]By[Key]FromCache put[Value]By[Key]ToCache ``` Also move `s.metricIDCache` methods from `indexDB` to `Storage` because this cache exists at the `Storage` level. Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -12,7 +12,6 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
@@ -358,28 +357,6 @@ func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricI
|
||||
qt.Printf("stored %d metricIDs into cache", metricIDs.Len())
|
||||
}
|
||||
|
||||
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
|
||||
// There is no need in checking for deleted metricIDs here, since they
|
||||
// must be checked by the caller.
|
||||
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
tmp := db.s.metricIDCache.Get(buf[:0], key[:])
|
||||
if len(tmp) == 0 {
|
||||
// The TSID for the given metricID wasn't found in the cache.
|
||||
return io.EOF
|
||||
}
|
||||
if &tmp[0] != &buf[0] || len(tmp) != len(buf) {
|
||||
return fmt.Errorf("corrupted MetricID->TSID cache: unexpected size for metricID=%d value; got %d bytes; want %d bytes", metricID, len(tmp), len(buf))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
|
||||
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
db.s.metricIDCache.Set(key[:], buf[:])
|
||||
}
|
||||
|
||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange) []byte {
|
||||
// Round start and end times to per-day granularity according to per-day inverted index.
|
||||
startDate, endDate := tr.DateRange()
|
||||
@@ -1768,7 +1745,7 @@ func (db *indexDB) SearchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
|
||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||
tsid := &tsids[i]
|
||||
err = db.getFromMetricIDCache(tsid, metricID)
|
||||
err = db.s.getTSIDByMetricIDFromCache(tsid, metricID)
|
||||
if err == nil {
|
||||
// Fast path - the tsid for metricID is found in cache.
|
||||
i++
|
||||
@@ -1790,7 +1767,7 @@ func (db *indexDB) SearchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
|
||||
}
|
||||
continue
|
||||
}
|
||||
db.putToMetricIDCache(metricID, tsid)
|
||||
db.s.putTSIDByMetricIDToCache(metricID, tsid)
|
||||
i++
|
||||
}
|
||||
return true
|
||||
@@ -1923,7 +1900,7 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date ui
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, bool) {
|
||||
metricName := is.db.s.getMetricNameFromCache(dst, metricID)
|
||||
metricName := is.db.s.getMetricNameByMetricIDFromCache(dst, metricID)
|
||||
if len(metricName) > len(dst) {
|
||||
return metricName, true
|
||||
}
|
||||
@@ -1932,7 +1909,7 @@ func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([
|
||||
if ok {
|
||||
// There is no need in verifying whether the given metricID is deleted,
|
||||
// since the filtering must be performed before calling this func.
|
||||
is.db.s.putMetricNameToCache(metricID, dst)
|
||||
is.db.s.putMetricNameByMetricIDToCache(metricID, dst)
|
||||
return dst, true
|
||||
}
|
||||
return dst, false
|
||||
|
||||
@@ -38,7 +38,7 @@ type metricNameSearch struct {
|
||||
func (s *metricNameSearch) search(dst []byte, metricID uint64) ([]byte, bool) {
|
||||
if !s.useSparseCache {
|
||||
n := len(dst)
|
||||
dst = s.storage.getMetricNameFromCache(dst, metricID)
|
||||
dst = s.storage.getMetricNameByMetricIDFromCache(dst, metricID)
|
||||
if len(dst) > n {
|
||||
return dst, true
|
||||
}
|
||||
@@ -52,7 +52,7 @@ func (s *metricNameSearch) search(dst []byte, metricID uint64) ([]byte, bool) {
|
||||
dst, found = idb.searchMetricName(dst, metricID, s.useSparseCache)
|
||||
if found {
|
||||
if !s.useSparseCache {
|
||||
s.storage.putMetricNameToCache(metricID, dst)
|
||||
s.storage.putMetricNameByMetricIDToCache(metricID, dst)
|
||||
}
|
||||
return dst, true
|
||||
}
|
||||
@@ -63,7 +63,7 @@ func (s *metricNameSearch) search(dst []byte, metricID uint64) ([]byte, bool) {
|
||||
dst, found = idb.searchMetricName(dst, metricID, s.useSparseCache)
|
||||
if found {
|
||||
if !s.useSparseCache {
|
||||
s.storage.putMetricNameToCache(metricID, dst)
|
||||
s.storage.putMetricNameByMetricIDToCache(metricID, dst)
|
||||
}
|
||||
return dst, true
|
||||
}
|
||||
@@ -74,7 +74,7 @@ func (s *metricNameSearch) search(dst []byte, metricID uint64) ([]byte, bool) {
|
||||
dst, found = idb.searchMetricName(dst, metricID, s.useSparseCache)
|
||||
if found {
|
||||
if !s.useSparseCache {
|
||||
s.storage.putMetricNameToCache(metricID, dst)
|
||||
s.storage.putMetricNameByMetricIDToCache(metricID, dst)
|
||||
}
|
||||
return dst, true
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package storage
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -1020,14 +1021,36 @@ func (s *Storage) mustSaveCache(c *workingsetcache.Cache, name string) {
|
||||
// saveCacheLock prevents from data races when multiple concurrent goroutines save the same cache.
|
||||
var saveCacheLock sync.Mutex
|
||||
|
||||
func (s *Storage) getMetricNameFromCache(dst []byte, metricID uint64) []byte {
|
||||
func (s *Storage) getTSIDByMetricIDFromCache(dst *TSID, metricID uint64) error {
|
||||
// There is no need in checking for deleted metricIDs here, since they
|
||||
// must be checked by the caller.
|
||||
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
tmp := s.metricIDCache.Get(buf[:0], key[:])
|
||||
if len(tmp) == 0 {
|
||||
// The TSID for the given metricID wasn't found in the cache.
|
||||
return io.EOF
|
||||
}
|
||||
if &tmp[0] != &buf[0] || len(tmp) != len(buf) {
|
||||
return fmt.Errorf("corrupted MetricID->TSID cache: unexpected size for metricID=%d value; got %d bytes; want %d bytes", metricID, len(tmp), len(buf))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) putTSIDByMetricIDToCache(metricID uint64, tsid *TSID) {
|
||||
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
s.metricIDCache.Set(key[:], buf[:])
|
||||
}
|
||||
|
||||
func (s *Storage) getMetricNameByMetricIDFromCache(dst []byte, metricID uint64) []byte {
|
||||
// There is no need in checking for deleted metricIDs here, since they
|
||||
// must be checked by the caller.
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
return s.metricNameCache.Get(dst, key[:])
|
||||
}
|
||||
|
||||
func (s *Storage) putMetricNameToCache(metricID uint64, metricName []byte) {
|
||||
func (s *Storage) putMetricNameByMetricIDToCache(metricID uint64, metricName []byte) {
|
||||
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
|
||||
s.metricNameCache.Set(key[:], metricName)
|
||||
}
|
||||
@@ -1724,7 +1747,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
deletedMetricIDs = idb.getDeletedMetricIDs()
|
||||
}
|
||||
|
||||
if s.getTSIDFromCache(&lTSID, mr.MetricNameRaw) && !deletedMetricIDs.Has(lTSID.TSID.MetricID) {
|
||||
if s.getTSIDByMetricNameFromCache(&lTSID, mr.MetricNameRaw) && !deletedMetricIDs.Has(lTSID.TSID.MetricID) {
|
||||
// Fast path - the TSID for the given mr.MetricNameRaw has been
|
||||
// found in cache and isn't deleted. If the TSID is deleted, we
|
||||
// re-register time series. Eventually, the deleted TSID will be
|
||||
@@ -1775,14 +1798,14 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
|
||||
if is.getTSIDByMetricName(&lTSID.TSID, metricNameBuf, date) {
|
||||
// Slower path - the TSID has been found in indexdb.
|
||||
s.storeTSIDToCache(&lTSID, mr.MetricNameRaw)
|
||||
s.putTSIDByMetricNameToCache(&lTSID, mr.MetricNameRaw)
|
||||
continue
|
||||
}
|
||||
|
||||
// Slowest path - there is no TSID in indexdb for the given mr.MetricNameRaw. Create it.
|
||||
generateTSID(&lTSID.TSID, mn)
|
||||
createAllIndexesForMetricName(idb, mn, &lTSID.TSID, date)
|
||||
s.storeTSIDToCache(&lTSID, mr.MetricNameRaw)
|
||||
s.putTSIDByMetricNameToCache(&lTSID, mr.MetricNameRaw)
|
||||
newSeriesCount++
|
||||
}
|
||||
if ptw != nil {
|
||||
@@ -1926,7 +1949,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
// tsidCache may contain TSIDs that were deleted from some indexDBs but
|
||||
// are still in use in other indexDBs. Thus, also check if a given TSID
|
||||
// was not deleted deom the current indexDB.
|
||||
if s.getTSIDFromCache(&lTSID, mr.MetricNameRaw) && !deletedMetricIDs.Has(lTSID.TSID.MetricID) {
|
||||
if s.getTSIDByMetricNameFromCache(&lTSID, mr.MetricNameRaw) && !deletedMetricIDs.Has(lTSID.TSID.MetricID) {
|
||||
// Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted.
|
||||
|
||||
r.TSID = lTSID.TSID
|
||||
@@ -1981,7 +2004,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
if is.getTSIDByMetricName(&lTSID.TSID, metricNameBuf, date) {
|
||||
// Slower path - the TSID has been found in indexdb.
|
||||
|
||||
s.storeTSIDToCache(&lTSID, mr.MetricNameRaw)
|
||||
s.putTSIDByMetricNameToCache(&lTSID, mr.MetricNameRaw)
|
||||
|
||||
r.TSID = lTSID.TSID
|
||||
prevTSID = lTSID.TSID
|
||||
@@ -1994,7 +2017,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
|
||||
generateTSID(&lTSID.TSID, mn)
|
||||
createAllIndexesForMetricName(idb, mn, &lTSID.TSID, date)
|
||||
s.storeTSIDToCache(&lTSID, mr.MetricNameRaw)
|
||||
s.putTSIDByMetricNameToCache(&lTSID, mr.MetricNameRaw)
|
||||
newSeriesCount++
|
||||
|
||||
r.TSID = lTSID.TSID
|
||||
@@ -2481,13 +2504,13 @@ type legacyTSID struct {
|
||||
_ uint64
|
||||
}
|
||||
|
||||
func (s *Storage) getTSIDFromCache(dst *legacyTSID, metricName []byte) bool {
|
||||
func (s *Storage) getTSIDByMetricNameFromCache(dst *legacyTSID, metricName []byte) bool {
|
||||
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))[:]
|
||||
buf = s.tsidCache.Get(buf[:0], metricName)
|
||||
return uintptr(len(buf)) == unsafe.Sizeof(*dst)
|
||||
}
|
||||
|
||||
func (s *Storage) storeTSIDToCache(tsid *legacyTSID, metricName []byte) {
|
||||
func (s *Storage) putTSIDByMetricNameToCache(tsid *legacyTSID, metricName []byte) {
|
||||
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))[:]
|
||||
s.tsidCache.Set(metricName, buf)
|
||||
}
|
||||
|
||||
@@ -924,7 +924,7 @@ func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
|
||||
assertMetricNameCached := func(metricNameRaw []byte, want bool) {
|
||||
t.Helper()
|
||||
var v legacyTSID
|
||||
if got := s.getTSIDFromCache(&v, metricNameRaw); got != want {
|
||||
if got := s.getTSIDByMetricNameFromCache(&v, metricNameRaw); got != want {
|
||||
t.Errorf("unexpected %q metric name in TSID cache: got %t, want %t", string(metricNameRaw), got, want)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user