mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-27 05:27:36 +03:00
Compare commits
2 Commits
issue-1098
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0c6aa681f | ||
|
|
c3525bf0bc |
@@ -118,6 +118,7 @@ func main() {
|
||||
remotewrite.InitSecretFlags()
|
||||
buildinfo.Init()
|
||||
logger.Init()
|
||||
opentelemetry.Init()
|
||||
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
|
||||
|
||||
if promscrape.IsDryRun() {
|
||||
|
||||
@@ -25,6 +25,11 @@ var (
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
|
||||
)
|
||||
|
||||
// Init must be called after flag.Parse and before using the opentelemetry package.
|
||||
func Init() {
|
||||
stream.InitDecodeOptions()
|
||||
}
|
||||
|
||||
// InsertHandlerForReader processes metrics from given reader.
|
||||
func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error {
|
||||
return stream.ParseStream(r, encoding, nil, func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error {
|
||||
|
||||
@@ -95,7 +95,6 @@ type groupMetrics struct {
|
||||
iterationTotal *metrics.Counter
|
||||
iterationDuration *metrics.Summary
|
||||
iterationMissed *metrics.Counter
|
||||
iterationReset *metrics.Counter
|
||||
iterationInterval *metrics.Gauge
|
||||
}
|
||||
|
||||
@@ -331,7 +330,6 @@ func (g *Group) Init() {
|
||||
g.metrics.iterationTotal = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
|
||||
g.metrics.iterationDuration = g.metrics.set.NewSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
|
||||
g.metrics.iterationMissed = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
|
||||
g.metrics.iterationReset = g.metrics.set.NewCounter(fmt.Sprintf(`vmalert_iteration_reset_total{%s}`, labels))
|
||||
g.metrics.iterationInterval = g.metrics.set.NewGauge(fmt.Sprintf(`vmalert_iteration_interval_seconds{%s}`, labels), func() float64 {
|
||||
i := g.Interval.Seconds()
|
||||
return i
|
||||
@@ -476,16 +474,14 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
|
||||
if missed < 0 {
|
||||
// missed can become < 0 due to irregular delays during evaluation
|
||||
// which can result in time.Since(evalTS) < g.Interval;
|
||||
// or the system wall clock was changed backward,
|
||||
// Reset the evalTS to the current time.
|
||||
// or the system wall clock was changed backward
|
||||
missed = 0
|
||||
evalTS = time.Now()
|
||||
g.metrics.iterationReset.Inc()
|
||||
} else {
|
||||
evalTS = evalTS.Add((missed + 1) * g.Interval)
|
||||
}
|
||||
if missed > 0 {
|
||||
g.metrics.iterationMissed.Inc()
|
||||
}
|
||||
evalTS = evalTS.Add((missed + 1) * g.Interval)
|
||||
|
||||
eval(evalCtx, evalTS)
|
||||
}
|
||||
|
||||
@@ -89,6 +89,7 @@ var staticServer = http.FileServer(http.FS(staticFiles))
|
||||
func Init() {
|
||||
relabel.Init()
|
||||
common.InitStreamAggr()
|
||||
opentelemetry.Init()
|
||||
protoparserutil.StartUnmarshalWorkers()
|
||||
if len(*graphiteListenAddr) > 0 {
|
||||
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
|
||||
|
||||
@@ -20,6 +20,11 @@ var (
|
||||
metadataInserted = metrics.NewCounter(`vm_metadata_rows_inserted_total{type="opentelemetry"}`)
|
||||
)
|
||||
|
||||
// Init must be called after flag.Parse and before using the opentelemetry package.
|
||||
func Init() {
|
||||
stream.InitDecodeOptions()
|
||||
}
|
||||
|
||||
// InsertHandler processes opentelemetry metrics.
|
||||
func InsertHandler(req *http.Request) error {
|
||||
extraLabels, err := protoparserutil.GetExtraLabels(req)
|
||||
|
||||
@@ -60,10 +60,10 @@ func getDefaultMaxConcurrentRequests() int {
|
||||
|
||||
// Init initializes vmselect
|
||||
func Init() {
|
||||
tmpDirPath := *vmstorage.DataPath + "/tmp"
|
||||
tmpDirPath := vmstorage.DataPath() + "/tmp"
|
||||
fs.MustRemoveDirContents(tmpDirPath)
|
||||
netstorage.InitTmpBlocksDir(tmpDirPath)
|
||||
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
||||
promql.InitRollupResultCache(vmstorage.DataPath() + "/cache/rollupResult")
|
||||
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
|
||||
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
|
||||
retentionPeriod = flagutil.NewRetentionDuration("retentionPeriod", "1M", "Data with timestamps outside the retentionPeriod is automatically deleted. The minimum retentionPeriod is 24h or 1d. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
|
||||
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
|
||||
@@ -43,9 +44,6 @@ var (
|
||||
|
||||
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
|
||||
|
||||
// DataPath is a path to storage data.
|
||||
DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
|
||||
|
||||
_ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing")
|
||||
_ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing")
|
||||
_ = flag.Int("smallMergeConcurrency", 0, "Deprecated: this flag does nothing")
|
||||
@@ -103,6 +101,10 @@ var (
|
||||
"If set to 0 or a negative value, defaults to 1% of allowed memory.")
|
||||
)
|
||||
|
||||
func DataPath() string {
|
||||
return *storageDataPath
|
||||
}
|
||||
|
||||
// Init initializes vmstorage.
|
||||
func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
|
||||
@@ -132,7 +134,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
if *idbPrefillStart > 23*time.Hour {
|
||||
logger.Panicf("-storage.idbPrefillStart cannot exceed 23 hours; got %s", idbPrefillStart)
|
||||
}
|
||||
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
|
||||
logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod)
|
||||
startTime := time.Now()
|
||||
WG = syncwg.WaitGroup{}
|
||||
opts := storage.OpenOptions{
|
||||
@@ -146,7 +148,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
IDBPrefillStart: *idbPrefillStart,
|
||||
LogNewSeries: *logNewSeries,
|
||||
}
|
||||
strg := storage.MustOpenStorage(*DataPath, opts)
|
||||
strg := storage.MustOpenStorage(*storageDataPath, opts)
|
||||
Storage = strg
|
||||
initStaleSnapshotsRemover(strg)
|
||||
|
||||
@@ -158,7 +160,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
rowsCount := tm.SmallRowsCount + tm.BigRowsCount
|
||||
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
|
||||
logger.Infof("successfully opened storage %q in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
|
||||
*DataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
|
||||
*storageDataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
|
||||
|
||||
// register storage metrics
|
||||
storageMetrics = metrics.NewSet()
|
||||
@@ -166,7 +168,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
writeStorageMetrics(w, strg)
|
||||
})
|
||||
metrics.RegisterSet(storageMetrics)
|
||||
fs.RegisterPathFsMetrics(*DataPath)
|
||||
fs.RegisterPathFsMetrics(*storageDataPath)
|
||||
}
|
||||
|
||||
var storageMetrics *metrics.Set
|
||||
@@ -311,7 +313,7 @@ func Stop() {
|
||||
metrics.UnregisterSet(storageMetrics, true)
|
||||
storageMetrics = nil
|
||||
|
||||
logger.Infof("gracefully closing the storage at %s", *DataPath)
|
||||
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
|
||||
startTime := time.Now()
|
||||
WG.WaitAndBlock()
|
||||
stopStaleSnapshotsRemover()
|
||||
@@ -515,15 +517,15 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
|
||||
tm := &m.TableMetrics
|
||||
idbm := &m.TableMetrics.IndexDBMetrics
|
||||
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_bytes{path=%q}`, *DataPath), fs.MustGetFreeSpace(*DataPath))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *DataPath), uint64(minFreeDiskSpaceBytes.N))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_total_disk_space_bytes{path=%q}`, *DataPath), fs.MustGetTotalSpace(*DataPath))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), uint64(minFreeDiskSpaceBytes.N))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_total_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetTotalSpace(*storageDataPath))
|
||||
|
||||
isReadOnly := 0
|
||||
if strg.IsReadOnly() {
|
||||
isReadOnly = 1
|
||||
}
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *DataPath), uint64(isReadOnly))
|
||||
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *storageDataPath), uint64(isReadOnly))
|
||||
|
||||
metrics.WriteGaugeUint64(w, `vm_active_merges{type="storage/inmemory"}`, tm.ActiveInmemoryMerges)
|
||||
metrics.WriteGaugeUint64(w, `vm_active_merges{type="storage/small"}`, tm.ActiveSmallMerges)
|
||||
|
||||
@@ -2804,10 +2804,10 @@
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 11
|
||||
"y": 352
|
||||
},
|
||||
"id": 63,
|
||||
"options": {
|
||||
@@ -2843,113 +2843,7 @@
|
||||
],
|
||||
"title": "Restarts ($job)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Group iteration reset can be caused by irregular delays during evaluation or by the system wall clock being moved backward.\nIf it is caused by host clock changes, vmalert could generate duplicate results for the group rules, since some evaluations could be repeated.\nCheck the host clock time synchronization configuration if this happens frequently.\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisBorderShow": false,
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"barWidthFactor": 0.6,
|
||||
"drawStyle": "bars",
|
||||
"fillOpacity": 10,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"legend": false,
|
||||
"tooltip": false,
|
||||
"viz": false
|
||||
},
|
||||
"insertNulls": false,
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "never",
|
||||
"showValues": false,
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": 0
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "short"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 11
|
||||
},
|
||||
"id": 70,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"mean",
|
||||
"lastNotNull",
|
||||
"max"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"hideZeros": false,
|
||||
"mode": "multi",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"pluginVersion": "12.2.0",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(increase(vmalert_iteration_reset_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job, group, file) > 0",
|
||||
"interval": "1m",
|
||||
"legendFormat": "({{job}}) {{group}}({{file}})",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Group Iteration Reset ($instance)",
|
||||
"type": "timeseries"
|
||||
}
|
||||
}
|
||||
],
|
||||
"title": "Troubleshooting",
|
||||
"type": "row"
|
||||
|
||||
@@ -2803,10 +2803,10 @@
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 11
|
||||
"y": 352
|
||||
},
|
||||
"id": 63,
|
||||
"options": {
|
||||
@@ -2842,113 +2842,7 @@
|
||||
],
|
||||
"title": "Restarts ($job)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Group iteration reset can be caused by irregular delays during evaluation or by the system wall clock being moved backward.\nIf it is caused by host clock changes, vmalert could generate duplicate results for the group rules, since some evaluations could be repeated.\nCheck the host clock time synchronization configuration if this happens frequently.\n",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisBorderShow": false,
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"barWidthFactor": 0.6,
|
||||
"drawStyle": "bars",
|
||||
"fillOpacity": 10,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"legend": false,
|
||||
"tooltip": false,
|
||||
"viz": false
|
||||
},
|
||||
"insertNulls": false,
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "never",
|
||||
"showValues": false,
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": 0
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "short"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 7,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 11
|
||||
},
|
||||
"id": 70,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [
|
||||
"mean",
|
||||
"lastNotNull",
|
||||
"max"
|
||||
],
|
||||
"displayMode": "table",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"hideZeros": false,
|
||||
"mode": "multi",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"pluginVersion": "12.2.0",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(increase(vmalert_iteration_reset_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job, group, file) > 0",
|
||||
"interval": "1m",
|
||||
"legendFormat": "({{job}}) {{group}}({{file}})",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Group Iteration Reset ($instance)",
|
||||
"type": "timeseries"
|
||||
}
|
||||
}
|
||||
],
|
||||
"title": "Troubleshooting",
|
||||
"type": "row"
|
||||
|
||||
@@ -64,18 +64,6 @@ groups:
|
||||
group \"{{ $labels.group }}\". See https://docs.victoriametrics.com/victoriametrics/vmalert/#groups.
|
||||
If rule expressions are taking longer than expected, please see https://docs.victoriametrics.com/victoriametrics/troubleshooting/#slow-queries."
|
||||
|
||||
- alert: GroupIterationReset
|
||||
expr: increase(vmalert_iteration_reset_total[5m]) > 0
|
||||
for: 5m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
summary: "Evaluation iteration for group {{ $labels.group }} in file {{ $labels.file }} is reset"
|
||||
description: "Evaluation iteration for group \"{{ $labels.group }}\" in file \"{{ $labels.file }}\" is reset on vmalert instance {{ $labels.instance }}.
|
||||
This can be caused by irregular delays during evaluation or by the system wall clock being moved backward. If it is caused by host clock changes, vmalert could
|
||||
generate duplicate results for the group rules since some evaluations could be repeated. Check host clock time synchronization configurations if this happens frequently."
|
||||
|
||||
|
||||
- alert: RemoteWriteErrors
|
||||
expr: increase(vmalert_remotewrite_errors_total[5m]) > 0
|
||||
for: 15m
|
||||
@@ -120,3 +108,4 @@ groups:
|
||||
summary: "vmalert instance {{ $labels.instance }} is failing to send notifications to Alertmanager"
|
||||
description: "vmalert instance {{ $labels.instance }} is failing to send alert notifications to \"{{ $labels.addr }}\".
|
||||
Check vmalert's logs for detailed error message."
|
||||
|
||||
|
||||
@@ -26,8 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): reset the group evaluation timestamp if it exceeds the current host time. Previously, vmalert could use future timestamps for evaluations if the system clock was shifted backward. See [#10985](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10985).
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
|
||||
|
||||
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)
|
||||
|
||||
|
||||
@@ -28,9 +28,17 @@ The following label sanitization options can be enabled:
|
||||
|
||||
> These flags can be applied on vmagent, vminsert or VictoriaMetrics single-node.
|
||||
|
||||
## Instrumentation Scope
|
||||
|
||||
By default, VictoriaMetrics promotes [OTel scope metadata](https://opentelemetry.io/docs/specs/otel/common/instrumentation-scope/) to metric labels. This behavior can be disabled via `-opentelemetry.promoteScopeMetadata`.
|
||||
|
||||
## Resource Attributes
|
||||
|
||||
By default, VictoriaMetrics promotes all [OpenTelemetry resource](https://opentelemetry.io/docs/specs/otel/resource/data-model/) attributes to labels and attaches them to all ingested OTLP metrics.
|
||||
The following attribute promotion options can be configured:
|
||||
* `-opentelemetry.promoteAllResourceAttributes` - promotes all resource attributes to labels, except for the ones configured with `-opentelemetry.ignoreResourceAttributes`.
|
||||
* `-opentelemetry.promoteResourceAttributes` - promotes specific list of resource attributes to labels. It cannot be configured simultaneously with `-opentelemetry.promoteAllResourceAttributes`.
|
||||
* `-opentelemetry.ignoreResourceAttributes` - controls which resource attributes to ignore, can only be set when `-opentelemetry.promoteAllResourceAttributes` is true.
|
||||
|
||||
## Exponential histograms
|
||||
|
||||
|
||||
@@ -217,11 +217,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.convertMetricNamesToPrometheus
|
||||
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentelemetry.ignoreResourceAttributes array
|
||||
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
|
||||
-opentelemetry.labelNameUnderscoreSanitization
|
||||
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
|
||||
-opentelemetry.maxRequestSize size
|
||||
The maximum size in bytes of a single OpenTelemetry request
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.promoteAllResourceAttributes
|
||||
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
|
||||
-opentelemetry.promoteResourceAttributes array
|
||||
Promote specific list of resource attributes to labels.
|
||||
-opentelemetry.promoteScopeMetadata
|
||||
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
|
||||
-opentelemetry.usePrometheusNaming
|
||||
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentsdbHTTPListenAddr string
|
||||
|
||||
@@ -184,11 +184,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.convertMetricNamesToPrometheus
|
||||
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentelemetry.ignoreResourceAttributes array
|
||||
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
|
||||
-opentelemetry.labelNameUnderscoreSanitization
|
||||
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
|
||||
-opentelemetry.maxRequestSize size
|
||||
The maximum size in bytes of a single OpenTelemetry request
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.promoteAllResourceAttributes
|
||||
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
|
||||
-opentelemetry.promoteResourceAttributes array
|
||||
Promote specific list of resource attributes to labels.
|
||||
-opentelemetry.promoteScopeMetadata
|
||||
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
|
||||
-opentelemetry.usePrometheusNaming
|
||||
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentsdbHTTPListenAddr string
|
||||
|
||||
@@ -184,11 +184,19 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.convertMetricNamesToPrometheus
|
||||
Whether to convert only metric names into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentelemetry.ignoreResourceAttributes array
|
||||
Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.
|
||||
-opentelemetry.labelNameUnderscoreSanitization
|
||||
Whether to enable prepending of 'key' to labels starting with '_' when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/ (default true)
|
||||
-opentelemetry.maxRequestSize size
|
||||
The maximum size in bytes of a single OpenTelemetry request
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
|
||||
-opentelemetry.promoteAllResourceAttributes
|
||||
Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.
|
||||
-opentelemetry.promoteResourceAttributes array
|
||||
Promote specific list of resource attributes to labels.
|
||||
-opentelemetry.promoteScopeMetadata
|
||||
Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.
|
||||
-opentelemetry.usePrometheusNaming
|
||||
Whether to convert metric names and labels into Prometheus-compatible format for the metrics ingested via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/
|
||||
-opentsdbHTTPListenAddr string
|
||||
|
||||
@@ -14,6 +14,14 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
)
|
||||
|
||||
// DecodeMetricsOptions defines options for DecodeMetricsData
|
||||
type DecodeMetricsOptions struct {
|
||||
DisableScopeMetadata bool
|
||||
DisableResourceAttributes bool
|
||||
// ResourceAttributesList stored a list of resource attributes to ignore or promote based on the value of DisableResourceAttributes.
|
||||
ResourceAttributesList map[string]struct{}
|
||||
}
|
||||
|
||||
// MetricPusher must push the parsed samples and metric metadata to the underlying storage.
|
||||
type MetricPusher interface {
|
||||
// PushSample must store a sample with the given args.
|
||||
@@ -73,8 +81,8 @@ func (r *MetricsData) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
}
|
||||
}
|
||||
|
||||
// DecodeMetricsData decodes metricsData from src and sends the decoded data to mp.
|
||||
func DecodeMetricsData(src []byte, mp MetricPusher) (err error) {
|
||||
// DecodeMetricsData decodes metricsData with given options from src and sends the decoded data to mp.
|
||||
func DecodeMetricsData(src []byte, mp MetricPusher, options DecodeMetricsOptions) (err error) {
|
||||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L56
|
||||
//
|
||||
// message MetricsData {
|
||||
@@ -96,7 +104,7 @@ func DecodeMetricsData(src []byte, mp MetricPusher) (err error) {
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot read ResourceMetrics data")
|
||||
}
|
||||
if err := dctx.decodeResourceMetrics(data); err != nil {
|
||||
if err := dctx.decodeResourceMetrics(data, options); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal ResourceMetrics: %w", err)
|
||||
}
|
||||
}
|
||||
@@ -121,7 +129,7 @@ func (rm *ResourceMetrics) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
|
||||
func (dctx *decoderContext) decodeResourceMetrics(src []byte, options DecodeMetricsOptions) error {
|
||||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L66
|
||||
//
|
||||
// message ResourceMetrics {
|
||||
@@ -137,7 +145,7 @@ func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
|
||||
return fmt.Errorf("cannot read Resource data: %w", err)
|
||||
}
|
||||
if ok {
|
||||
if err := dctx.decodeResource(resourceData); err != nil {
|
||||
if err := dctx.decodeResource(resourceData, options.DisableResourceAttributes, options.ResourceAttributesList); err != nil {
|
||||
return fmt.Errorf("cannot decode Resource: %w", err)
|
||||
}
|
||||
}
|
||||
@@ -157,7 +165,7 @@ func (dctx *decoderContext) decodeResourceMetrics(src []byte) error {
|
||||
return fmt.Errorf("cannot read ScopeMetrics data")
|
||||
}
|
||||
|
||||
if err := dctx.decodeScopeMetrics(data); err != nil {
|
||||
if err := dctx.decodeScopeMetrics(data, options.DisableScopeMetadata); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal ScopeMetrics: %w", err)
|
||||
}
|
||||
|
||||
@@ -180,7 +188,7 @@ func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dctx *decoderContext) decodeResource(src []byte) (err error) {
|
||||
func (dctx *decoderContext) decodeResource(src []byte, disableResourceAttributes bool, attributeKeys map[string]struct{}) (err error) {
|
||||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/resource/v1/resource.proto#L28
|
||||
//
|
||||
// message Resource {
|
||||
@@ -199,7 +207,34 @@ func (dctx *decoderContext) decodeResource(src []byte) (err error) {
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot read Attributes")
|
||||
}
|
||||
if err := decodeKeyValue(data, &dctx.ls, &dctx.fb, ""); err != nil {
|
||||
keySuffix, ok, err := easyproto.GetString(data, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find Key in KeyValue: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
// Key is missing, skip it.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/869#issuecomment-3631307996
|
||||
continue
|
||||
}
|
||||
if _, ok := attributeKeys[keySuffix]; ok != disableResourceAttributes {
|
||||
// Skip the attribute if:
|
||||
// 1. it is in the list of ignore attributes when disableResourceAttributes is false,
|
||||
// 2. it isn't in the list of promote attributes when disableResourceAttributes is true.
|
||||
continue
|
||||
}
|
||||
key := dctx.fb.formatSubFieldName("", keySuffix)
|
||||
|
||||
// Decode value
|
||||
value, ok, err := easyproto.GetMessageData(data, 2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find Value in KeyValue: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
// Value is null, skip it.
|
||||
continue
|
||||
}
|
||||
|
||||
if err := decodeAnyValue(value, &dctx.ls, &dctx.fb, key); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal Attributes: %w", err)
|
||||
}
|
||||
}
|
||||
@@ -257,7 +292,6 @@ func decodeKeyValue(src []byte, ls *promutil.Labels, fb *fmtBuffer, keyPrefix st
|
||||
if err := decodeAnyValue(valueData, ls, fb, key); err != nil {
|
||||
return fmt.Errorf("cannot decode AnyValue: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -452,7 +486,7 @@ func (sm *ScopeMetrics) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dctx *decoderContext) decodeScopeMetrics(src []byte) error {
|
||||
func (dctx *decoderContext) decodeScopeMetrics(src []byte, disableScopeMetadata bool) error {
|
||||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/049d4332834935792fd4dbd392ecd31904f99ba2/opentelemetry/proto/metrics/v1/metrics.proto#L86
|
||||
//
|
||||
// message ScopeMetrics {
|
||||
@@ -460,19 +494,22 @@ func (dctx *decoderContext) decodeScopeMetrics(src []byte) error {
|
||||
// repeated Metric metrics = 2;
|
||||
// }
|
||||
|
||||
scopeData, ok, err := easyproto.GetMessageData(src, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read InstrumentationScope: %w", err)
|
||||
}
|
||||
if ok {
|
||||
if err := dctx.decodeInstrumentationScope(scopeData); err != nil {
|
||||
return fmt.Errorf("cannot decode InstrumentationScope: %w", err)
|
||||
if !disableScopeMetadata {
|
||||
scopeData, ok, err := easyproto.GetMessageData(src, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read InstrumentationScope: %w", err)
|
||||
}
|
||||
if ok {
|
||||
if err := dctx.decodeInstrumentationScope(scopeData); err != nil {
|
||||
return fmt.Errorf("cannot decode InstrumentationScope: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dctxSnapshot := dctx.getSnapshot()
|
||||
|
||||
var fc easyproto.FieldContext
|
||||
var err error
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
|
||||
166
lib/protoparser/opentelemetry/pb/pb_test.go
Normal file
166
lib/protoparser/opentelemetry/pb/pb_test.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package pb
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
)
|
||||
|
||||
type testMetricPusher struct {
|
||||
samples []testSample
|
||||
metadata []MetricMetadata
|
||||
}
|
||||
|
||||
type testSample struct {
|
||||
mm MetricMetadata
|
||||
suffix string
|
||||
labels []prompb.Label
|
||||
ts uint64
|
||||
value float64
|
||||
}
|
||||
|
||||
func (p *testMetricPusher) PushSample(mm *MetricMetadata, suffix string, ls *promutil.Labels, timestampNsecs uint64, value float64, _ uint32) {
|
||||
labels := make([]prompb.Label, len(ls.Labels))
|
||||
copy(labels, ls.Labels)
|
||||
p.samples = append(p.samples, testSample{
|
||||
mm: *mm,
|
||||
suffix: suffix,
|
||||
labels: labels,
|
||||
ts: timestampNsecs,
|
||||
value: value,
|
||||
})
|
||||
}
|
||||
|
||||
func (p *testMetricPusher) PushMetricMetadata(mm *MetricMetadata) {
|
||||
p.metadata = append(p.metadata, *mm)
|
||||
}
|
||||
|
||||
func TestDecodeScopeMetrics(t *testing.T) {
|
||||
scopeName := "my-scope"
|
||||
scopeVersion := "v1.0"
|
||||
envVal := "prod"
|
||||
intVal := int64(1)
|
||||
md := &MetricsData{
|
||||
ResourceMetrics: []*ResourceMetrics{
|
||||
{
|
||||
Resource: &Resource{
|
||||
Attributes: []*KeyValue{
|
||||
{Key: "job", Value: &AnyValue{StringValue: new("vm")}},
|
||||
{Key: "region", Value: &AnyValue{StringValue: new("us-east-1")}},
|
||||
},
|
||||
},
|
||||
ScopeMetrics: []*ScopeMetrics{
|
||||
{
|
||||
Scope: &InstrumentationScope{
|
||||
Name: &scopeName,
|
||||
Version: &scopeVersion,
|
||||
Attributes: []*KeyValue{
|
||||
{Key: "env", Value: &AnyValue{StringValue: &envVal}},
|
||||
},
|
||||
},
|
||||
Metrics: []*Metric{
|
||||
{
|
||||
Name: "my-gauge",
|
||||
Description: "a test gauge",
|
||||
Gauge: &Gauge{
|
||||
DataPoints: []*NumberDataPoint{
|
||||
{
|
||||
Attributes: []*KeyValue{{Key: "label1", Value: &AnyValue{StringValue: new("value1")}}},
|
||||
IntValue: &intVal,
|
||||
TimeUnixNano: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data := md.MarshalProtobuf(nil)
|
||||
|
||||
f := func(options DecodeMetricsOptions, wantLabels map[string]string) {
|
||||
t.Helper()
|
||||
mp := &testMetricPusher{}
|
||||
if err := DecodeMetricsData(data, mp, options); err != nil {
|
||||
t.Fatalf("DecodeMetricsData error: %v", err)
|
||||
}
|
||||
if len(mp.samples) != 1 {
|
||||
t.Fatalf("expected 1 sample, got %d", len(mp.samples))
|
||||
}
|
||||
gotMap := make(map[string]string, len(mp.samples[0].labels))
|
||||
for _, l := range mp.samples[0].labels {
|
||||
gotMap[l.Name] = l.Value
|
||||
}
|
||||
if !reflect.DeepEqual(gotMap, wantLabels) {
|
||||
t.Errorf("unexpected labels:\n got: %v\n want: %v", gotMap, wantLabels)
|
||||
}
|
||||
}
|
||||
|
||||
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=true:
|
||||
// got all scope labels and resource attrs
|
||||
f(DecodeMetricsOptions{
|
||||
DisableScopeMetadata: false,
|
||||
DisableResourceAttributes: false,
|
||||
}, map[string]string{
|
||||
"job": "vm",
|
||||
"region": "us-east-1",
|
||||
"scope.name": "my-scope",
|
||||
"scope.version": "v1.0",
|
||||
"scope.attributes.env": "prod",
|
||||
"label1": "value1",
|
||||
})
|
||||
|
||||
// PromoteScopeMetadata=false + PromoteAllResourceAttributes=true:
|
||||
// got all resource attrs, no scope labels
|
||||
f(DecodeMetricsOptions{
|
||||
DisableScopeMetadata: true,
|
||||
DisableResourceAttributes: false,
|
||||
}, map[string]string{
|
||||
"job": "vm",
|
||||
"region": "us-east-1",
|
||||
"label1": "value1",
|
||||
})
|
||||
|
||||
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=false + ResourceAttributesList=[region]:
|
||||
// got only the `region` attr from resource
|
||||
f(DecodeMetricsOptions{
|
||||
DisableScopeMetadata: false,
|
||||
DisableResourceAttributes: true,
|
||||
ResourceAttributesList: map[string]struct{}{"region": {}},
|
||||
}, map[string]string{
|
||||
"region": "us-east-1",
|
||||
"scope.name": "my-scope",
|
||||
"scope.version": "v1.0",
|
||||
"scope.attributes.env": "prod",
|
||||
"label1": "value1",
|
||||
})
|
||||
|
||||
// PromoteScopeMetadata=true + PromoteAllResourceAttributes=true + ResourceAttributesList=[region]:
|
||||
// got all resource attrs except `region` (ignore list)
|
||||
f(DecodeMetricsOptions{
|
||||
DisableScopeMetadata: false,
|
||||
DisableResourceAttributes: false,
|
||||
ResourceAttributesList: map[string]struct{}{"region": {}},
|
||||
}, map[string]string{
|
||||
"job": "vm",
|
||||
"scope.name": "my-scope",
|
||||
"scope.version": "v1.0",
|
||||
"scope.attributes.env": "prod",
|
||||
"label1": "value1",
|
||||
})
|
||||
|
||||
// PromoteScopeMetadata=false + PromoteAllResourceAttributes=false + ResourceAttributesList=[job]:
|
||||
// got only `job` attr
|
||||
f(DecodeMetricsOptions{
|
||||
DisableScopeMetadata: true,
|
||||
DisableResourceAttributes: true,
|
||||
ResourceAttributesList: map[string]struct{}{"job": {}},
|
||||
}, map[string]string{
|
||||
"job": "vm",
|
||||
"label1": "value1",
|
||||
})
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
@@ -10,13 +11,43 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
||||
)
|
||||
|
||||
var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
|
||||
var (
|
||||
maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
|
||||
|
||||
promoteScopeMetadata = flag.Bool("opentelemetry.promoteScopeMetadata", true, "Whether to promote OTel scope metadata (i.e. name, version, schema URL, and attributes) to metric labels.")
|
||||
promoteAllResourceAttributes = flag.Bool("opentelemetry.promoteAllResourceAttributes", true, "Whether to promote all resource attributes to labels, except for the ones configured with 'opentelemetry.ignoreResourceAttributes'.")
|
||||
promoteResourceAttributes = flagutil.NewArrayString("opentelemetry.promoteResourceAttributes", "Promote specific list of resource attributes to labels.")
|
||||
ignoreResourceAttributes = flagutil.NewArrayString("opentelemetry.ignoreResourceAttributes", "Control which resource attributes to ignore, can only be set when 'opentelemetry.promoteAllResourceAttributes' is true.")
|
||||
)
|
||||
|
||||
// InitDecodeOptions configures decoding settings for the parser
|
||||
func InitDecodeOptions() {
|
||||
if *promoteAllResourceAttributes && len(*promoteResourceAttributes) > 0 {
|
||||
logger.Fatalf("cannot set both '-opentelemetry.promoteAllResourceAttributes' and '-opentelemetry.promoteResourceAttributes'")
|
||||
}
|
||||
if !*promoteAllResourceAttributes && len(*ignoreResourceAttributes) > 0 {
|
||||
logger.Fatalf("'-opentelemetry.ignoreResourceAttributes' can only be set when '-opentelemetry.promoteAllResourceAttributes' is true.")
|
||||
}
|
||||
defaultDecodeMetricsOptions.DisableScopeMetadata = !*promoteScopeMetadata
|
||||
defaultDecodeMetricsOptions.DisableResourceAttributes = !*promoteAllResourceAttributes
|
||||
attributes := *ignoreResourceAttributes
|
||||
if !*promoteAllResourceAttributes {
|
||||
attributes = *promoteResourceAttributes
|
||||
}
|
||||
defaultDecodeMetricsOptions.ResourceAttributesList = make(map[string]struct{}, len(attributes))
|
||||
for _, a := range attributes {
|
||||
defaultDecodeMetricsOptions.ResourceAttributesList[a] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
var defaultDecodeMetricsOptions = pb.DecodeMetricsOptions{}
|
||||
|
||||
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
|
||||
//
|
||||
@@ -47,7 +78,7 @@ func parseData(data []byte, callback func(tss []prompb.TimeSeries, mms []prompb.
|
||||
// the flushFunc will be called multiple time if the request is big, to avoid over allocating memory for such request.
|
||||
wctx.flushFunc = callback
|
||||
|
||||
if err := pb.DecodeMetricsData(data, wctx); err != nil {
|
||||
if err := pb.DecodeMetricsData(data, wctx, defaultDecodeMetricsOptions); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user