diff --git a/app/vmalert/remotewrite/client_test.go b/app/vmalert/remotewrite/client_test.go index d8b71aefdb..224910ab29 100644 --- a/app/vmalert/remotewrite/client_test.go +++ b/app/vmalert/remotewrite/client_test.go @@ -103,7 +103,10 @@ func TestClient_run_maxBatchSizeDuringShutdown(t *testing.T) { // push time series to the client. for range pushCnt { - if err = rwClient.Push(prompb.TimeSeries{}); err != nil { + if err = rwClient.Push(prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "__name__", Value: "m"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1000}}, + }); err != nil { t.Fatalf("cannot time series to the client: %s", err) } } diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index d631a538b9..526c6c6cae 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -35,6 +35,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel * FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `__meta_hetzner_robot_datacenter` label for `robot` role in [hetzner_sd_configs](https://docs.victoriametrics.com/victoriametrics/sd_configs/#hetzner_sd_configs). See [#10909](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10909). Thanks to @juliusrickert for contribution. * FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-rule.stripFilePath` to support stripping rule file paths in logs and all API responses, including /metrics. See [#5625](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5625). * FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `formatTime` template function for formatting a Unix timestamp using the provided layout. For example, `{{ now | formatTime "2006-01-02T15:04:05Z07:00" }}` returns the current time in RFC3339 format. See issue [#10624](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10624). Thanks to @andriibeee for the contribution. +* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add support for [Prometheus native histogram](https://prometheus.io/docs/specs/native_histograms/) during ingestion. See [#10743](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743). ## [v1.142.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.142.0) diff --git a/docs/victoriametrics/integrations/prometheus.md b/docs/victoriametrics/integrations/prometheus.md index 4bf91a1e1c..ebee6d8b10 100644 --- a/docs/victoriametrics/integrations/prometheus.md +++ b/docs/victoriametrics/integrations/prometheus.md @@ -68,3 +68,15 @@ since previous versions may have issues with `remote_write`. Take a look at [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/), which can be used as faster and less resource-hungry alternative to Prometheus. + +## Native histograms + +Prometheus [native histogram](https://prometheus.io/docs/specs/native_histograms/) is automatically converted +to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) with `vmrange` labels during ingestion. + +> After conversion, a native histogram is transformed into classic histograms with `_count`, `_sum`, and `_bucket` series. These series can be queried using standard histogram functions such as `histogram_quantile()`. + + +## Remote Write 2.0 + +[Prometheus Remote-Write 2.0](https://prometheus.io/docs/specs/prw/remote_write_spec_2_0/) is still marked as experimental and is not currently supported by VictoriaMetrics. diff --git a/lib/prompb/fmt_buffer.go b/lib/prompb/fmt_buffer.go new file mode 100644 index 0000000000..f205f5e8ca --- /dev/null +++ b/lib/prompb/fmt_buffer.go @@ -0,0 +1,36 @@ +package prompb + +import ( + "strconv" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +type fmtBuffer struct { + buf []byte +} + +func (fb *fmtBuffer) reset() { + fb.buf = fb.buf[:0] +} + +func (fb *fmtBuffer) formatName(prefix, suffix string) string { + if prefix == "" { + // There is no prefix, so just return the suffix as is. + return suffix + } + + n := len(fb.buf) + fb.buf = append(fb.buf, prefix...) + fb.buf = append(fb.buf, suffix...) + + return bytesutil.ToUnsafeString(fb.buf[n:]) +} + +func (fb *fmtBuffer) formatVmrange(start, end float64) string { + n := len(fb.buf) + fb.buf = strconv.AppendFloat(fb.buf, start, 'e', 3, 64) + fb.buf = append(fb.buf, "..."...) + fb.buf = strconv.AppendFloat(fb.buf, end, 'e', 3, 64) + return bytesutil.ToUnsafeString(fb.buf[n:]) +} diff --git a/lib/prompb/prompb_test.go b/lib/prompb/prompb_test.go index e6a9107f0e..0d90b1f505 100644 --- a/lib/prompb/prompb_test.go +++ b/lib/prompb/prompb_test.go @@ -50,6 +50,12 @@ func TestWriteRequestMarshalUnmarshal(t *testing.T) { Value: "node-exporter", }, }, + Samples: []prompb.Sample{ + { + Value: 1, + Timestamp: 1000, + }, + }, }, }, }) diff --git a/lib/prompb/write_request_unmarshaler.go b/lib/prompb/write_request_unmarshaler.go index 501e358fd5..f4d16a28a5 100644 --- a/lib/prompb/write_request_unmarshaler.go +++ b/lib/prompb/write_request_unmarshaler.go @@ -2,6 +2,7 @@ package prompb import ( "fmt" + "math" "sync" "github.com/VictoriaMetrics/easyproto" @@ -37,6 +38,7 @@ type WriteRequestUnmarshaler struct { labelsPool []Label samplesPool []Sample + fb fmtBuffer } // Reset resets wru, so it could be re-used. @@ -48,6 +50,8 @@ func (wru *WriteRequestUnmarshaler) Reset() { clear(wru.samplesPool) wru.samplesPool = wru.samplesPool[:0] + + wru.fb.reset() } // UnmarshalProtobuf parses the given Protobuf-encoded `src` into an internal WriteRequest instance and returns a pointer to it. @@ -85,13 +89,7 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest if !ok { return nil, fmt.Errorf("cannot read timeseries data") } - if len(tss) < cap(tss) { - tss = tss[:len(tss)+1] - } else { - tss = append(tss, TimeSeries{}) - } - ts := &tss[len(tss)-1] - labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool) + tss, labelsPool, samplesPool, err = unmarshalTimeSeries(data, tss, labelsPool, samplesPool, &wru.fb) if err != nil { return nil, fmt.Errorf("cannot unmarshal timeseries: %w", err) } @@ -119,25 +117,31 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest return &wru.wr, nil } -func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) { +// unmarshalTimeSeries unmarshals TimeSeries messages, which can specify either samples or native histogram samples, but not both. +// See https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L133 +func unmarshalTimeSeries(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) { + labelsPoolLen := len(labelsPool) + samplesPoolLen := len(samplesPool) + + var histograms [][]byte + var fc easyproto.FieldContext + var err error + // message TimeSeries { // repeated Label labels = 1; // repeated Sample samples = 2; + // repeated Histogram histograms = 4 // } - labelsPoolLen := len(labelsPool) - samplesPoolLen := len(samplesPool) - var fc easyproto.FieldContext for len(src) > 0 { - var err error src, err = fc.NextField(src) if err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err) } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read label data") + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read label data") } if len(labelsPool) < cap(labelsPool) { labelsPool = labelsPool[:len(labelsPool)+1] @@ -146,12 +150,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } label := &labelsPool[len(labelsPool)-1] if err := label.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) + return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err) } case 2: data, ok := fc.MessageData() if !ok { - return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data") + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sample data") } if len(samplesPool) < cap(samplesPool) { samplesPool = samplesPool[:len(samplesPool)+1] @@ -160,15 +164,415 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP } sample := &samplesPool[len(samplesPool)-1] if err := sample.unmarshalProtobuf(data); err != nil { - return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) + return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err) + } + case 4: + data, ok := fc.MessageData() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read native histogram data") + } + histograms = append(histograms, data) + } + } + + baseLabels := labelsPool[labelsPoolLen:len(labelsPool):len(labelsPool)] + samples := samplesPool[samplesPoolLen:len(samplesPool):len(samplesPool)] + + if len(samples) > 0 && len(histograms) > 0 { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot have both samples and native histograms in the same TimeSeries") + } + + // classic series with normal samples + if len(samples) > 0 { + tss = appendTimeSeries(tss, baseLabels, samples) + return tss, labelsPool, samplesPool, nil + } + + for _, hdata := range histograms { + tss, labelsPool, samplesPool, err = unmarshalHistogram(hdata, tss, labelsPool, samplesPool, baseLabels, fb) + if err != nil { + return tss, labelsPool, samplesPool, fmt.Errorf("failed to unmarshal native histogram: %w", err) + } + } + + return tss, labelsPool, samplesPool, nil +} + +func appendTimeSeries(tss []TimeSeries, labels []Label, samples []Sample) []TimeSeries { + if len(tss) < cap(tss) { + tss = tss[:len(tss)+1] + } else { + tss = append(tss, TimeSeries{}) + } + ts := &tss[len(tss)-1] + ts.Labels = labels + ts.Samples = samples + return tss +} + +func unmarshalHistogram(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) { + // see https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L57 + // message Histogram { + // oneof count { // Count of observations in the histogram. + // uint64 count_int = 1; + // double count_float = 2; + // } + // double sum = 3; // Sum of observations in the histogram. + // sint32 schema = 4; + // double zero_threshold = 5; // Breadth of the zero bucket. + // oneof zero_count { // Count in zero bucket. + // uint64 zero_count_int = 6; + // double zero_count_float = 7; + // } + + // repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false]; + // repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + // repeated double negative_counts = 10; // Absolute count of each bucket. + + // repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false]; + // repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + // repeated double positive_counts = 13; // Absolute count of each bucket. + + // ResetHint reset_hint = 14; + // int64 timestamp = 15; + + // repeated double custom_values = 16; + // } + nhctx := getNativeHistogramContext() + defer putNativeHistogramContext(nhctx) + + var err error + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read next field: %w", err) + } + var ok bool + switch fc.FieldNum { + case 1: + nhctx.countInt, ok = fc.Uint64() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_int") + } + case 2: + nhctx.countFloat, ok = fc.Double() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_float") + } + nhctx.isCountFloat = true + case 3: + nhctx.sum, ok = fc.Double() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sum") + } + case 4: + nhctx.schema, ok = fc.Sint32() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read schema") + } + case 5: + nhctx.zeroThreshold, ok = fc.Double() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_threshold") + } + case 6: + nhctx.zeroCountInt, ok = fc.Uint64() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_int") + } + case 7: + nhctx.zeroCountFloat, ok = fc.Double() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_float") + } + nhctx.isZeroCountFloat = true + case 8: + data, ok := fc.MessageData() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_spans") + } + nhctx.negativeSpans, err = appendBucketSpan(nhctx.negativeSpans, data) + if err != nil { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode negative_spans: %w", err) + } + case 9: + nhctx.negativeDeltas, ok = fc.UnpackSint64s(nhctx.negativeDeltas) + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_deltas") + } + case 10: + nhctx.negativeCounts, ok = fc.UnpackDoubles(nhctx.negativeCounts) + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_counts") + } + case 11: + data, ok := fc.MessageData() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_spans") + } + nhctx.positiveSpans, err = appendBucketSpan(nhctx.positiveSpans, data) + if err != nil { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode positive_spans: %w", err) + } + case 12: + nhctx.positiveDeltas, ok = fc.UnpackSint64s(nhctx.positiveDeltas) + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_deltas") + } + case 13: + nhctx.positiveCounts, ok = fc.UnpackDoubles(nhctx.positiveCounts) + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_counts") + } + // case 14: reset_hint exposes extra reset info for query + case 15: + nhctx.timestamp, ok = fc.Int64() + if !ok { + return tss, labelsPool, samplesPool, fmt.Errorf("cannot read timestamp") + } + // case 16: custom_values — internal OTel→Prom only, skip + } + } + tss, labelsPool, samplesPool = nhctx.appendTimeSeries(tss, baseLabels, labelsPool, samplesPool, fb) + + return tss, labelsPool, samplesPool, nil +} + +func appendBucketSpan(spans []bucketSpan, src []byte) ([]bucketSpan, error) { + // message BucketSpan { + // sint32 offset = 1; // gap to previous span, or index of first bucket for the first span + // uint32 length = 2; // number of consecutive buckets in this span + // } + if len(spans) < cap(spans) { + spans = spans[:len(spans)+1] + } else { + spans = append(spans, bucketSpan{}) + } + span := &spans[len(spans)-1] + var err error + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return spans, fmt.Errorf("cannot read next field: %w", err) + } + var ok bool + switch fc.FieldNum { + case 1: + span.offset, ok = fc.Sint32() + if !ok { + return spans, fmt.Errorf("cannot read offset") + } + case 2: + span.length, ok = fc.Uint32() + if !ok { + return spans, fmt.Errorf("cannot read length") } } } - ts.Labels = labelsPool[labelsPoolLen:] - ts.Samples = samplesPool[samplesPoolLen:] - return labelsPool, samplesPool, nil + return spans, nil } +// appendTimeSeries converts the parsed native histogram into _count, _sum and _bucket +// TimeSeries and appends them to tss. +// See https://prometheus.io/docs/specs/native_histograms/#data-model +func (nhctx *nativeHistogramContext) appendTimeSeries(tss []TimeSeries, baseLabels []Label, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample) { + tsMillis := nhctx.timestamp + + count := float64(nhctx.countInt) + if nhctx.isCountFloat { + count = nhctx.countFloat + } + + var baseName string + var nameValueP *string + for i := range baseLabels { + if baseLabels[i].Name == "__name__" { + baseName = baseLabels[i].Value + nameValueP = &baseLabels[i].Value + break + } + } + // metric have no name, skip it + if baseName == "" { + return tss, labelsPool, samplesPool + } + + *nameValueP = fb.formatName(baseName, "_count") + tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, count) + *nameValueP = fb.formatName(baseName, "_sum") + tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, nhctx.sum) + + *nameValueP = fb.formatName(baseName, "_bucket") + zeroCount := float64(nhctx.zeroCountInt) + if nhctx.isZeroCountFloat { + zeroCount = nhctx.zeroCountFloat + } + if zeroCount > 0 { + vmrange := fb.formatVmrange(-nhctx.zeroThreshold, nhctx.zeroThreshold) + tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, zeroCount) + } + + ratio := math.Pow(2, -float64(nhctx.schema)) + base := math.Pow(2, ratio) + + tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.positiveSpans, nhctx.positiveDeltas, nhctx.positiveCounts, base, false, tsMillis) + tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.negativeSpans, nhctx.negativeDeltas, nhctx.negativeCounts, base, true, tsMillis) + + return tss, labelsPool, samplesPool +} + +// Bucket counts are stored either in deltas or floatCounts. +// deltas is used for regular histograms with integer counts, storing cumulative deltas; +// floatCounts is used for float histograms, storing absolute counts. +func appendSpanBuckets( + tss []TimeSeries, + labelsPool []Label, + samplesPool []Sample, + baseLabels []Label, + fb *fmtBuffer, + spans []bucketSpan, + deltas []int64, + floatCounts []float64, + base float64, + negative bool, + tsMillis int64, +) ([]TimeSeries, []Label, []Sample) { + useFloatCounts := len(floatCounts) > 0 + var bucketIdx int32 + var deltaIdx, floatIdx int + var cumDelta int64 + + for _, span := range spans { + bucketIdx += span.offset + for i := uint32(0); i < span.length; i++ { + var bucketCount float64 + if useFloatCounts { + if floatIdx >= len(floatCounts) { + return tss, labelsPool, samplesPool + } + bucketCount = floatCounts[floatIdx] + floatIdx++ + } else { + if deltaIdx >= len(deltas) { + return tss, labelsPool, samplesPool + } + cumDelta += deltas[deltaIdx] + deltaIdx++ + bucketCount = float64(cumDelta) + } + + if bucketCount > 0 { + upper := math.Pow(base, float64(bucketIdx)) + lower := upper / base + if negative { + lower, upper = -upper, -lower + } + vmrange := fb.formatVmrange(lower, upper) + tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, bucketCount) + } + bucketIdx++ + } + } + return tss, labelsPool, samplesPool +} + +func appendHistogramSeries(tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, vmrange string, + tsMillis int64, value float64, +) ([]TimeSeries, []Label, []Sample) { + labelsStart := len(labelsPool) + for _, l := range baseLabels { + if len(labelsPool) < cap(labelsPool) { + labelsPool = labelsPool[:len(labelsPool)+1] + } else { + labelsPool = append(labelsPool, Label{}) + } + labelsPool[len(labelsPool)-1] = l + } + if vmrange != "" { + if len(labelsPool) < cap(labelsPool) { + labelsPool = labelsPool[:len(labelsPool)+1] + } else { + labelsPool = append(labelsPool, Label{}) + } + l := &labelsPool[len(labelsPool)-1] + l.Name = "vmrange" + l.Value = vmrange + } + labels := labelsPool[labelsStart:len(labelsPool):len(labelsPool)] + + if len(samplesPool) < cap(samplesPool) { + samplesPool = samplesPool[:len(samplesPool)+1] + } else { + samplesPool = append(samplesPool, Sample{}) + } + s := &samplesPool[len(samplesPool)-1] + s.Value = value + s.Timestamp = tsMillis + samples := samplesPool[len(samplesPool)-1 : len(samplesPool) : len(samplesPool)] + + return appendTimeSeries(tss, labels, samples), labelsPool, samplesPool +} + +type bucketSpan struct { + offset int32 + length uint32 +} + +type nativeHistogramContext struct { + isCountFloat bool + countInt uint64 + countFloat float64 + sum float64 + schema int32 + zeroThreshold float64 + isZeroCountFloat bool + zeroCountInt uint64 + zeroCountFloat float64 + timestamp int64 + negativeSpans []bucketSpan + negativeDeltas []int64 + negativeCounts []float64 + positiveSpans []bucketSpan + positiveDeltas []int64 + positiveCounts []float64 +} + +func (nhctx *nativeHistogramContext) reset() { + nhctx.isCountFloat = false + nhctx.countInt = 0 + nhctx.countFloat = 0 + nhctx.sum = 0 + nhctx.schema = 0 + nhctx.zeroThreshold = 0 + nhctx.isZeroCountFloat = false + nhctx.zeroCountInt = 0 + nhctx.zeroCountFloat = 0 + nhctx.timestamp = 0 + nhctx.negativeSpans = nhctx.negativeSpans[:0] + nhctx.negativeDeltas = nhctx.negativeDeltas[:0] + nhctx.negativeCounts = nhctx.negativeCounts[:0] + nhctx.positiveSpans = nhctx.positiveSpans[:0] + nhctx.positiveDeltas = nhctx.positiveDeltas[:0] + nhctx.positiveCounts = nhctx.positiveCounts[:0] +} + +func getNativeHistogramContext() *nativeHistogramContext { + v := nhctxPool.Get() + if v == nil { + return &nativeHistogramContext{} + } + return v.(*nativeHistogramContext) +} + +func putNativeHistogramContext(nhctx *nativeHistogramContext) { + nhctx.reset() + nhctxPool.Put(nhctx) +} + +var nhctxPool sync.Pool + func (lbl *Label) unmarshalProtobuf(src []byte) (err error) { // message Label { // string name = 1; diff --git a/lib/prompb/write_request_unmarshaler_test.go b/lib/prompb/write_request_unmarshaler_test.go new file mode 100644 index 0000000000..33c1248905 --- /dev/null +++ b/lib/prompb/write_request_unmarshaler_test.go @@ -0,0 +1,348 @@ +package prompb + +import ( + "encoding/binary" + "math" + "reflect" + "testing" +) + +func TestUnmarshalTimeSeries(t *testing.T) { + f := func(src []byte, wantTSS []TimeSeries) { + t.Helper() + + var tss []TimeSeries + var err error + + tss, _, _, err = unmarshalTimeSeries(src, tss, nil, nil, &fmtBuffer{}) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(tss, wantTSS) { + t.Fatalf("unexpected result\ngot:\n%v\nwant:\n%v", tss, wantTSS) + } + } + + // classic time series with samples, no histogram + { + src := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}}, + []Sample{{Value: 1.5, Timestamp: 5000}}, + nil, + ) + f(src, []TimeSeries{ + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}}, + Samples: []Sample{{Value: 1.5, Timestamp: 5000}}, + }, + }) + } + + // basic positive histogram + { + nativeHistogramC := nativeHistogramContext{ + countInt: 13, + sum: 175.5, + schema: 0, + zeroThreshold: 0.00001, + zeroCountInt: 2, + positiveSpans: []bucketSpan{{offset: 0, length: 4}, {offset: 2, length: 1}}, + positiveDeltas: []int64{2, -1, 2, -1, 1}, + timestamp: 1000, + } + histogram := encodeHistogram(nativeHistogramC) + src := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}}, + nil, + [][]byte{histogram}, + ) + f(src, []TimeSeries{ + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}, {Name: "job", Value: "node-exporter"}}, + Samples: []Sample{{Value: 13, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}, {Name: "job", Value: "node-exporter"}}, + Samples: []Sample{{Value: 175.5, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}}, + Samples: []Sample{{Value: 2, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(0.5, 1)}}, + Samples: []Sample{{Value: 2, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 2)}}, + Samples: []Sample{{Value: 1, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(2, 4)}}, + Samples: []Sample{{Value: 3, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(4, 8)}}, + Samples: []Sample{{Value: 2, Timestamp: 1000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(32, 64)}}, + Samples: []Sample{{Value: 3, Timestamp: 1000}}, + }, + }) + } + + // basic negative histogram + { + nativeHistogramC := nativeHistogramContext{ + countInt: 7, + sum: -15.0, + schema: 0, + timestamp: 2000, + negativeSpans: []bucketSpan{{offset: 1, length: 2}}, + negativeDeltas: []int64{3, 1}, + } + histogram := encodeHistogram(nativeHistogramC) + src := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}}, + nil, + [][]byte{histogram}, + ) + f(src, []TimeSeries{ + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}}, + Samples: []Sample{{Value: 7, Timestamp: 2000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}}, + Samples: []Sample{{Value: -15.0, Timestamp: 2000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-2, -1)}}, + Samples: []Sample{{Value: 3, Timestamp: 2000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-4, -2)}}, + Samples: []Sample{{Value: 4, Timestamp: 2000}}, + }, + }) + } + + // float histogram + { + nativeHistogramC := nativeHistogramContext{ + countInt: 0, + isCountFloat: true, + countFloat: 2.5, + sum: 1.0, + schema: 1, + zeroThreshold: 0.00001, + isZeroCountFloat: true, + zeroCountFloat: 0.5, + timestamp: 3000, + positiveSpans: []bucketSpan{{offset: 0, length: 2}}, + positiveCounts: []float64{1.5, 1.0}, + } + histogram := encodeHistogram(nativeHistogramC) + src := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}}, + nil, + [][]byte{histogram}, + ) + f(src, []TimeSeries{ + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}}, + Samples: []Sample{{Value: 2.5, Timestamp: 3000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}}, + Samples: []Sample{{Value: 1.0, Timestamp: 3000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}}, + Samples: []Sample{{Value: 0.5, Timestamp: 3000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(0.7071, 1)}}, + Samples: []Sample{{Value: 1.5, Timestamp: 3000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 1.414)}}, + Samples: []Sample{{Value: 1.0, Timestamp: 3000}}, + }, + }) + } + + // count-only histogram: no buckets, just count and sum + { + histogram := encodeHistogram(nativeHistogramContext{ + countInt: 10, + sum: 42.0, + schema: 3, + timestamp: 4000, + }) + src := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}}, + nil, + [][]byte{histogram}, + ) + f(src, []TimeSeries{ + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}}, + Samples: []Sample{{Value: 10, Timestamp: 4000}}, + }, + { + Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}}, + Samples: []Sample{{Value: 42.0, Timestamp: 4000}}, + }, + }) + } +} + +func encodeTimeSeries(labels []Label, samples []Sample, histograms [][]byte) []byte { + var dst []byte + for _, l := range labels { + msg := pbEncodeLabel(l.Name, l.Value) + dst = pbAppendBytes(dst, 1, msg) + } + for _, s := range samples { + msg := pbEncodeSample(s.Value, s.Timestamp) + dst = pbAppendBytes(dst, 2, msg) + } + for _, h := range histograms { + dst = pbAppendBytes(dst, 4, h) + } + return dst +} + +func encodeHistogram(h nativeHistogramContext) []byte { + var dst []byte + + dst = pbAppendVarint(dst, 1, h.countInt) + if h.isCountFloat { + dst = pbAppendDouble(dst, 2, h.countFloat) + } + if h.sum != 0 { + dst = pbAppendDouble(dst, 3, h.sum) + } + if h.schema != 0 { + dst = pbAppendSint32(dst, 4, h.schema) + } + if h.zeroThreshold != 0 { + dst = pbAppendDouble(dst, 5, h.zeroThreshold) + } + dst = pbAppendVarint(dst, 6, h.zeroCountInt) + if h.isZeroCountFloat { + dst = pbAppendDouble(dst, 7, h.zeroCountFloat) + } + for _, span := range h.negativeSpans { + dst = pbAppendBytes(dst, 8, pbEncodeBucketSpan(span)) + } + if len(h.negativeDeltas) > 0 { + dst = pbAppendBytes(dst, 9, pbPackSint64s(h.negativeDeltas)) + } + if len(h.negativeCounts) > 0 { + dst = pbAppendBytes(dst, 10, pbPackDoubles(h.negativeCounts)) + } + for _, span := range h.positiveSpans { + dst = pbAppendBytes(dst, 11, pbEncodeBucketSpan(span)) + } + if len(h.positiveDeltas) > 0 { + dst = pbAppendBytes(dst, 12, pbPackSint64s(h.positiveDeltas)) + } + if len(h.positiveCounts) > 0 { + dst = pbAppendBytes(dst, 13, pbPackDoubles(h.positiveCounts)) + } + if h.timestamp != 0 { + dst = pbAppendVarint(dst, 15, uint64(h.timestamp)) + } + return dst +} + +func pbEncodeLabel(name, value string) []byte { + var dst []byte + dst = pbAppendBytes(dst, 1, []byte(name)) + dst = pbAppendBytes(dst, 2, []byte(value)) + return dst +} + +func pbEncodeSample(value float64, timestamp int64) []byte { + var dst []byte + if value != 0 { + dst = pbAppendDouble(dst, 1, value) + } + if timestamp != 0 { + dst = pbAppendVarint(dst, 2, uint64(timestamp)) + } + return dst +} + +func pbEncodeBucketSpan(span bucketSpan) []byte { + var dst []byte + if span.offset != 0 { + dst = pbAppendSint32(dst, 1, span.offset) + } + if span.length != 0 { + dst = pbAppendVarint(dst, 2, uint64(span.length)) + } + return dst +} + +func pbAppendVarint(dst []byte, field uint32, v uint64) []byte { + dst = appendProtoVarint(dst, uint64(field<<3)) + dst = appendProtoVarint(dst, v) + return dst +} + +func pbAppendDouble(dst []byte, field uint32, v float64) []byte { + dst = appendProtoVarint(dst, uint64(field<<3|1)) + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v)) + dst = append(dst, buf[:]...) + return dst +} + +func pbAppendSint32(dst []byte, field uint32, v int32) []byte { + dst = appendProtoVarint(dst, uint64(field<<3)) + dst = appendProtoVarint(dst, uint64((uint32(v)<<1)^uint32(v>>31))) + return dst +} + +func pbAppendBytes(dst []byte, field uint32, data []byte) []byte { + dst = appendProtoVarint(dst, uint64(field<<3|2)) + dst = appendProtoVarint(dst, uint64(len(data))) + dst = append(dst, data...) + return dst +} + +func pbPackSint64s(values []int64) []byte { + var dst []byte + for _, v := range values { + dst = appendProtoVarint(dst, uint64((uint64(v)<<1)^uint64(v>>63))) + } + return dst +} + +func pbPackDoubles(values []float64) []byte { + var dst []byte + for _, v := range values { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v)) + dst = append(dst, buf[:]...) + } + return dst +} + +func appendProtoVarint(dst []byte, v uint64) []byte { + for v >= 0x80 { + dst = append(dst, byte(v)|0x80) + v >>= 7 + } + dst = append(dst, byte(v)) + return dst +} + +func appendVmrangeHelper(lower float64, upper float64) string { + var fb fmtBuffer + return fb.formatVmrange(lower, upper) +} diff --git a/lib/prompb/write_request_unmarshaler_timing_test.go b/lib/prompb/write_request_unmarshaler_timing_test.go new file mode 100644 index 0000000000..3cb4233f33 --- /dev/null +++ b/lib/prompb/write_request_unmarshaler_timing_test.go @@ -0,0 +1,97 @@ +package prompb + +import ( + "fmt" + "testing" +) + +func BenchmarkUnmarshalProtobuf_ClassicSamples(b *testing.B) { + var src []byte + for i := 0; i < 1000; i++ { + ts := encodeTimeSeries( + []Label{ + { + Name: "__name__", + Value: "process_cpu_seconds_total", + }, + { + Name: "instance", + Value: fmt.Sprintf("host-%d:4567", i), + }, + { + Name: "job", + Value: "node-exporter", + }, + { + Name: "pod", + Value: "foo-bar-pod-8983423843", + }, + { + Name: "cpu", + Value: "1", + }, + { + Name: "mode", + Value: "system", + }, + { + Name: "node", + Value: "host-123", + }, + { + Name: "namespace", + Value: "foo-bar-baz", + }, + { + Name: "container", + Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i), + }, + }, + []Sample{{Value: float64(i), Timestamp: int64(i * 1000)}}, + nil, + ) + src = pbAppendBytes(src, 1, ts) + } + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + wru := GetWriteRequestUnmarshaler() + if _, err := wru.UnmarshalProtobuf(src); err != nil { + b.Fatal(err) + } + PutWriteRequestUnmarshaler(wru) + } +} + +func BenchmarkUnmarshalProtobuf_NativeHistogram(b *testing.B) { + var src []byte + histogram := encodeHistogram(nativeHistogramContext{ + countInt: 1072, + sum: 1750000.5, + schema: 0, + zeroThreshold: 0.00001, + zeroCountInt: 2, + positiveSpans: []bucketSpan{{offset: 0, length: 10}, {offset: 2, length: 10}}, + positiveDeltas: []int64{2, -1, 2, -1, 1, 1, 1, 100, 1, 1, 2, -1, -80, -1, 1, 1, 100, 1, 1, 1}, + timestamp: 1000, + }) + for i := 0; i < 100; i++ { + ts := encodeTimeSeries( + []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}}, + nil, + [][]byte{histogram}, + ) + src = pbAppendBytes(src, 1, ts) + } + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + wru := GetWriteRequestUnmarshaler() + if _, err := wru.UnmarshalProtobuf(src); err != nil { + b.Fatal(err) + } + PutWriteRequestUnmarshaler(wru) + } +}