Merge remote-tracking branch 'opensource/master' into vmagent-drain-in-memory-queue

This commit is contained in:
Max Kotliar
2026-05-12 15:56:04 +03:00
9 changed files with 51 additions and 9 deletions

View File

@@ -57,6 +57,8 @@ jobs:
arch: amd64
- os: openbsd
arch: amd64
- os: netbsd
arch: amd64
- os: windows
arch: amd64
steps:

View File

@@ -699,7 +699,7 @@ func shardAmountRemoteWriteCtx(tssBlock []prompb.TimeSeries, shards [][]prompb.T
}
tmpLabels.Labels = hashLabels
}
h := getLabelsHash(hashLabels)
h := getLabelsHashForShard(hashLabels)
// Get the rwctxIdx through consistent hashing and then map it to the index in shards.
// The rwctxIdx is not always equal to the shardIdx, for example, when some rwctx are not available.
@@ -790,11 +790,28 @@ var (
dailySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_daily_series_limit_rows_dropped_total`)
)
// getLabelsHashForShard is a separate function from getLabelsHash because
// it omits the '=' separator between label name and value for backward compatibility.
// Changing it would re-shard all series across remoteWrite targets.
func getLabelsHashForShard(labels []prompb.Label) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
for _, label := range labels {
b = append(b, label.Name...)
b = append(b, label.Value...)
}
h := xxhash.Sum64(b)
bb.B = b
labelsHashBufPool.Put(bb)
return h
}
func getLabelsHash(labels []prompb.Label) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
for _, label := range labels {
b = append(b, label.Name...)
b = append(b, '=')
b = append(b, label.Value...)
}
h := xxhash.Sum64(b)

View File

@@ -25,7 +25,7 @@ func TestGetLabelsHash_Distribution(t *testing.T) {
t.Helper()
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
itemsCount := 1_000 * bucketsCount
itemsCount := 10_000 * bucketsCount
m := make([]int, bucketsCount)
var labels []prompb.Label
for i := range itemsCount {
@@ -44,10 +44,12 @@ func TestGetLabelsHash_Distribution(t *testing.T) {
}
// Verify that the distribution is even
expectedItemsPerBucket := itemsCount / bucketsCount
expectedItemsPerBucket := float64(itemsCount / bucketsCount)
allowedDeviation := math.Round(float64(expectedItemsPerBucket) * 0.04)
for _, n := range m {
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
if math.Abs(expectedItemsPerBucket-float64(n)) > allowedDeviation {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want in range [%.0f, %.0f]",
bucketsCount, n, expectedItemsPerBucket-allowedDeviation, expectedItemsPerBucket+allowedDeviation)
}
}
}

View File

@@ -26,10 +26,12 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -23,6 +23,8 @@ The following label sanitization options can be enabled:
For example, `process.cpu.time{service.name="foo"}` is converted to `process_cpu_time_seconds_total{service_name="foo"}`.
* `-opentelemetry.convertMetricNamesToPrometheus` - converts **only metric names** according to [OTLP Metric points to Prometheus specification](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.33.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus) for metrics ingested via OTLP.
For example, `process.cpu.time{service.name="foo"}` is converted to `process_cpu_time_seconds_total{service.name="foo"}`. See more about this use case [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9830).
* `-opentelemetry.labelNameUnderscoreSanitization` - controls whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. Reserved labels starting with `__` are not modified.
For example, `_mylabel` is converted to `key_mylabel`.
> These flags can be applied on vmagent, vminsert or VictoriaMetrics single-node.

View File

@@ -961,6 +961,7 @@ func getLabelsHash(labels []prompb.Label) uint64 {
for _, label := range labels {
b = append(b, label.Name...)
b = append(b, '=')
b = append(b, label.Value...)
}
h := xxhash.Sum64(b)

View File

@@ -700,7 +700,7 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
StreamParse: true,
ScrapeTimeout: time.Second * 42,
SeriesLimit: 4000,
}, 3, 4015, 2, 50)
}, 3, 4012, 2, 50)
}
func TestScrapeWorkScrapeInternalWithMaxScrapeSize(t *testing.T) {

View File

@@ -16,6 +16,9 @@ var (
"via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/")
convertMetricNamesToPrometheus = flag.Bool("opentelemetry.convertMetricNamesToPrometheus", false, "Whether to convert only metric names into Prometheus-compatible format for the metrics ingested "+
"via OpenTelemetry protocol; see https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/")
labelNameUnderscoreSanitization = flag.Bool("opentelemetry.labelNameUnderscoreSanitization", true, "Whether to enable prepending of 'key' to labels starting with '_' "+
"when -opentelemetry.usePrometheusNaming is enabled. Reserved labels starting with '__' are not modified. "+
"See https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/")
)
// unitMap is obtained from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19
@@ -80,8 +83,6 @@ func (sctx *sanitizerContext) reset() {
sctx.labelBuf = sctx.labelBuf[:0]
}
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26
//
// The returned string is valid until the next call to sanitizeLabelName.
func (sctx *sanitizerContext) sanitizeLabelName(labelName string) string {
if !*usePrometheusNaming {
@@ -90,6 +91,8 @@ func (sctx *sanitizerContext) sanitizeLabelName(labelName string) string {
return sctx.sanitizePrometheusLabelName(labelName)
}
// sanitizePrometheusLabelName performs conversion and normalization of OpenTelemetry attributes to Prometheus labels.
// It follows the Prometheus guidelines: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus#labels
func (sctx *sanitizerContext) sanitizePrometheusLabelName(labelName string) string {
if len(labelName) == 0 {
return ""
@@ -97,7 +100,7 @@ func (sctx *sanitizerContext) sanitizePrometheusLabelName(labelName string) stri
labelName = promrelabel.SanitizeLabelName(labelName)
if labelName[0] >= '0' && labelName[0] <= '9' {
return sctx.concatLabel("key_", labelName)
} else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
} else if *labelNameUnderscoreSanitization && strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
return sctx.concatLabel("key", labelName)
}
return labelName

View File

@@ -24,6 +24,19 @@ func TestSanitizePrometheusLabelName(t *testing.T) {
f("1foo", "key_1foo")
f("_foo", "key_foo")
f("__bar", "__bar")
prev := *labelNameUnderscoreSanitization
*labelNameUnderscoreSanitization = false
defer func() {
*labelNameUnderscoreSanitization = prev
}()
f("", "")
f("foo", "foo")
f("foo_bar", "foo_bar")
f("1foo", "key_1foo")
f("_foo", "_foo")
f("__bar", "__bar")
}
func TestSanitizePrometheusMetricName(t *testing.T) {