mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 16:59:40 +03:00
Compare commits
3 Commits
query-debu
...
gh-6145
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
517ce21792 | ||
|
|
f635ce90a9 | ||
|
|
c3bbf12494 |
@@ -117,9 +117,9 @@ func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, dead
|
||||
return netstorage.ResetMetricNamesStats(qt, dl)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, le, limit int, matchPattern string, deadline uint64) (storage.MetricNamesStatsResponse, error) {
|
||||
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, statsQuery storage.MetricNamesStatsQuery, deadline uint64) (storage.MetricNamesStatsResponse, error) {
|
||||
dl := searchutil.DeadlineFromTimestamp(deadline)
|
||||
return netstorage.GetMetricNamesStats(qt, tt, le, limit, matchPattern, dl)
|
||||
return netstorage.GetMetricNamesStats(qt, statsQuery, dl)
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
|
||||
@@ -2,6 +2,7 @@ package netstorage
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
@@ -3031,12 +3032,6 @@ func writeUint64(bc *handshake.BufferedConn, n uint64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func writeInt64(bc *handshake.BufferedConn, n int64) error {
|
||||
sizeBuf := encoding.MarshalInt64(nil, n)
|
||||
_, err := bc.Write(sizeBuf)
|
||||
return err
|
||||
}
|
||||
|
||||
func writeBool(bc *handshake.BufferedConn, b bool) error {
|
||||
var buf [1]byte
|
||||
if b {
|
||||
@@ -3337,14 +3332,14 @@ func metricNameTenantToTags(mn *storage.MetricName) {
|
||||
}
|
||||
|
||||
// GetMetricNamesStats returns metric names usage statistics for the given params
|
||||
func GetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
|
||||
func GetMetricNamesStats(qt *querytracer.Tracer, statsQuery storage.MetricNamesStatsQuery, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
|
||||
type nodeResult struct {
|
||||
resp storage.MetricNamesStatsResponse
|
||||
err error
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
resp, err := sn.processGetMetricNamesStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
resp, err := sn.processGetMetricNamesStats(qt, statsQuery, deadline)
|
||||
return nodeResult{resp: resp, err: err}
|
||||
})
|
||||
var mu sync.Mutex
|
||||
@@ -3365,45 +3360,31 @@ func GetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit,
|
||||
return mnuss, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) processGetMetricNamesStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
|
||||
func (sn *storageNode) processGetMetricNamesStats(qt *querytracer.Tracer, statsQuery storage.MetricNamesStatsQuery, deadline searchutil.Deadline) (storage.MetricNamesStatsResponse, error) {
|
||||
var result storage.MetricNamesStatsResponse
|
||||
f := func(bc *handshake.BufferedConn) error {
|
||||
bcResult, err := processGetMetricNamesUsageStatsOnConn(bc, tt, limit, le, matchPattern)
|
||||
bcResult, err := processGetMetricNamesUsageStatsOnConn(bc, statsQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result = bcResult
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry(qt, "metricNamesUsageStats_v1", f, deadline); err != nil {
|
||||
if err := sn.execOnConnWithPossibleRetry(qt, "metricNamesUsageStats_v2", f, deadline); err != nil {
|
||||
return result, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func processGetMetricNamesUsageStatsOnConn(bc *handshake.BufferedConn, tt *storage.TenantToken, limit, le int, matchPattern string) (storage.MetricNamesStatsResponse, error) {
|
||||
func processGetMetricNamesUsageStatsOnConn(bc *handshake.BufferedConn, statsQuery storage.MetricNamesStatsQuery) (storage.MetricNamesStatsResponse, error) {
|
||||
var result storage.MetricNamesStatsResponse
|
||||
hasTenantToken := tt != nil
|
||||
if err := writeBool(bc, hasTenantToken); err != nil {
|
||||
return result, fmt.Errorf("cannot write hasTenantToken: %w", err)
|
||||
|
||||
data, err := json.Marshal(statsQuery)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("cannot marshal MetricNamesStatsQuery: %w", err)
|
||||
}
|
||||
// conditionally write tenant token
|
||||
if hasTenantToken {
|
||||
if err := writeUint32(bc, tt.AccountID); err != nil {
|
||||
return result, fmt.Errorf("cannot write AccountID: %w", err)
|
||||
}
|
||||
if err := writeUint32(bc, tt.ProjectID); err != nil {
|
||||
return result, fmt.Errorf("cannot write ProjectID: %w", err)
|
||||
}
|
||||
}
|
||||
if err := writeLimit(bc, limit); err != nil {
|
||||
return result, fmt.Errorf("cannot write limit: %w", err)
|
||||
}
|
||||
if err := writeInt64(bc, int64(le)); err != nil {
|
||||
return result, fmt.Errorf("cannot write le: %w", err)
|
||||
}
|
||||
if err := writeBytes(bc, []byte(matchPattern)); err != nil {
|
||||
return result, fmt.Errorf("cannot write matchPattern: %w", err)
|
||||
if err := writeBytes(bc, data); err != nil {
|
||||
return result, fmt.Errorf("cannot write MetricNamesStatsQuery: %w", err)
|
||||
}
|
||||
if err := bc.Flush(); err != nil {
|
||||
return result, fmt.Errorf("cannot flush write: %w", err)
|
||||
|
||||
@@ -45,7 +45,18 @@ func MetricNamesStatsHandler(startTime time.Time, at *auth.Token, qt *querytrace
|
||||
ProjectID: at.ProjectID,
|
||||
}
|
||||
}
|
||||
stats, err := netstorage.GetMetricNamesStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
matchNames := r.Form["match_names"]
|
||||
statsQuery := storage.MetricNamesStatsQuery{
|
||||
TenantToken: tt,
|
||||
Limit: limit,
|
||||
Le: le,
|
||||
MatchNames: matchNames,
|
||||
MatchPattern: matchPattern,
|
||||
}
|
||||
if limit > 0 && len(matchNames) > limit {
|
||||
return fmt.Errorf("match_names len=%d cannot exceed limit=%d", len(matchNames), limit)
|
||||
}
|
||||
stats, err := netstorage.GetMetricNamesStats(qt, statsQuery, deadline)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -195,8 +195,8 @@ func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []stora
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (storage.MetricNamesStatsResponse, error) {
|
||||
return api.s.GetMetricNamesStats(qt, tt, limit, le, matchPattern), nil
|
||||
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, statsQuery storage.MetricNamesStatsQuery, _ uint64) (storage.MetricNamesStatsResponse, error) {
|
||||
return api.s.GetMetricNamesStats(qt, statsQuery), nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
|
||||
|
||||
@@ -122,7 +122,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()),
|
||||
})
|
||||
// verify empty stats
|
||||
resp := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"})
|
||||
resp := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "0:0"})
|
||||
if len(resp.Records) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Records), 0)
|
||||
}
|
||||
@@ -155,7 +155,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3"},
|
||||
},
|
||||
}
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -172,7 +172,17 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 3},
|
||||
},
|
||||
}
|
||||
gotStats = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
gotStats = vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
|
||||
}
|
||||
expected = apptest.MetricNamesStatsResponse{
|
||||
Records: []at.MetricNamesStatsRecord{
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 3},
|
||||
},
|
||||
}
|
||||
matchNames := []string{"metric_name_1"}
|
||||
gotStats = vmselect.MetricNamesStats(t, "", "", "", matchNames, apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
|
||||
}
|
||||
@@ -186,14 +196,27 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 9},
|
||||
},
|
||||
}
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// verify multitenant stats
|
||||
expected = apptest.MetricNamesStatsResponse{
|
||||
Records: []at.MetricNamesStatsRecord{
|
||||
{MetricName: "metric_name_2", QueryRequestsCount: 3},
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 9},
|
||||
},
|
||||
}
|
||||
matchNames := []string{"metric_name_2", "metric_name_1"}
|
||||
gotStats = vmselect.MetricNamesStats(t, "", "", "", matchNames, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// reset cache and check empty state
|
||||
vmselect.MetricNamesStatsReset(t, at.QueryOpts{})
|
||||
resp = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
resp = vmselect.MetricNamesStats(t, "", "", "", nil, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
if len(resp.Records) != 0 {
|
||||
t.Fatalf("want 0 records, got: %d", len(resp.Records))
|
||||
}
|
||||
|
||||
@@ -139,13 +139,16 @@ func (app *Vmselect) DeleteSeries(t *testing.T, matchQuery string, opts QueryOpt
|
||||
// and returns the statistics response for given params.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/#Trackingestedmetricsusage
|
||||
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
|
||||
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, matchNames []string, opts QueryOpts) MetricNamesStatsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("limit", limit)
|
||||
values.Add("le", le)
|
||||
values.Add("match_pattern", matchPattern)
|
||||
for _, mn := range matchNames {
|
||||
values.Add("match_names", mn)
|
||||
}
|
||||
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/metric_names_stats", app.httpListenAddr, opts.getTenant())
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values)
|
||||
|
||||
@@ -483,6 +483,7 @@ To get metric names usage statistics, use the `/prometheus/api/v1/status/metric_
|
||||
* `limit` - integer value to limit the number of metric names in response. By default, API returns 1000 records.
|
||||
* `le` - `less than or equal`, is an integer threshold for filtering metric names by their usage count in queries. For example, with `?le=1` API returns metric names that were queried <=1 times.
|
||||
* `match_pattern` - a substring pattern to match metric names. For example, `?match_pattern=vm_` will match any metric names with `vm_` pattern, like `vm_http_requests`, `max_vm_memory_available`. It doesn't support regex syntax.
|
||||
* `match_names` - a list of metric names to query. If this param is defined, `le` and `match_pattern` options will be ignored.
|
||||
|
||||
The API endpoint returns the following `JSON` response:
|
||||
|
||||
|
||||
@@ -18,9 +18,12 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||
|
||||
## tip
|
||||
|
||||
** update note 1: change vmselect to vmstorage cluster RPC method from `metricNamesUsageStats_v1` to `metricNamesUsageStats_v2`
|
||||
|
||||
* FEATURE: all the VictoriaMetrics components: mask `authKey` value from log messages. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5973) for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), [vmagent](https://docs.victoriametrics.com/vmagent/): add helpful hints to the unexpected EOF error message in the write concurrency limiter. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704) for details.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): use [VM remote write protocol](https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol) by default with automatic downgrade in runtime to Prometheus protocol when needed. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8462) for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): enhance - `/api/v1/status/metric_names_stats` [API](https://docs.victoriametrics.com/keyconcepts/#structure-of-a-metric) with `match_names` query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6145) for details and related [docs](https://docs.victoriametrics.com/#track-ingested-metrics-usage)
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): properly init [enterprise](https://docs.victoriametrics.com/enterprise/) version for `linux/arm` and non-CGO buids. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6019) for details.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): remote write client sets correct content encoding header based on actual body content, rather than relying on configuration. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8650).
|
||||
|
||||
@@ -391,6 +391,71 @@ func (mt *Tracker) GetStatsForTenant(accountID, projectID uint32, limit, le int,
|
||||
return result
|
||||
}
|
||||
|
||||
// GetStatsForNamesMultitenant returns stats for metric names
|
||||
// ignores accountID and projectID
|
||||
//
|
||||
// SingleNode version must use GetStatsForNamesTenant with 0 account and project IDs
|
||||
func (mt *Tracker) GetStatsForNamesMultitenant(names []string) StatsResult {
|
||||
var result StatsResult
|
||||
|
||||
result.CollectedSinceTs = mt.creationTs.Load()
|
||||
result.TotalRecords = mt.currentItemsCount.Load()
|
||||
result.MaxSizeBytes = mt.maxSizeBytes
|
||||
result.CurrentSizeBytes = mt.currentSizeBytes.Load()
|
||||
|
||||
mt.mu.RLock()
|
||||
for sk, si := range mt.store {
|
||||
for _, mn := range names {
|
||||
if mn == sk.metricName {
|
||||
result.Records = append(result.Records, StatRecord{
|
||||
MetricName: sk.metricName,
|
||||
RequestsCount: si.requestsCount.Load(),
|
||||
LastRequestTs: si.lastRequestTs.Load(),
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
mt.mu.RUnlock()
|
||||
|
||||
result.sort()
|
||||
result.DeduplicateMergeRecords()
|
||||
return result
|
||||
}
|
||||
|
||||
// GetStatsForNamesTenant returns stats for given accountID and projectID
|
||||
func (mt *Tracker) GetStatsForNamesTenant(accountID, projectID uint32, names []string) StatsResult {
|
||||
var result StatsResult
|
||||
|
||||
result.CollectedSinceTs = mt.creationTs.Load()
|
||||
result.TotalRecords = mt.currentItemsCount.Load()
|
||||
result.MaxSizeBytes = mt.maxSizeBytes
|
||||
result.CurrentSizeBytes = mt.currentSizeBytes.Load()
|
||||
|
||||
mt.mu.RLock()
|
||||
for _, mn := range names {
|
||||
sk := statKey{
|
||||
accountID: accountID,
|
||||
projectID: projectID,
|
||||
metricName: mn,
|
||||
}
|
||||
si, ok := mt.store[sk]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
result.Records = append(result.Records, StatRecord{
|
||||
MetricName: sk.metricName,
|
||||
RequestsCount: si.requestsCount.Load(),
|
||||
LastRequestTs: si.lastRequestTs.Load(),
|
||||
})
|
||||
}
|
||||
|
||||
mt.mu.RUnlock()
|
||||
|
||||
result.sort()
|
||||
return result
|
||||
}
|
||||
|
||||
// GetStats returns stats response for the tracked metrics
|
||||
//
|
||||
// DeduplicateMergeRecords must be called at cluster version on returned result.
|
||||
|
||||
@@ -562,3 +562,60 @@ func TestStatsResultMerge(t *testing.T) {
|
||||
f(dst, src, expected)
|
||||
|
||||
}
|
||||
|
||||
func TestStatsForMetricNames(t *testing.T) {
|
||||
type testOp struct {
|
||||
o byte
|
||||
mg string
|
||||
}
|
||||
|
||||
umt, err := loadFrom(t.TempDir()+t.Name(), storeOverhead+10*2)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load tracker: %s", err)
|
||||
}
|
||||
umt.getCurrentTs = func() uint64 { return 1 }
|
||||
ops := []testOp{
|
||||
{'i', "metric_1"},
|
||||
{'r', "metric_2"},
|
||||
{'r', "metric_1"},
|
||||
{'i', "metric_2"},
|
||||
{'i', "metric_3"},
|
||||
{'i', "metric_4"},
|
||||
{'r', "metric_1"},
|
||||
{'r', "metric_2"},
|
||||
{'r', "metric_2"},
|
||||
{'r', "metric_2"},
|
||||
}
|
||||
for _, op := range ops {
|
||||
switch op.o {
|
||||
case 'i':
|
||||
umt.RegisterIngestRequest(0, 0, []byte(op.mg))
|
||||
case 'r':
|
||||
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
|
||||
}
|
||||
}
|
||||
|
||||
f := func(names []string, expected StatsResult) {
|
||||
t.Helper()
|
||||
got := umt.GetStatsForNamesTenant(0, 0, names)
|
||||
if d := cmp.Diff(expected, got, statsResultCmpOpts); len(d) > 0 {
|
||||
t.Fatalf("unexpected deduplicate result: %s", d)
|
||||
}
|
||||
}
|
||||
want := StatsResult{
|
||||
TotalRecords: 2,
|
||||
Records: []StatRecord{
|
||||
{MetricName: "metric_1", RequestsCount: 2, LastRequestTs: 1},
|
||||
},
|
||||
}
|
||||
f([]string{"metric_1", "metric"}, want)
|
||||
want = StatsResult{
|
||||
TotalRecords: 2,
|
||||
Records: []StatRecord{
|
||||
{MetricName: "metric_1", RequestsCount: 2, LastRequestTs: 1},
|
||||
{MetricName: "metric_2", RequestsCount: 3, LastRequestTs: 1},
|
||||
},
|
||||
}
|
||||
f([]string{"metric_1", "metric_2"}, want)
|
||||
|
||||
}
|
||||
|
||||
@@ -3124,12 +3124,32 @@ type MetricNamesStatsResponse = metricnamestats.StatsResult
|
||||
// MetricNamesStatsRecord represents record at MetricNamesStatsResponse
|
||||
type MetricNamesStatsRecord = metricnamestats.StatRecord
|
||||
|
||||
// GetMetricNamesStats returns metric names usage stats with give limit and lte predicate
|
||||
func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, tt *TenantToken, limit, le int, matchPattern string) MetricNamesStatsResponse {
|
||||
if tt != nil {
|
||||
return s.metricsTracker.GetStatsForTenant(tt.AccountID, tt.ProjectID, limit, le, matchPattern)
|
||||
// MetricNamesStatsQuery represents query params for Stats requests
|
||||
type MetricNamesStatsQuery struct {
|
||||
TenantToken *TenantToken
|
||||
Limit int
|
||||
Le int
|
||||
MatchPattern string
|
||||
MatchNames []string
|
||||
}
|
||||
|
||||
// GetMetricNamesStats returns metric names usage stats for given Query params
|
||||
func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, statsQuery MetricNamesStatsQuery) MetricNamesStatsResponse {
|
||||
|
||||
if statsQuery.TenantToken != nil {
|
||||
tt := statsQuery.TenantToken
|
||||
if len(statsQuery.MatchNames) > 0 {
|
||||
return s.metricsTracker.GetStatsForNamesTenant(tt.AccountID, tt.ProjectID, statsQuery.MatchNames)
|
||||
}
|
||||
|
||||
return s.metricsTracker.GetStatsForTenant(tt.AccountID, tt.ProjectID, statsQuery.Limit, statsQuery.Le, statsQuery.MatchPattern)
|
||||
}
|
||||
res := s.metricsTracker.GetStats(limit, le, matchPattern)
|
||||
|
||||
var res MetricNamesStatsResponse
|
||||
if len(statsQuery.MatchNames) > 0 {
|
||||
return s.metricsTracker.GetStatsForNamesMultitenant(statsQuery.MatchNames)
|
||||
}
|
||||
res = s.metricsTracker.GetStats(statsQuery.Limit, statsQuery.Le, statsQuery.MatchPattern)
|
||||
res.DeduplicateMergeRecords()
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -3471,7 +3471,10 @@ func TestStorageMetricTracker(t *testing.T) {
|
||||
|
||||
var sr Search
|
||||
// check stats for metrics with 0 requests count
|
||||
mus := s.GetMetricNamesStats(nil, nil, 10_000, 0, "")
|
||||
statsQuery := MetricNamesStatsQuery{
|
||||
Limit: 10_000,
|
||||
}
|
||||
mus := s.GetMetricNamesStats(nil, statsQuery)
|
||||
if len(mus.Records) != int(numRows) {
|
||||
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
|
||||
}
|
||||
@@ -3487,11 +3490,12 @@ func TestStorageMetricTracker(t *testing.T) {
|
||||
}
|
||||
sr.MustClose()
|
||||
|
||||
mus = s.GetMetricNamesStats(nil, nil, 10_000, 0, "")
|
||||
mus = s.GetMetricNamesStats(nil, statsQuery)
|
||||
if len(mus.Records) != 0 {
|
||||
t.Fatalf("unexpected Stats records count=%d; want 0 records", len(mus.Records))
|
||||
}
|
||||
mus = s.GetMetricNamesStats(nil, nil, 10_000, 1, "")
|
||||
statsQuery.Le = 1
|
||||
mus = s.GetMetricNamesStats(nil, statsQuery)
|
||||
if len(mus.Records) != int(numRows) {
|
||||
t.Fatalf("unexpected Stats records count=%d, want %d records", len(mus.Records), numRows)
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ type API interface {
|
||||
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
|
||||
|
||||
// GetMetricNamesUsageStats returns statistics for metric names
|
||||
GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (storage.MetricNamesStatsResponse, error)
|
||||
GetMetricNamesUsageStats(qt *querytracer.Tracer, sq storage.MetricNamesStatsQuery, deadline uint64) (storage.MetricNamesStatsResponse, error)
|
||||
|
||||
// ResetMetricNamesUsageStats resets internal state of metric names tracker
|
||||
ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package vmselectapi
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -348,18 +349,6 @@ func (ctx *vmselectRequestCtx) readUint64() (uint64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (ctx *vmselectRequestCtx) readInt64() (int64, error) {
|
||||
ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8)
|
||||
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
|
||||
if err == io.EOF {
|
||||
return 0, err
|
||||
}
|
||||
return 0, fmt.Errorf("cannot read int64: %w", err)
|
||||
}
|
||||
n := encoding.UnmarshalInt64(ctx.sizeBuf)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) {
|
||||
accountID, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
@@ -586,7 +575,7 @@ func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
|
||||
return s.processRegisterMetricNames(ctx)
|
||||
case "tenants_v1":
|
||||
return s.processTenants(ctx)
|
||||
case "metricNamesUsageStats_v1":
|
||||
case "metricNamesUsageStats_v2":
|
||||
return s.processMetricNamesUsageStats(ctx)
|
||||
case "resetMetricNamesStats_v1":
|
||||
return s.processResetMetricUsageStats(ctx)
|
||||
@@ -1076,46 +1065,24 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const maxMetricNamesStatsQueryJSONSize = 1024 * 1024
|
||||
|
||||
func (s *Server) processMetricNamesUsageStats(ctx *vmselectRequestCtx) error {
|
||||
// Read request.
|
||||
hasTenant, err := ctx.readBool()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read hasTenant: %w", err)
|
||||
}
|
||||
var at *storage.TenantToken
|
||||
if hasTenant {
|
||||
accountID, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read accountID: %w", err)
|
||||
}
|
||||
projectID, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read projectID: %w", err)
|
||||
}
|
||||
at = &storage.TenantToken{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
}
|
||||
}
|
||||
limit, err := ctx.readLimit()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read limit: %w", err)
|
||||
}
|
||||
le, err := ctx.readInt64()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read le: %w", err)
|
||||
}
|
||||
if err := ctx.readDataBufBytes(256); err != nil {
|
||||
if err := ctx.readDataBufBytes(maxMetricNamesStatsQueryJSONSize); err != nil {
|
||||
return fmt.Errorf("cannot read matchPattern: %w", err)
|
||||
}
|
||||
matchPattern := string(ctx.dataBuf)
|
||||
var statsQuery storage.MetricNamesStatsQuery
|
||||
if err := json.Unmarshal(ctx.dataBuf, &statsQuery); err != nil {
|
||||
return fmt.Errorf("cannot parse MetricNamesStatsQuery: %w", err)
|
||||
}
|
||||
|
||||
if err := s.beginConcurrentRequest(ctx); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
defer s.endConcurrentRequest()
|
||||
|
||||
result, err := s.api.GetMetricNamesUsageStats(ctx.qt, at, limit, int(le), matchPattern, ctx.deadline)
|
||||
result, err := s.api.GetMetricNamesUsageStats(ctx.qt, statsQuery, ctx.deadline)
|
||||
if err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user