mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
app/vmagent: properly attach tenant information to metadata (#10865)
Previously, vmagent ignored tenant ID information obtained from `__tenant_id__` label for metrics metadata. It made it impossible to route metrics metadata to the `/multitenant` endpoints. This commit adds tenant ID to the metrics metadata. It also fixes VMagent multitenant ingestion endpoints. Previously, the tenant info defined there was not properly set to metadata. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10828 PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10865 --------- Signed-off-by: Nikolay <nik@victoriametrics.com> Signed-off-by: f41gh7 <nik@victoriametrics.com> Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> Co-authored-by: Max Kotliar <mkotlyar@victoriametrics.com>
This commit is contained in:
@@ -77,16 +77,6 @@ func insertRows(at *auth.Token, tss []prompb.TimeSeries, mms []prompb.MetricMeta
|
||||
|
||||
var metadataTotal int
|
||||
if prommetadata.IsEnabled() {
|
||||
var accountID, projectID uint32
|
||||
if at != nil {
|
||||
accountID = at.AccountID
|
||||
projectID = at.ProjectID
|
||||
for i := range mms {
|
||||
mm := &mms[i]
|
||||
mm.AccountID = accountID
|
||||
mm.ProjectID = projectID
|
||||
}
|
||||
}
|
||||
ctx.WriteRequest.Metadata = mms
|
||||
metadataTotal = len(mms)
|
||||
}
|
||||
|
||||
@@ -75,11 +75,6 @@ func insertRows(at *auth.Token, rows []prometheus.Row, mms []prometheus.Metadata
|
||||
Samples: samples[len(samples)-1:],
|
||||
})
|
||||
}
|
||||
var accountID, projectID uint32
|
||||
if at != nil {
|
||||
accountID = at.AccountID
|
||||
projectID = at.ProjectID
|
||||
}
|
||||
for i := range mms {
|
||||
mm := &mms[i]
|
||||
mmsDst = append(mmsDst, prompb.MetricMetadata{
|
||||
@@ -88,8 +83,6 @@ func insertRows(at *auth.Token, rows []prometheus.Row, mms []prometheus.Metadata
|
||||
Type: mm.Type,
|
||||
// there is no unit in Prometheus exposition formats
|
||||
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Timeseries = tssDst
|
||||
|
||||
@@ -72,11 +72,6 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
|
||||
|
||||
var metadataTotal int
|
||||
if prommetadata.IsEnabled() {
|
||||
var accountID, projectID uint32
|
||||
if at != nil {
|
||||
accountID = at.AccountID
|
||||
projectID = at.ProjectID
|
||||
}
|
||||
for i := range mms {
|
||||
mm := &mms[i]
|
||||
mmsDst = append(mmsDst, prompb.MetricMetadata{
|
||||
@@ -85,8 +80,8 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
|
||||
Type: mm.Type,
|
||||
Unit: mm.Unit,
|
||||
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
AccountID: mm.AccountID,
|
||||
ProjectID: mm.ProjectID,
|
||||
})
|
||||
}
|
||||
ctx.WriteRequest.Metadata = mmsDst
|
||||
|
||||
@@ -211,6 +211,9 @@ func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
|
||||
dst.Type = src.Type
|
||||
dst.Unit = src.Unit
|
||||
|
||||
dst.AccountID = src.AccountID
|
||||
dst.ProjectID = src.ProjectID
|
||||
|
||||
// Pre-allocate memory for all string fields.
|
||||
neededBufLen := len(src.MetricFamilyName) + len(src.Help)
|
||||
bufLen := len(wr.metadatabuf)
|
||||
|
||||
@@ -398,7 +398,7 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
|
||||
|
||||
// Push metadata separately from time series, since it doesn't need sharding,
|
||||
// relabeling, stream aggregation, deduplication, etc.
|
||||
if !tryPushMetadataToRemoteStorages(rwctxs, mms, forceDropSamplesOnFailure) {
|
||||
if !tryPushMetadataToRemoteStorages(at, rwctxs, mms, forceDropSamplesOnFailure) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -536,11 +536,18 @@ func pushTimeSeriesToRemoteStoragesTrackDropped(tss []prompb.TimeSeries) {
|
||||
}
|
||||
}
|
||||
|
||||
func tryPushMetadataToRemoteStorages(rwctxs []*remoteWriteCtx, mms []prompb.MetricMetadata, forceDropSamplesOnFailure bool) bool {
|
||||
func tryPushMetadataToRemoteStorages(at *auth.Token, rwctxs []*remoteWriteCtx, mms []prompb.MetricMetadata, forceDropSamplesOnFailure bool) bool {
|
||||
if len(mms) == 0 {
|
||||
// Nothing to push
|
||||
return true
|
||||
}
|
||||
if at != nil {
|
||||
for idx := range mms {
|
||||
mm := &mms[idx]
|
||||
mm.AccountID = at.AccountID
|
||||
mm.ProjectID = at.ProjectID
|
||||
}
|
||||
}
|
||||
// Do not shard metadata even if -remoteWrite.shardByURL is set, just replicate it among rwctxs.
|
||||
// Since metadata is usually small and there is no guarantee that metadata can be sent to
|
||||
// the same remote storage with the corresponding metrics.
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
// TestSingleVMAgentReloadConfigs verifies that vmagent reload new configurations on SIGHUP signal
|
||||
@@ -512,3 +513,74 @@ func TestSingleVMAgentCardinalityLimiter(t *testing.T) {
|
||||
t.Fatalf("unexpected vmagent_daily_series_limit_rows_dropped_total value: %d", v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterVMAgentForwardMetricsMetadata(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
sut := tc.MustStartDefaultCluster()
|
||||
|
||||
vmagent := tc.MustStartVmagent("vmagent", []string{
|
||||
`-remoteWrite.flushInterval=50ms`,
|
||||
`-remoteWrite.forcePromProto=true`,
|
||||
`-enableMultitenantHandlers=true`,
|
||||
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
|
||||
fmt.Sprintf(`-remoteWrite.url=http://%s/insert/multitenant/prometheus/api/v1/write`, sut.Vminsert.HTTPAddr()),
|
||||
}, ``)
|
||||
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary, AccountID: 100},
|
||||
},
|
||||
}
|
||||
vmagent.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
Msg: "unexpected /api/v1/metadata response",
|
||||
Got: func() any {
|
||||
return sut.PrometheusAPIV1Metadata(t, ``, -1, apptest.QueryOpts{Tenant: "100:0"})
|
||||
},
|
||||
Want: &apptest.PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: map[string][]apptest.MetadataEntry{
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
prometheusRemoteWriteDataSet = prompb.WriteRequest{
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeSummary, AccountID: 100},
|
||||
},
|
||||
}
|
||||
// enforce tenant from request uri /insert/tenant_id/prometheus/api/v1/write
|
||||
vmagent.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: "500:500"})
|
||||
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
Msg: "unexpected /api/v1/metadata response",
|
||||
Got: func() any {
|
||||
return sut.PrometheusAPIV1Metadata(t, ``, -1, apptest.QueryOpts{Tenant: "500:500"})
|
||||
},
|
||||
Want: &apptest.PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: map[string][]apptest.MetadataEntry{
|
||||
"metric_name_6": {{Help: "some help message", Type: "summary"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
Msg: "unexpected /api/v1/metadata response",
|
||||
Got: func() any {
|
||||
return sut.PrometheusAPIV1Metadata(t, ``, -1, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
},
|
||||
Want: &apptest.PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: map[string][]apptest.MetadataEntry{
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_6": {{Help: "some help message", Type: "summary"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
@@ -10,6 +10,10 @@ import (
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Vmagent holds the state of a vmagent app and provides vmagent-specific functions
|
||||
@@ -158,6 +162,31 @@ func (app *Vmagent) ReloadRelabelConfigs(t *testing.T) {
|
||||
t.Fatalf("relabel configs were not reloaded after SIGHUP signal; previous total: %f, current total: %f", prevTotal, currTotal)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Write is a test helper function that inserts a
|
||||
// collection of records in Prometheus remote-write format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/write vmagent endpoint.
|
||||
func (app *Vmagent) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/prometheus/api/v1/write", app.httpListenAddr)
|
||||
if opts.Tenant != "" {
|
||||
url = fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.Tenant)
|
||||
}
|
||||
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
|
||||
recordsCount := len(wr.Timeseries)
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += len(wr.Metadata)
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
app.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// HTTPAddr returns the address at which the vmagent process is listening
|
||||
// for http connections.
|
||||
func (app *Vmagent) HTTPAddr() string {
|
||||
@@ -176,16 +205,22 @@ func (app *Vmagent) HTTPAddr() string {
|
||||
// If it is, then the data has been sent to vmstorage.
|
||||
//
|
||||
// Unreliable if the records are inserted concurrently.
|
||||
func (app *Vmagent) sendBlocking(t *testing.T, numRecordsToSend int, send func()) {
|
||||
func (app *Vmagent) sendBlocking(t *testing.T, _ int, send func()) {
|
||||
t.Helper()
|
||||
|
||||
currRowsSentCount := app.remoteWriteRequestsTotal(t)
|
||||
|
||||
send()
|
||||
|
||||
const (
|
||||
retries = 20
|
||||
period = 100 * time.Millisecond
|
||||
)
|
||||
wantRowsSentCount := app.remoteWriteRequestsTotal(t) + numRecordsToSend
|
||||
// TODO: properly account wantRowsSentCount
|
||||
// currently vmagent doesn't expose per time-series write information
|
||||
// so we can only account number of blocks sent via remote write protocol
|
||||
// it should be suitable for tests purpose
|
||||
wantRowsSentCount := currRowsSentCount + 1
|
||||
for range retries {
|
||||
if app.remoteWriteRequestsTotal(t) >= wantRowsSentCount {
|
||||
return
|
||||
|
||||
@@ -36,6 +36,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix increased memory usage after upgrade to [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0) by properly accounting for internal buffer count when calculating per-storage buffer size. See [#10725](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10725#issuecomment-4282256709).
|
||||
* BUGFIX: all VictoriaMetrics components: properly parse IPv6 source address when accepting connections with proxy protocol v2 enabled. See [#10839](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10839). Thanks to @andriibeee for the contribution.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): properly attach [tenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy) from `__tenant_id__` label to the scraped metadata. See [#10828](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10828).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): keep [tenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy) ingested into vmagent via [prometheus remotewrite](https://docs.victoriametrics.com/victoriametrics/integrations/prometheus/) endpoint. See [#10828](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10828).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): `-maxScrapeSize` is now correctly applied when reading response bodies, including non-OK scrape error responses. See [#10804](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10804).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix `ec2_sd_configs` returning 401 `AuthFailure` from AWS when credentials are obtained via IRSA, instance role or `AWS_CONTAINER_CREDENTIALS_*` env vars. The regression was introduced in [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0). See [#10815](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10815).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix leak of backend TCP connections, file descriptors and goroutines when the client cancels the request after the backend response has been received. See [#10833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10833). Thanks to @andriibeee for the contribution.
|
||||
|
||||
@@ -501,9 +501,24 @@ scrape_configs:
|
||||
target_label: vm_account_id
|
||||
```
|
||||
|
||||
In addition, vmagent could obtain tenant identifier from `__tenant_id__` label at target discovery phase.
|
||||
It implicitly converts `__tenant_id__` label into `vm_account_id` and `vm_project_id` labels and attaches
|
||||
it to the scraped metrics and metrics metadata.
|
||||
For example, the following relabeling rule instructs sending metrics to `10:5` [tenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
|
||||
defined in the `prometheus.io/tenant_id: 10:5` annotation of Kubernetes pod deployment:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- kubernetes_sd_configs:
|
||||
- role: pod
|
||||
relabel_configs:
|
||||
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_tenant_id]
|
||||
target_label: __tenant_id__
|
||||
```
|
||||
|
||||
`vmagent` can accept data via the same multitenant endpoints (`/insert/<accountID>/<suffix>`) as `vminsert` at [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/)
|
||||
does according to [these docs](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format) if `-enableMultitenantHandlers` command-line flag is set.
|
||||
In this case, vmagent automatically converts tenant identifiers from the URL to `vm_account_id` and `vm_project_id` labels.
|
||||
In this case, vmagent automatically converts tenant identifiers from the URL to `vm_account_id` and `vm_project_id` labels and sets tenant info in metadata.
|
||||
These tenant labels are added before applying [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) specified via `-remoteWrite.relabelConfig`
|
||||
and `-remoteWrite.urlRelabelConfig` command-line flags. Metrics with `vm_account_id` and `vm_project_id` labels can be routed to the corresponding tenants
|
||||
when specifying `-remoteWrite.url` to [multitenant url at VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels).
|
||||
@@ -665,10 +680,14 @@ e.g. it sets `scrape_series_added` metric to zero. See [these docs](#automatical
|
||||
`vmagent` accepts{{% available_from "v1.137.0" %}} metric metadata exposed by scrape targets in [Prometheus exposition format](https://github.com/prometheus/docs/blob/main/docs/instrumenting/exposition_formats.md), received via [Prometheus remote write v1](https://prometheus.io/docs/specs/prw/remote_write_spec/) or [OpenTelemetry protocol](https://github.com/open-telemetry/opentelemetry-proto/blob/v1.7.0/opentelemetry/proto/metrics/v1/metrics.proto) by default. Set `-enableMetadata=false` to disable metadata processing{{% available_from "v1.125.1" %}}.
|
||||
During processing, metadata won't be dropped or modified by [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) or [streaming aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/).
|
||||
|
||||
When `-enableMultitenantHandlers` is enabled, vmagent adds tenant info to metadata received via the [multitenant endpoints](https://docs.victoriametrics.com/victoriametrics/vmagent/#multitenancy) (`/insert/<accountID>/<suffix>`). However, if `vm_account_id` or `vm_project_id` labels are added directly to metrics before reaching vmagent, and vmagent writes to the [vminsert multitenant endpoints](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels), the tenant info won't be attached and the metadata will be stored under the default tenant of VictoriaMetrics cluster.
|
||||
When `-enableMultitenantHandlers` is enabled, vmagent adds tenant info to metadata specified in [multitenant endpoint](https://docs.victoriametrics.com/victoriametrics/vmagent/#multitenancy) (`/insert/<accountID>/<suffix>`).
|
||||
However, if the `/insert/multitenant/<suffix>` endpoint is used, vmagent preserves the tenant information provided in the metadata by the sender. If the sender does not specify tenant information, the default tenant `0:0` is used.
|
||||
|
||||
> Metadata requires extra memory, disk space, and network traffic.
|
||||
|
||||
Use `-remoteWrite.disableMetadata`{{% available_from "v1.140.0" %}} to fully disable sending metadata from vmagent.
|
||||
This reduces network traffic and resource usage when metadata is not required.
|
||||
|
||||
## Stream parsing mode
|
||||
|
||||
By default, `vmagent` parses the full response from the scrape target, applies [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/)
|
||||
|
||||
Reference in New Issue
Block a user