mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-08 19:33:35 +03:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ab4cea5e5 | ||
|
|
c050abbbad | ||
|
|
3f1637fae8 | ||
|
|
c56b9ed03b | ||
|
|
3fd32e331a | ||
|
|
119dfd01bb | ||
|
|
86a1cd700b |
@@ -32,6 +32,12 @@ victoria-metrics-arm64:
|
||||
victoria-metrics-arm64-prod:
|
||||
APP_NAME=victoria-metrics APP_SUFFIX='-arm64' DOCKER_OPTS='--env CGO_ENABLED=0 --env GOARCH=arm64' $(MAKE) app-via-docker
|
||||
|
||||
victoria-metrics-ppc64le:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-ppc64le ./app/victoria-metrics
|
||||
|
||||
victoria-metrics-ppc64le-prod:
|
||||
APP_NAME=victoria-metrics APP_SUFFIX='-ppc64le' DOCKER_OPTS='--env CGO_ENABLED=0 --env GOARCH=ppc64le' $(MAKE) app-via-docker
|
||||
|
||||
victoria-metrics-386:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=386 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-386 ./app/victoria-metrics
|
||||
|
||||
|
||||
5
app/vmselect/promql/arch.go
Normal file
5
app/vmselect/promql/arch.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package promql
|
||||
|
||||
import "unsafe"
|
||||
|
||||
const maxByteSliceLen = 1<<(31+9*(unsafe.Sizeof(int(0))/8)) - 1
|
||||
@@ -1,3 +0,0 @@
|
||||
package promql
|
||||
|
||||
const maxByteSliceLen = 1<<31 - 1
|
||||
@@ -1,3 +0,0 @@
|
||||
package promql
|
||||
|
||||
const maxByteSliceLen = 1 << 40
|
||||
@@ -1,3 +0,0 @@
|
||||
package promql
|
||||
|
||||
const maxByteSliceLen = 1<<31 - 1
|
||||
@@ -1,3 +0,0 @@
|
||||
package promql
|
||||
|
||||
const maxByteSliceLen = 1 << 40
|
||||
@@ -986,6 +986,8 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
|
||||
timestamp := timestamps[i]
|
||||
dt := float64(timestamp-prevTimestamp) * 1e-3
|
||||
sum += 0.5 * (v + prevValue) * dt
|
||||
prevTimestamp = timestamp
|
||||
prevValue = v
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
@@ -291,7 +291,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
||||
f("stdvar_over_time", 945.7430555555555)
|
||||
f("first_over_time", 123)
|
||||
f("last_over_time", 34)
|
||||
f("integrate", 61.0275)
|
||||
f("integrate", 5.4705)
|
||||
f("distinct_over_time", 8)
|
||||
f("ideriv", 0)
|
||||
f("decreases_over_time", 5)
|
||||
@@ -810,7 +810,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 4.6035, 4.3934999999999995, 2.166, 0.34}
|
||||
valuesExpected := []float64{nan, 1.526, 2.2795, 1.325, 0.34}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
||||
@@ -25,8 +25,6 @@ var (
|
||||
// DataPath is a path to storage data.
|
||||
DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
|
||||
|
||||
disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+
|
||||
"This may be useful in order to reduce memory usage when working with high number of time series")
|
||||
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
|
||||
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
|
||||
)
|
||||
@@ -45,9 +43,6 @@ func InitWithoutMetrics() {
|
||||
logger.Fatalf("invalid `-precisionBits`: %s", err)
|
||||
}
|
||||
|
||||
if *disableRecentHourIndex {
|
||||
storage.DisableRecentHourIndex()
|
||||
}
|
||||
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
|
||||
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
|
||||
|
||||
@@ -412,25 +407,6 @@ func registerStorageMetrics() {
|
||||
return float64(idbm().ItemsCount)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 {
|
||||
return float64(m().RecentHourInvertedIndexSize)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 {
|
||||
return float64(m().RecentHourInvertedIndexSizeBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 {
|
||||
return float64(m().RecentHourInvertedIndexUniqueTagPairsSize)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 {
|
||||
return float64(m().RecentHourInvertedIndexPendingMetricIDsSize)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 {
|
||||
return float64(idbm().RecentHourInvertedIndexSearchCalls)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 {
|
||||
return float64(idbm().RecentHourInvertedIndexSearchHits)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 {
|
||||
return float64(idbm().DateRangeSearchCalls)
|
||||
})
|
||||
@@ -491,6 +467,12 @@ func registerStorageMetrics() {
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
|
||||
return float64(m().MetricNameCacheSizeBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 {
|
||||
return float64(m().DateMetricIDCacheSizeBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/hour_metric_ids"}`, func() float64 {
|
||||
return float64(m().HourMetricIDCacheSizeBytes)
|
||||
})
|
||||
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
||||
return float64(idbm().TagCacheSizeBytes)
|
||||
})
|
||||
|
||||
@@ -2,7 +2,7 @@ version: '3.5'
|
||||
services:
|
||||
prometheus:
|
||||
container_name: prometheus
|
||||
image: prom/prometheus:v2.12.0
|
||||
image: prom/prometheus:v2.14.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
|
||||
@@ -95,12 +95,6 @@ type indexDB struct {
|
||||
// The number of successful searches for metric ids by days.
|
||||
dateMetricIDsSearchHits uint64
|
||||
|
||||
// The number of calls for recent hour searches over inverted index.
|
||||
recentHourInvertedIndexSearchCalls uint64
|
||||
|
||||
// The number of hits for recent hour searches over inverted index.
|
||||
recentHourInvertedIndexSearchHits uint64
|
||||
|
||||
// The number of calls for date range searches.
|
||||
dateRangeSearchCalls uint64
|
||||
|
||||
@@ -231,9 +225,6 @@ type IndexDBMetrics struct {
|
||||
DateMetricIDsSearchCalls uint64
|
||||
DateMetricIDsSearchHits uint64
|
||||
|
||||
RecentHourInvertedIndexSearchCalls uint64
|
||||
RecentHourInvertedIndexSearchHits uint64
|
||||
|
||||
DateRangeSearchCalls uint64
|
||||
DateRangeSearchHits uint64
|
||||
|
||||
@@ -275,9 +266,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
|
||||
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
|
||||
|
||||
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
|
||||
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
|
||||
|
||||
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
|
||||
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
|
||||
|
||||
@@ -1013,8 +1001,9 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
||||
if err := ts.Error(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// The database is empty. Return the current date.
|
||||
return minDate, nil
|
||||
// There are no (date,tag)->metricIDs entries in the database yet.
|
||||
// Return the next date, since the current date may contain unindexed data.
|
||||
return minDate + 1, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||
@@ -1667,10 +1656,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
||||
return bytes.Compare(a.prefix, b.prefix) < 0
|
||||
})
|
||||
|
||||
if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) {
|
||||
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
|
||||
return nil
|
||||
}
|
||||
ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -2193,38 +2178,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
|
||||
if disableRecentHourIndex {
|
||||
return false
|
||||
}
|
||||
|
||||
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
|
||||
minHour := uint64(tr.MinTimestamp) / msecPerHour
|
||||
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
|
||||
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
|
||||
// The tr fits the current hour.
|
||||
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
|
||||
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
|
||||
return true
|
||||
}
|
||||
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
|
||||
// The tr fits the previous hour.
|
||||
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
|
||||
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
|
||||
return true
|
||||
}
|
||||
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
|
||||
// The tr spans the previous and the current hours.
|
||||
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs)
|
||||
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs)
|
||||
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (db *indexDB) storeDateMetricID(date, metricID uint64) error {
|
||||
is := db.getIndexSearch()
|
||||
ok, err := is.hasDateMetricID(date, metricID)
|
||||
|
||||
@@ -1418,10 +1418,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
prevMetricIDs.m.Add(tsids[i].MetricID)
|
||||
currMetricIDs.m.Add(tsids[i].MetricID)
|
||||
}
|
||||
prevMetricIDs.iidx = newInmemoryInvertedIndex()
|
||||
prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.m)
|
||||
currMetricIDs.iidx = newInmemoryInvertedIndex()
|
||||
currMetricIDs.iidx.MustUpdate(db, currMetricIDs.m)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,283 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
type inmemoryInvertedIndex struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]*uint64set.Set
|
||||
pendingMetricIDs []uint64
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte {
|
||||
iidx.mu.RLock()
|
||||
defer iidx.mu.RUnlock()
|
||||
|
||||
// Marshal iidx.m
|
||||
var metricIDs []uint64
|
||||
dst = encoding.MarshalUint64(dst, uint64(len(iidx.m)))
|
||||
for k, v := range iidx.m {
|
||||
dst = encoding.MarshalBytes(dst, []byte(k))
|
||||
metricIDs = v.AppendTo(metricIDs[:0])
|
||||
dst = marshalMetricIDs(dst, metricIDs)
|
||||
}
|
||||
|
||||
// Marshal iidx.pendingMetricIDs
|
||||
dst = marshalMetricIDs(dst, iidx.pendingMetricIDs)
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) {
|
||||
iidx.mu.Lock()
|
||||
defer iidx.mu.Unlock()
|
||||
|
||||
// Unmarshal iidx.m
|
||||
if len(src) < 8 {
|
||||
return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src))
|
||||
}
|
||||
mLen := int(encoding.UnmarshalUint64(src))
|
||||
src = src[8:]
|
||||
m := make(map[string]*uint64set.Set, mLen)
|
||||
var metricIDs []uint64
|
||||
for i := 0; i < mLen; i++ {
|
||||
tail, k, err := encoding.UnmarshalBytes(src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err)
|
||||
}
|
||||
src = tail
|
||||
tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err)
|
||||
}
|
||||
src = tail
|
||||
var v uint64set.Set
|
||||
for _, metricID := range metricIDs {
|
||||
v.Add(metricID)
|
||||
}
|
||||
m[string(k)] = &v
|
||||
}
|
||||
iidx.m = m
|
||||
|
||||
// Unmarshal iidx.pendingMetricIDs
|
||||
var err error
|
||||
var tail []byte
|
||||
tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal iidx.pendingMetricIDs: %s", err)
|
||||
}
|
||||
src = tail
|
||||
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs[:0], metricIDs...)
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
|
||||
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
|
||||
for _, metricID := range metricIDs {
|
||||
dst = encoding.MarshalUint64(dst, metricID)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) {
|
||||
if len(src) < 8 {
|
||||
return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src))
|
||||
}
|
||||
metricIDsLen := int(encoding.UnmarshalUint64(src))
|
||||
src = src[8:]
|
||||
if len(src) < 8*metricIDsLen {
|
||||
return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src))
|
||||
}
|
||||
for i := 0; i < metricIDsLen; i++ {
|
||||
metricID := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
dst = append(dst, metricID)
|
||||
}
|
||||
return src, dst, nil
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 {
|
||||
n := uint64(0)
|
||||
iidx.mu.RLock()
|
||||
for k, v := range iidx.m {
|
||||
n += uint64(len(k))
|
||||
n += v.SizeBytes()
|
||||
}
|
||||
n += uint64(len(iidx.pendingMetricIDs)) * 8
|
||||
iidx.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int {
|
||||
if iidx == nil {
|
||||
return 0
|
||||
}
|
||||
iidx.mu.RLock()
|
||||
n := len(iidx.m)
|
||||
iidx.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) GetEntriesCount() int {
|
||||
if iidx == nil {
|
||||
return 0
|
||||
}
|
||||
n := 0
|
||||
iidx.mu.RLock()
|
||||
for _, v := range iidx.m {
|
||||
n += v.Len()
|
||||
}
|
||||
iidx.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int {
|
||||
if iidx == nil {
|
||||
return 0
|
||||
}
|
||||
iidx.mu.RLock()
|
||||
n := len(iidx.pendingMetricIDs)
|
||||
iidx.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
func newInmemoryInvertedIndex() *inmemoryInvertedIndex {
|
||||
return &inmemoryInvertedIndex{
|
||||
m: make(map[string]*uint64set.Set),
|
||||
}
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, src *uint64set.Set) {
|
||||
metricIDs := src.AppendTo(nil)
|
||||
iidx.mu.Lock()
|
||||
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricIDs...)
|
||||
if err := iidx.addPendingEntriesLocked(idb); err != nil {
|
||||
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
|
||||
}
|
||||
iidx.mu.Unlock()
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, metricID uint64) {
|
||||
iidx.mu.Lock()
|
||||
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
|
||||
if err := iidx.addPendingEntriesLocked(idb); err != nil {
|
||||
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err)
|
||||
}
|
||||
iidx.mu.Unlock()
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) {
|
||||
if iidx == nil {
|
||||
return
|
||||
}
|
||||
var result *uint64set.Set
|
||||
var tfFirst *tagFilter
|
||||
for i := range tfs.tfs {
|
||||
if tfs.tfs[i].isNegative {
|
||||
continue
|
||||
}
|
||||
tfFirst = &tfs.tfs[i]
|
||||
break
|
||||
}
|
||||
|
||||
iidx.mu.RLock()
|
||||
defer iidx.mu.RUnlock()
|
||||
|
||||
if tfFirst == nil {
|
||||
result = allMetricIDs.Clone()
|
||||
} else {
|
||||
result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix)
|
||||
}
|
||||
for i := range tfs.tfs {
|
||||
tf := &tfs.tfs[i]
|
||||
if tf == tfFirst {
|
||||
continue
|
||||
}
|
||||
m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix)
|
||||
if tf.isNegative {
|
||||
result.Subtract(m)
|
||||
} else {
|
||||
result.Intersect(m)
|
||||
}
|
||||
if result.Len() == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
metricIDs.Union(result)
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set {
|
||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||
logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix)
|
||||
}
|
||||
prefix := tf.prefix[len(commonPrefix):]
|
||||
var m uint64set.Set
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
for k, v := range iidx.m {
|
||||
if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) {
|
||||
continue
|
||||
}
|
||||
kb.B = append(kb.B[:0], k[len(prefix):]...)
|
||||
ok, err := tf.matchSuffix(kb.B)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err)
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
m.Union(v)
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error {
|
||||
metricIDs := iidx.pendingMetricIDs
|
||||
iidx.pendingMetricIDs = iidx.pendingMetricIDs[:0]
|
||||
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
for _, metricID := range metricIDs {
|
||||
var err error
|
||||
kb.B, err = idb.searchMetricName(kb.B[:0], metricID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
|
||||
}
|
||||
if err = mn.Unmarshal(kb.B); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err)
|
||||
}
|
||||
kb.B = marshalTagValue(kb.B[:0], nil)
|
||||
kb.B = marshalTagValue(kb.B, mn.MetricGroup)
|
||||
iidx.addMetricIDLocked(kb.B, metricID)
|
||||
for i := range mn.Tags {
|
||||
kb.B = mn.Tags[i].Marshal(kb.B[:0])
|
||||
iidx.addMetricIDLocked(kb.B, metricID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) {
|
||||
v := iidx.m[string(key)]
|
||||
if v == nil {
|
||||
v = &uint64set.Set{}
|
||||
iidx.m[string(key)] = v
|
||||
}
|
||||
v.Add(metricID)
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) {
|
||||
iidx := newInmemoryInvertedIndex()
|
||||
const keysCount = 100
|
||||
const metricIDsCount = 10000
|
||||
for i := 0; i < metricIDsCount; i++ {
|
||||
k := fmt.Sprintf("key %d", i%keysCount)
|
||||
iidx.addMetricIDLocked([]byte(k), uint64(i))
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
metricID := uint64(i * 43)
|
||||
iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID)
|
||||
}
|
||||
|
||||
data := iidx.Marshal(nil)
|
||||
|
||||
iidx2 := newInmemoryInvertedIndex()
|
||||
tail, err := iidx2.Unmarshal(data)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot unmarshal iidx: %s", err)
|
||||
}
|
||||
if len(tail) != 0 {
|
||||
t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail))
|
||||
}
|
||||
if len(iidx.m) != len(iidx2.m) {
|
||||
t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m))
|
||||
}
|
||||
if !reflect.DeepEqual(iidx.pendingMetricIDs, iidx2.pendingMetricIDs) {
|
||||
t.Fatalf("unexpected pendingMetricIDs; got\n%d;\nwant\n%d", iidx2.pendingMetricIDs, iidx.pendingMetricIDs)
|
||||
}
|
||||
for k, v := range iidx.m {
|
||||
v2 := iidx2.m[k]
|
||||
if !v.Equal(v2) {
|
||||
t.Fatalf("unexpected set for key %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,21 +72,15 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
|
||||
func TestSearch(t *testing.T) {
|
||||
t.Run("global_inverted_index", func(t *testing.T) {
|
||||
testSearchGeneric(t, false, false)
|
||||
testSearchGeneric(t, false)
|
||||
})
|
||||
t.Run("perday_inverted_index", func(t *testing.T) {
|
||||
testSearchGeneric(t, false, true)
|
||||
})
|
||||
t.Run("recent_hour_global_inverted_index", func(t *testing.T) {
|
||||
testSearchGeneric(t, true, false)
|
||||
})
|
||||
t.Run("recent_hour_perday_inverted_index", func(t *testing.T) {
|
||||
testSearchGeneric(t, true, true)
|
||||
testSearchGeneric(t, true)
|
||||
})
|
||||
}
|
||||
|
||||
func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) {
|
||||
path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex)
|
||||
func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
|
||||
path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex)
|
||||
st, err := OpenStorage(path, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage %q: %s", path, err)
|
||||
@@ -147,10 +141,6 @@ func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayIn
|
||||
extDB.startDateForPerDayInvertedIndex = 0
|
||||
})
|
||||
}
|
||||
if forceRecentHourInvertedIndex {
|
||||
hm := st.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hm.isFull = true
|
||||
}
|
||||
|
||||
// Run search.
|
||||
tr := TimeRange{
|
||||
|
||||
@@ -27,17 +27,6 @@ import (
|
||||
|
||||
const maxRetentionMonths = 12 * 100
|
||||
|
||||
var disableRecentHourIndex = false
|
||||
|
||||
// DisableRecentHourIndex disables in-memory inverted index for recent hour.
|
||||
//
|
||||
// This may be useful in order to save RAM for high cardinality data.
|
||||
//
|
||||
// This function must be called before OpenStorage.
|
||||
func DisableRecentHourIndex() {
|
||||
disableRecentHourIndex = true
|
||||
}
|
||||
|
||||
// Storage represents TSDB storage.
|
||||
type Storage struct {
|
||||
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
|
||||
@@ -317,15 +306,12 @@ type Metrics struct {
|
||||
MetricNameCacheCollisions uint64
|
||||
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSizeBytes uint64
|
||||
DateMetricIDCacheSyncsCount uint64
|
||||
DateMetricIDCacheResetsCount uint64
|
||||
|
||||
HourMetricIDCacheSize uint64
|
||||
|
||||
RecentHourInvertedIndexSize uint64
|
||||
RecentHourInvertedIndexSizeBytes uint64
|
||||
RecentHourInvertedIndexUniqueTagPairsSize uint64
|
||||
RecentHourInvertedIndexPendingMetricIDsSize uint64
|
||||
HourMetricIDCacheSize uint64
|
||||
HourMetricIDCacheSizeBytes uint64
|
||||
|
||||
IndexDBMetrics IndexDBMetrics
|
||||
TableMetrics TableMetrics
|
||||
@@ -372,6 +358,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.MetricNameCacheCollisions += cs.Collisions
|
||||
|
||||
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
|
||||
m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes())
|
||||
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
|
||||
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
|
||||
|
||||
@@ -382,18 +369,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
hourMetricIDsLen = hmCurr.m.Len()
|
||||
}
|
||||
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
|
||||
|
||||
m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount())
|
||||
m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount())
|
||||
|
||||
m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes()
|
||||
m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes()
|
||||
|
||||
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen())
|
||||
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen())
|
||||
|
||||
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen())
|
||||
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen())
|
||||
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
|
||||
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
|
||||
|
||||
s.idb().UpdateMetrics(&m.IndexDBMetrics)
|
||||
s.tb.UpdateMetrics(&m.TableMetrics)
|
||||
@@ -509,7 +486,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
if !fs.IsPathExist(path) {
|
||||
logger.Infof("nothing to load from %q", path)
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
@@ -521,7 +497,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
if len(src) < 24 {
|
||||
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
@@ -534,7 +509,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
if hourLoaded != hour {
|
||||
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
@@ -545,7 +519,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
if uint64(len(src)) < 8*hmLen {
|
||||
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen)
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
@@ -556,30 +529,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
m.Add(metricID)
|
||||
}
|
||||
|
||||
// Unmarshal hm.iidx
|
||||
iidx := newInmemoryInvertedIndex()
|
||||
if !disableRecentHourIndex {
|
||||
tail, err := iidx.Unmarshal(src)
|
||||
if err != nil {
|
||||
logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err)
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
if len(tail) > 0 {
|
||||
logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail))
|
||||
return &hourMetricIDs{
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
|
||||
return &hourMetricIDs{
|
||||
m: m,
|
||||
iidx: iidx,
|
||||
hour: hourLoaded,
|
||||
isFull: isFull != 0,
|
||||
}
|
||||
@@ -605,11 +557,6 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
|
||||
dst = encoding.MarshalUint64(dst, metricID)
|
||||
}
|
||||
|
||||
if !disableRecentHourIndex {
|
||||
// Marshal hm.iidx
|
||||
dst = hm.iidx.Marshal(dst)
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
|
||||
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
||||
}
|
||||
@@ -960,9 +907,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
s.pendingHourEntries.Add(metricID)
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
if !disableRecentHourIndex {
|
||||
hm.iidx.AddMetricID(idb, metricID)
|
||||
}
|
||||
}
|
||||
|
||||
// Slower path: check global cache for (date, metricID) entry.
|
||||
@@ -1028,6 +972,15 @@ func (dmc *dateMetricIDCache) EntriesCount() int {
|
||||
return n
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) SizeBytes() uint64 {
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
n := uint64(0)
|
||||
for _, e := range byDate.m {
|
||||
n += e.v.SizeBytes()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
v := byDate.get(date)
|
||||
@@ -1140,20 +1093,16 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
||||
|
||||
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
|
||||
var m *uint64set.Set
|
||||
var iidx *inmemoryInvertedIndex
|
||||
isFull := hm.isFull
|
||||
if hm.hour == hour {
|
||||
m = hm.m.Clone()
|
||||
iidx = hm.iidx
|
||||
} else {
|
||||
m = &uint64set.Set{}
|
||||
iidx = newInmemoryInvertedIndex()
|
||||
isFull = true
|
||||
}
|
||||
m.Union(newMetricIDs)
|
||||
hmNew := &hourMetricIDs{
|
||||
m: m,
|
||||
iidx: iidx,
|
||||
hour: hour,
|
||||
isFull: isFull,
|
||||
}
|
||||
@@ -1165,7 +1114,6 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
||||
|
||||
type hourMetricIDs struct {
|
||||
m *uint64set.Set
|
||||
iidx *inmemoryInvertedIndex
|
||||
hour uint64
|
||||
isFull bool
|
||||
}
|
||||
|
||||
@@ -108,7 +108,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: 123,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
@@ -144,7 +143,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
@@ -189,7 +187,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: 123,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
@@ -231,7 +228,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
iidx: newInmemoryInvertedIndex(),
|
||||
hour: hour,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
|
||||
Reference in New Issue
Block a user