Compare commits

..

2 Commits

Author SHA1 Message Date
Artem Fetishev
d0c6aa681f vmsingle: rename DataPath variable to storageDataPath (#11018)
This way the variable will match the corresponding name in cluster
branch which will reduce diff between branches.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-05-26 19:17:05 +02:00
Hui Wang
c3525bf0bc lib/protoparser/opentelemetry: support disable scope and resource attributes label promotions
This commit adds four flags to allow managing label promotion for
resource attributes and OTel scope metadata, while staying compatible
with
[Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/):
- `-opentelemetry.promoteScopeMetadata` - promote OTel scope metadata
(i.e. name, version, schema URL, and attributes) to metric labels.
- `opentelemetry.promoteAllResourceAttributes` - promote all resource
attributes to labels, except for the ones configured with
`-opentelemetry.ignoreResourceAttributes`.
- `opentelemetry.promoteResourceAttributes` - promote specific list of
resource attributes to labels. It cannot be configured simultaneously
with `opentelemetry.promoteAllResourceAttributes`.
- `opentelemetry.ignoreResourceAttributes` - which resource attributes
to ignore, can only be set when
`opentelemetry.promoteAllResourceAttributes` is true.

`-opentelemetry.promoteScopeMetadata` and
`opentelemetry.promoteAllResourceAttributes` are enabled by default in
order to preserve the current behavior.

fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931.
2026-05-26 17:47:07 +02:00
18 changed files with 324 additions and 272 deletions

View File

@@ -118,6 +118,7 @@ func main() {
remotewrite.InitSecretFlags()
buildinfo.Init()
logger.Init()
opentelemetry.Init()
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
if promscrape.IsDryRun() {

View File

@@ -25,6 +25,11 @@ var (
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
)
// Init must be called after flag.Parse and before using the opentelemetry package.
func Init() {
stream.InitDecodeOptions()
}
// InsertHandlerForReader processes metrics from given reader.
func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error {
return stream.ParseStream(r, encoding, nil, func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error {

View File

@@ -95,7 +95,6 @@ type groupMetrics struct {
iterationTotal *metrics.Counter
iterationDuration *metrics.Summary
iterationMissed *metrics.Counter
iterationReset *metrics.Counter
iterationInterval *metrics.Gauge
}
@@ -331,7 +330,6 @@ func (g *Group) Init() {
g.metrics.iterationTotal = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
g.metrics.iterationDuration = g.metrics.set.NewSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
g.metrics.iterationMissed = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
g.metrics.iterationReset = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_reset_total{%s}`, labels))
g.metrics.iterationInterval = g.metrics.set.NewGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
i := g.Interval.Seconds()
return i
@@ -476,16 +474,14 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
if missed < 0 {
// missed can become < 0 due to irregular delays during evaluation
// which can result in time.Since(evalTS) < g.Interval;
// or the system wall clock was changed backward,
// Reset the evalTS to the current time.
// or the system wall clock was changed backward
missed = 0
evalTS = time.Now()
g.metrics.iterationReset.Inc()
} else {
evalTS = evalTS.Add((missed + 1) * g.Interval)
}
if missed > 0 {
g.metrics.iterationMissed.Inc()
}
evalTS = evalTS.Add((missed + 1) * g.Interval)
eval(evalCtx, evalTS)
}

View File

@@ -89,6 +89,7 @@ var staticServer = http.FileServer(http.FS(staticFiles))
func Init() {
relabel.Init()
common.InitStreamAggr()
opentelemetry.Init()
protoparserutil.StartUnmarshalWorkers()
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)

View File

@@ -20,6 +20,11 @@ var (
metadataInserted = metrics.NewCounter(`vm_metadata_rows_inserted_total{type="opentelemetry"}`)
)
// Init must be called after flag.Parse and before using the opentelemetry package.
func Init() {
stream.InitDecodeOptions()
}
// InsertHandler processes opentelemetry metrics.
func InsertHandler(req *http.Request) error {
extraLabels, err := protoparserutil.GetExtraLabels(req)

View File

@@ -60,10 +60,10 @@ func getDefaultMaxConcurrentRequests() int {
// Init initializes vmselect
func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
tmpDirPath := vmstorage.DataPath() + "/tmp"
fs.MustRemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
promql.InitRollupResultCache(vmstorage.DataPath() + "/cache/rollupResult")
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)

View File

@@ -31,6 +31,7 @@ import (
)
var (
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
retentionPeriod = flagutil.NewRetentionDuration("retentionPeriod", "1M", "Data with timestamps outside the retentionPeriod is automatically deleted. The minimum retentionPeriod is 24h or 1d. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
@@ -43,9 +44,6 @@ var (
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
// DataPath is a path to storage data.
DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
_ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing")
_ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing")
_ = flag.Int("smallMergeConcurrency", 0, "Deprecated: this flag does nothing")
@@ -103,6 +101,10 @@ var (
"If set to 0 or a negative value, defaults to 1% of allowed memory.")
)
func DataPath() string {
return *storageDataPath
}
// Init initializes vmstorage.
func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
@@ -132,7 +134,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
if *idbPrefillStart > 23*time.Hour {
logger.Panicf("-storage.idbPrefillStart cannot exceed 23 hours; got %s", idbPrefillStart)
}
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod)
startTime := time.Now()
WG = syncwg.WaitGroup{}
opts := storage.OpenOptions{
@@ -146,7 +148,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
IDBPrefillStart: *idbPrefillStart,
LogNewSeries: *logNewSeries,
}
strg := storage.MustOpenStorage(*DataPath, opts)
strg := storage.MustOpenStorage(*storageDataPath, opts)
Storage = strg
initStaleSnapshotsRemover(strg)
@@ -158,7 +160,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
rowsCount := tm.SmallRowsCount + tm.BigRowsCount
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
logger.Infof("successfully opened storage %q in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
*DataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
*storageDataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
// register storage metrics
storageMetrics = metrics.NewSet()
@@ -166,7 +168,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
writeStorageMetrics(w, strg)
})
metrics.RegisterSet(storageMetrics)
fs.RegisterPathFsMetrics(*DataPath)
fs.RegisterPathFsMetrics(*storageDataPath)
}
var storageMetrics *metrics.Set
@@ -311,7 +313,7 @@ func Stop() {
metrics.UnregisterSet(storageMetrics, true)
storageMetrics = nil
logger.Infof("gracefully closing the storage at %s", *DataPath)
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime := time.Now()
WG.WaitAndBlock()
stopStaleSnapshotsRemover()
@@ -515,15 +517,15 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
tm := &m.TableMetrics
idbm := &m.TableMetrics.IndexDBMetrics
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_bytes{path=%q}`, *DataPath), fs.MustGetFreeSpace(*DataPath))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *DataPath), uint64(minFreeDiskSpaceBytes.N))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_total_disk_space_bytes{path=%q}`, *DataPath), fs.MustGetTotalSpace(*DataPath))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), uint64(minFreeDiskSpaceBytes.N))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_total_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetTotalSpace(*storageDataPath))
isReadOnly := 0
if strg.IsReadOnly() {
isReadOnly = 1
}
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *DataPath), uint64(isReadOnly))
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *storageDataPath), uint64(isReadOnly))
metrics.WriteGaugeUint64(w, `vm_active_merges{type="storage/inmemory"}`, tm.ActiveInmemoryMerges)
metrics.WriteGaugeUint64(w, `vm_active_merges{type="storage/small"}`, tm.ActiveSmallMerges)

View File

@@ -2804,10 +2804,10 @@
"overrides": []
},
"gridPos": {
"h": 7,
"h": 8,
"w": 12,
"x": 0,
"y": 11
"y": 352
},
"id": 63,
"options": {
@@ -2843,113 +2843,7 @@
],
"title": "Restarts ($job)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"description": "Group iteration reset can be caused by irregular delays during evaluation or by the system wall clock being moved backward.\nIf it is caused by host clock changes, vmalert could generate duplicate results for the group rules, since some evaluations could be repeated.\nCheck the host clock time synchronization configuration if this happens frequently.\n",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "bars",
"fillOpacity": 10,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"showValues": false,
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 11
},
"id": 70,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "multi",
"sort": "none"
}
},
"pluginVersion": "12.2.0",
"targets": [
{
"datasource": {
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": false,
"expr": "sum(increase(vmalert_iteration_reset_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job, group, file) > 0",
"interval": "1m",
"legendFormat": "({{job}}) {{group}}({{file}})",
"range": true,
"refId": "A"
}
],
"title": "Group Iteration Reset ($instance)",
"type": "timeseries"
}
}
],
"title": "Troubleshooting",
"type": "row"

View File

@@ -2803,10 +2803,10 @@
"overrides": []
},
"gridPos": {
"h": 7,
"h": 8,
"w": 12,
"x": 0,
"y": 11
"y": 352
},
"id": 63,
"options": {
@@ -2842,113 +2842,7 @@
],
"title": "Restarts ($job)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Group iteration reset can be caused by irregular delays during evaluation or by the system wall clock being moved backward.\nIf it is caused by host clock changes, vmalert could generate duplicate results for the group rules, since some evaluations could be repeated.\nCheck the host clock time synchronization configuration if this happens frequently.\n",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "bars",
"fillOpacity": 10,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"showValues": false,
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": 0
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 11
},
"id": 70,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "multi",
"sort": "none"
}
},
"pluginVersion": "12.2.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": false,
"expr": "sum(increase(vmalert_iteration_reset_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job, group, file) > 0",
"interval": "1m",
"legendFormat": "({{job}}) {{group}}({{file}})",
"range": true,
"refId": "A"
}
],
"title": "Group Iteration Reset ($instance)",
"type": "timeseries"
}
}
],
"title": "Troubleshooting",
"type": "row"

View File

@@ -64,18 +64,6 @@ groups:
group \"{{ $labels.group }}\". See https://docs.victoriametrics.com/victoriametrics/vmalert/#groups.
If rule expressions are taking longer than expected, please see https://docs.victoriametrics.com/victoriametrics/troubleshooting/#slow-queries."
- alert: GroupIterationReset
expr: increase(vmalert_iteration_reset_total[5m]) > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Evaluation iteration for group {{ $labels.group }} in file {{ $labels.file }} is reset"
description: "Evaluation iteration for group \"{{ $labels.group }}\" in file \"{{ $labels.file }}\" is reset on vmalert instance {{ $labels.instance }}.
This can be caused by irregular delays during evaluation or by the system wall clock being moved backward. If it is caused by host clock changes, vmalert could
generate duplicate results for the group rules since some evaluations could be repeated. Check host clock time synchronization configurations if this happens frequently."
- alert: RemoteWriteErrors
expr: increase(vmalert_remotewrite_errors_total[5m]) > 0
for: 15m
@@ -120,3 +108,4 @@ groups:
summary: "vmalert instance {{ $labels.instance }} is failing to send notifications to Alertmanager"
description: "vmalert instance {{ $labels.instance }} is failing to send alert notifications to \"{{ $labels.addr }}\".
Check vmalert's logs for detailed error message."

View File

@@ -26,8 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): reset the group evaluation timestamp if it exceeds the current host time. Previously, vmalert could use future timestamps for evaluations if the system clock was shifted backward. See [#10985](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10985).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)

View File

@@ -28,9 +28,17 @@ The following label sanitization options can be enabled:
> These flags can be applied on vmagent, vminsert or VictoriaMetrics single-node.
## Instrumentation Scope
By default, VictoriaMetrics promotes [OTel scope metadata](https://opentelemetry.io/docs/specs/otel/common/instrumentation-scope/) to metric labels. This behavior can be disabled via `-opentelemetry.promoteScopeMetadata`.
## Resource Attributes
By default, VictoriaMetrics promotes all [OpenTelemetry resource](https://opentelemetry.io/docs/specs/otel/resource/data-model/) attributes to labels and attaches them to all ingested OTLP metrics.
The following attribute promotion options can be configured:
* `-opentelemetry.promoteAllResourceAttributes` - promotes all resource attributes to labels, except for the ones configured with `-opentelemetry.ignoreResourceAttributes`.
* `-opentelemetry.promoteResourceAttributes` - promotes specific list of resource attributes to labels. It cannot be configured simultaneously with `-opentelemetry.promoteAllResourceAttributes`.
* `-opentelemetry.ignoreResourceAttributes` - controls which resource attributes to ignore, can only be set when `-opentelemetry.promoteAllResourceAttributes` is true.
## Exponential histograms

View File

@@ -217,11 +217,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.convertMetricNamesToPrometheus
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentelemetry.ignoreResourceAttributes array
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
-opentelemetry.labelNameUnderscoreSanitization
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
-opentelemetry.maxRequestSize size
The maximum size in bytes of a single OpenTelemetry request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.promoteAllResourceAttributes
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
-opentelemetry.promoteResourceAttributes array
Promote specific list of resource attributes to labels.
-opentelemetry.promoteScopeMetadata
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
-opentelemetry.usePrometheusNaming
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentsdbHTTPListenAddr string

View File

@@ -184,11 +184,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.convertMetricNamesToPrometheus
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentelemetry.ignoreResourceAttributes array
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
-opentelemetry.labelNameUnderscoreSanitization
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
-opentelemetry.maxRequestSize size
The maximum size in bytes of a single OpenTelemetry request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.promoteAllResourceAttributes
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
-opentelemetry.promoteResourceAttributes array
Promote specific list of resource attributes to labels.
-opentelemetry.promoteScopeMetadata
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
-opentelemetry.usePrometheusNaming
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentsdbHTTPListenAddr string

View File

@@ -184,11 +184,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.convertMetricNamesToPrometheus
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentelemetry.ignoreResourceAttributes array
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
-opentelemetry.labelNameUnderscoreSanitization
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
-opentelemetry.maxRequestSize size
The maximum size in bytes of a single OpenTelemetry request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.promoteAllResourceAttributes
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
-opentelemetry.promoteResourceAttributes array
Promote specific list of resource attributes to labels.
-opentelemetry.promoteScopeMetadata
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
-opentelemetry.usePrometheusNaming
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
-opentsdbHTTPListenAddr string

View File

@@ -14,6 +14,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
// DecodeMetricsOptions defines options for DecodeMetricsData
type DecodeMetricsOptions struct {
DisableScopeMetadata bool
DisableResourceAttributes bool
// ResourceAttributesList stored a list of resource attributes to ignore or promote based on the value of DisableResourceAttributes.
ResourceAttributesList map[string]struct{}
}
// MetricPusher must push the parsed samples and metric metadata to the underlying storage.
type MetricPusher interface {
// PushSample must store a sample with the given args.
@@ -73,8 +81,8 @@ func (r *MetricsData) marshalProtobuf(mm *easyproto.MessageMarshaler) {
}
}
// DecodeMetricsData decodes metricsData from src and sends the decoded data to mp.
func DecodeMetricsData(src []byte, mp MetricPusher) (err error) {
// DecodeMetricsData decodes metricsData with given options from src and sends the decoded data to mp.
func DecodeMetricsData(src []byte, mp MetricPusher, options DecodeMetricsOptions) (err error) {
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L56
//
// message MetricsData {
@@ -96,7 +104,7 @@ func DecodeMetricsData(src []byte, mp MetricPusher) (err error) {
if !ok {
return fmt.Errorf("cannot read ResourceMetrics data")
}
if err := dctx.decodeResourceMetrics(data); err != nil {
if err := dctx.decodeResourceMetrics(data, options); err != nil {
return fmt.Errorf("cannot unmarshal ResourceMetrics: %w", err)
}
}
@@ -121,7 +129,7 @@ func (rm *ResourceMetrics) marshalProtobuf(mm *easyproto.MessageMarshaler) {
}
}
func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
func (dctx *decoderContext) decodeResourceMetrics(src []byte, options DecodeMetricsOptions) error {
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L66
//
// message ResourceMetrics {
@@ -137,7 +145,7 @@ func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
return fmt.Errorf("cannot read Resource data: %w", err)
}
if ok {
if err := dctx.decodeResource(resourceData); err != nil {
if err := dctx.decodeResource(resourceData, options.DisableResourceAttributes, options.ResourceAttributesList); err != nil {
return fmt.Errorf("cannot decode Resource: %w", err)
}
}
@@ -157,7 +165,7 @@ func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
return fmt.Errorf("cannot read ScopeMetrics data")
}
if err := dctx.decodeScopeMetrics(data); err != nil {
if err := dctx.decodeScopeMetrics(data, options.DisableScopeMetadata); err != nil {
return fmt.Errorf("cannot unmarshal ScopeMetrics: %w", err)
}
@@ -180,7 +188,7 @@ func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) {
}
}
func (dctx *decoderContext) decodeResource(src []byte) (err error) {
func (dctx *decoderContext) decodeResource(src []byte, disableResourceAttributes bool, attributeKeys map[string]struct{}) (err error) {
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/resource/v1/resource.proto#L28
//
// message Resource {
@@ -199,7 +207,34 @@ func (dctx *decoderContext) decodeResource(src []byte) (err error) {
if !ok {
return fmt.Errorf("cannot read Attributes")
}
if err := decodeKeyValue(data, &dctx.ls, &dctx.fb, ""); err != nil {
keySuffix, ok, err := easyproto.GetString(data, 1)
if err != nil {
return fmt.Errorf("cannot find Key in KeyValue: %w", err)
}
if !ok {
// Key is missing, skip it.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/869#issuecomment-3631307996
continue
}
if _, ok := attributeKeys[keySuffix]; ok != disableResourceAttributes {
// Skip the attribute if:
// 1. it is in the list of ignore attributes when disableResourceAttributes is false,
// 2. it isn't in the list of promote attributes when disableResourceAttributes is true.
continue
}
key := dctx.fb.formatSubFieldName("", keySuffix)
// Decode value
value, ok, err := easyproto.GetMessageData(data, 2)
if err != nil {
return fmt.Errorf("cannot find Value in KeyValue: %w", err)
}
if !ok {
// Value is null, skip it.
continue
}
if err := decodeAnyValue(value, &dctx.ls, &dctx.fb, key); err != nil {
return fmt.Errorf("cannot unmarshal Attributes: %w", err)
}
}
@@ -257,7 +292,6 @@ func decodeKeyValue(src []byte, ls *promutil.Labels, fb *fmtBuffer, keyPrefix st
if err := decodeAnyValue(valueData, ls, fb, key); err != nil {
return fmt.Errorf("cannot decode AnyValue: %w", err)
}
return nil
}
@@ -452,7 +486,7 @@ func (sm *ScopeMetrics) marshalProtobuf(mm *easyproto.MessageMarshaler) {
}
}
func (dctx *decoderContext) decodeScopeMetrics(src []byte) error {
func (dctx *decoderContext) decodeScopeMetrics(src []byte, disableScopeMetadata bool) error {
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L86
//
// message ScopeMetrics {
@@ -460,19 +494,22 @@ func (dctx *decoderContext) decodeScopeMetrics(src []byte) error {
// repeated Metric metrics = 2;
// }
scopeData, ok, err := easyproto.GetMessageData(src, 1)
if err != nil {
return fmt.Errorf("cannot read InstrumentationScope: %w", err)
}
if ok {
if err := dctx.decodeInstrumentationScope(scopeData); err != nil {
return fmt.Errorf("cannot decode InstrumentationScope: %w", err)
if !disableScopeMetadata {
scopeData, ok, err := easyproto.GetMessageData(src, 1)
if err != nil {
return fmt.Errorf("cannot read InstrumentationScope: %w", err)
}
if ok {
if err := dctx.decodeInstrumentationScope(scopeData); err != nil {
return fmt.Errorf("cannot decode InstrumentationScope: %w", err)
}
}
}
dctxSnapshot := dctx.getSnapshot()
var fc easyproto.FieldContext
var err error
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {

View File

@@ -0,0 +1,166 @@
package pb
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
type testMetricPusher struct {
samples []testSample
metadata []MetricMetadata
}
type testSample struct {
mm MetricMetadata
suffix string
labels []prompb.Label
ts uint64
value float64
}
func (p *testMetricPusher) PushSample(mm *MetricMetadata, suffix string, ls *promutil.Labels, timestampNsecs uint64, value float64, _ uint32) {
labels := make([]prompb.Label, len(ls.Labels))
copy(labels, ls.Labels)
p.samples = append(p.samples, testSample{
mm: *mm,
suffix: suffix,
labels: labels,
ts: timestampNsecs,
value: value,
})
}
func (p *testMetricPusher) PushMetricMetadata(mm *MetricMetadata) {
p.metadata = append(p.metadata, *mm)
}
func TestDecodeScopeMetrics(t *testing.T) {
scopeName := "my-scope"
scopeVersion := "v1.0"
envVal := "prod"
intVal := int64(1)
md := &MetricsData{
ResourceMetrics: []*ResourceMetrics{
{
Resource: &Resource{
Attributes: []*KeyValue{
{Key: "job", Value: &AnyValue{StringValue: new("vm")}},
{Key: "region", Value: &AnyValue{StringValue: new("us-east-1")}},
},
},
ScopeMetrics: []*ScopeMetrics{
{
Scope: &InstrumentationScope{
Name: &scopeName,
Version: &scopeVersion,
Attributes: []*KeyValue{
{Key: "env", Value: &AnyValue{StringValue: &envVal}},
},
},
Metrics: []*Metric{
{
Name: "my-gauge",
Description: "a test gauge",
Gauge: &Gauge{
DataPoints: []*NumberDataPoint{
{
Attributes: []*KeyValue{{Key: "label1", Value: &AnyValue{StringValue: new("value1")}}},
IntValue: &intVal,
TimeUnixNano: 1000,
},
},
},
},
},
},
},
},
},
}
data := md.MarshalProtobuf(nil)
f := func(options DecodeMetricsOptions, wantLabels map[string]string) {
t.Helper()
mp := &testMetricPusher{}
if err := DecodeMetricsData(data, mp, options); err != nil {
t.Fatalf("DecodeMetricsData error: %v", err)
}
if len(mp.samples) != 1 {
t.Fatalf("expected 1 sample, got %d", len(mp.samples))
}
gotMap := make(map[string]string, len(mp.samples[0].labels))
for _, l := range mp.samples[0].labels {
gotMap[l.Name] = l.Value
}
if !reflect.DeepEqual(gotMap, wantLabels) {
t.Errorf("unexpected labels:\n got: %v\n want: %v", gotMap, wantLabels)
}
}
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=true:
// got all scope labels and resource attrs
f(DecodeMetricsOptions{
DisableScopeMetadata: false,
DisableResourceAttributes: false,
}, map[string]string{
"job": "vm",
"region": "us-east-1",
"scope.name": "my-scope",
"scope.version": "v1.0",
"scope.attributes.env": "prod",
"label1": "value1",
})
// PromoteScopeMetadata=false + PromoteAllResourceAttributes=true:
// got all resource attrs, no scope labels
f(DecodeMetricsOptions{
DisableScopeMetadata: true,
DisableResourceAttributes: false,
}, map[string]string{
"job": "vm",
"region": "us-east-1",
"label1": "value1",
})
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=false + ResourceAttributesList=[region]:
// got only the `region` attr from resource
f(DecodeMetricsOptions{
DisableScopeMetadata: false,
DisableResourceAttributes: true,
ResourceAttributesList: map[string]struct{}{"region": {}},
}, map[string]string{
"region": "us-east-1",
"scope.name": "my-scope",
"scope.version": "v1.0",
"scope.attributes.env": "prod",
"label1": "value1",
})
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=true + ResourceAttributesList=[region]:
// got all resource attrs except `region` (ignore list)
f(DecodeMetricsOptions{
DisableScopeMetadata: false,
DisableResourceAttributes: false,
ResourceAttributesList: map[string]struct{}{"region": {}},
}, map[string]string{
"job": "vm",
"scope.name": "my-scope",
"scope.version": "v1.0",
"scope.attributes.env": "prod",
"label1": "value1",
})
// PromoteScopeMetadata=false + PromoteAllResourceAttributes=false + ResourceAttributesList=[job]:
// got only `job` attr
f(DecodeMetricsOptions{
DisableScopeMetadata: true,
DisableResourceAttributes: true,
ResourceAttributesList: map[string]struct{}{"job": {}},
}, map[string]string{
"job": "vm",
"label1": "value1",
})
}

View File

@@ -1,6 +1,7 @@
package stream
import (
"flag"
"fmt"
"io"
"sync"
@@ -10,13 +11,43 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
var (
maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
promoteScopeMetadata = flag.Bool("opentelemetry.promoteScopeMetadata", true, "Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.")
promoteAllResourceAttributes = flag.Bool("opentelemetry.promoteAllResourceAttributes", true, "Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.")
promoteResourceAttributes = flagutil.NewArrayString("opentelemetry.promoteResourceAttributes", "Promote specific list of resource attributes to labels.")
ignoreResourceAttributes = flagutil.NewArrayString("opentelemetry.ignoreResourceAttributes", "Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.")
)
// InitDecodeOptions configures decoding settings for the parser
func InitDecodeOptions() {
if *promoteAllResourceAttributes && len(*promoteResourceAttributes) > 0 {
logger.Fatalf("cannot set both '-opentelemetry.promoteAllResourceAttributes' and '-opentelemetry.promoteResourceAttributes'")
}
if !*promoteAllResourceAttributes && len(*ignoreResourceAttributes) > 0 {
logger.Fatalf("'-opentelemetry.ignoreResourceAttributes' can only be set when '-opentelemetry.promoteAllResourceAttributes' is true.")
}
defaultDecodeMetricsOptions.DisableScopeMetadata = !*promoteScopeMetadata
defaultDecodeMetricsOptions.DisableResourceAttributes = !*promoteAllResourceAttributes
attributes := *ignoreResourceAttributes
if !*promoteAllResourceAttributes {
attributes = *promoteResourceAttributes
}
defaultDecodeMetricsOptions.ResourceAttributesList = make(map[string]struct{}, len(attributes))
for _, a := range attributes {
defaultDecodeMetricsOptions.ResourceAttributesList[a] = struct{}{}
}
}
var defaultDecodeMetricsOptions = pb.DecodeMetricsOptions{}
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
//
@@ -47,7 +78,7 @@ func parseData(data []byte, callback func(tss []prompb.TimeSeries, mms []prompb.
// the flushFunc will be called multiple time if the request is big, to avoid over allocating memory for such request.
wctx.flushFunc = callback
if err := pb.DecodeMetricsData(data, wctx); err != nil {
if err := pb.DecodeMetricsData(data, wctx, defaultDecodeMetricsOptions); err != nil {
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
}