Compare commits

...

4 Commits

Author SHA1 Message Date
f41gh7
44c0c7ed8d fix typos
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2026-06-25 13:32:03 +02:00
f41gh7
a3199ac210 address review comments 2026-06-24 17:14:58 +02:00
Nikolay
5ec24116d9 Update lib/timeserieslimits/timeseries_limits.go
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Signed-off-by: Nikolay <nik@victoriametrics.com>
2026-06-24 17:14:58 +02:00
f41gh7
062662f44f lib/storage: introduce limit for metric metadata fields
This commit adds a size limit for metric metadata fields.
Limit is hard-capped by `56kb` ( max uint16), since historically metric
metadata marshalled with those limit at cluster version. And it seems to
be a sane limit, it's unlikely that any valid metadata will exceed it.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128
2026-06-24 17:14:58 +02:00
7 changed files with 148 additions and 14 deletions

View File

@@ -565,6 +565,14 @@ func tryPushMetadataToRemoteStorages(at *auth.Token, rwctxs []*remoteWriteCtx, m
mm.ProjectID = at.ProjectID
}
}
tmp := mms[:0]
for _, mm := range mms {
if timeserieslimits.IsMetricMetadataExceeding(&mm) {
continue
}
tmp = append(tmp, mm)
}
mms = tmp
// Do not shard metadata even if -remoteWrite.shardByURL is set, just replicate it among rwctxs.
// Since metadata is usually small and there is no guarantee that metadata can be sent to
// the same remote storage with the corresponding metrics.

View File

@@ -175,13 +175,19 @@ func (ctx *InsertCtx) WriteMetadata(mmpbs []prompb.MetricMetadata) error {
}
mms := ctx.mms
mms = slicesutil.SetLength(mms, len(mmpbs))
for idx, mmpb := range mmpbs {
mm := &mms[idx]
var cnt int
for _, mmpb := range mmpbs {
if timeserieslimits.IsMetricMetadataExceeding(&mmpb) {
continue
}
mm := &mms[cnt]
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.MetricFamilyName)
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
mm.Type = mmpb.Type
mm.Unit = bytesutil.ToUnsafeBytes(mmpb.Unit)
cnt++
}
mms = mms[:cnt]
ctx.mms = mms
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
@@ -201,14 +207,19 @@ func (ctx *InsertCtx) WritePromMetadata(mmps []prometheus.Metadata) error {
}
mms := ctx.mms
mms = slicesutil.SetLength(mms, len(mmps))
for idx, mmpb := range mmps {
mm := &mms[idx]
var cnt int
for _, mmpb := range mmps {
mm := &mms[cnt]
if timeserieslimits.IsPrometheusMetadataExceeding(&mmpb) {
continue
}
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.Metric)
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
mm.Type = mmpb.Type
cnt++
}
mms = mms[:cnt]
ctx.mms = mms
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
if err != nil {
return &httpserver.ErrorWithStatusCode{

View File

@@ -2,6 +2,7 @@ package tests
import (
"fmt"
"math"
"testing"
"github.com/google/go-cmp/cmp"
@@ -25,7 +26,11 @@ func TestSingleMetricsMetadata(t *testing.T) {
if len(resp.Data) != 0 {
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
}
generateValueExceedLimit := func(prefix string) string {
buf := make([]byte, math.MaxUint16+len(prefix))
copy(buf, prefix)
return string(buf)
}
const ingestTimestamp = 1707123456700
prometheusTextDataSet := []string{
`# HELP metric_name_1 some help message`,
@@ -40,6 +45,12 @@ func TestSingleMetricsMetadata(t *testing.T) {
`# TYPE metric_name_3 gauge`,
`metric_name_3{label="baz"} 30`,
}
prometheusTextDataSet = append(prometheusTextDataSet,
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
`# TYPE metric_name_4 gauge`,
`metric_name_4{label="baz"} 30`,
)
prometheusRemoteWriteDataSet := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
@@ -52,6 +63,9 @@ func TestSingleMetricsMetadata(t *testing.T) {
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
},
}
@@ -137,6 +151,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
if len(resp.Data) != 0 {
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
}
generateValueExceedLimit := func(prefix string) string {
buf := make([]byte, math.MaxUint16+len(prefix))
copy(buf, prefix)
return string(buf)
}
const ingestTimestamp = 1707123456700
prometheusTextDataSet := []string{
@@ -152,6 +171,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
`# TYPE metric_name_3 gauge`,
`metric_name_3{label="baz"} 30`,
}
prometheusTextDataSet = append(prometheusTextDataSet,
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
`# TYPE metric_name_4 gauge`,
`metric_name_4{label="baz"} 30`,
)
prometheusRemoteWriteDataSet := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
@@ -164,6 +188,9 @@ func TestClusterMetricsMetadata(t *testing.T) {
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
},
}

View File

@@ -517,10 +517,15 @@ func TestClusterVMAgentForwardMetricsMetadata(t *testing.T) {
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
fmt.Sprintf(`-remoteWrite.url=http://%s/insert/multitenant/prometheus/api/v1/write`, sut.Vminsert.HTTPAddr()),
})
generateValueExceedLimit := func(prefix string) string {
buf := make([]byte, math.MaxUint16+len(prefix))
copy(buf, prefix)
return string(buf)
}
prometheusRemoteWriteDataSet := prompb.WriteRequest{
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary, AccountID: 100},
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset, AccountID: 100},
},
}
vmagent.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: "multitenant"})

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): introduce `65kb` size limit for `metric metadata` fields - `Unit`, `Help` and `MetricFamilyName`. See [#11128](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): propagate cache reset operation to `selectNode` when `/internal/resetRollupResultCache` is called. Previously, the propagation only happened when the `delete_series` API was called. See [#11112](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11112).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix possible unexpected increases in `rate_avg` and `rate_sum` if an out-of-order sample is ingested after the previous flush. See [#11140](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11140).

View File

@@ -2,6 +2,7 @@ package metricsmetadata
import (
"fmt"
"math"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -24,7 +25,7 @@ type Row struct {
}
// MarshalTo serializes Row into provided buffer and returns result
func (mr *Row) MarshalTo(dst []byte) []byte {
func (mr *Row) MarshalTo(dst []byte) ([]byte, error) {
dstLen := len(dst)
// tenant information (accountID and projectID)
dstSize := dstLen + 8
@@ -37,10 +38,20 @@ func (mr *Row) MarshalTo(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, mr.AccountID)
dst = encoding.MarshalUint32(dst, mr.ProjectID)
dst = encoding.MarshalUint32(dst, uint32(mr.Type))
dst = marshalBytesFast(dst, mr.MetricFamilyName)
dst = marshalBytesFast(dst, mr.Help)
dst = marshalBytesFast(dst, mr.Unit)
return dst
var err error
dst, err = marshalBytesFast(dst, mr.MetricFamilyName)
if err != nil {
return dst, fmt.Errorf("cannot marshal MetricFamilyName: %w", err)
}
dst, err = marshalBytesFast(dst, mr.Help)
if err != nil {
return dst, fmt.Errorf("cannot marshal Help: %w", err)
}
dst, err = marshalBytesFast(dst, mr.Unit)
if err != nil {
return dst, fmt.Errorf("cannot marshal Unit: %w", err)
}
return dst, nil
}
// Unmarshal parses Row from provided buffer and returns tail buffer
@@ -126,8 +137,11 @@ func UnmarshalRows(dst []Row, src []byte, maxRows int) ([]Row, []byte, error) {
return dst, src, nil
}
func marshalBytesFast(dst []byte, s []byte) []byte {
func marshalBytesFast(dst []byte, s []byte) ([]byte, error) {
if len(s) > math.MaxUint16 {
return dst, fmt.Errorf("size of s: %d cannot exceed max uint16", len(s))
}
dst = encoding.MarshalUint16(dst, uint16(len(s)))
dst = append(dst, s...)
return dst
return dst, nil
}

View File

@@ -1,6 +1,7 @@
package timeserieslimits
import (
"math"
"time"
"github.com/VictoriaMetrics/metrics"
@@ -8,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
var (
@@ -44,6 +46,9 @@ func Init(inputMaxLabelsPerTimeseries, inputMaxLabelNameLen, inputMaxLabelValueL
_ = metrics.GetOrCreateGauge(`vm_rows_ignored_total{reason="too_long_label_value"}`, func() float64 {
return float64(ignoredSeriesWithTooLongLabelValue.Load())
})
_ = metrics.GetOrCreateGauge(`vm_rows_ignored_total{reason="too_long_metric_metadata_value"}`, func() float64 {
return float64(ignoredMetricsMetadataWithTooLongValue.Load())
})
}
var (
@@ -61,6 +66,8 @@ var (
// ignoredSeriesWithTooLongLabelValue is the number of ignored series which contain labels with too long values
ignoredSeriesWithTooLongLabelValue atomicutil.Uint64
ignoredMetricsMetadataWithTooLongValue atomicutil.Uint64
)
func trackIgnoredSeriesWithTooManyLabels(labels []prompb.Label) {
@@ -132,3 +139,63 @@ func IsExceeding(labels []prompb.Label) bool {
}
return false
}
func trackIgnoredMetricMetadataWithTooLongValue(fieldName, metricName string, fieldSize int) {
ignoredMetricsMetadataWithTooLongValue.Add(1)
select {
case <-ignoredSeriesWithTooLongLabelValueLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
logger.Warnf("ignoring metric metadata with metric name %q; field %q value length=%d exceeds %d limit; "+
"reduce the size of field at metric metadata source.",
metricName, fieldName, fieldSize, metricMetadataMaxFieldValueSize)
default:
}
}
// metricMetadataMaxFieldValueSize defines max size of string fields at MetricMetadata
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128 for details
const metricMetadataMaxFieldValueSize = math.MaxUint16
// IsMetricMetadataExceeding checks if passed metricMetadata exceed size limit
// for the following fields:
// * Help
// * MetricFamilyName
// * Unit
//
// increments metrics and shows warning in logs
func IsMetricMetadataExceeding(md *prompb.MetricMetadata) bool {
if len(md.Help) > metricMetadataMaxFieldValueSize {
trackIgnoredMetricMetadataWithTooLongValue("help", md.MetricFamilyName, len(md.Help))
return true
}
if len(md.MetricFamilyName) > metricMetadataMaxFieldValueSize {
trackIgnoredMetricMetadataWithTooLongValue("metricFamilyName", md.MetricFamilyName, len(md.MetricFamilyName))
return true
}
if len(md.Unit) > metricMetadataMaxFieldValueSize {
trackIgnoredMetricMetadataWithTooLongValue("unit", md.MetricFamilyName, len(md.Unit))
return true
}
return false
}
// IsPrometheusMetadataExceeding checks if passed prometheus.Metadata exceed size limit
// for the following fields:
// * Help
// * Metric
//
// increments metrics and shows warning in logs
func IsPrometheusMetadataExceeding(md *prometheus.Metadata) bool {
if len(md.Help) > metricMetadataMaxFieldValueSize {
trackIgnoredMetricMetadataWithTooLongValue("help", md.Metric, len(md.Help))
return true
}
if len(md.Metric) > metricMetadataMaxFieldValueSize {
trackIgnoredMetricMetadataWithTooLongValue("metric", md.Metric, len(md.Metric))
return true
}
return false
}