Compare commits

...

1 Commits

Author SHA1 Message Date
f41gh7
19ed525a10 wip 2025-04-14 01:29:48 +02:00
14 changed files with 227 additions and 89 deletions

View File

@@ -117,9 +117,9 @@ func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, dead
return netstorage.ResetMetricNamesStats(qt, dl)
}
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, le, limit int, matchPattern string, deadline uint64) (storage.MetricNamesStatsResponse, error) {
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, deadline uint64) (storage.MetricNamesStatsResponse, error) {
dl := searchutil.DeadlineFromTimestamp(deadline)
return netstorage.GetMetricNamesStats(qt, tt, le, limit, matchPattern, dl)
return netstorage.GetMetricNamesStats(qt, sq, dl)
}
// blockIterator implements vmselectapi.BlockIterator

View File

@@ -2,6 +2,7 @@ package netstorage
import (
"container/heap"
"encoding/json"
"errors"
"flag"
"fmt"
@@ -3337,14 +3338,14 @@ func metricNameTenantToTags(mn *storage.MetricName) {
}
// GetMetricNamesStats returns metric names usage statistics for the given params
func GetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
func GetMetricNamesStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
type nodeResult struct {
resp storage.MetricNamesStatsResponse
err error
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
resp, err := sn.processGetMetricNamesStats(qt, tt, limit, le, matchPattern, deadline)
resp, err := sn.processGetMetricNamesStats(qt, sq, deadline)
return nodeResult{resp: resp, err: err}
})
var mu sync.Mutex
@@ -3365,45 +3366,33 @@ func GetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit,
return mnuss, nil
}
func (sn *storageNode) processGetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
func (sn *storageNode) processGetMetricNamesStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
var result storage.MetricNamesStatsResponse
f := func(bc *handshake.BufferedConn) error {
bcResult, err := processGetMetricNamesUsageStatsOnConn(bc, tt, limit, le, matchPattern)
bcResult, err := processGetMetricNamesUsageStatsOnConn(bc, sq)
if err != nil {
return err
}
result = bcResult
return nil
}
if err := sn.execOnConnWithPossibleRetry(qt, "metricNamesUsageStats_v1", f, deadline); err != nil {
if err := sn.execOnConnWithPossibleRetry(qt, "metricNamesUsageStats_v2", f, deadline); err != nil {
return result, err
}
return result, nil
}
func processGetMetricNamesUsageStatsOnConn(bc *handshake.BufferedConn, tt *storage.TenantToken, limit, le int, matchPattern string) (storage.MetricNamesStatsResponse, error) {
const maxStatsQuerySize = 1024 * 1024
func processGetMetricNamesUsageStatsOnConn(bc *handshake.BufferedConn, sq storage.MetricNamesStatsQuery) (storage.MetricNamesStatsResponse, error) {
var result storage.MetricNamesStatsResponse
hasTenantToken := tt != nil
if err := writeBool(bc, hasTenantToken); err != nil {
return result, fmt.Errorf("cannot write hasTenantToken: %w", err)
data, err := json.Marshal(sq)
if err != nil {
return result, fmt.Errorf("cannot marshal MetricNamesStatsQuery: %w", err)
}
// conditionally write tenant token
if hasTenantToken {
if err := writeUint32(bc, tt.AccountID); err != nil {
return result, fmt.Errorf("cannot write AccountID: %w", err)
}
if err := writeUint32(bc, tt.ProjectID); err != nil {
return result, fmt.Errorf("cannot write ProjectID: %w", err)
}
}
if err := writeLimit(bc, limit); err != nil {
return result, fmt.Errorf("cannot write limit: %w", err)
}
if err := writeInt64(bc, int64(le)); err != nil {
return result, fmt.Errorf("cannot write le: %w", err)
}
if err := writeBytes(bc, []byte(matchPattern)); err != nil {
return result, fmt.Errorf("cannot write matchPattern: %w", err)
if err := writeBytes(bc, data); err != nil {
return result, fmt.Errorf("cannot write MetricNamesStatsQuery: %w", err)
}
if err := bc.Flush(); err != nil {
return result, fmt.Errorf("cannot flush write: %w", err)

View File

@@ -45,7 +45,15 @@ func MetricNamesStatsHandler(startTime time.Time, at *auth.Token, qt *querytrace
ProjectID: at.ProjectID,
}
}
stats, err := netstorage.GetMetricNamesStats(qt, tt, limit, le, matchPattern, deadline)
matchNames := r.Form["match_names"]
sq := storage.MetricNamesStatsQuery{
TenantToken: tt,
Limit: limit,
Le: le,
MatchNames: matchNames,
MatchPattern: matchPattern,
}
stats, err := netstorage.GetMetricNamesStats(qt, sq, deadline)
if err != nil {
return err
}

View File

@@ -195,8 +195,8 @@ func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []stora
return nil
}
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (storage.MetricNamesStatsResponse, error) {
return api.s.GetMetricNamesStats(qt, tt, limit, le, matchPattern), nil
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, _ uint64) (storage.MetricNamesStatsResponse, error) {
return api.s.GetMetricNamesStats(qt, sq), nil
}
func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {

View File

@@ -122,7 +122,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()),
})
// verify empty stats
resp := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"})
resp := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "0:0"})
if len(resp.Records) != 0 {
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Records), 0)
}
@@ -155,7 +155,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
{MetricName: "metric_name_3"},
},
}
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
gotStats := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
@@ -172,7 +172,17 @@ func TestClusterMetricNamesStats(t *testing.T) {
{MetricName: "metric_name_1", QueryRequestsCount: 3},
},
}
gotStats = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
gotStats = vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
}
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_1", QueryRequestsCount: 3},
},
}
matchNames := []string{"metric_name_1"}
gotStats = vmselect.MetricNamesStats(t, "", "", "", matchNames, apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
}
@@ -186,14 +196,27 @@ func TestClusterMetricNamesStats(t *testing.T) {
{MetricName: "metric_name_1", QueryRequestsCount: 9},
},
}
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
gotStats := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "multitenant"})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// verify multitenant stats
expected = apptest.MetricNamesStatsResponse{
Records: []at.MetricNamesStatsRecord{
{MetricName: "metric_name_2", QueryRequestsCount: 3},
{MetricName: "metric_name_1", QueryRequestsCount: 9},
},
}
matchNames := []string{"metric_name_2", "metric_name_1"}
gotStats = vmselect.MetricNamesStats(t, "", "", "", matchNames, apptest.QueryOpts{Tenant: "multitenant"})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// reset cache and check empty state
vmselect.MetricNamesStatsReset(t, at.QueryOpts{})
resp = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
resp = vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "multitenant"})
if len(resp.Records) != 0 {
t.Fatalf("want 0 records, got: %d", len(resp.Records))
}

View File

@@ -139,13 +139,16 @@ func (app *Vmselect) DeleteSeries(t *testing.T, matchQuery string, opts QueryOpt
// and returns the statistics response for given params.
//
// See https://docs.victoriametrics.com/#Trackingestedmetricsusage
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, matchNames []string, opts QueryOpts) MetricNamesStatsResponse {
t.Helper()
values := opts.asURLValues()
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
for _, mn := range matchNames {
values.Add("match_names", mn)
}
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/metric_names_stats", app.httpListenAddr, opts.getTenant())
res, statusCode := app.cli.PostForm(t, queryURL, values)

View File

@@ -483,6 +483,7 @@ To get metric names usage statistics, use the `/prometheus/api/v1/status/metric_
* `limit` - integer value to limit the number of metric names in response. By default, API returns 1000 records.
* `le` - `less than or equal`, is an integer threshold for filtering metric names by their usage count in queries. For example, with `?le=1` API returns metric names that were queried <=1 times.
* `match_pattern` - a substring pattern to match metric names. For example, `?match_pattern=vm_` will match any metric names with `vm_` pattern, like `vm_http_requests`, `max_vm_memory_available`. It doesn't support regex syntax.
* `match_names` - a list of metric names to query. If this param is defined, `le` and `match_pattern` options will be ignored.
The API endpoint returns the following `JSON` response:

View File

@@ -19,6 +19,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* FEATURE: all the VictoriaMetrics components: mask `authKey` value from log messages. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5973) for details.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): enhance - `/api/v1/status/metric_names_stats` [API](https://docs.victoriametrics.com/keyconcepts/#structure-of-a-metric) with `match_names` query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4458) for details and related [docs](https://docs.victoriametrics.com/#track-ingested-metrics-usage)
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): properly init [enterprise](https://docs.victoriametrics.com/enterprise/) version for `linux/arm` and non-CGO buids. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6019) for details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): remote write client sets correct content encoding header based on actual body content, rather than relying on configuration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650).

View File

@@ -391,6 +391,71 @@ func (mt *Tracker) GetStatsForTenant(accountID, projectID uint32, limit, le int,
return result
}
// GetStatsForNamesMultitenant returns stats for metric names
// ignores accountID and projectID
//
// SingleNode version must use GetStatsForNamesTenant with 0 account nad project IDs
func (mt *Tracker) GetStatsForNamesMultitenant(names []string) StatsResult {
var result StatsResult
result.CollectedSinceTs = mt.creationTs.Load()
result.TotalRecords = mt.currentItemsCount.Load()
result.MaxSizeBytes = mt.maxSizeBytes
result.CurrentSizeBytes = mt.currentSizeBytes.Load()
mt.mu.RLock()
for sk, si := range mt.store {
for _, mn := range names {
if mn == sk.metricName {
result.Records = append(result.Records, StatRecord{
MetricName: sk.metricName,
RequestsCount: si.requestsCount.Load(),
LastRequestTs: si.lastRequestTs.Load(),
})
break
}
}
}
mt.mu.RUnlock()
result.sort()
result.DeduplicateMergeRecords()
return result
}
// GetStatsForNamesTenant returns stats for given accountID and projectID
func (mt *Tracker) GetStatsForNamesTenant(accountID, projectID uint32, names []string) StatsResult {
var result StatsResult
result.CollectedSinceTs = mt.creationTs.Load()
result.TotalRecords = mt.currentItemsCount.Load()
result.MaxSizeBytes = mt.maxSizeBytes
result.CurrentSizeBytes = mt.currentSizeBytes.Load()
mt.mu.RLock()
for _, mn := range names {
sk := statKey{
accountID: accountID,
projectID: projectID,
metricName: mn,
}
si, ok := mt.store[sk]
if !ok {
continue
}
result.Records = append(result.Records, StatRecord{
MetricName: sk.metricName,
RequestsCount: si.requestsCount.Load(),
LastRequestTs: si.lastRequestTs.Load(),
})
}
mt.mu.RUnlock()
result.sort()
return result
}
// GetStats returns stats response for the tracked metrics
//
// DeduplicateMergeRecords must be called at cluster version on returned result.

View File

@@ -562,3 +562,60 @@ func TestStatsResultMerge(t *testing.T) {
f(dst, src, expected)
}
func TestStatsForMetricNames(t *testing.T) {
type testOp struct {
o byte
mg string
}
umt, err := loadFrom(t.TempDir()+t.Name(), storeOverhead+10*2)
if err != nil {
t.Fatalf("cannot load tracker: %s", err)
}
umt.getCurrentTs = func() uint64 { return 1 }
ops := []testOp{
{'i', "metric_1"},
{'r', "metric_2"},
{'r', "metric_1"},
{'i', "metric_2"},
{'i', "metric_3"},
{'i', "metric_4"},
{'r', "metric_1"},
{'r', "metric_2"},
{'r', "metric_2"},
{'r', "metric_2"},
}
for _, op := range ops {
switch op.o {
case 'i':
umt.RegisterIngestRequest(0, 0, []byte(op.mg))
case 'r':
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
}
}
f := func(names []string, expected StatsResult) {
t.Helper()
got := umt.GetStatsForNamesTenant(0, 0, names)
if d := cmp.Diff(expected, got, statsResultCmpOpts); len(d) > 0 {
t.Fatalf("unexpected deduplicate result: %s", d)
}
}
want := StatsResult{
TotalRecords: 2,
Records: []StatRecord{
{MetricName: "metric_1", RequestsCount: 2, LastRequestTs: 1},
},
}
f([]string{"metric_1", "metric"}, want)
want = StatsResult{
TotalRecords: 2,
Records: []StatRecord{
{MetricName: "metric_1", RequestsCount: 2, LastRequestTs: 1},
{MetricName: "metric_2", RequestsCount: 3, LastRequestTs: 1},
},
}
f([]string{"metric_1", "metric_2"}, want)
}

View File

@@ -3124,12 +3124,32 @@ type MetricNamesStatsResponse = metricnamestats.StatsResult
// MetricNamesStatsRecord represents record at MetricNamesStatsResponse
type MetricNamesStatsRecord = metricnamestats.StatRecord
// GetMetricNamesStats returns metric names usage stats with give limit and lte predicate
func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, tt *TenantToken, limit, le int, matchPattern string) MetricNamesStatsResponse {
if tt != nil {
return s.metricsTracker.GetStatsForTenant(tt.AccountID, tt.ProjectID, limit, le, matchPattern)
// MetricNamesStatsQuery represents query params for Stats requests
type MetricNamesStatsQuery struct {
TenantToken *TenantToken
Limit int
Le int
MatchPattern string
MatchNames []string
}
// GetMetricNamesStats returns metric names usage stats for given Query params
func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, sq MetricNamesStatsQuery) MetricNamesStatsResponse {
if sq.TenantToken != nil {
tt := sq.TenantToken
if len(sq.MatchNames) > 0 {
return s.metricsTracker.GetStatsForNamesTenant(tt.AccountID, tt.ProjectID, sq.MatchNames)
}
return s.metricsTracker.GetStatsForTenant(tt.AccountID, tt.ProjectID, sq.Limit, sq.Le, sq.MatchPattern)
}
res := s.metricsTracker.GetStats(limit, le, matchPattern)
var res MetricNamesStatsResponse
if len(sq.MatchNames) > 0 {
return s.metricsTracker.GetStatsForNamesMultitenant(sq.MatchNames)
}
res = s.metricsTracker.GetStats(sq.Limit, sq.Le, sq.MatchPattern)
res.DeduplicateMergeRecords()
return res
}

View File

@@ -3471,7 +3471,10 @@ func TestStorageMetricTracker(t *testing.T) {
var sr Search
// check stats for metrics with 0 requests count
mus := s.GetMetricNamesStats(nil, nil, 10_000, 0, "")
sq := MetricNamesStatsQuery{
Limit: 10_000,
}
mus := s.GetMetricNamesStats(nil, sq)
if len(mus.Records) != int(numRows) {
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
}
@@ -3487,11 +3490,12 @@ func TestStorageMetricTracker(t *testing.T) {
}
sr.MustClose()
mus = s.GetMetricNamesStats(nil, nil, 10_000, 0, "")
mus = s.GetMetricNamesStats(nil, sq)
if len(mus.Records) != 0 {
t.Fatalf("unexpected Stats records count=%d; want 0 records", len(mus.Records))
}
mus = s.GetMetricNamesStats(nil, nil, 10_000, 1, "")
sq.Le = 1
mus = s.GetMetricNamesStats(nil, sq)
if len(mus.Records) != int(numRows) {
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
}

View File

@@ -40,7 +40,7 @@ type API interface {
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
// GetMetricNamesUsageStats returns statistics for metric names
GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (storage.MetricNamesStatsResponse, error)
GetMetricNamesUsageStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, deadline uint64) (storage.MetricNamesStatsResponse, error)
// ResetMetricNamesUsageStats resets internal state of metric names tracker
ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error

View File

@@ -1,6 +1,7 @@
package vmselectapi
import (
"encoding/json"
"errors"
"fmt"
"io"
@@ -348,18 +349,6 @@ func (ctx *vmselectRequestCtx) readUint64() (uint64, error) {
return n, nil
}
func (ctx *vmselectRequestCtx) readInt64() (int64, error) {
ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
if err == io.EOF {
return 0, err
}
return 0, fmt.Errorf("cannot read int64: %w", err)
}
n := encoding.UnmarshalInt64(ctx.sizeBuf)
return n, nil
}
func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) {
accountID, err := ctx.readUint32()
if err != nil {
@@ -586,7 +575,7 @@ func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
return s.processRegisterMetricNames(ctx)
case "tenants_v1":
return s.processTenants(ctx)
case "metricNamesUsageStats_v1":
case "metricNamesUsageStats_v2":
return s.processMetricNamesUsageStats(ctx)
case "resetMetricNamesStats_v1":
return s.processResetMetricUsageStats(ctx)
@@ -1076,46 +1065,24 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
return nil
}
const maxMetricNamesStatsQueryJSONSize = 1024 * 1024
func (s *Server) processMetricNamesUsageStats(ctx *vmselectRequestCtx) error {
// Read request.
hasTenant, err := ctx.readBool()
if err != nil {
return fmt.Errorf("cannot read hasTenant: %w", err)
}
var at *storage.TenantToken
if hasTenant {
accountID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read accountID: %w", err)
}
projectID, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read projectID: %w", err)
}
at = &storage.TenantToken{
AccountID: accountID,
ProjectID: projectID,
}
}
limit, err := ctx.readLimit()
if err != nil {
return fmt.Errorf("cannot read limit: %w", err)
}
le, err := ctx.readInt64()
if err != nil {
return fmt.Errorf("cannot read le: %w", err)
}
if err := ctx.readDataBufBytes(256); err != nil {
if err := ctx.readDataBufBytes(maxMetricNamesStatsQueryJSONSize); err != nil {
return fmt.Errorf("cannot read matchPattern: %w", err)
}
matchPattern := string(ctx.dataBuf)
var sq storage.MetricNamesStatsQuery
if err := json.Unmarshal(ctx.dataBuf, &sq); err != nil {
return fmt.Errorf("cannot parse MetricNamesStatsQuery: %w", err)
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
result, err := s.api.GetMetricNamesUsageStats(ctx.qt, at, limit, int(le), matchPattern, ctx.deadline)
result, err := s.api.GetMetricNamesUsageStats(ctx.qt, sq, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}