lib/storage: add tracker for time series metric names statistics

This feature allows to track query requests by metric names. Tracker
state is stored in-memory, capped by 1/100 of allocated memory to the
storage. If cap exceeds, tracker rejects any new items add and instead
registers query requests for already observed metric names.

This feature is disable by default and new flag:
`-storage.trackMetricNamesStats` enables it.

  New API added to the select component:

* /api/v1/status/metric_names_stats - which returns a JSON
object
    with usage statistics.
* /admin/api/v1/status/metric_names_stats/reset - which resets internal
    state of the tracker and reset tsid/cache.

   New metrics were added for this feature:

  * vm_cache_size_bytes{type="storage/metricNamesUsageTracker"}
  * vm_cache_size{type="storage/metricNamesUsageTracker"}
  * vm_cache_size_max_bytes{type="storage/metricNamesUsageTracker"}

  Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4458

---------

Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
Nikolay
2025-03-06 22:06:50 +01:00
committed by GitHub
parent 7dfaef9088
commit b85b28d30a
20 changed files with 1990 additions and 13 deletions

View File

@@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -29,7 +30,10 @@ import (
)
var (
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for reseting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
"See https://docs.victoriametrics.com/#track-ingested-metrics-usage")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
@@ -178,7 +182,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
promql.ResetRollupResultCache()
return true
}
if strings.HasPrefix(path, "/api/v1/label/") {
s := path[len("/api/v1/label/"):]
if strings.HasSuffix(s, "/values") {
@@ -399,6 +402,26 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "/api/v1/status/metric_names_stats":
metricNamesStatsRequests.Inc()
if err := stats.MetricNamesStatsHandler(qt, w, r); err != nil {
metricNamesStatsErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
return true
case "/api/v1/admin/status/metric_names_stats/reset":
metricNamesStatsResetRequests.Inc()
if !httpserver.CheckAuthFlag(w, r, metricNamesStatsResetAuthKey) {
return true
}
if err := stats.ResetMetricNamesStatsHandler(qt); err != nil {
metricNamesStatsResetErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
default:
return false
}
@@ -674,6 +697,12 @@ var (
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`)
buildInfoRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/buildinfo"}`)
queryExemplarsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/query_exemplars"}`)
metricNamesStatsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/metric_names_stats"}`)
metricNamesStatsErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/metric_names_stats"}`)
metricNamesStatsResetRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
)
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request) {

View File

@@ -1367,3 +1367,18 @@ func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) {
//
// See https://github.com/golang/go/blob/704401ffa06c60e059c9e6e4048045b4ff42530a/src/runtime/malloc.go#L11
const maxFastAllocBlockSize = 32 * 1024
// GetMetricNamesStats returns statistic for timeseries metric names usage.
func GetMetricNamesStats(qt *querytracer.Tracer, limit, le int, matchPattern string) (storage.MetricNamesStatsResponse, error) {
qt = qt.NewChild("get metric names usage statistics with limit: %d, less or equal to: %d, match pattern=%q", limit, le, matchPattern)
defer qt.Done()
return vmstorage.GetMetricNamesStats(qt, limit, le, matchPattern)
}
// ResetMetricNamesStats resets state of metric names usage
func ResetMetricNamesStats(qt *querytracer.Tracer) error {
qt = qt.NewChild("reset metric names usage stats")
defer qt.Done()
vmstorage.ResetMetricNamesStats(qt)
return nil
}

View File

@@ -0,0 +1,33 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) %}
{% stripspace %}
MetricNamesStatsResponse generates response for /api/v1/status/metric_names_stats .
{% func MetricNamesStatsResponse(stats *storage.MetricNamesStatsResponse, qt *querytracer.Tracer) %}
{
"status":"success",
"statsCollectedSince": {%dul= stats.CollectedSinceTs %},
"statsCollectedRecordsTotal": {%dul= stats.TotalRecords %},
"trackerMemoryMaxSizeBytes": {%dul= stats.MaxSizeBytes %},
"trackerCurrentMemoryUsageBytes": {%dul= stats.CurrentSizeBytes %},
"records":
[
{% for i, r := range stats.Records %}
{
"metricName":{%q= r.MetricName %},
"queryRequestsCount":{%dul= r.RequestsCount %},
"lastQueryRequestTimestamp":{%dul= r.LastRequestTs %}
}
{% if i+1 < len(stats.Records) %},{% endif %}
{% endfor %}
]
{% code qt.Done() %}
{% code traceJSON := qt.ToJSON() %}
{% if traceJSON != "" %},"trace":{%s= traceJSON %}{% endif %}
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,117 @@
// Code generated by qtc from "metric_names_usage_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/stats/metric_names_usage_response.qtpl:1
package stats
//line app/vmselect/stats/metric_names_usage_response.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
// MetricNamesStatsResponse generates response for /api/v1/status/metric_names_stats .
//line app/vmselect/stats/metric_names_usage_response.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/stats/metric_names_usage_response.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/stats/metric_names_usage_response.qtpl:8
func StreamMetricNamesStatsResponse(qw422016 *qt422016.Writer, stats *storage.MetricNamesStatsResponse, qt *querytracer.Tracer) {
//line app/vmselect/stats/metric_names_usage_response.qtpl:8
qw422016.N().S(`{"status":"success","statsCollectedSince":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:11
qw422016.N().DUL(stats.CollectedSinceTs)
//line app/vmselect/stats/metric_names_usage_response.qtpl:11
qw422016.N().S(`,"statsCollectedRecordsTotal":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:12
qw422016.N().DUL(stats.TotalRecords)
//line app/vmselect/stats/metric_names_usage_response.qtpl:12
qw422016.N().S(`,"trackerMemoryMaxSizeBytes":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:13
qw422016.N().DUL(stats.MaxSizeBytes)
//line app/vmselect/stats/metric_names_usage_response.qtpl:13
qw422016.N().S(`,"trackerCurrentMemoryUsageBytes":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:14
qw422016.N().DUL(stats.CurrentSizeBytes)
//line app/vmselect/stats/metric_names_usage_response.qtpl:14
qw422016.N().S(`,"records":[`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:17
for i, r := range stats.Records {
//line app/vmselect/stats/metric_names_usage_response.qtpl:17
qw422016.N().S(`{"metricName":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:19
qw422016.N().Q(r.MetricName)
//line app/vmselect/stats/metric_names_usage_response.qtpl:19
qw422016.N().S(`,"queryRequestsCount":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:20
qw422016.N().DUL(r.RequestsCount)
//line app/vmselect/stats/metric_names_usage_response.qtpl:20
qw422016.N().S(`,"lastQueryRequestTimestamp":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:21
qw422016.N().DUL(r.LastRequestTs)
//line app/vmselect/stats/metric_names_usage_response.qtpl:21
qw422016.N().S(`}`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:23
if i+1 < len(stats.Records) {
//line app/vmselect/stats/metric_names_usage_response.qtpl:23
qw422016.N().S(`,`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:23
}
//line app/vmselect/stats/metric_names_usage_response.qtpl:24
}
//line app/vmselect/stats/metric_names_usage_response.qtpl:24
qw422016.N().S(`]`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:26
qt.Done()
//line app/vmselect/stats/metric_names_usage_response.qtpl:27
traceJSON := qt.ToJSON()
//line app/vmselect/stats/metric_names_usage_response.qtpl:28
if traceJSON != "" {
//line app/vmselect/stats/metric_names_usage_response.qtpl:28
qw422016.N().S(`,"trace":`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:28
qw422016.N().S(traceJSON)
//line app/vmselect/stats/metric_names_usage_response.qtpl:28
}
//line app/vmselect/stats/metric_names_usage_response.qtpl:28
qw422016.N().S(`}`)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
}
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
func WriteMetricNamesStatsResponse(qq422016 qtio422016.Writer, stats *storage.MetricNamesStatsResponse, qt *querytracer.Tracer) {
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
StreamMetricNamesStatsResponse(qw422016, stats, qt)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
}
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
func MetricNamesStatsResponse(stats *storage.MetricNamesStatsResponse, qt *querytracer.Tracer) string {
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
WriteMetricNamesStatsResponse(qb422016, stats, qt)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
qs422016 := string(qb422016.B)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
return qs422016
//line app/vmselect/stats/metric_names_usage_response.qtpl:31
}

View File

@@ -0,0 +1,50 @@
package stats
import (
"fmt"
"net/http"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
)
// MetricNamesStatsHandler returns timeseries metric names usage statistics
func MetricNamesStatsHandler(qt *querytracer.Tracer, w http.ResponseWriter, r *http.Request) error {
limit := 1000
limitStr := r.FormValue("limit")
if len(limitStr) > 0 {
n, err := strconv.Atoi(limitStr)
if err != nil {
return fmt.Errorf("cannot parse `limit` arg %q: %w", limitStr, err)
}
if n > 0 {
limit = n
}
}
// by default display all values
le := -1
leStr := r.FormValue("le")
if len(leStr) > 0 {
n, err := strconv.Atoi(leStr)
if err != nil {
return fmt.Errorf("cannot parse `le` arg %q: %w", leStr, err)
}
le = n
}
matchPattern := r.FormValue("match_pattern")
stats, err := netstorage.GetMetricNamesStats(qt, limit, le, matchPattern)
if err != nil {
return err
}
WriteMetricNamesStatsResponse(w, &stats, qt)
return nil
}
// ResetMetricNamesStatsHandler resets metric names usage state
func ResetMetricNamesStatsHandler(qt *querytracer.Tracer) error {
if err := netstorage.ResetMetricNamesStats(qt); err != nil {
return err
}
return nil
}

View File

@@ -76,6 +76,11 @@ var (
"This may improve performance and decrease disk space usage for the use cases with fixed set of timeseries scattered across a "+
"big time range (for example, when loading years of historical data). "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#index-tuning")
trackMetricNamesStats = flag.Bool("storage.trackMetricNamesStats", false, "Whether to track ingest and query requests for timeseries metric names. "+
"This feature allows to track metric names unused at query requests. "+
"See https://docs.victoriametrics.com/#track-ingested-metrics-usage")
cacheSizeMetricNamesStats = flagutil.NewBytes("storage.cacheSizeMetricNamesStats", 0, "Overrides max size for storage/metricNamesStatsTracker cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
)
// CheckTimeRange returns true if the given tr is denied for querying.
@@ -105,6 +110,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN())
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN())
storage.SetMetricNamesStatsCacheSize(cacheSizeMetricNamesStats.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
@@ -115,12 +121,12 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
startTime := time.Now()
WG = syncwg.WaitGroup{}
opts := storage.OpenOptions{
Retention: retentionPeriod.Duration(),
MaxHourlySeries: *maxHourlySeries,
MaxDailySeries: *maxDailySeries,
DisablePerDayIndex: *disablePerDayIndex,
Retention: retentionPeriod.Duration(),
MaxHourlySeries: *maxHourlySeries,
MaxDailySeries: *maxDailySeries,
DisablePerDayIndex: *disablePerDayIndex,
TrackMetricNamesStats: *trackMetricNamesStats,
}
strg := storage.MustOpenStorage(*DataPath, opts)
Storage = strg
@@ -193,6 +199,21 @@ func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics
return n, err
}
// GetMetricNamesStats returns metric names usage stats with give limit and lte predicate
func GetMetricNamesStats(qt *querytracer.Tracer, limit, le int, matchPattern string) (storage.MetricNamesStatsResponse, error) {
WG.Add(1)
r := Storage.GetMetricNamesStats(qt, limit, le, matchPattern)
WG.Done()
return r, nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func ResetMetricNamesStats(qt *querytracer.Tracer) {
WG.Add(1)
Storage.ResetMetricNamesStats(qt)
WG.Done()
}
// SearchMetricNames returns metric names for the given tfss on the given tr.
func SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1)
@@ -657,6 +678,12 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_next_retention_seconds`, m.NextRetentionSeconds)
if *trackMetricNamesStats {
metrics.WriteCounterUint64(w, `vm_cache_size_bytes{type="storage/metricNamesStatsTracker"}`, m.MetricNamesUsageTrackerSizeBytes)
metrics.WriteCounterUint64(w, `vm_cache_size{type="storage/metricNamesStatsTracker"}`, m.MetricNamesUsageTrackerSize)
metrics.WriteCounterUint64(w, `vm_cache_size_max_bytes{type="storage/metricNamesStatsTracker"}`, m.MetricNamesUsageTrackerSizeMaxBytes)
}
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize)
}

View File

@@ -258,3 +258,15 @@ func (t *Trace) Contains(s string) int {
}
return times
}
// MetricNamesStatsResponse is an inmemory representation of the
// /api/v1/status/metric_names_stats API response
type MetricNamesStatsResponse struct {
Records []MetricNamesStatsRecord
}
// MetricNamesStatsRecord is a record item for MetricNamesStatsResponse
type MetricNamesStatsRecord struct {
MetricName string
QueryRequestsCount uint64
}

View File

@@ -0,0 +1,202 @@
package tests
import (
"fmt"
"os"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
func TestSingleMetricNamesStats(t *testing.T) {
os.RemoveAll(t.Name())
tc := at.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartVmsingle("vmsingle", []string{"-storage.trackMetricNamesStats=true", "-retentionPeriod=100y"})
const ingestDateTime = `2024-02-05T08:57:36.700Z`
const ingestTimestamp = ` 1707123456700`
dataSet := []string{
`metric_name_1{label="foo"} 10`,
`metric_name_1{label="bar"} 10`,
`metric_name_2{label="baz"} 20`,
`metric_name_1{label="baz"} 10`,
`metric_name_3{label="baz"} 30`,
`metric_name_3{label="baz"} 30`,
}
for idx := range dataSet {
dataSet[idx] += ingestTimestamp
}
sut.PrometheusAPIV1ImportPrometheus(t, dataSet, at.QueryOpts{})
sut.ForceFlush(t)
// verify ingest request correctly registered
expected := apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_1"},
{MetricName: "metric_name_2"},
{MetricName: "metric_name_3"},
},
}
got := sut.APIV1StatusMetricNamesStats(t, "", "", "", at.QueryOpts{})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// verify query request correctly registered
sut.PrometheusAPIV1Query(t, `{__name__!=""}`, at.QueryOpts{Time: ingestDateTime})
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_1", QueryRequestsCount: 3},
{MetricName: "metric_name_2", QueryRequestsCount: 1},
{MetricName: "metric_name_3", QueryRequestsCount: 1},
},
}
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", at.QueryOpts{})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// perform query request for single metric and check counter increase
sut.PrometheusAPIV1Query(t, `metric_name_2`, at.QueryOpts{Time: ingestDateTime})
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_1", QueryRequestsCount: 3},
{MetricName: "metric_name_2", QueryRequestsCount: 2},
{MetricName: "metric_name_3", QueryRequestsCount: 1},
},
}
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", at.QueryOpts{})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// verify le filter
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_2", QueryRequestsCount: 2},
{MetricName: "metric_name_3", QueryRequestsCount: 1},
},
}
got = sut.APIV1StatusMetricNamesStats(t, "", "2", "", at.QueryOpts{})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// reset state and check empty request response
sut.APIV1AdminStatusMetricNamesStatsReset(t, at.QueryOpts{})
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{},
}
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", at.QueryOpts{})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
}
func TestClusterMetricNamesStats(t *testing.T) {
os.RemoveAll(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-1",
"-retentionPeriod=100y",
"-storage.trackMetricNamesStats",
})
vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
"-retentionPeriod=100y",
"-storage.trackMetricNamesStats",
})
vminsert := tc.MustStartVminsert("vminsert", []string{
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VminsertAddr(), vmstorage2.VminsertAddr()),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()),
})
// verify empty stats
resp := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"})
if len(resp.Records) != 0 {
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Records), 0)
}
const ingestDateTime = `2024-02-05T08:57:36.700Z`
const ingestTimestamp = ` 1707123456700`
dataSet := []string{
`metric_name_1{label="foo"} 10`,
`metric_name_1{label="bar"} 10`,
`metric_name_2{label="baz"} 20`,
`metric_name_1{label="baz"} 10`,
`metric_name_3{label="baz"} 30`,
`metric_name_3{label="baz"} 30`,
}
for idx := range dataSet {
dataSet[idx] += ingestTimestamp
}
// ingest per tenant data and verify it with search
tenantIDs := []string{"1:1", "1:15", "15:15"}
for _, tenantID := range tenantIDs {
vminsert.PrometheusAPIV1ImportPrometheus(t, dataSet, apptest.QueryOpts{Tenant: tenantID})
vmstorage1.ForceFlush(t)
vmstorage2.ForceFlush(t)
// verify ingest request correctly registered
expected := apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_1"},
{MetricName: "metric_name_2"},
{MetricName: "metric_name_3"},
},
}
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// verify query request registered correctly
vmselect.PrometheusAPIV1Query(t, `{__name__!=""}`, apptest.QueryOpts{
Tenant: tenantID, Time: ingestDateTime,
})
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_2", QueryRequestsCount: 1},
{MetricName: "metric_name_3", QueryRequestsCount: 1},
{MetricName: "metric_name_1", QueryRequestsCount: 3},
},
}
gotStats = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
}
}
// verify multitenant stats
expected := apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_2", QueryRequestsCount: 3},
{MetricName: "metric_name_3", QueryRequestsCount: 3},
{MetricName: "metric_name_1", QueryRequestsCount: 9},
},
}
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// reset cache and check empty state
vmselect.MetricNamesStatsReset(t, at.QueryOpts{})
resp = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
if len(resp.Records) != 0 {
t.Fatalf("want 0 records, got: %d", len(resp.Records))
}
}

View File

@@ -1,7 +1,9 @@
package apptest
import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"testing"
)
@@ -133,6 +135,45 @@ func (app *Vmselect) DeleteSeries(t *testing.T, matchQuery string, opts QueryOpt
}
}
// MetricNamesStats sends a query to a /select/tenant/prometheus/api/v1/status/metric_names_stats endpoint
// and returns the statistics response for given params.
//
// See https://docs.victoriametrics.com/#Trackingestedmetricsusage
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
t.Helper()
values := opts.asURLValues()
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/metric_names_stats", app.httpListenAddr, opts.getTenant())
res, statusCode := app.cli.PostForm(t, queryURL, values)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
var resp MetricNamesStatsResponse
if err := json.Unmarshal([]byte(res), &resp); err != nil {
t.Fatalf("could not unmarshal series response data:\n%s\n err: %v", res, err)
}
return resp
}
// MetricNamesStatsReset sends a query to a /admin/api/v1/status/metric_names_stats/reset endpoint
//
// See https://docs.victoriametrics.com/#Trackingestedmetricsusage
func (app *Vmselect) MetricNamesStatsReset(t *testing.T, opts QueryOpts) {
t.Helper()
values := opts.asURLValues()
queryURL := fmt.Sprintf("http://%s/admin/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr)
res, statusCode := app.cli.PostForm(t, queryURL, values)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
}
// String returns the string representation of the vmselect app state.
func (app *Vmselect) String() string {
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)

View File

@@ -1,6 +1,7 @@
package apptest
import (
"encoding/json"
"fmt"
"net/http"
"os"
@@ -188,6 +189,45 @@ func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
return NewPrometheusAPIV1SeriesResponse(t, res)
}
// APIV1StatusMetricNamesStats sends a query to a /api/v1/status/metric_names_stats endpoint
// and returns the statistics response for given params.
//
// See https://docs.victoriametrics.com/#track-ingested-metrics-usage
func (app *Vmsingle) APIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
t.Helper()
values := opts.asURLValues()
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
queryURL := fmt.Sprintf("http://%s/api/v1/status/metric_names_stats", app.httpListenAddr)
res, statusCode := app.cli.PostForm(t, queryURL, values)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
var resp MetricNamesStatsResponse
if err := json.Unmarshal([]byte(res), &resp); err != nil {
t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err)
}
return resp
}
// APIV1AdminStatusMetricNamesStatsReset sends a query to a /api/v1/admin/status/metric_names_stats/reset endpoint
//
// See https://docs.victoriametrics.com/#Trackingestedmetricsusage
func (app *Vmsingle) APIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
t.Helper()
values := opts.asURLValues()
queryURL := fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr)
res, statusCode := app.cli.PostForm(t, queryURL, values)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
}
// String returns the string representation of the vmsingle app state.
func (app *Vmsingle) String() string {
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{

View File

@@ -1264,6 +1264,11 @@ Below is the output for `/path/to/vminsert -help`:
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-memory.allowedPercent float
Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage (default 60)
-metricNamesStatsResetAuthKey value
AuthKey for reseting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*
See https://docs.victoriametrics.com/#track-ingested-metrics-usage
Flag value can be read from the given file when using -metricNamesStatsResetAuthKey=file:///abs/path/to/file or -metricNamesStatsResetAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https
url when using -metricNamesStatsResetAuthKey=http://host/path or -metricNamesStatsResetAuthKey=https://host/path
-metrics.exposeMetadata
Whether to expose TYPE and HELP metadata at the /metrics page, which is exposed at -httpListenAddr . The metadata may be needed when the /metrics page is consumed by systems, which require this information. For example, Managed Prometheus in Google Cloud - https://cloud.google.com/stackdriver/docs/managed-prometheus/troubleshooting#missing-metric-type
-metricsAuthKey value
@@ -1936,6 +1941,9 @@ Below is the output for `/path/to/vmstorage -help`:
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeMetricNamesStats size
Overrides max size for storage/metricNamesStatsTracker cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
@@ -1950,6 +1958,9 @@ Below is the output for `/path/to/vmstorage -help`:
-storage.minFreeDiskSpaceBytes size
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 10000000)
-storage.trackMetricNamesStats
Whether to track ingest and query requests for timeseries metric names. This feature allows to track metric names unused at query requests.
See https://docs.victoriametrics.com/#track-ingested-metrics-usage
-storage.vminsertConnsShutdownDuration duration
The time needed for gradual closing of vminsert connections during graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. See https://docs.victoriametrics.com/cluster-victoriametrics/#improving-re-routing-performance-during-restart (default 25s)
-storageDataPath string

View File

@@ -461,6 +461,58 @@ vmselect requests stats via [/api/v1/status/tsdb](#tsdb-stats) API from each vms
This may lead to inflated values when samples for the same time series are spread across multiple vmstorage nodes
due to [replication](#replication) or [rerouting](https://docs.victoriametrics.com/cluster-victoriametrics/?highlight=re-routes#cluster-availability).
### Track ingested metrics usage
VictoriaMetrics provides the ability to record statistics of fetched [metric names](https://docs.victoriametrics.com/keyconcepts/#structure-of-a-metric) during [querying](https://docs.victoriametrics.com/keyconcepts/#query-data). This feature can be enabled via the flag `--storage.trackMetricNamesStats` (disabled by default) on a single-node VictoriaMetrics or [vmstorage](https://docs.victoriametrics.com/cluster-victoriametrics/#architecture-overview). Querying a metric with non-matching filters doesn't increase the counter for this particular metric name.
For example, querying for `vm_log_messages_total{level!="info"}` won't increment usage counter for `vm_log_messages_total` if there are no `{level="error"}` or `{level="warning"}` series yet.
VictoriaMetrics tracks metric names query statistics for `/api/v1/query`, `/api/v1/query_range`, `/render`, `/federate` and `/api/v1/export` API calls.
To get metric names usage statistics, use the `/prometheus/api/v1/status/metric_names_stats` API endpoint. It accepts the following query parameters:
* `limit` - integer value to limit the number of metric names in response. By default, API returns 1000 records.
* `le` - `less than or equal`, is an integer threshold for filtering metric names by their usage count in queries. For example, with `?le=1` API returns metric names that were queried <=1 times.
* `match_pattern` - a substring pattern to match metric names. For example, `?match_pattern=vm_` will match any metric names with `vm_` pattern, like `vm_http_requests`, `max_vm_memory_available`. It doesn't support regex syntax.
The API endpoint returns the following `JSON` response:
```json
{
"status": "success",
"statsSollectedSince": 1737534094,
"statsCollectedRecordsTotal": 2,
"records": [
{
"metricName": "node_disk_writes_completed_total",
"queryRequests": 50,
"lastRequestTimestamp": 1737534262
},
{
"metricName": "node_network_transmit_errs_total",
"queryRequestsCount": 100,
"lastRequestTimestamp": 1737534262
}
]
}
```
VictoriaMetrics stores tracked metric names in memory and saves the state to disk in the data/cache folder during restarts.
The size of the in-memory state is limited to 1% of the available memory by default.
This limit can be adjusted using the `-storage.cacheSizeMetricNamesStats` flag.
When the maximum state capacity is reached, VictoriaMetrics will stop tracking stats for newly registered time series.
However, read request statistics for already tracked time series will continue to work as expected.
VictoriaMetrics exposes the following metrics for the metric name tracker:
* vm_cache_size_bytes{type="storage/metricNamesStatsTracker"}
* vm_cache_size{type="storage/metricNamesStatsTracker"}
* vm_cache_size_max_bytes{type="storage/metricNamesStatsTracker"}
An alerting rule with query `vm_cache_size_bytes{type="storage/metricNamesStatsTracker"} \ vm_cache_size_max_bytes{type="storage/metricNamesStatsTracker"} > 0.9` can be used to notify the user of cache utilization exceeding 90%.
The metric name tracker state can be reset via the API endpoint /api/v1/admin/status/metric_names_stats/reset or
via [cache removal](#cache-removal) procedure.
## How to apply new config to VictoriaMetrics
VictoriaMetrics is configured via command-line flags, so it must be restarted when new command-line flags should be applied:
@@ -3209,6 +3261,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
-metricNamesStatsResetAuthKey value
AuthKey for reseting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*
See https://docs.victoriametrics.com/#track-ingested-metrics-usage
Flag value can be read from the given file when using -metricNamesStatsResetAuthKey=file:///abs/path/to/file or -metricNamesStatsResetAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https
url when using -metricNamesStatsResetAuthKey=http://host/path or -metricNamesStatsResetAuthKey=https://host/path
-retentionFilter array
Retention filter in the format 'filter:retention'. For example, '{env="dev"}:3d' configures the retention for time series with env="dev" label to 3 days. See https://docs.victoriametrics.com/#retention-filters for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise/
Supports an array of values separated by comma or specified via multiple flags.
@@ -3357,6 +3414,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeMetricNamesStats size
Overrides max size for storage/metricNamesStatsTracker cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
@@ -3371,6 +3431,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storage.minFreeDiskSpaceBytes size
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 10000000)
-storage.trackMetricNamesStats
Whether to track ingest and query requests for timeseries metric names. This feature allows to track metric names unused at query requests.
See https://docs.victoriametrics.com/#track-ingested-metrics-usage
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string

View File

@@ -34,6 +34,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmalert-tool](https://docs.victoriametrics.com/vmalert-tool/): make the temporary storage path for unittest unique, allowing user to run multiple vmalert-tool processes on a single host simultaneously. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8393).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): disallow using [time buckets stats pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets) in VictoriaLogs rule expressions. Such construction produces meaningless results for [stats query API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and may lead to cardinality issues.
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): make `KeyValueList`, `ArrayValue` [OpenTelemetry protocol for metrics](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) attributes label values compatible with open-telemetry-collector format. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8384).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add a new flag `--storage.trackMetricNamesStats` and a new HTTP API - `/api/v1/status/metric_names_stats`. It allows to track how frequent ingested [metric names](https://docs.victoriametrics.com/keyconcepts/#structure-of-a-metric) are used during [querying](https://docs.victoriametrics.com/keyconcepts/#query-data). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4458) for details and related [docs](https://docs.victoriametrics.com/#track-ingested-metrics-usage)
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/): fix the incorrect caching of extMetricsIDs when a query timeout error occurs. This can lead to incorrect query results. Thanks to @changshun-shi for [the bug report issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8345).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): respect time filter when exploring time series for [influxdb mode](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8259) for details.
@@ -83,7 +85,6 @@ Released at 2025-02-21
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): print full error messages for failed queries on the `Explore Cardinality` page. Before, only response status code was printed.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): move values representing changes relative to the previous day to a separate column for easier sorting on the `Explore Cardinality` page.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/metricsql/): support auto-format (prettify) for expressions that use quoted metric or label names. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7703) for details.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/metricsql/): parse `$__interval` and `$__rate_interval` inside square brackets as missing square brackets. For example, `rate(m[$__interval])` is parsed as `rate(m)` instead of `rate(m[1i])`. This enables automatic detection of the lookbehind window for [rollup functions](https://docs.victoriametrics.com/metricsql/#rollup-functions) by VictoriaMetrics, which usually returns the most expected result.
* BUGFIX: all the VictoriaMetrics components: properly override basic authorization for API endpoints protected with `authKey`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7345#issuecomment-2662595807) for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix polluted alert messages when multiple Alertmanager instances are configured with `alert_relabel_configs`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8040), and thanks to @evkuzin for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8258).

View File

@@ -479,7 +479,7 @@ func isProtectedByAuthFlag(path string) bool {
return strings.HasSuffix(path, "/config") || strings.HasSuffix(path, "/reload") ||
strings.HasSuffix(path, "/resetRollupResultCache") || strings.HasSuffix(path, "/delSeries") || strings.HasSuffix(path, "/delete_series") ||
strings.HasSuffix(path, "/force_merge") || strings.HasSuffix(path, "/force_flush") || strings.HasSuffix(path, "/snapshot") ||
strings.HasPrefix(path, "/snapshot/")
strings.HasPrefix(path, "/snapshot/") || strings.HasSuffix(path, "/admin/status/metric_names_stats/reset")
}
// CheckAuthFlag checks whether the given authKey is set and valid

View File

@@ -0,0 +1,593 @@
package metricnamestats
import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const (
// metricNameBufSize can hold up to 64 metric name values
// max size of metric name label value is 256
// but usual size of metric name is 16-32
metricNameBufSize = 16 * 1024
statItemBufSize = 1024
// statKey + statItem + approx key-value at map in-memory size
storeOverhead = 24 + 16 + 24
)
// Tracker implements in-memory tracker for timeseries metric names
// it tracks ingest and query requests for metric names
// and collects statistics
//
// main purpose of this tracker is to provide insights about metrics that have never been queried
type Tracker struct {
maxSizeBytes uint64
cachePath string
creationTs atomic.Uint64
currentSizeBytes atomic.Uint64
currentItemsCount atomic.Uint64
// mu protect fields below
mu sync.RWMutex
store map[statKey]*statItem
// holds batch allocations for statItems at store
statItemBuf []statItem
// holds batch allocations for metric names at statKey
metricNamesBuf []byte
// helper for tests
getCurrentTs func() uint64
}
type statKey struct {
accountID uint32
projectID uint32
metricName string
}
type statItem struct {
requestsCount atomic.Uint64
lastRequestTs atomic.Uint64
}
type recordForStore struct {
AccountID uint32
ProjectID uint32
MetricName string
RequestsCount uint64
LastRequestTs uint64
}
// MustLoadFrom inits tracker from the given on-disk path
func MustLoadFrom(loadPath string, maxSizeBytes uint64) *Tracker {
mt, err := loadFrom(loadPath, maxSizeBytes)
if err != nil {
logger.Fatalf("unexpected error at tracker state load from path=%q: %s", loadPath, err)
}
return mt
}
func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
mt := &Tracker{
maxSizeBytes: maxSizeBytes,
cachePath: loadPath,
getCurrentTs: fasttime.UnixTimestamp,
}
mt.initEmpty()
f, err := os.Open(loadPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot access file content: %w", err)
}
// fast path
if f == nil {
return mt, nil
}
defer f.Close()
zr, err := gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("cannot create new gzip reader: %w", err)
}
reader := json.NewDecoder(zr)
var storedMaxSizeBytes uint64
if err := reader.Decode(&storedMaxSizeBytes); err != nil {
if errors.Is(err, io.EOF) {
return mt, nil
}
return nil, fmt.Errorf("cannot parse maxSizeBytes: %w", err)
}
if storedMaxSizeBytes > maxSizeBytes {
logger.Infof("Reseting tracker state due to changed maxSizeBytes from %d to %d.", storedMaxSizeBytes, maxSizeBytes)
return mt, nil
}
var creationTs uint64
if err := reader.Decode(&creationTs); err != nil {
return nil, fmt.Errorf("cannot parse creation timestamp: %w", err)
}
mt.creationTs.Store(creationTs)
var cnt uint64
var size uint64
var r recordForStore
for {
if err := reader.Decode(&r); err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("cannot parse state record: %w", err)
}
// during cache load, there is no need to hold lock
si := mt.nextRecordLocked()
si.lastRequestTs.Store(r.LastRequestTs)
si.requestsCount.Store(r.RequestsCount)
key := statKey{
projectID: r.ProjectID,
accountID: r.AccountID,
metricName: mt.cloneMetricNameLocked([]byte(r.MetricName)),
}
mt.store[key] = si
size += uint64(len(r.MetricName)) + storeOverhead
cnt++
}
if err := zr.Close(); err != nil {
return nil, fmt.Errorf("cannot close gzip reader: %w", err)
}
mt.currentSizeBytes.Store(size)
mt.currentItemsCount.Store(cnt)
logger.Infof("loaded state from disk, records: %d, total size: %d", cnt, size)
return mt, nil
}
func (mt *Tracker) nextRecordLocked() *statItem {
n := len(mt.statItemBuf) + 1
if n > cap(mt.statItemBuf) {
// allocate a new slice instead of reallocating exist
// it saves memory and reduces GC pressure
mt.statItemBuf = make([]statItem, 0, statItemBufSize)
n = 1
}
mt.statItemBuf = mt.statItemBuf[:n]
st := &mt.statItemBuf[n-1]
return st
}
// cloneMetricNameLocked uses the same idea as strings.Clone.
// But instead of direct []byte allocation for each cloned string,
// it allocates metricNamesBuf, copies provide metricGroup into it
// and uses string *byte references for it via subslice.
func (mt *Tracker) cloneMetricNameLocked(metricName []byte) string {
idx := len(mt.metricNamesBuf)
n := len(metricName) + len(mt.metricNamesBuf)
if n > cap(mt.metricNamesBuf) {
// allocate a new slice instead of reallocting exist
// it saves memory and reduces GC pressure
mt.metricNamesBuf = make([]byte, 0, metricNameBufSize)
idx = 0
}
mt.metricNamesBuf = append(mt.metricNamesBuf, metricName...)
return bytesutil.ToUnsafeString(mt.metricNamesBuf[idx:])
}
// MustClose closes tracker and saves state on disk
func (mt *Tracker) MustClose() {
if mt == nil {
return
}
if err := mt.saveLocked(); err != nil {
logger.Panicf("cannot save tracker state at path=%q: %s", mt.cachePath, err)
}
}
// saveLocked stores in-memory state of tracker on disk
func (mt *Tracker) saveLocked() error {
// Create dir if it doesn't exist in the same manner as other caches doing
dir, fileName := filepath.Split(mt.cachePath)
if _, err := os.Stat(dir); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("cannot stat %q: %s", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("cannot create dir %q: %s", dir, err)
}
}
f, err := os.CreateTemp(os.TempDir(), fileName)
if err != nil {
return fmt.Errorf("cannot open file for state save: %w", err)
}
defer f.Close()
zw := gzip.NewWriter(f)
writer := json.NewEncoder(zw)
if err := writer.Encode(mt.maxSizeBytes); err != nil {
return fmt.Errorf("cannot save encoded maxSizeBytes: %w", err)
}
if err := writer.Encode(mt.creationTs.Load()); err != nil {
return fmt.Errorf("cannot save encoded creation timestamp: %w", err)
}
var r recordForStore
for sk, si := range mt.store {
r.AccountID = sk.accountID
r.ProjectID = sk.projectID
r.MetricName = sk.metricName
r.LastRequestTs = si.lastRequestTs.Load()
r.RequestsCount = si.requestsCount.Load()
if err := writer.Encode(r); err != nil {
return fmt.Errorf("cannot save encoded state record: %w", err)
}
}
if err := zw.Close(); err != nil {
return fmt.Errorf("cannot flush writer state: %w", err)
}
// atomically save result
if err := os.Rename(f.Name(), mt.cachePath); err != nil {
return fmt.Errorf("cannot move temporary file %q to %q: %s", f.Name(), mt.cachePath, err)
}
return nil
}
// TrackerMetrics holds metrics to report
type TrackerMetrics struct {
CurrentSizeBytes uint64
CurrentItemsCount uint64
MaxSizeBytes uint64
}
// UpdateMetrics writes internal metrics to the provided object
func (mt *Tracker) UpdateMetrics(dst *TrackerMetrics) {
if mt == nil {
return
}
dst.CurrentSizeBytes = mt.currentSizeBytes.Load()
dst.CurrentItemsCount = mt.currentItemsCount.Load()
dst.MaxSizeBytes = mt.maxSizeBytes
}
// IsEmpty checks if internal state has any records
func (mt *Tracker) IsEmpty() bool {
return mt.currentItemsCount.Load() > 0
}
// Reset cleans stats, saves cache state and executes provided func
func (mt *Tracker) Reset(onReset func()) {
if mt == nil {
return
}
logger.Infof("reseting metric names tracker state")
mt.mu.Lock()
defer mt.mu.Unlock()
mt.initEmpty()
if err := mt.saveLocked(); err != nil {
logger.Panicf("during Tracker reset cannot save state: %s", err)
}
onReset()
}
func (mt *Tracker) initEmpty() {
mt.store = make(map[statKey]*statItem)
mt.metricNamesBuf = make([]byte, 0, metricNameBufSize)
mt.statItemBuf = make([]statItem, 0, statItemBufSize)
mt.currentSizeBytes.Store(0)
mt.currentItemsCount.Store(0)
mt.creationTs.Store(mt.getCurrentTs())
}
// RegisterIngestRequest tracks metric name ingestion
func (mt *Tracker) RegisterIngestRequest(accountID, projectID uint32, metricName []byte) {
if mt == nil {
return
}
if mt.cacheIsFull() {
return
}
sk := statKey{
accountID: accountID,
projectID: projectID,
metricName: bytesutil.ToUnsafeString(metricName),
}
mt.mu.RLock()
_, ok := mt.store[sk]
mt.mu.RUnlock()
if ok {
return
}
mt.mu.Lock()
// key could be already ingested concurrently
_, ok = mt.store[sk]
if ok {
mt.mu.Unlock()
return
}
si := mt.nextRecordLocked()
sk.metricName = mt.cloneMetricNameLocked(metricName)
mt.store[sk] = si
mt.mu.Unlock()
mt.currentSizeBytes.Add(uint64(len(metricName)) + storeOverhead)
mt.currentItemsCount.Add(1)
}
// RegisterQueryRequest tracks metric name at query request
func (mt *Tracker) RegisterQueryRequest(accountID, projectID uint32, metricName []byte) {
if mt == nil {
return
}
mt.mu.RLock()
key := statKey{
accountID: accountID,
projectID: projectID,
metricName: bytesutil.ToUnsafeString(metricName),
}
si, ok := mt.store[key]
mt.mu.RUnlock()
if !ok {
return
}
si.lastRequestTs.Store(mt.getCurrentTs())
si.requestsCount.Add(1)
}
func (mt *Tracker) cacheIsFull() bool {
return mt.currentSizeBytes.Load() > mt.maxSizeBytes
}
// GetStatsForTenant returns stats response for the tracked metrics for given tenant
func (mt *Tracker) GetStatsForTenant(accountID, projectID uint32, limit, le int, matchPattern string) StatsResult {
var result StatsResult
if mt == nil {
return result
}
mt.mu.RLock()
result = mt.getStatsLocked(limit, func(sk *statKey, si *statItem) bool {
if sk.accountID != accountID || sk.projectID != projectID {
return false
}
if le >= 0 && int(si.requestsCount.Load()) > le {
return false
}
if len(matchPattern) > 0 && !strings.Contains(sk.metricName, matchPattern) {
return false
}
return true
})
mt.mu.RUnlock()
result.sort()
return result
}
// GetStats returns stats response for the tracked metrics
//
// DeduplicateMergeRecords must be called at cluster version on returned result.
func (mt *Tracker) GetStats(limit, le int, matchPattern string) StatsResult {
var result StatsResult
if mt == nil {
return result
}
mt.mu.RLock()
result = mt.getStatsLocked(limit, func(sk *statKey, si *statItem) bool {
if le >= 0 && int(si.requestsCount.Load()) > le {
return false
}
if len(matchPattern) > 0 && !strings.Contains(sk.metricName, matchPattern) {
return false
}
return true
})
mt.mu.RUnlock()
result.sort()
return result
}
func (mt *Tracker) getStatsLocked(limit int, predicate func(sk *statKey, si *statItem) bool) StatsResult {
var result StatsResult
result.CollectedSinceTs = mt.creationTs.Load()
result.TotalRecords = mt.currentItemsCount.Load()
result.MaxSizeBytes = mt.maxSizeBytes
result.CurrentSizeBytes = mt.currentSizeBytes.Load()
for sk, si := range mt.store {
if len(result.Records) >= limit {
return result
}
if predicate(&sk, si) {
result.Records = append(result.Records, StatRecord{
MetricName: sk.metricName,
RequestsCount: si.requestsCount.Load(),
LastRequestTs: si.lastRequestTs.Load(),
})
}
}
return result
}
// StatsResult defines stats result for GetStats request
type StatsResult struct {
CollectedSinceTs uint64
TotalRecords uint64
MaxSizeBytes uint64
CurrentSizeBytes uint64
Records []StatRecord
}
// StatRecord defines stat record for given metric name
type StatRecord struct {
MetricName string
RequestsCount uint64
LastRequestTs uint64
}
func (sr *StatsResult) sort() {
sort.Slice(sr.Records, func(i, j int) bool {
return sr.Records[i].MetricName < sr.Records[j].MetricName
})
}
// DeduplicateMergeRecords performs merging duplicate records by metric name
//
// It is usual case for global tenant request at cluster version.
func (sr *StatsResult) DeduplicateMergeRecords() {
if len(sr.Records) < 2 {
return
}
tmp := sr.Records[:0]
// deduplication uses sliding indexes
//
// records:
// [ 0 1 2 3 4 5 6 ]
//
// [ mn1, mn2, mn2, mn2, mn3, mn4, mn4 ]
//
// 0 1
// 0 2
// 2 3
// 2 4
// 2 5
// 5 6
//
// result:
//
// [0,1,4,5]
i := 0
j := 1
rCurr := sr.Records[i]
rNext := sr.Records[j]
for {
if rCurr.MetricName == rNext.MetricName {
rCurr.RequestsCount += rNext.RequestsCount
if rCurr.LastRequestTs < rNext.LastRequestTs {
rCurr.LastRequestTs = rNext.LastRequestTs
}
j++
if j >= len(sr.Records) {
tmp = append(tmp, rCurr)
break
}
} else {
tmp = append(tmp, rCurr)
i = j
rCurr = sr.Records[i]
j++
if j >= len(sr.Records) {
tmp = append(tmp, rNext)
break
}
}
rNext = sr.Records[j]
}
sr.Records = tmp
}
// Sort sorts records by metric name and requests count
func (sr *StatsResult) Sort() {
sort.Slice(sr.Records, func(i, j int) bool {
if sr.Records[i].RequestsCount == sr.Records[j].RequestsCount {
return sr.Records[i].MetricName < sr.Records[j].MetricName
}
return sr.Records[i].RequestsCount < sr.Records[j].RequestsCount
})
}
// Merge adds records from given src
//
// It expected src to be sorted by metricName
func (sr *StatsResult) Merge(src *StatsResult) {
if sr.CollectedSinceTs < src.CollectedSinceTs {
sr.CollectedSinceTs = src.CollectedSinceTs
}
sr.TotalRecords += src.TotalRecords
sr.CurrentSizeBytes += src.CurrentSizeBytes
sr.MaxSizeBytes += src.MaxSizeBytes
if len(src.Records) == 0 {
return
}
if len(sr.Records) == 0 {
sr.Records = append(sr.Records, src.Records...)
return
}
// merge sorted elements into new slice
// records:
// [ mn1, mn2, mn3, mn4, mn6 ]
// [ mn2, mn4, mn5 ]
// 0
// 0
// [ ]
// 1
// 0
// [ mn1 ]
// 2
// 1
// [ mn1, mn2 ]
// 3
// 1
// [ mn1, mn2, mn3 ]
// 4
// 2
// [ mn1, mn2, mn3, mn4 ]
// 4
// -
// [ mn1, mn2, mn3, mn4, mn5 ]
//
// [ mn1, mn2, mn3, mn4, mn5, mn6 ]
i := 0
j := 0
// TODO: probably, we can append src records to sr instead of allocating new slice
// it will require to perform sort on sr and probably will use more CPU, but less memory
result := make([]StatRecord, 0, len(sr.Records))
for {
if i >= len(sr.Records) {
result = append(result, src.Records[j:]...)
break
}
if j >= len(src.Records) {
result = append(result, sr.Records[i:]...)
break
}
left, right := sr.Records[i], src.Records[j]
switch {
case left.MetricName == right.MetricName:
left.RequestsCount += right.RequestsCount
if left.LastRequestTs < right.LastRequestTs {
left.LastRequestTs = right.LastRequestTs
}
result = append(result, left)
i++
j++
case left.MetricName < right.MetricName:
result = append(result, left)
i++
case left.MetricName > right.MetricName:
result = append(result, right)
j++
}
}
sr.Records = result
}

View File

@@ -0,0 +1,564 @@
package metricnamestats
import (
"path"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
var statsResultCmpOpts = cmpopts.IgnoreFields(StatsResult{}, "CollectedSinceTs", "MaxSizeBytes", "CurrentSizeBytes")
func TestMetricsTracker(t *testing.T) {
type testOp struct {
aID uint32
pID uint32
o byte
mg string
ts uint64
}
type queryOpts struct {
accountID uint32
projectID uint32
isTenantEmpty bool
limit int
lte int
matchPattern string
}
cmpOpts := cmpopts.IgnoreFields(StatsResult{}, "CollectedSinceTs", "MaxSizeBytes", "CurrentSizeBytes")
cachePath := path.Join(t.TempDir(), t.Name())
f := func(ops []testOp, qo queryOpts, expected StatsResult) {
t.Helper()
expected.sort()
mt, err := loadFrom(cachePath, 100_000)
if err != nil {
t.Fatalf("cannot load state from disk on init: %s", err)
}
for _, op := range ops {
mt.getCurrentTs = func() uint64 {
return op.ts
}
switch op.o {
case 'i':
mt.RegisterIngestRequest(op.aID, op.pID, []byte(op.mg))
case 'r':
mt.RegisterQueryRequest(op.aID, op.pID, []byte(op.mg))
}
}
var got StatsResult
if qo.isTenantEmpty {
got = mt.GetStats(qo.limit, qo.lte, qo.matchPattern)
got.sort()
got.DeduplicateMergeRecords()
} else {
got = mt.GetStatsForTenant(qo.accountID, qo.projectID, qo.limit, qo.lte, qo.matchPattern)
got.sort()
}
if !cmp.Equal(expected, got, cmpOpts) {
t.Fatalf("unexpected GetStatsForTenant result: %s", cmp.Diff(expected, got, cmpOpts))
}
if err := mt.saveLocked(); err != nil {
t.Fatalf("cannot save in-memory state: %s", err)
}
loadedUmt, err := loadFrom(cachePath, 100_000)
if err != nil {
t.Fatalf("cannot load restore state from disk: %s", err)
}
if qo.isTenantEmpty {
got = loadedUmt.GetStats(qo.limit, qo.lte, qo.matchPattern)
got.sort()
got.DeduplicateMergeRecords()
} else {
got = loadedUmt.GetStatsForTenant(qo.accountID, qo.projectID, qo.limit, qo.lte, qo.matchPattern)
got.sort()
}
if !cmp.Equal(expected, got, cmpOpts) {
t.Fatalf("unexpected GetStatsForTenant result after load state from disk: %s", cmp.Diff(expected, got, cmpOpts))
}
mt.Reset(func() {})
}
dataSet := []testOp{
{1, 1, 'i', "metric_1", 1},
{1, 1, 'i', "metric_1", 1},
{1, 1, 'r', "metric_1", 1},
{1, 1, 'i', "metric_2", 1},
{1, 1, 'r', "metric_2", 1},
{1, 1, 'r', "metric_2", 1},
{15, 15, 'i', "metric_1", 1},
{15, 15, 'i', "metric_2", 1},
{15, 15, 'i', "metric_3", 1},
{15, 15, 'r', "metric_3", 1},
{15, 15, 'r', "metric_2", 1},
}
qOpts := queryOpts{
limit: 100,
lte: -1,
}
// query empty tenant
expected := StatsResult{
TotalRecords: 5,
}
f(dataSet, qOpts, expected)
// query single tenant
qOpts = queryOpts{
accountID: 1,
projectID: 1,
limit: 100,
lte: -1,
}
expected = StatsResult{
TotalRecords: 5,
Records: []StatRecord{
{"metric_1", 1, 1},
{"metric_2", 2, 1},
},
}
f(dataSet, qOpts, expected)
// query all tenants
qOpts = queryOpts{
isTenantEmpty: true,
limit: 100,
lte: -1,
}
expected = StatsResult{
TotalRecords: 5,
Records: []StatRecord{
{"metric_1", 1, 1},
{"metric_2", 3, 1},
{"metric_3", 1, 1},
},
}
f(dataSet, qOpts, expected)
}
func TestMetricsTrackerConcurrent(t *testing.T) {
type testOp struct {
o byte
mg string
}
const concurrency = 3
f := func(ops []testOp, predicate int, expected StatsResult) {
t.Helper()
umt, err := loadFrom(t.TempDir()+t.Name(), 1024)
if err != nil {
t.Fatalf("cannot load: %s", err)
}
umt.creationTs.Store(0)
umt.getCurrentTs = func() uint64 { return 1 }
for _, op := range ops {
switch op.o {
case 'i':
umt.RegisterIngestRequest(0, 0, []byte(op.mg))
case 'r':
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
}
}
var wg sync.WaitGroup
for range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
for _, op := range ops {
switch op.o {
case 'i':
umt.RegisterIngestRequest(0, 0, []byte(op.mg))
case 'r':
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
}
}
}()
}
wg.Wait()
got := umt.GetStats(100, predicate, "")
got.sort()
expected.sort()
if !cmp.Equal(expected.Records, got.Records) {
t.Fatalf("unexpected unusedMetricNames result: %s", cmp.Diff(expected.Records, got.Records))
}
}
f([]testOp{{'i', "metric_1"}, {'r', "metric_2"}, {'r', "metric_1"}, {'i', "metric_3"}},
0,
StatsResult{
Records: []StatRecord{
{
MetricName: "metric_3",
},
},
})
f([]testOp{{'i', "metric_1"}, {'i', "metric_2"}, {'r', "metric_2"}, {'r', "metric_2"}, {'r', "metric_1"}, {'i', "metric_3"}},
10,
StatsResult{
Records: []StatRecord{
{
MetricName: "metric_1",
RequestsCount: 1 + concurrency,
LastRequestTs: 1,
},
{
MetricName: "metric_2",
RequestsCount: 2 + 2*concurrency,
LastRequestTs: 1,
},
{
MetricName: "metric_3",
LastRequestTs: 0,
},
},
})
}
func TestMetricsTrackerMaxSize(t *testing.T) {
type testOp struct {
o byte
mg string
}
umt, err := loadFrom(t.TempDir()+t.Name(), storeOverhead+10*2)
if err != nil {
t.Fatalf("cannot load tracker: %s", err)
}
umt.getCurrentTs = func() uint64 { return 1 }
ops := []testOp{
{'i', "metric_1"},
{'r', "metric_2"},
{'r', "metric_1"},
{'i', "metric_2"},
{'i', "metric_3"},
{'i', "metric_4"},
{'r', "metric_1"},
{'r', "metric_2"},
{'r', "metric_2"},
{'r', "metric_2"},
}
for _, op := range ops {
switch op.o {
case 'i':
umt.RegisterIngestRequest(0, 0, []byte(op.mg))
case 'r':
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
}
}
got := umt.GetStats(100, -1, "")
got.sort()
expected := StatsResult{
Records: []StatRecord{
{
MetricName: "metric_1",
RequestsCount: 2,
LastRequestTs: 1,
},
{
MetricName: "metric_2",
RequestsCount: 3,
LastRequestTs: 1,
},
},
}
if !cmp.Equal(expected.Records, got.Records) {
t.Fatalf("unexpected unusedMetricNames result: %s", cmp.Diff(expected.Records, got.Records))
}
}
func TestDeduplicateRecords(t *testing.T) {
f := func(result StatsResult, expected StatsResult) {
t.Helper()
expected.sort()
result.sort()
result.DeduplicateMergeRecords()
if !cmp.Equal(result, expected, statsResultCmpOpts) {
t.Fatalf("unexpected deduplicate result: %s", cmp.Diff(result, expected, statsResultCmpOpts))
}
}
// single record
dataSet := StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
},
}
expected := StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
},
}
f(dataSet, expected)
// no duplicates
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
f(dataSet, expected)
// 2 duplicates
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 1},
},
}
f(dataSet, expected)
// duplicates on start
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
f(dataSet, expected)
// duplicates on end
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 30, LastRequestTs: 4},
},
}
f(dataSet, expected)
// duplicates start end
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 13, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 30, LastRequestTs: 4},
},
}
f(dataSet, expected)
// duplicates mixed
dataSet = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 10, LastRequestTs: 3},
{MetricName: "mn3", RequestsCount: 10, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
{MetricName: "mn4", RequestsCount: 15, LastRequestTs: 4},
{MetricName: "mn5", RequestsCount: 15, LastRequestTs: 4},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 1},
{MetricName: "mn2", RequestsCount: 12, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 3},
{MetricName: "mn4", RequestsCount: 30, LastRequestTs: 4},
{MetricName: "mn5", RequestsCount: 15, LastRequestTs: 4},
},
}
f(dataSet, expected)
}
func TestStatsResultMerge(t *testing.T) {
f := func(left, right StatsResult, expected StatsResult) {
t.Helper()
expected.sort()
left.sort()
right.sort()
left.Merge(&right)
if !cmp.Equal(left, expected, statsResultCmpOpts) {
t.Fatalf("unexpected deduplicate result: %s", cmp.Diff(left, expected, statsResultCmpOpts))
}
}
// empty src
dst := StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
},
}
src := StatsResult{}
expected := StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
},
}
f(dst, src, expected)
// empty dst
dst = StatsResult{}
src = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
},
}
f(dst, src, expected)
// all duplicates
dst = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
},
}
src = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 40, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 60, LastRequestTs: 2},
},
}
f(dst, src, expected)
// no duplicates
dst = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
},
}
src = StatsResult{
Records: []StatRecord{
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
f(dst, src, expected)
// mixed
dst = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 2},
},
}
src = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 40, LastRequestTs: 2},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
f(dst, src, expected)
// mixed
dst = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 1},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
src = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 20, LastRequestTs: 2},
},
}
expected = StatsResult{
Records: []StatRecord{
{MetricName: "mn1", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn2", RequestsCount: 20, LastRequestTs: 2},
{MetricName: "mn3", RequestsCount: 30, LastRequestTs: 2},
{MetricName: "mn4", RequestsCount: 10, LastRequestTs: 2},
{MetricName: "mn5", RequestsCount: 40, LastRequestTs: 2},
{MetricName: "mn6", RequestsCount: 30, LastRequestTs: 2},
},
}
f(dst, src, expected)
}

View File

@@ -0,0 +1,57 @@
package metricnamestats
import (
"testing"
"github.com/google/go-cmp/cmp"
)
func BenchmarkTracker(b *testing.B) {
b.ReportAllocs()
mt := MustLoadFrom("testdata/"+b.Name(), 100_000_000)
mt.getCurrentTs = func() uint64 {
return 1
}
type testOp struct {
t byte
metricName []byte
}
dataSet := []testOp{
{'i', []byte("metric_2")},
{'i', []byte("metric_3")},
{'i', []byte("metric_3")},
{'i', []byte("metric_4")},
{'r', []byte("metric_3")},
{'r', []byte("metric_3")},
{'r', []byte("metric_3")},
{'i', []byte("metric_1")},
{'r', []byte("metric_1")},
}
b.ResetTimer()
for range b.N {
for _, op := range dataSet {
switch op.t {
case 'i':
mt.RegisterIngestRequest(0, 0, op.metricName)
case 'r':
mt.RegisterQueryRequest(0, 0, op.metricName)
}
}
}
b.StopTimer()
got := mt.GetStats(100, -1, "")
got.sort()
expected := StatsResult{
TotalRecords: 4,
Records: []StatRecord{
{"metric_2", 0, 0},
{"metric_4", 0, 0},
{"metric_1", uint64(b.N), 1},
{"metric_3", 3 * uint64(b.N), 1},
},
}
expected.sort()
if !cmp.Equal(expected, got, statsResultCmpOpts) {
b.Fatalf("unexpected result: %s", cmp.Diff(expected, got, statsResultCmpOpts))
}
}

View File

@@ -118,6 +118,9 @@ type Search struct {
loops int
prevMetricID uint64
// metricGroupBuf holds metricGroup used for metric names tracker
metricGroupBuf []byte
}
func (s *Search) reset() {
@@ -134,6 +137,7 @@ func (s *Search) reset() {
s.needClosing = false
s.loops = 0
s.prevMetricID = 0
s.metricGroupBuf = nil
}
// Init initializes s from the given storage, tfss and tr.
@@ -224,6 +228,18 @@ func (s *Search) NextMetricBlock() bool {
// It should be automatically fixed. See indexDB.searchMetricNameWithCache for details.
continue
}
// for perfomance reasons parse metricGroup conditionally
if s.idb.s.metricsTracker != nil {
var err error
// MetricName must be sorted and marshalled with MetricName.Marshal()
// it guarantees that first tag is metricGroup
_, s.metricGroupBuf, err = unmarshalTagValue(s.metricGroupBuf[:0], s.MetricBlockRef.MetricName)
if err != nil {
s.err = fmt.Errorf("cannot unmarshal metricGroup from MetricBlockRef.MetricName: %w", err)
return false
}
s.idb.s.metricsTracker.RegisterQueryRequest(0, 0, s.metricGroupBuf)
}
s.prevMetricID = tsid.MetricID
}
s.MetricBlockRef.BlockRef = s.ts.BlockRef

View File

@@ -26,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot/snapshotutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
@@ -165,14 +166,17 @@ type Storage struct {
// isReadOnly is set to true when the storage is in read-only mode.
isReadOnly atomic.Bool
metricsTracker *metricnamestats.Tracker
}
// OpenOptions optional args for MustOpenStorage
type OpenOptions struct {
Retention time.Duration
MaxHourlySeries int
MaxDailySeries int
DisablePerDayIndex bool
Retention time.Duration
MaxHourlySeries int
MaxDailySeries int
DisablePerDayIndex bool
TrackMetricNamesStats bool
}
// MustOpenStorage opens storage on the given path with the given retentionMsecs.
@@ -244,6 +248,16 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.prefetchedMetricIDs = &uint64set.Set{}
if opts.TrackMetricNamesStats {
mnt := metricnamestats.MustLoadFrom(filepath.Join(s.cachePath, "metric_usage_tracker"), uint64(getMetricNamesStatsCacheSize()))
s.metricsTracker = mnt
if mnt.IsEmpty() {
// metric names tracker performs attemp to track timeseries during ingestion only at tsid cache miss.
// It allows to do not decrease storage performance.
logger.Infof("reseting tsidCache in order to properly track metric names stats usage")
s.tsidCache.Reset()
}
}
// Load metadata
metadataDir := filepath.Join(path, metadataDirname)
@@ -319,6 +333,20 @@ func getTSIDCacheSize() int {
return maxTSIDCacheSize
}
var maxMetricNamesStatsCacheSize int
// SetMetricNamesStatsCacheSize overrides the default size of storage/metricNamesStatsTracker
func SetMetricNamesStatsCacheSize(size int) {
maxMetricNamesStatsCacheSize = size
}
func getMetricNamesStatsCacheSize() int {
if maxMetricNamesStatsCacheSize <= 0 {
return memory.Allowed() / 100
}
return maxMetricNamesStatsCacheSize
}
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.deletedMetricIDs.Load()
}
@@ -561,6 +589,10 @@ type Metrics struct {
NextRetentionSeconds uint64
MetricNamesUsageTrackerSize uint64
MetricNamesUsageTrackerSizeBytes uint64
MetricNamesUsageTrackerSizeMaxBytes uint64
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
@@ -655,6 +687,12 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
s.prefetchedMetricIDsLock.Unlock()
var tm metricnamestats.TrackerMetrics
s.metricsTracker.UpdateMetrics(&tm)
m.MetricNamesUsageTrackerSizeBytes = tm.CurrentSizeBytes
m.MetricNamesUsageTrackerSize = tm.CurrentItemsCount
m.MetricNamesUsageTrackerSizeMaxBytes = tm.MaxSizeBytes
d := s.nextRetentionSeconds()
if d < 0 {
d = 0
@@ -904,6 +942,7 @@ func (s *Storage) MustClose() {
nextDayMetricIDs := s.nextDayMetricIDs.Load()
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
s.metricsTracker.MustClose()
// Release lock file.
fs.MustClose(s.flockF)
s.flockF = nil
@@ -2028,6 +2067,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
// register metric name on tsid cache miss
// it allows to track metric names since last tsid cache reset
// and skip index scan to fill metrics tracker
s.metricsTracker.RegisterIngestRequest(0, 0, mn.MetricGroup)
// Search for TSID for the given mr.MetricNameRaw in the indexdb.
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
// Slower path - the TSID has been found in indexdb.
@@ -2087,6 +2131,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
}
}
if err := s.updatePerDateData(rows, dstMrs); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
@@ -2845,3 +2890,16 @@ func (s *Storage) wasMetricIDMissingBefore(metricID uint64) bool {
}
return ct > deleteDeadline
}
// MetricNamesStatsResponse contains metric names usage stats API response
type MetricNamesStatsResponse = metricnamestats.StatsResult
// GetMetricNamesStats returns metric names usage stats with given limit and le predicate
func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, limit, le int, matchPattern string) MetricNamesStatsResponse {
return s.metricsTracker.GetStats(limit, le, matchPattern)
}
// ResetMetricNamesStats resets state for metric names usage tracker
func (s *Storage) ResetMetricNamesStats(_ *querytracer.Tracer) {
s.metricsTracker.Reset(s.tsidCache.Reset)
}

View File

@@ -2892,3 +2892,51 @@ func testGenerateMetricRowBatches(opts *batchOptions) ([][]MetricRow, *counts) {
}
return batches, &want
}
func TestStorageMetricTracker(t *testing.T) {
defer testRemoveAll(t)
rng := rand.New(rand.NewSource(1))
numRows := uint64(1000)
minTimestamp := time.Now().UnixMilli()
maxTimestamp := minTimestamp + 1000
mrs := testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
var gotMetrics Metrics
s := MustOpenStorage(t.Name(), OpenOptions{TrackMetricNamesStats: true})
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
var sr Search
tr := TimeRange{
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
}
// check stats for metrics with 0 requests count
mus := s.GetMetricNamesStats(nil, 10_000, 0, "")
if len(mus.Records) != int(numRows) {
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
}
// search query for all ingested metrics
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil {
t.Fatalf("unexpected error at tfs add: %s", err)
}
sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
for sr.NextMetricBlock() {
}
sr.MustClose()
mus = s.GetMetricNamesStats(nil, 10_000, 0, "")
if len(mus.Records) != 0 {
t.Fatalf("unexpected Stats records count=%d; want 0 records", len(mus.Records))
}
mus = s.GetMetricNamesStats(nil, 10_000, 1, "")
if len(mus.Records) != int(numRows) {
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
}
}