lib/prompb: support prometheus native histogram during ingestion

This commit adds support for Prometheus Native Histogram https://prometheus.io/docs/specs/native_histograms data ingestion via Prometheus RemoteWrite format. It converts Native Histograms into VictoriaMetrics histogram format.

fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743
This commit is contained in:
Hui Wang
2026-05-08 01:06:51 +08:00
committed by GitHub
parent a13bfb3aaa
commit 76e0bcdf45
8 changed files with 928 additions and 21 deletions

View File

@@ -103,7 +103,10 @@ func TestClient_run_maxBatchSizeDuringShutdown(t *testing.T) {
// push time series to the client.
for range pushCnt {
if err = rwClient.Push(prompb.TimeSeries{}); err != nil {
if err = rwClient.Push(prompb.TimeSeries{
Labels: []prompb.Label{{Name: "__name__", Value: "m"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 1000}},
}); err != nil {
t.Fatalf("cannot time series to the client: %s", err)
}
}

View File

@@ -35,6 +35,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `__meta_hetzner_robot_datacenter` label for `robot` role in [hetzner_sd_configs](https://docs.victoriametrics.com/victoriametrics/sd_configs/#hetzner_sd_configs). See [#10909](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10909). Thanks to @juliusrickert for contribution.
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-rule.stripFilePath` to support stripping rule file paths in logs and all API responses, including /metrics. See [#5625](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5625).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `formatTime` template function for formatting a Unix timestamp using the provided layout. For example, `{{ now | formatTime "2006-01-02T15:04:05Z07:00" }}` returns the current time in RFC3339 format. See issue [#10624](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10624). Thanks to @andriibeee for the contribution.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add support for [Prometheus native histogram](https://prometheus.io/docs/specs/native_histograms/) during ingestion. See [#10743](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743).
## [v1.142.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.142.0)

View File

@@ -68,3 +68,15 @@ since previous versions may have issues with `remote_write`.
Take a look at [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),
which can be used as faster and less resource-hungry alternative to Prometheus.
## Native histograms
Prometheus [native histogram](https://prometheus.io/docs/specs/native_histograms/) is automatically converted
to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) with `vmrange` labels during ingestion.
> After conversion, a native histogram is transformed into classic histograms with `_count`, `_sum`, and `_bucket` series. These series can be queried using standard histogram functions such as `histogram_quantile()`.
## Remote Write 2.0
[Prometheus Remote-Write 2.0](https://prometheus.io/docs/specs/prw/remote_write_spec_2_0/) is still marked as experimental and is not currently supported by VictoriaMetrics.

36
lib/prompb/fmt_buffer.go Normal file
View File

@@ -0,0 +1,36 @@
package prompb
import (
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type fmtBuffer struct {
buf []byte
}
func (fb *fmtBuffer) reset() {
fb.buf = fb.buf[:0]
}
func (fb *fmtBuffer) formatName(prefix, suffix string) string {
if prefix == "" {
// There is no prefix, so just return the suffix as is.
return suffix
}
n := len(fb.buf)
fb.buf = append(fb.buf, prefix...)
fb.buf = append(fb.buf, suffix...)
return bytesutil.ToUnsafeString(fb.buf[n:])
}
func (fb *fmtBuffer) formatVmrange(start, end float64) string {
n := len(fb.buf)
fb.buf = strconv.AppendFloat(fb.buf, start, 'e', 3, 64)
fb.buf = append(fb.buf, "..."...)
fb.buf = strconv.AppendFloat(fb.buf, end, 'e', 3, 64)
return bytesutil.ToUnsafeString(fb.buf[n:])
}

View File

@@ -50,6 +50,12 @@ func TestWriteRequestMarshalUnmarshal(t *testing.T) {
Value: "node-exporter",
},
},
Samples: []prompb.Sample{
{
Value: 1,
Timestamp: 1000,
},
},
},
},
})

View File

@@ -2,6 +2,7 @@ package prompb
import (
"fmt"
"math"
"sync"
"github.com/VictoriaMetrics/easyproto"
@@ -37,6 +38,7 @@ type WriteRequestUnmarshaler struct {
labelsPool []Label
samplesPool []Sample
fb fmtBuffer
}
// Reset resets wru, so it could be re-used.
@@ -48,6 +50,8 @@ func (wru *WriteRequestUnmarshaler) Reset() {
clear(wru.samplesPool)
wru.samplesPool = wru.samplesPool[:0]
wru.fb.reset()
}
// UnmarshalProtobuf parses the given Protobuf-encoded `src` into an internal WriteRequest instance and returns a pointer to it.
@@ -85,13 +89,7 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest
if !ok {
return nil, fmt.Errorf("cannot read timeseries data")
}
if len(tss) < cap(tss) {
tss = tss[:len(tss)+1]
} else {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
tss, labelsPool, samplesPool, err = unmarshalTimeSeries(data, tss, labelsPool, samplesPool, &wru.fb)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal timeseries: %w", err)
}
@@ -119,25 +117,31 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest
return &wru.wr, nil
}
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
// unmarshalTimeSeries unmarshals TimeSeries messages, which can specify either samples or native histogram samples, but not both.
// See https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L133
func unmarshalTimeSeries(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) {
labelsPoolLen := len(labelsPool)
samplesPoolLen := len(samplesPool)
var histograms [][]byte
var fc easyproto.FieldContext
var err error
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// repeated Histogram histograms = 4
// }
labelsPoolLen := len(labelsPool)
samplesPoolLen := len(samplesPool)
var fc easyproto.FieldContext
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read label data")
}
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
@@ -146,12 +150,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
}
label := &labelsPool[len(labelsPool)-1]
if err := label.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
}
case 2:
data, ok := fc.MessageData()
if !ok {
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sample data")
}
if len(samplesPool) < cap(samplesPool) {
samplesPool = samplesPool[:len(samplesPool)+1]
@@ -160,15 +164,415 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
}
sample := &samplesPool[len(samplesPool)-1]
if err := sample.unmarshalProtobuf(data); err != nil {
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
}
case 4:
data, ok := fc.MessageData()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read native histogram data")
}
histograms = append(histograms, data)
}
}
baseLabels := labelsPool[labelsPoolLen:len(labelsPool):len(labelsPool)]
samples := samplesPool[samplesPoolLen:len(samplesPool):len(samplesPool)]
if len(samples) > 0 && len(histograms) > 0 {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot have both samples and native histograms in the same TimeSeries")
}
// classic series with normal samples
if len(samples) > 0 {
tss = appendTimeSeries(tss, baseLabels, samples)
return tss, labelsPool, samplesPool, nil
}
for _, hdata := range histograms {
tss, labelsPool, samplesPool, err = unmarshalHistogram(hdata, tss, labelsPool, samplesPool, baseLabels, fb)
if err != nil {
return tss, labelsPool, samplesPool, fmt.Errorf("failed to unmarshal native histogram: %w", err)
}
}
return tss, labelsPool, samplesPool, nil
}
func appendTimeSeries(tss []TimeSeries, labels []Label, samples []Sample) []TimeSeries {
if len(tss) < cap(tss) {
tss = tss[:len(tss)+1]
} else {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
ts.Labels = labels
ts.Samples = samples
return tss
}
func unmarshalHistogram(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) {
// see https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L57
// message Histogram {
// oneof count { // Count of observations in the histogram.
// uint64 count_int = 1;
// double count_float = 2;
// }
// double sum = 3; // Sum of observations in the histogram.
// sint32 schema = 4;
// double zero_threshold = 5; // Breadth of the zero bucket.
// oneof zero_count { // Count in zero bucket.
// uint64 zero_count_int = 6;
// double zero_count_float = 7;
// }
// repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
// repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
// repeated double negative_counts = 10; // Absolute count of each bucket.
// repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
// repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
// repeated double positive_counts = 13; // Absolute count of each bucket.
// ResetHint reset_hint = 14;
// int64 timestamp = 15;
// repeated double custom_values = 16;
// }
nhctx := getNativeHistogramContext()
defer putNativeHistogramContext(nhctx)
var err error
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read next field: %w", err)
}
var ok bool
switch fc.FieldNum {
case 1:
nhctx.countInt, ok = fc.Uint64()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_int")
}
case 2:
nhctx.countFloat, ok = fc.Double()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_float")
}
nhctx.isCountFloat = true
case 3:
nhctx.sum, ok = fc.Double()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sum")
}
case 4:
nhctx.schema, ok = fc.Sint32()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read schema")
}
case 5:
nhctx.zeroThreshold, ok = fc.Double()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_threshold")
}
case 6:
nhctx.zeroCountInt, ok = fc.Uint64()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_int")
}
case 7:
nhctx.zeroCountFloat, ok = fc.Double()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_float")
}
nhctx.isZeroCountFloat = true
case 8:
data, ok := fc.MessageData()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_spans")
}
nhctx.negativeSpans, err = appendBucketSpan(nhctx.negativeSpans, data)
if err != nil {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode negative_spans: %w", err)
}
case 9:
nhctx.negativeDeltas, ok = fc.UnpackSint64s(nhctx.negativeDeltas)
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_deltas")
}
case 10:
nhctx.negativeCounts, ok = fc.UnpackDoubles(nhctx.negativeCounts)
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_counts")
}
case 11:
data, ok := fc.MessageData()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_spans")
}
nhctx.positiveSpans, err = appendBucketSpan(nhctx.positiveSpans, data)
if err != nil {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode positive_spans: %w", err)
}
case 12:
nhctx.positiveDeltas, ok = fc.UnpackSint64s(nhctx.positiveDeltas)
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_deltas")
}
case 13:
nhctx.positiveCounts, ok = fc.UnpackDoubles(nhctx.positiveCounts)
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_counts")
}
// case 14: reset_hint exposes extra reset info for query
case 15:
nhctx.timestamp, ok = fc.Int64()
if !ok {
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read timestamp")
}
// case 16: custom_values — internal OTel→Prom only, skip
}
}
tss, labelsPool, samplesPool = nhctx.appendTimeSeries(tss, baseLabels, labelsPool, samplesPool, fb)
return tss, labelsPool, samplesPool, nil
}
func appendBucketSpan(spans []bucketSpan, src []byte) ([]bucketSpan, error) {
// message BucketSpan {
// sint32 offset = 1; // gap to previous span, or index of first bucket for the first span
// uint32 length = 2; // number of consecutive buckets in this span
// }
if len(spans) < cap(spans) {
spans = spans[:len(spans)+1]
} else {
spans = append(spans, bucketSpan{})
}
span := &spans[len(spans)-1]
var err error
var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
return spans, fmt.Errorf("cannot read next field: %w", err)
}
var ok bool
switch fc.FieldNum {
case 1:
span.offset, ok = fc.Sint32()
if !ok {
return spans, fmt.Errorf("cannot read offset")
}
case 2:
span.length, ok = fc.Uint32()
if !ok {
return spans, fmt.Errorf("cannot read length")
}
}
}
ts.Labels = labelsPool[labelsPoolLen:]
ts.Samples = samplesPool[samplesPoolLen:]
return labelsPool, samplesPool, nil
return spans, nil
}
// appendTimeSeries converts the parsed native histogram into _count, _sum and _bucket
// TimeSeries and appends them to tss.
// See https://prometheus.io/docs/specs/native_histograms/#data-model
func (nhctx *nativeHistogramContext) appendTimeSeries(tss []TimeSeries, baseLabels []Label, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample) {
tsMillis := nhctx.timestamp
count := float64(nhctx.countInt)
if nhctx.isCountFloat {
count = nhctx.countFloat
}
var baseName string
var nameValueP *string
for i := range baseLabels {
if baseLabels[i].Name == "__name__" {
baseName = baseLabels[i].Value
nameValueP = &baseLabels[i].Value
break
}
}
// metric have no name, skip it
if baseName == "" {
return tss, labelsPool, samplesPool
}
*nameValueP = fb.formatName(baseName, "_count")
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, count)
*nameValueP = fb.formatName(baseName, "_sum")
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, nhctx.sum)
*nameValueP = fb.formatName(baseName, "_bucket")
zeroCount := float64(nhctx.zeroCountInt)
if nhctx.isZeroCountFloat {
zeroCount = nhctx.zeroCountFloat
}
if zeroCount > 0 {
vmrange := fb.formatVmrange(-nhctx.zeroThreshold, nhctx.zeroThreshold)
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, zeroCount)
}
ratio := math.Pow(2, -float64(nhctx.schema))
base := math.Pow(2, ratio)
tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.positiveSpans, nhctx.positiveDeltas, nhctx.positiveCounts, base, false, tsMillis)
tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.negativeSpans, nhctx.negativeDeltas, nhctx.negativeCounts, base, true, tsMillis)
return tss, labelsPool, samplesPool
}
// Bucket counts are stored either in deltas or floatCounts.
// deltas is used for regular histograms with integer counts, storing cumulative deltas;
// floatCounts is used for float histograms, storing absolute counts.
func appendSpanBuckets(
tss []TimeSeries,
labelsPool []Label,
samplesPool []Sample,
baseLabels []Label,
fb *fmtBuffer,
spans []bucketSpan,
deltas []int64,
floatCounts []float64,
base float64,
negative bool,
tsMillis int64,
) ([]TimeSeries, []Label, []Sample) {
useFloatCounts := len(floatCounts) > 0
var bucketIdx int32
var deltaIdx, floatIdx int
var cumDelta int64
for _, span := range spans {
bucketIdx += span.offset
for i := uint32(0); i < span.length; i++ {
var bucketCount float64
if useFloatCounts {
if floatIdx >= len(floatCounts) {
return tss, labelsPool, samplesPool
}
bucketCount = floatCounts[floatIdx]
floatIdx++
} else {
if deltaIdx >= len(deltas) {
return tss, labelsPool, samplesPool
}
cumDelta += deltas[deltaIdx]
deltaIdx++
bucketCount = float64(cumDelta)
}
if bucketCount > 0 {
upper := math.Pow(base, float64(bucketIdx))
lower := upper / base
if negative {
lower, upper = -upper, -lower
}
vmrange := fb.formatVmrange(lower, upper)
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, bucketCount)
}
bucketIdx++
}
}
return tss, labelsPool, samplesPool
}
func appendHistogramSeries(tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, vmrange string,
tsMillis int64, value float64,
) ([]TimeSeries, []Label, []Sample) {
labelsStart := len(labelsPool)
for _, l := range baseLabels {
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
} else {
labelsPool = append(labelsPool, Label{})
}
labelsPool[len(labelsPool)-1] = l
}
if vmrange != "" {
if len(labelsPool) < cap(labelsPool) {
labelsPool = labelsPool[:len(labelsPool)+1]
} else {
labelsPool = append(labelsPool, Label{})
}
l := &labelsPool[len(labelsPool)-1]
l.Name = "vmrange"
l.Value = vmrange
}
labels := labelsPool[labelsStart:len(labelsPool):len(labelsPool)]
if len(samplesPool) < cap(samplesPool) {
samplesPool = samplesPool[:len(samplesPool)+1]
} else {
samplesPool = append(samplesPool, Sample{})
}
s := &samplesPool[len(samplesPool)-1]
s.Value = value
s.Timestamp = tsMillis
samples := samplesPool[len(samplesPool)-1 : len(samplesPool) : len(samplesPool)]
return appendTimeSeries(tss, labels, samples), labelsPool, samplesPool
}
type bucketSpan struct {
offset int32
length uint32
}
type nativeHistogramContext struct {
isCountFloat bool
countInt uint64
countFloat float64
sum float64
schema int32
zeroThreshold float64
isZeroCountFloat bool
zeroCountInt uint64
zeroCountFloat float64
timestamp int64
negativeSpans []bucketSpan
negativeDeltas []int64
negativeCounts []float64
positiveSpans []bucketSpan
positiveDeltas []int64
positiveCounts []float64
}
func (nhctx *nativeHistogramContext) reset() {
nhctx.isCountFloat = false
nhctx.countInt = 0
nhctx.countFloat = 0
nhctx.sum = 0
nhctx.schema = 0
nhctx.zeroThreshold = 0
nhctx.isZeroCountFloat = false
nhctx.zeroCountInt = 0
nhctx.zeroCountFloat = 0
nhctx.timestamp = 0
nhctx.negativeSpans = nhctx.negativeSpans[:0]
nhctx.negativeDeltas = nhctx.negativeDeltas[:0]
nhctx.negativeCounts = nhctx.negativeCounts[:0]
nhctx.positiveSpans = nhctx.positiveSpans[:0]
nhctx.positiveDeltas = nhctx.positiveDeltas[:0]
nhctx.positiveCounts = nhctx.positiveCounts[:0]
}
func getNativeHistogramContext() *nativeHistogramContext {
v := nhctxPool.Get()
if v == nil {
return &nativeHistogramContext{}
}
return v.(*nativeHistogramContext)
}
func putNativeHistogramContext(nhctx *nativeHistogramContext) {
nhctx.reset()
nhctxPool.Put(nhctx)
}
var nhctxPool sync.Pool
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
// message Label {
// string name = 1;

View File

@@ -0,0 +1,348 @@
package prompb
import (
"encoding/binary"
"math"
"reflect"
"testing"
)
func TestUnmarshalTimeSeries(t *testing.T) {
f := func(src []byte, wantTSS []TimeSeries) {
t.Helper()
var tss []TimeSeries
var err error
tss, _, _, err = unmarshalTimeSeries(src, tss, nil, nil, &fmtBuffer{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(tss, wantTSS) {
t.Fatalf("unexpected result\ngot:\n%v\nwant:\n%v", tss, wantTSS)
}
}
// classic time series with samples, no histogram
{
src := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
[]Sample{{Value: 1.5, Timestamp: 5000}},
nil,
)
f(src, []TimeSeries{
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
Samples: []Sample{{Value: 1.5, Timestamp: 5000}},
},
})
}
// basic positive histogram
{
nativeHistogramC := nativeHistogramContext{
countInt: 13,
sum: 175.5,
schema: 0,
zeroThreshold: 0.00001,
zeroCountInt: 2,
positiveSpans: []bucketSpan{{offset: 0, length: 4}, {offset: 2, length: 1}},
positiveDeltas: []int64{2, -1, 2, -1, 1},
timestamp: 1000,
}
histogram := encodeHistogram(nativeHistogramC)
src := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
nil,
[][]byte{histogram},
)
f(src, []TimeSeries{
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}, {Name: "job", Value: "node-exporter"}},
Samples: []Sample{{Value: 13, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}, {Name: "job", Value: "node-exporter"}},
Samples: []Sample{{Value: 175.5, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}},
Samples: []Sample{{Value: 2, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(0.5, 1)}},
Samples: []Sample{{Value: 2, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 2)}},
Samples: []Sample{{Value: 1, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(2, 4)}},
Samples: []Sample{{Value: 3, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(4, 8)}},
Samples: []Sample{{Value: 2, Timestamp: 1000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(32, 64)}},
Samples: []Sample{{Value: 3, Timestamp: 1000}},
},
})
}
// basic negative histogram
{
nativeHistogramC := nativeHistogramContext{
countInt: 7,
sum: -15.0,
schema: 0,
timestamp: 2000,
negativeSpans: []bucketSpan{{offset: 1, length: 2}},
negativeDeltas: []int64{3, 1},
}
histogram := encodeHistogram(nativeHistogramC)
src := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
nil,
[][]byte{histogram},
)
f(src, []TimeSeries{
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
Samples: []Sample{{Value: 7, Timestamp: 2000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
Samples: []Sample{{Value: -15.0, Timestamp: 2000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-2, -1)}},
Samples: []Sample{{Value: 3, Timestamp: 2000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-4, -2)}},
Samples: []Sample{{Value: 4, Timestamp: 2000}},
},
})
}
// float histogram
{
nativeHistogramC := nativeHistogramContext{
countInt: 0,
isCountFloat: true,
countFloat: 2.5,
sum: 1.0,
schema: 1,
zeroThreshold: 0.00001,
isZeroCountFloat: true,
zeroCountFloat: 0.5,
timestamp: 3000,
positiveSpans: []bucketSpan{{offset: 0, length: 2}},
positiveCounts: []float64{1.5, 1.0},
}
histogram := encodeHistogram(nativeHistogramC)
src := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
nil,
[][]byte{histogram},
)
f(src, []TimeSeries{
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
Samples: []Sample{{Value: 2.5, Timestamp: 3000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
Samples: []Sample{{Value: 1.0, Timestamp: 3000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}},
Samples: []Sample{{Value: 0.5, Timestamp: 3000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(0.7071, 1)}},
Samples: []Sample{{Value: 1.5, Timestamp: 3000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 1.414)}},
Samples: []Sample{{Value: 1.0, Timestamp: 3000}},
},
})
}
// count-only histogram: no buckets, just count and sum
{
histogram := encodeHistogram(nativeHistogramContext{
countInt: 10,
sum: 42.0,
schema: 3,
timestamp: 4000,
})
src := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
nil,
[][]byte{histogram},
)
f(src, []TimeSeries{
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
Samples: []Sample{{Value: 10, Timestamp: 4000}},
},
{
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
Samples: []Sample{{Value: 42.0, Timestamp: 4000}},
},
})
}
}
func encodeTimeSeries(labels []Label, samples []Sample, histograms [][]byte) []byte {
var dst []byte
for _, l := range labels {
msg := pbEncodeLabel(l.Name, l.Value)
dst = pbAppendBytes(dst, 1, msg)
}
for _, s := range samples {
msg := pbEncodeSample(s.Value, s.Timestamp)
dst = pbAppendBytes(dst, 2, msg)
}
for _, h := range histograms {
dst = pbAppendBytes(dst, 4, h)
}
return dst
}
func encodeHistogram(h nativeHistogramContext) []byte {
var dst []byte
dst = pbAppendVarint(dst, 1, h.countInt)
if h.isCountFloat {
dst = pbAppendDouble(dst, 2, h.countFloat)
}
if h.sum != 0 {
dst = pbAppendDouble(dst, 3, h.sum)
}
if h.schema != 0 {
dst = pbAppendSint32(dst, 4, h.schema)
}
if h.zeroThreshold != 0 {
dst = pbAppendDouble(dst, 5, h.zeroThreshold)
}
dst = pbAppendVarint(dst, 6, h.zeroCountInt)
if h.isZeroCountFloat {
dst = pbAppendDouble(dst, 7, h.zeroCountFloat)
}
for _, span := range h.negativeSpans {
dst = pbAppendBytes(dst, 8, pbEncodeBucketSpan(span))
}
if len(h.negativeDeltas) > 0 {
dst = pbAppendBytes(dst, 9, pbPackSint64s(h.negativeDeltas))
}
if len(h.negativeCounts) > 0 {
dst = pbAppendBytes(dst, 10, pbPackDoubles(h.negativeCounts))
}
for _, span := range h.positiveSpans {
dst = pbAppendBytes(dst, 11, pbEncodeBucketSpan(span))
}
if len(h.positiveDeltas) > 0 {
dst = pbAppendBytes(dst, 12, pbPackSint64s(h.positiveDeltas))
}
if len(h.positiveCounts) > 0 {
dst = pbAppendBytes(dst, 13, pbPackDoubles(h.positiveCounts))
}
if h.timestamp != 0 {
dst = pbAppendVarint(dst, 15, uint64(h.timestamp))
}
return dst
}
func pbEncodeLabel(name, value string) []byte {
var dst []byte
dst = pbAppendBytes(dst, 1, []byte(name))
dst = pbAppendBytes(dst, 2, []byte(value))
return dst
}
func pbEncodeSample(value float64, timestamp int64) []byte {
var dst []byte
if value != 0 {
dst = pbAppendDouble(dst, 1, value)
}
if timestamp != 0 {
dst = pbAppendVarint(dst, 2, uint64(timestamp))
}
return dst
}
func pbEncodeBucketSpan(span bucketSpan) []byte {
var dst []byte
if span.offset != 0 {
dst = pbAppendSint32(dst, 1, span.offset)
}
if span.length != 0 {
dst = pbAppendVarint(dst, 2, uint64(span.length))
}
return dst
}
func pbAppendVarint(dst []byte, field uint32, v uint64) []byte {
dst = appendProtoVarint(dst, uint64(field<<3))
dst = appendProtoVarint(dst, v)
return dst
}
func pbAppendDouble(dst []byte, field uint32, v float64) []byte {
dst = appendProtoVarint(dst, uint64(field<<3|1))
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v))
dst = append(dst, buf[:]...)
return dst
}
func pbAppendSint32(dst []byte, field uint32, v int32) []byte {
dst = appendProtoVarint(dst, uint64(field<<3))
dst = appendProtoVarint(dst, uint64((uint32(v)<<1)^uint32(v>>31)))
return dst
}
func pbAppendBytes(dst []byte, field uint32, data []byte) []byte {
dst = appendProtoVarint(dst, uint64(field<<3|2))
dst = appendProtoVarint(dst, uint64(len(data)))
dst = append(dst, data...)
return dst
}
func pbPackSint64s(values []int64) []byte {
var dst []byte
for _, v := range values {
dst = appendProtoVarint(dst, uint64((uint64(v)<<1)^uint64(v>>63)))
}
return dst
}
func pbPackDoubles(values []float64) []byte {
var dst []byte
for _, v := range values {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v))
dst = append(dst, buf[:]...)
}
return dst
}
func appendProtoVarint(dst []byte, v uint64) []byte {
for v >= 0x80 {
dst = append(dst, byte(v)|0x80)
v >>= 7
}
dst = append(dst, byte(v))
return dst
}
func appendVmrangeHelper(lower float64, upper float64) string {
var fb fmtBuffer
return fb.formatVmrange(lower, upper)
}

View File

@@ -0,0 +1,97 @@
package prompb
import (
"fmt"
"testing"
)
func BenchmarkUnmarshalProtobuf_ClassicSamples(b *testing.B) {
var src []byte
for i := 0; i < 1000; i++ {
ts := encodeTimeSeries(
[]Label{
{
Name: "__name__",
Value: "process_cpu_seconds_total",
},
{
Name: "instance",
Value: fmt.Sprintf("host-%d:4567", i),
},
{
Name: "job",
Value: "node-exporter",
},
{
Name: "pod",
Value: "foo-bar-pod-8983423843",
},
{
Name: "cpu",
Value: "1",
},
{
Name: "mode",
Value: "system",
},
{
Name: "node",
Value: "host-123",
},
{
Name: "namespace",
Value: "foo-bar-baz",
},
{
Name: "container",
Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i),
},
},
[]Sample{{Value: float64(i), Timestamp: int64(i * 1000)}},
nil,
)
src = pbAppendBytes(src, 1, ts)
}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
wru := GetWriteRequestUnmarshaler()
if _, err := wru.UnmarshalProtobuf(src); err != nil {
b.Fatal(err)
}
PutWriteRequestUnmarshaler(wru)
}
}
func BenchmarkUnmarshalProtobuf_NativeHistogram(b *testing.B) {
var src []byte
histogram := encodeHistogram(nativeHistogramContext{
countInt: 1072,
sum: 1750000.5,
schema: 0,
zeroThreshold: 0.00001,
zeroCountInt: 2,
positiveSpans: []bucketSpan{{offset: 0, length: 10}, {offset: 2, length: 10}},
positiveDeltas: []int64{2, -1, 2, -1, 1, 1, 1, 100, 1, 1, 2, -1, -80, -1, 1, 1, 100, 1, 1, 1},
timestamp: 1000,
})
for i := 0; i < 100; i++ {
ts := encodeTimeSeries(
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
nil,
[][]byte{histogram},
)
src = pbAppendBytes(src, 1, ts)
}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
wru := GetWriteRequestUnmarshaler()
if _, err := wru.UnmarshalProtobuf(src); err != nil {
b.Fatal(err)
}
PutWriteRequestUnmarshaler(wru)
}
}