mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-25 19:47:19 +03:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
44c0c7ed8d | ||
|
|
a3199ac210 | ||
|
|
5ec24116d9 | ||
|
|
062662f44f |
@@ -565,6 +565,14 @@ func tryPushMetadataToRemoteStorages(at *auth.Token, rwctxs []*remoteWriteCtx, m
|
||||
mm.ProjectID = at.ProjectID
|
||||
}
|
||||
}
|
||||
tmp := mms[:0]
|
||||
for _, mm := range mms {
|
||||
if timeserieslimits.IsMetricMetadataExceeding(&mm) {
|
||||
continue
|
||||
}
|
||||
tmp = append(tmp, mm)
|
||||
}
|
||||
mms = tmp
|
||||
// Do not shard metadata even if -remoteWrite.shardByURL is set, just replicate it among rwctxs.
|
||||
// Since metadata is usually small and there is no guarantee that metadata can be sent to
|
||||
// the same remote storage with the corresponding metrics.
|
||||
|
||||
@@ -175,13 +175,19 @@ func (ctx *InsertCtx) WriteMetadata(mmpbs []prompb.MetricMetadata) error {
|
||||
}
|
||||
mms := ctx.mms
|
||||
mms = slicesutil.SetLength(mms, len(mmpbs))
|
||||
for idx, mmpb := range mmpbs {
|
||||
mm := &mms[idx]
|
||||
var cnt int
|
||||
for _, mmpb := range mmpbs {
|
||||
if timeserieslimits.IsMetricMetadataExceeding(&mmpb) {
|
||||
continue
|
||||
}
|
||||
mm := &mms[cnt]
|
||||
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.MetricFamilyName)
|
||||
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
|
||||
mm.Type = mmpb.Type
|
||||
mm.Unit = bytesutil.ToUnsafeBytes(mmpb.Unit)
|
||||
cnt++
|
||||
}
|
||||
mms = mms[:cnt]
|
||||
ctx.mms = mms
|
||||
|
||||
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
|
||||
@@ -201,14 +207,19 @@ func (ctx *InsertCtx) WritePromMetadata(mmps []prometheus.Metadata) error {
|
||||
}
|
||||
mms := ctx.mms
|
||||
mms = slicesutil.SetLength(mms, len(mmps))
|
||||
for idx, mmpb := range mmps {
|
||||
mm := &mms[idx]
|
||||
var cnt int
|
||||
for _, mmpb := range mmps {
|
||||
mm := &mms[cnt]
|
||||
if timeserieslimits.IsPrometheusMetadataExceeding(&mmpb) {
|
||||
continue
|
||||
}
|
||||
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.Metric)
|
||||
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
|
||||
mm.Type = mmpb.Type
|
||||
cnt++
|
||||
}
|
||||
mms = mms[:cnt]
|
||||
ctx.mms = mms
|
||||
|
||||
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
|
||||
if err != nil {
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
|
||||
@@ -2,6 +2,7 @@ package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
@@ -25,7 +26,11 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
if len(resp.Data) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
|
||||
}
|
||||
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
const ingestTimestamp = 1707123456700
|
||||
prometheusTextDataSet := []string{
|
||||
`# HELP metric_name_1 some help message`,
|
||||
@@ -40,6 +45,12 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
`# TYPE metric_name_3 gauge`,
|
||||
`metric_name_3{label="baz"} 30`,
|
||||
}
|
||||
prometheusTextDataSet = append(prometheusTextDataSet,
|
||||
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
|
||||
`# TYPE metric_name_4 gauge`,
|
||||
`metric_name_4{label="baz"} 30`,
|
||||
)
|
||||
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
@@ -52,6 +63,9 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
|
||||
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -137,6 +151,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
if len(resp.Data) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
|
||||
}
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
|
||||
const ingestTimestamp = 1707123456700
|
||||
prometheusTextDataSet := []string{
|
||||
@@ -152,6 +171,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
`# TYPE metric_name_3 gauge`,
|
||||
`metric_name_3{label="baz"} 30`,
|
||||
}
|
||||
prometheusTextDataSet = append(prometheusTextDataSet,
|
||||
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
|
||||
`# TYPE metric_name_4 gauge`,
|
||||
`metric_name_4{label="baz"} 30`,
|
||||
)
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
@@ -164,6 +188,9 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
|
||||
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -517,10 +517,15 @@ func TestClusterVMAgentForwardMetricsMetadata(t *testing.T) {
|
||||
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
|
||||
fmt.Sprintf(`-remoteWrite.url=http://%s/insert/multitenant/prometheus/api/v1/write`, sut.Vminsert.HTTPAddr()),
|
||||
})
|
||||
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary, AccountID: 100},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset, AccountID: 100},
|
||||
},
|
||||
}
|
||||
vmagent.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
|
||||
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): introduce `65kb` size limit for `metric metadata` fields - `Unit`, `Help` and `MetricFamilyName`. See [#11128](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128).
|
||||
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): propagate cache reset operation to `selectNode` when `/internal/resetRollupResultCache` is called. Previously, the propagation only happened when the `delete_series` API was called. See [#11112](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11112).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix possible unexpected increases in `rate_avg` and `rate_sum` if an out-of-order sample is ingested after the previous flush. See [#11140](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11140).
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package metricsmetadata
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@@ -24,7 +25,7 @@ type Row struct {
|
||||
}
|
||||
|
||||
// MarshalTo serializes Row into provided buffer and returns result
|
||||
func (mr *Row) MarshalTo(dst []byte) []byte {
|
||||
func (mr *Row) MarshalTo(dst []byte) ([]byte, error) {
|
||||
dstLen := len(dst)
|
||||
// tenant information (accountID and projectID)
|
||||
dstSize := dstLen + 8
|
||||
@@ -37,10 +38,20 @@ func (mr *Row) MarshalTo(dst []byte) []byte {
|
||||
dst = encoding.MarshalUint32(dst, mr.AccountID)
|
||||
dst = encoding.MarshalUint32(dst, mr.ProjectID)
|
||||
dst = encoding.MarshalUint32(dst, uint32(mr.Type))
|
||||
dst = marshalBytesFast(dst, mr.MetricFamilyName)
|
||||
dst = marshalBytesFast(dst, mr.Help)
|
||||
dst = marshalBytesFast(dst, mr.Unit)
|
||||
return dst
|
||||
var err error
|
||||
dst, err = marshalBytesFast(dst, mr.MetricFamilyName)
|
||||
if err != nil {
|
||||
return dst, fmt.Errorf("cannot marshal MetricFamilyName: %w", err)
|
||||
}
|
||||
dst, err = marshalBytesFast(dst, mr.Help)
|
||||
if err != nil {
|
||||
return dst, fmt.Errorf("cannot marshal Help: %w", err)
|
||||
}
|
||||
dst, err = marshalBytesFast(dst, mr.Unit)
|
||||
if err != nil {
|
||||
return dst, fmt.Errorf("cannot marshal Unit: %w", err)
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Unmarshal parses Row from provided buffer and returns tail buffer
|
||||
@@ -126,8 +137,11 @@ func UnmarshalRows(dst []Row, src []byte, maxRows int) ([]Row, []byte, error) {
|
||||
return dst, src, nil
|
||||
}
|
||||
|
||||
func marshalBytesFast(dst []byte, s []byte) []byte {
|
||||
func marshalBytesFast(dst []byte, s []byte) ([]byte, error) {
|
||||
if len(s) > math.MaxUint16 {
|
||||
return dst, fmt.Errorf("size of s: %d cannot exceed max uint16", len(s))
|
||||
}
|
||||
dst = encoding.MarshalUint16(dst, uint16(len(s)))
|
||||
dst = append(dst, s...)
|
||||
return dst
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package timeserieslimits
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -8,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -44,6 +46,9 @@ func Init(inputMaxLabelsPerTimeseries, inputMaxLabelNameLen, inputMaxLabelValueL
|
||||
_ = metrics.GetOrCreateGauge(`vm_rows_ignored_total{reason="too_long_label_value"}`, func() float64 {
|
||||
return float64(ignoredSeriesWithTooLongLabelValue.Load())
|
||||
})
|
||||
_ = metrics.GetOrCreateGauge(`vm_rows_ignored_total{reason="too_long_metric_metadata_value"}`, func() float64 {
|
||||
return float64(ignoredMetricsMetadataWithTooLongValue.Load())
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -61,6 +66,8 @@ var (
|
||||
|
||||
// ignoredSeriesWithTooLongLabelValue is the number of ignored series which contain labels with too long values
|
||||
ignoredSeriesWithTooLongLabelValue atomicutil.Uint64
|
||||
|
||||
ignoredMetricsMetadataWithTooLongValue atomicutil.Uint64
|
||||
)
|
||||
|
||||
func trackIgnoredSeriesWithTooManyLabels(labels []prompb.Label) {
|
||||
@@ -132,3 +139,63 @@ func IsExceeding(labels []prompb.Label) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
func trackIgnoredMetricMetadataWithTooLongValue(fieldName, metricName string, fieldSize int) {
|
||||
ignoredMetricsMetadataWithTooLongValue.Add(1)
|
||||
select {
|
||||
case <-ignoredSeriesWithTooLongLabelValueLogTicker.C:
|
||||
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
|
||||
logger.Warnf("ignoring metric metadata with metric name %q; field %q value length=%d exceeds %d limit; "+
|
||||
"reduce the size of field at metric metadata source.",
|
||||
metricName, fieldName, fieldSize, metricMetadataMaxFieldValueSize)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// metricMetadataMaxFieldValueSize defines max size of string fields at MetricMetadata
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128 for details
|
||||
const metricMetadataMaxFieldValueSize = math.MaxUint16
|
||||
|
||||
// IsMetricMetadataExceeding checks if passed metricMetadata exceed size limit
|
||||
// for the following fields:
|
||||
// * Help
|
||||
// * MetricFamilyName
|
||||
// * Unit
|
||||
//
|
||||
// increments metrics and shows warning in logs
|
||||
func IsMetricMetadataExceeding(md *prompb.MetricMetadata) bool {
|
||||
if len(md.Help) > metricMetadataMaxFieldValueSize {
|
||||
trackIgnoredMetricMetadataWithTooLongValue("help", md.MetricFamilyName, len(md.Help))
|
||||
return true
|
||||
}
|
||||
if len(md.MetricFamilyName) > metricMetadataMaxFieldValueSize {
|
||||
trackIgnoredMetricMetadataWithTooLongValue("metricFamilyName", md.MetricFamilyName, len(md.MetricFamilyName))
|
||||
|
||||
return true
|
||||
}
|
||||
if len(md.Unit) > metricMetadataMaxFieldValueSize {
|
||||
trackIgnoredMetricMetadataWithTooLongValue("unit", md.MetricFamilyName, len(md.Unit))
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// IsPrometheusMetadataExceeding checks if passed prometheus.Metadata exceed size limit
|
||||
// for the following fields:
|
||||
// * Help
|
||||
// * Metric
|
||||
//
|
||||
// increments metrics and shows warning in logs
|
||||
func IsPrometheusMetadataExceeding(md *prometheus.Metadata) bool {
|
||||
if len(md.Help) > metricMetadataMaxFieldValueSize {
|
||||
trackIgnoredMetricMetadataWithTooLongValue("help", md.Metric, len(md.Help))
|
||||
return true
|
||||
}
|
||||
if len(md.Metric) > metricMetadataMaxFieldValueSize {
|
||||
trackIgnoredMetricMetadataWithTooLongValue("metric", md.Metric, len(md.Metric))
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user