mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
lib/prompb: support prometheus native histogram during ingestion
This commit adds support for Prometheus Native Histogram https://prometheus.io/docs/specs/native_histograms data ingestion via Prometheus RemoteWrite format. It converts Native Histograms into VictoriaMetrics histogram format. fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743
This commit is contained in:
@@ -103,7 +103,10 @@ func TestClient_run_maxBatchSizeDuringShutdown(t *testing.T) {
|
||||
|
||||
// push time series to the client.
|
||||
for range pushCnt {
|
||||
if err = rwClient.Push(prompb.TimeSeries{}); err != nil {
|
||||
if err = rwClient.Push(prompb.TimeSeries{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "m"}},
|
||||
Samples: []prompb.Sample{{Value: 1, Timestamp: 1000}},
|
||||
}); err != nil {
|
||||
t.Fatalf("cannot time series to the client: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `__meta_hetzner_robot_datacenter` label for `robot` role in [hetzner_sd_configs](https://docs.victoriametrics.com/victoriametrics/sd_configs/#hetzner_sd_configs). See [#10909](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10909). Thanks to @juliusrickert for contribution.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-rule.stripFilePath` to support stripping rule file paths in logs and all API responses, including /metrics. See [#5625](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5625).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `formatTime` template function for formatting a Unix timestamp using the provided layout. For example, `{{ now | formatTime "2006-01-02T15:04:05Z07:00" }}` returns the current time in RFC3339 format. See issue [#10624](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10624). Thanks to @andriibeee for the contribution.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add support for [Prometheus native histogram](https://prometheus.io/docs/specs/native_histograms/) during ingestion. See [#10743](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10743).
|
||||
|
||||
## [v1.142.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.142.0)
|
||||
|
||||
|
||||
@@ -68,3 +68,15 @@ since previous versions may have issues with `remote_write`.
|
||||
|
||||
Take a look at [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),
|
||||
which can be used as faster and less resource-hungry alternative to Prometheus.
|
||||
|
||||
## Native histograms
|
||||
|
||||
Prometheus [native histogram](https://prometheus.io/docs/specs/native_histograms/) is automatically converted
|
||||
to [VictoriaMetrics histogram format](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) with `vmrange` labels during ingestion.
|
||||
|
||||
> After conversion, a native histogram is transformed into classic histograms with `_count`, `_sum`, and `_bucket` series. These series can be queried using standard histogram functions such as `histogram_quantile()`.
|
||||
|
||||
|
||||
## Remote Write 2.0
|
||||
|
||||
[Prometheus Remote-Write 2.0](https://prometheus.io/docs/specs/prw/remote_write_spec_2_0/) is still marked as experimental and is not currently supported by VictoriaMetrics.
|
||||
|
||||
36
lib/prompb/fmt_buffer.go
Normal file
36
lib/prompb/fmt_buffer.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package prompb
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
)
|
||||
|
||||
type fmtBuffer struct {
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (fb *fmtBuffer) reset() {
|
||||
fb.buf = fb.buf[:0]
|
||||
}
|
||||
|
||||
func (fb *fmtBuffer) formatName(prefix, suffix string) string {
|
||||
if prefix == "" {
|
||||
// There is no prefix, so just return the suffix as is.
|
||||
return suffix
|
||||
}
|
||||
|
||||
n := len(fb.buf)
|
||||
fb.buf = append(fb.buf, prefix...)
|
||||
fb.buf = append(fb.buf, suffix...)
|
||||
|
||||
return bytesutil.ToUnsafeString(fb.buf[n:])
|
||||
}
|
||||
|
||||
func (fb *fmtBuffer) formatVmrange(start, end float64) string {
|
||||
n := len(fb.buf)
|
||||
fb.buf = strconv.AppendFloat(fb.buf, start, 'e', 3, 64)
|
||||
fb.buf = append(fb.buf, "..."...)
|
||||
fb.buf = strconv.AppendFloat(fb.buf, end, 'e', 3, 64)
|
||||
return bytesutil.ToUnsafeString(fb.buf[n:])
|
||||
}
|
||||
@@ -50,6 +50,12 @@ func TestWriteRequestMarshalUnmarshal(t *testing.T) {
|
||||
Value: "node-exporter",
|
||||
},
|
||||
},
|
||||
Samples: []prompb.Sample{
|
||||
{
|
||||
Value: 1,
|
||||
Timestamp: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -2,6 +2,7 @@ package prompb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/easyproto"
|
||||
@@ -37,6 +38,7 @@ type WriteRequestUnmarshaler struct {
|
||||
|
||||
labelsPool []Label
|
||||
samplesPool []Sample
|
||||
fb fmtBuffer
|
||||
}
|
||||
|
||||
// Reset resets wru, so it could be re-used.
|
||||
@@ -48,6 +50,8 @@ func (wru *WriteRequestUnmarshaler) Reset() {
|
||||
|
||||
clear(wru.samplesPool)
|
||||
wru.samplesPool = wru.samplesPool[:0]
|
||||
|
||||
wru.fb.reset()
|
||||
}
|
||||
|
||||
// UnmarshalProtobuf parses the given Protobuf-encoded `src` into an internal WriteRequest instance and returns a pointer to it.
|
||||
@@ -85,13 +89,7 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cannot read timeseries data")
|
||||
}
|
||||
if len(tss) < cap(tss) {
|
||||
tss = tss[:len(tss)+1]
|
||||
} else {
|
||||
tss = append(tss, TimeSeries{})
|
||||
}
|
||||
ts := &tss[len(tss)-1]
|
||||
labelsPool, samplesPool, err = ts.unmarshalProtobuf(data, labelsPool, samplesPool)
|
||||
tss, labelsPool, samplesPool, err = unmarshalTimeSeries(data, tss, labelsPool, samplesPool, &wru.fb)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot unmarshal timeseries: %w", err)
|
||||
}
|
||||
@@ -119,25 +117,31 @@ func (wru *WriteRequestUnmarshaler) UnmarshalProtobuf(src []byte) (*WriteRequest
|
||||
return &wru.wr, nil
|
||||
}
|
||||
|
||||
func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesPool []Sample) ([]Label, []Sample, error) {
|
||||
// unmarshalTimeSeries unmarshals TimeSeries messages, which can specify either samples or native histogram samples, but not both.
|
||||
// See https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L133
|
||||
func unmarshalTimeSeries(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) {
|
||||
labelsPoolLen := len(labelsPool)
|
||||
samplesPoolLen := len(samplesPool)
|
||||
|
||||
var histograms [][]byte
|
||||
var fc easyproto.FieldContext
|
||||
var err error
|
||||
|
||||
// message TimeSeries {
|
||||
// repeated Label labels = 1;
|
||||
// repeated Sample samples = 2;
|
||||
// repeated Histogram histograms = 4
|
||||
// }
|
||||
labelsPoolLen := len(labelsPool)
|
||||
samplesPoolLen := len(samplesPool)
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
var err error
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read the next field: %w", err)
|
||||
}
|
||||
switch fc.FieldNum {
|
||||
case 1:
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return labelsPool, samplesPool, fmt.Errorf("cannot read label data")
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read label data")
|
||||
}
|
||||
if len(labelsPool) < cap(labelsPool) {
|
||||
labelsPool = labelsPool[:len(labelsPool)+1]
|
||||
@@ -146,12 +150,12 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
|
||||
}
|
||||
label := &labelsPool[len(labelsPool)-1]
|
||||
if err := label.unmarshalProtobuf(data); err != nil {
|
||||
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal label: %w", err)
|
||||
}
|
||||
case 2:
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return labelsPool, samplesPool, fmt.Errorf("cannot read the sample data")
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sample data")
|
||||
}
|
||||
if len(samplesPool) < cap(samplesPool) {
|
||||
samplesPool = samplesPool[:len(samplesPool)+1]
|
||||
@@ -160,15 +164,415 @@ func (ts *TimeSeries) unmarshalProtobuf(src []byte, labelsPool []Label, samplesP
|
||||
}
|
||||
sample := &samplesPool[len(samplesPool)-1]
|
||||
if err := sample.unmarshalProtobuf(data); err != nil {
|
||||
return labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot unmarshal sample: %w", err)
|
||||
}
|
||||
case 4:
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read native histogram data")
|
||||
}
|
||||
histograms = append(histograms, data)
|
||||
}
|
||||
}
|
||||
|
||||
baseLabels := labelsPool[labelsPoolLen:len(labelsPool):len(labelsPool)]
|
||||
samples := samplesPool[samplesPoolLen:len(samplesPool):len(samplesPool)]
|
||||
|
||||
if len(samples) > 0 && len(histograms) > 0 {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot have both samples and native histograms in the same TimeSeries")
|
||||
}
|
||||
|
||||
// classic series with normal samples
|
||||
if len(samples) > 0 {
|
||||
tss = appendTimeSeries(tss, baseLabels, samples)
|
||||
return tss, labelsPool, samplesPool, nil
|
||||
}
|
||||
|
||||
for _, hdata := range histograms {
|
||||
tss, labelsPool, samplesPool, err = unmarshalHistogram(hdata, tss, labelsPool, samplesPool, baseLabels, fb)
|
||||
if err != nil {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("failed to unmarshal native histogram: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return tss, labelsPool, samplesPool, nil
|
||||
}
|
||||
|
||||
func appendTimeSeries(tss []TimeSeries, labels []Label, samples []Sample) []TimeSeries {
|
||||
if len(tss) < cap(tss) {
|
||||
tss = tss[:len(tss)+1]
|
||||
} else {
|
||||
tss = append(tss, TimeSeries{})
|
||||
}
|
||||
ts := &tss[len(tss)-1]
|
||||
ts.Labels = labels
|
||||
ts.Samples = samples
|
||||
return tss
|
||||
}
|
||||
|
||||
func unmarshalHistogram(src []byte, tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample, error) {
|
||||
// see https://github.com/prometheus/prometheus/blob/9a3ac8910b0476d0d73a5c36a54c55baec5829b6/prompb/types.proto#L57
|
||||
// message Histogram {
|
||||
// oneof count { // Count of observations in the histogram.
|
||||
// uint64 count_int = 1;
|
||||
// double count_float = 2;
|
||||
// }
|
||||
// double sum = 3; // Sum of observations in the histogram.
|
||||
// sint32 schema = 4;
|
||||
// double zero_threshold = 5; // Breadth of the zero bucket.
|
||||
// oneof zero_count { // Count in zero bucket.
|
||||
// uint64 zero_count_int = 6;
|
||||
// double zero_count_float = 7;
|
||||
// }
|
||||
|
||||
// repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
|
||||
// repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
|
||||
// repeated double negative_counts = 10; // Absolute count of each bucket.
|
||||
|
||||
// repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
|
||||
// repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
|
||||
// repeated double positive_counts = 13; // Absolute count of each bucket.
|
||||
|
||||
// ResetHint reset_hint = 14;
|
||||
// int64 timestamp = 15;
|
||||
|
||||
// repeated double custom_values = 16;
|
||||
// }
|
||||
nhctx := getNativeHistogramContext()
|
||||
defer putNativeHistogramContext(nhctx)
|
||||
|
||||
var err error
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read next field: %w", err)
|
||||
}
|
||||
var ok bool
|
||||
switch fc.FieldNum {
|
||||
case 1:
|
||||
nhctx.countInt, ok = fc.Uint64()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_int")
|
||||
}
|
||||
case 2:
|
||||
nhctx.countFloat, ok = fc.Double()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read count_float")
|
||||
}
|
||||
nhctx.isCountFloat = true
|
||||
case 3:
|
||||
nhctx.sum, ok = fc.Double()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read sum")
|
||||
}
|
||||
case 4:
|
||||
nhctx.schema, ok = fc.Sint32()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read schema")
|
||||
}
|
||||
case 5:
|
||||
nhctx.zeroThreshold, ok = fc.Double()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_threshold")
|
||||
}
|
||||
case 6:
|
||||
nhctx.zeroCountInt, ok = fc.Uint64()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_int")
|
||||
}
|
||||
case 7:
|
||||
nhctx.zeroCountFloat, ok = fc.Double()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read zero_count_float")
|
||||
}
|
||||
nhctx.isZeroCountFloat = true
|
||||
case 8:
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_spans")
|
||||
}
|
||||
nhctx.negativeSpans, err = appendBucketSpan(nhctx.negativeSpans, data)
|
||||
if err != nil {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode negative_spans: %w", err)
|
||||
}
|
||||
case 9:
|
||||
nhctx.negativeDeltas, ok = fc.UnpackSint64s(nhctx.negativeDeltas)
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_deltas")
|
||||
}
|
||||
case 10:
|
||||
nhctx.negativeCounts, ok = fc.UnpackDoubles(nhctx.negativeCounts)
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read negative_counts")
|
||||
}
|
||||
case 11:
|
||||
data, ok := fc.MessageData()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_spans")
|
||||
}
|
||||
nhctx.positiveSpans, err = appendBucketSpan(nhctx.positiveSpans, data)
|
||||
if err != nil {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot decode positive_spans: %w", err)
|
||||
}
|
||||
case 12:
|
||||
nhctx.positiveDeltas, ok = fc.UnpackSint64s(nhctx.positiveDeltas)
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_deltas")
|
||||
}
|
||||
case 13:
|
||||
nhctx.positiveCounts, ok = fc.UnpackDoubles(nhctx.positiveCounts)
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read positive_counts")
|
||||
}
|
||||
// case 14: reset_hint exposes extra reset info for query
|
||||
case 15:
|
||||
nhctx.timestamp, ok = fc.Int64()
|
||||
if !ok {
|
||||
return tss, labelsPool, samplesPool, fmt.Errorf("cannot read timestamp")
|
||||
}
|
||||
// case 16: custom_values — internal OTel→Prom only, skip
|
||||
}
|
||||
}
|
||||
tss, labelsPool, samplesPool = nhctx.appendTimeSeries(tss, baseLabels, labelsPool, samplesPool, fb)
|
||||
|
||||
return tss, labelsPool, samplesPool, nil
|
||||
}
|
||||
|
||||
func appendBucketSpan(spans []bucketSpan, src []byte) ([]bucketSpan, error) {
|
||||
// message BucketSpan {
|
||||
// sint32 offset = 1; // gap to previous span, or index of first bucket for the first span
|
||||
// uint32 length = 2; // number of consecutive buckets in this span
|
||||
// }
|
||||
if len(spans) < cap(spans) {
|
||||
spans = spans[:len(spans)+1]
|
||||
} else {
|
||||
spans = append(spans, bucketSpan{})
|
||||
}
|
||||
span := &spans[len(spans)-1]
|
||||
var err error
|
||||
var fc easyproto.FieldContext
|
||||
for len(src) > 0 {
|
||||
src, err = fc.NextField(src)
|
||||
if err != nil {
|
||||
return spans, fmt.Errorf("cannot read next field: %w", err)
|
||||
}
|
||||
var ok bool
|
||||
switch fc.FieldNum {
|
||||
case 1:
|
||||
span.offset, ok = fc.Sint32()
|
||||
if !ok {
|
||||
return spans, fmt.Errorf("cannot read offset")
|
||||
}
|
||||
case 2:
|
||||
span.length, ok = fc.Uint32()
|
||||
if !ok {
|
||||
return spans, fmt.Errorf("cannot read length")
|
||||
}
|
||||
}
|
||||
}
|
||||
ts.Labels = labelsPool[labelsPoolLen:]
|
||||
ts.Samples = samplesPool[samplesPoolLen:]
|
||||
return labelsPool, samplesPool, nil
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
// appendTimeSeries converts the parsed native histogram into _count, _sum and _bucket
|
||||
// TimeSeries and appends them to tss.
|
||||
// See https://prometheus.io/docs/specs/native_histograms/#data-model
|
||||
func (nhctx *nativeHistogramContext) appendTimeSeries(tss []TimeSeries, baseLabels []Label, labelsPool []Label, samplesPool []Sample, fb *fmtBuffer) ([]TimeSeries, []Label, []Sample) {
|
||||
tsMillis := nhctx.timestamp
|
||||
|
||||
count := float64(nhctx.countInt)
|
||||
if nhctx.isCountFloat {
|
||||
count = nhctx.countFloat
|
||||
}
|
||||
|
||||
var baseName string
|
||||
var nameValueP *string
|
||||
for i := range baseLabels {
|
||||
if baseLabels[i].Name == "__name__" {
|
||||
baseName = baseLabels[i].Value
|
||||
nameValueP = &baseLabels[i].Value
|
||||
break
|
||||
}
|
||||
}
|
||||
// metric have no name, skip it
|
||||
if baseName == "" {
|
||||
return tss, labelsPool, samplesPool
|
||||
}
|
||||
|
||||
*nameValueP = fb.formatName(baseName, "_count")
|
||||
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, count)
|
||||
*nameValueP = fb.formatName(baseName, "_sum")
|
||||
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, "", tsMillis, nhctx.sum)
|
||||
|
||||
*nameValueP = fb.formatName(baseName, "_bucket")
|
||||
zeroCount := float64(nhctx.zeroCountInt)
|
||||
if nhctx.isZeroCountFloat {
|
||||
zeroCount = nhctx.zeroCountFloat
|
||||
}
|
||||
if zeroCount > 0 {
|
||||
vmrange := fb.formatVmrange(-nhctx.zeroThreshold, nhctx.zeroThreshold)
|
||||
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, zeroCount)
|
||||
}
|
||||
|
||||
ratio := math.Pow(2, -float64(nhctx.schema))
|
||||
base := math.Pow(2, ratio)
|
||||
|
||||
tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.positiveSpans, nhctx.positiveDeltas, nhctx.positiveCounts, base, false, tsMillis)
|
||||
tss, labelsPool, samplesPool = appendSpanBuckets(tss, labelsPool, samplesPool, baseLabels, fb, nhctx.negativeSpans, nhctx.negativeDeltas, nhctx.negativeCounts, base, true, tsMillis)
|
||||
|
||||
return tss, labelsPool, samplesPool
|
||||
}
|
||||
|
||||
// Bucket counts are stored either in deltas or floatCounts.
|
||||
// deltas is used for regular histograms with integer counts, storing cumulative deltas;
|
||||
// floatCounts is used for float histograms, storing absolute counts.
|
||||
func appendSpanBuckets(
|
||||
tss []TimeSeries,
|
||||
labelsPool []Label,
|
||||
samplesPool []Sample,
|
||||
baseLabels []Label,
|
||||
fb *fmtBuffer,
|
||||
spans []bucketSpan,
|
||||
deltas []int64,
|
||||
floatCounts []float64,
|
||||
base float64,
|
||||
negative bool,
|
||||
tsMillis int64,
|
||||
) ([]TimeSeries, []Label, []Sample) {
|
||||
useFloatCounts := len(floatCounts) > 0
|
||||
var bucketIdx int32
|
||||
var deltaIdx, floatIdx int
|
||||
var cumDelta int64
|
||||
|
||||
for _, span := range spans {
|
||||
bucketIdx += span.offset
|
||||
for i := uint32(0); i < span.length; i++ {
|
||||
var bucketCount float64
|
||||
if useFloatCounts {
|
||||
if floatIdx >= len(floatCounts) {
|
||||
return tss, labelsPool, samplesPool
|
||||
}
|
||||
bucketCount = floatCounts[floatIdx]
|
||||
floatIdx++
|
||||
} else {
|
||||
if deltaIdx >= len(deltas) {
|
||||
return tss, labelsPool, samplesPool
|
||||
}
|
||||
cumDelta += deltas[deltaIdx]
|
||||
deltaIdx++
|
||||
bucketCount = float64(cumDelta)
|
||||
}
|
||||
|
||||
if bucketCount > 0 {
|
||||
upper := math.Pow(base, float64(bucketIdx))
|
||||
lower := upper / base
|
||||
if negative {
|
||||
lower, upper = -upper, -lower
|
||||
}
|
||||
vmrange := fb.formatVmrange(lower, upper)
|
||||
tss, labelsPool, samplesPool = appendHistogramSeries(tss, labelsPool, samplesPool, baseLabels, vmrange, tsMillis, bucketCount)
|
||||
}
|
||||
bucketIdx++
|
||||
}
|
||||
}
|
||||
return tss, labelsPool, samplesPool
|
||||
}
|
||||
|
||||
func appendHistogramSeries(tss []TimeSeries, labelsPool []Label, samplesPool []Sample, baseLabels []Label, vmrange string,
|
||||
tsMillis int64, value float64,
|
||||
) ([]TimeSeries, []Label, []Sample) {
|
||||
labelsStart := len(labelsPool)
|
||||
for _, l := range baseLabels {
|
||||
if len(labelsPool) < cap(labelsPool) {
|
||||
labelsPool = labelsPool[:len(labelsPool)+1]
|
||||
} else {
|
||||
labelsPool = append(labelsPool, Label{})
|
||||
}
|
||||
labelsPool[len(labelsPool)-1] = l
|
||||
}
|
||||
if vmrange != "" {
|
||||
if len(labelsPool) < cap(labelsPool) {
|
||||
labelsPool = labelsPool[:len(labelsPool)+1]
|
||||
} else {
|
||||
labelsPool = append(labelsPool, Label{})
|
||||
}
|
||||
l := &labelsPool[len(labelsPool)-1]
|
||||
l.Name = "vmrange"
|
||||
l.Value = vmrange
|
||||
}
|
||||
labels := labelsPool[labelsStart:len(labelsPool):len(labelsPool)]
|
||||
|
||||
if len(samplesPool) < cap(samplesPool) {
|
||||
samplesPool = samplesPool[:len(samplesPool)+1]
|
||||
} else {
|
||||
samplesPool = append(samplesPool, Sample{})
|
||||
}
|
||||
s := &samplesPool[len(samplesPool)-1]
|
||||
s.Value = value
|
||||
s.Timestamp = tsMillis
|
||||
samples := samplesPool[len(samplesPool)-1 : len(samplesPool) : len(samplesPool)]
|
||||
|
||||
return appendTimeSeries(tss, labels, samples), labelsPool, samplesPool
|
||||
}
|
||||
|
||||
type bucketSpan struct {
|
||||
offset int32
|
||||
length uint32
|
||||
}
|
||||
|
||||
type nativeHistogramContext struct {
|
||||
isCountFloat bool
|
||||
countInt uint64
|
||||
countFloat float64
|
||||
sum float64
|
||||
schema int32
|
||||
zeroThreshold float64
|
||||
isZeroCountFloat bool
|
||||
zeroCountInt uint64
|
||||
zeroCountFloat float64
|
||||
timestamp int64
|
||||
negativeSpans []bucketSpan
|
||||
negativeDeltas []int64
|
||||
negativeCounts []float64
|
||||
positiveSpans []bucketSpan
|
||||
positiveDeltas []int64
|
||||
positiveCounts []float64
|
||||
}
|
||||
|
||||
func (nhctx *nativeHistogramContext) reset() {
|
||||
nhctx.isCountFloat = false
|
||||
nhctx.countInt = 0
|
||||
nhctx.countFloat = 0
|
||||
nhctx.sum = 0
|
||||
nhctx.schema = 0
|
||||
nhctx.zeroThreshold = 0
|
||||
nhctx.isZeroCountFloat = false
|
||||
nhctx.zeroCountInt = 0
|
||||
nhctx.zeroCountFloat = 0
|
||||
nhctx.timestamp = 0
|
||||
nhctx.negativeSpans = nhctx.negativeSpans[:0]
|
||||
nhctx.negativeDeltas = nhctx.negativeDeltas[:0]
|
||||
nhctx.negativeCounts = nhctx.negativeCounts[:0]
|
||||
nhctx.positiveSpans = nhctx.positiveSpans[:0]
|
||||
nhctx.positiveDeltas = nhctx.positiveDeltas[:0]
|
||||
nhctx.positiveCounts = nhctx.positiveCounts[:0]
|
||||
}
|
||||
|
||||
func getNativeHistogramContext() *nativeHistogramContext {
|
||||
v := nhctxPool.Get()
|
||||
if v == nil {
|
||||
return &nativeHistogramContext{}
|
||||
}
|
||||
return v.(*nativeHistogramContext)
|
||||
}
|
||||
|
||||
func putNativeHistogramContext(nhctx *nativeHistogramContext) {
|
||||
nhctx.reset()
|
||||
nhctxPool.Put(nhctx)
|
||||
}
|
||||
|
||||
var nhctxPool sync.Pool
|
||||
|
||||
func (lbl *Label) unmarshalProtobuf(src []byte) (err error) {
|
||||
// message Label {
|
||||
// string name = 1;
|
||||
|
||||
348
lib/prompb/write_request_unmarshaler_test.go
Normal file
348
lib/prompb/write_request_unmarshaler_test.go
Normal file
@@ -0,0 +1,348 @@
|
||||
package prompb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUnmarshalTimeSeries(t *testing.T) {
|
||||
f := func(src []byte, wantTSS []TimeSeries) {
|
||||
t.Helper()
|
||||
|
||||
var tss []TimeSeries
|
||||
var err error
|
||||
|
||||
tss, _, _, err = unmarshalTimeSeries(src, tss, nil, nil, &fmtBuffer{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(tss, wantTSS) {
|
||||
t.Fatalf("unexpected result\ngot:\n%v\nwant:\n%v", tss, wantTSS)
|
||||
}
|
||||
}
|
||||
|
||||
// classic time series with samples, no histogram
|
||||
{
|
||||
src := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
|
||||
[]Sample{{Value: 1.5, Timestamp: 5000}},
|
||||
nil,
|
||||
)
|
||||
f(src, []TimeSeries{
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
|
||||
Samples: []Sample{{Value: 1.5, Timestamp: 5000}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// basic positive histogram
|
||||
{
|
||||
nativeHistogramC := nativeHistogramContext{
|
||||
countInt: 13,
|
||||
sum: 175.5,
|
||||
schema: 0,
|
||||
zeroThreshold: 0.00001,
|
||||
zeroCountInt: 2,
|
||||
positiveSpans: []bucketSpan{{offset: 0, length: 4}, {offset: 2, length: 1}},
|
||||
positiveDeltas: []int64{2, -1, 2, -1, 1},
|
||||
timestamp: 1000,
|
||||
}
|
||||
histogram := encodeHistogram(nativeHistogramC)
|
||||
src := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
|
||||
nil,
|
||||
[][]byte{histogram},
|
||||
)
|
||||
f(src, []TimeSeries{
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}, {Name: "job", Value: "node-exporter"}},
|
||||
Samples: []Sample{{Value: 13, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}, {Name: "job", Value: "node-exporter"}},
|
||||
Samples: []Sample{{Value: 175.5, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}},
|
||||
Samples: []Sample{{Value: 2, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(0.5, 1)}},
|
||||
Samples: []Sample{{Value: 2, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 2)}},
|
||||
Samples: []Sample{{Value: 1, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(2, 4)}},
|
||||
Samples: []Sample{{Value: 3, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(4, 8)}},
|
||||
Samples: []Sample{{Value: 2, Timestamp: 1000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "job", Value: "node-exporter"}, {Name: "vmrange", Value: appendVmrangeHelper(32, 64)}},
|
||||
Samples: []Sample{{Value: 3, Timestamp: 1000}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// basic negative histogram
|
||||
{
|
||||
nativeHistogramC := nativeHistogramContext{
|
||||
countInt: 7,
|
||||
sum: -15.0,
|
||||
schema: 0,
|
||||
timestamp: 2000,
|
||||
negativeSpans: []bucketSpan{{offset: 1, length: 2}},
|
||||
negativeDeltas: []int64{3, 1},
|
||||
}
|
||||
histogram := encodeHistogram(nativeHistogramC)
|
||||
src := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
|
||||
nil,
|
||||
[][]byte{histogram},
|
||||
)
|
||||
f(src, []TimeSeries{
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
|
||||
Samples: []Sample{{Value: 7, Timestamp: 2000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
|
||||
Samples: []Sample{{Value: -15.0, Timestamp: 2000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-2, -1)}},
|
||||
Samples: []Sample{{Value: 3, Timestamp: 2000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-4, -2)}},
|
||||
Samples: []Sample{{Value: 4, Timestamp: 2000}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// float histogram
|
||||
{
|
||||
nativeHistogramC := nativeHistogramContext{
|
||||
countInt: 0,
|
||||
isCountFloat: true,
|
||||
countFloat: 2.5,
|
||||
sum: 1.0,
|
||||
schema: 1,
|
||||
zeroThreshold: 0.00001,
|
||||
isZeroCountFloat: true,
|
||||
zeroCountFloat: 0.5,
|
||||
timestamp: 3000,
|
||||
positiveSpans: []bucketSpan{{offset: 0, length: 2}},
|
||||
positiveCounts: []float64{1.5, 1.0},
|
||||
}
|
||||
histogram := encodeHistogram(nativeHistogramC)
|
||||
src := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
|
||||
nil,
|
||||
[][]byte{histogram},
|
||||
)
|
||||
f(src, []TimeSeries{
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
|
||||
Samples: []Sample{{Value: 2.5, Timestamp: 3000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
|
||||
Samples: []Sample{{Value: 1.0, Timestamp: 3000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(-0.00001, 0.00001)}},
|
||||
Samples: []Sample{{Value: 0.5, Timestamp: 3000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(0.7071, 1)}},
|
||||
Samples: []Sample{{Value: 1.5, Timestamp: 3000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_bucket"}, {Name: "vmrange", Value: appendVmrangeHelper(1, 1.414)}},
|
||||
Samples: []Sample{{Value: 1.0, Timestamp: 3000}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// count-only histogram: no buckets, just count and sum
|
||||
{
|
||||
histogram := encodeHistogram(nativeHistogramContext{
|
||||
countInt: 10,
|
||||
sum: 42.0,
|
||||
schema: 3,
|
||||
timestamp: 4000,
|
||||
})
|
||||
src := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}},
|
||||
nil,
|
||||
[][]byte{histogram},
|
||||
)
|
||||
f(src, []TimeSeries{
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_count"}},
|
||||
Samples: []Sample{{Value: 10, Timestamp: 4000}},
|
||||
},
|
||||
{
|
||||
Labels: []Label{{Name: "__name__", Value: "rpc_latency_seconds_sum"}},
|
||||
Samples: []Sample{{Value: 42.0, Timestamp: 4000}},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func encodeTimeSeries(labels []Label, samples []Sample, histograms [][]byte) []byte {
|
||||
var dst []byte
|
||||
for _, l := range labels {
|
||||
msg := pbEncodeLabel(l.Name, l.Value)
|
||||
dst = pbAppendBytes(dst, 1, msg)
|
||||
}
|
||||
for _, s := range samples {
|
||||
msg := pbEncodeSample(s.Value, s.Timestamp)
|
||||
dst = pbAppendBytes(dst, 2, msg)
|
||||
}
|
||||
for _, h := range histograms {
|
||||
dst = pbAppendBytes(dst, 4, h)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func encodeHistogram(h nativeHistogramContext) []byte {
|
||||
var dst []byte
|
||||
|
||||
dst = pbAppendVarint(dst, 1, h.countInt)
|
||||
if h.isCountFloat {
|
||||
dst = pbAppendDouble(dst, 2, h.countFloat)
|
||||
}
|
||||
if h.sum != 0 {
|
||||
dst = pbAppendDouble(dst, 3, h.sum)
|
||||
}
|
||||
if h.schema != 0 {
|
||||
dst = pbAppendSint32(dst, 4, h.schema)
|
||||
}
|
||||
if h.zeroThreshold != 0 {
|
||||
dst = pbAppendDouble(dst, 5, h.zeroThreshold)
|
||||
}
|
||||
dst = pbAppendVarint(dst, 6, h.zeroCountInt)
|
||||
if h.isZeroCountFloat {
|
||||
dst = pbAppendDouble(dst, 7, h.zeroCountFloat)
|
||||
}
|
||||
for _, span := range h.negativeSpans {
|
||||
dst = pbAppendBytes(dst, 8, pbEncodeBucketSpan(span))
|
||||
}
|
||||
if len(h.negativeDeltas) > 0 {
|
||||
dst = pbAppendBytes(dst, 9, pbPackSint64s(h.negativeDeltas))
|
||||
}
|
||||
if len(h.negativeCounts) > 0 {
|
||||
dst = pbAppendBytes(dst, 10, pbPackDoubles(h.negativeCounts))
|
||||
}
|
||||
for _, span := range h.positiveSpans {
|
||||
dst = pbAppendBytes(dst, 11, pbEncodeBucketSpan(span))
|
||||
}
|
||||
if len(h.positiveDeltas) > 0 {
|
||||
dst = pbAppendBytes(dst, 12, pbPackSint64s(h.positiveDeltas))
|
||||
}
|
||||
if len(h.positiveCounts) > 0 {
|
||||
dst = pbAppendBytes(dst, 13, pbPackDoubles(h.positiveCounts))
|
||||
}
|
||||
if h.timestamp != 0 {
|
||||
dst = pbAppendVarint(dst, 15, uint64(h.timestamp))
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbEncodeLabel(name, value string) []byte {
|
||||
var dst []byte
|
||||
dst = pbAppendBytes(dst, 1, []byte(name))
|
||||
dst = pbAppendBytes(dst, 2, []byte(value))
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbEncodeSample(value float64, timestamp int64) []byte {
|
||||
var dst []byte
|
||||
if value != 0 {
|
||||
dst = pbAppendDouble(dst, 1, value)
|
||||
}
|
||||
if timestamp != 0 {
|
||||
dst = pbAppendVarint(dst, 2, uint64(timestamp))
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbEncodeBucketSpan(span bucketSpan) []byte {
|
||||
var dst []byte
|
||||
if span.offset != 0 {
|
||||
dst = pbAppendSint32(dst, 1, span.offset)
|
||||
}
|
||||
if span.length != 0 {
|
||||
dst = pbAppendVarint(dst, 2, uint64(span.length))
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbAppendVarint(dst []byte, field uint32, v uint64) []byte {
|
||||
dst = appendProtoVarint(dst, uint64(field<<3))
|
||||
dst = appendProtoVarint(dst, v)
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbAppendDouble(dst []byte, field uint32, v float64) []byte {
|
||||
dst = appendProtoVarint(dst, uint64(field<<3|1))
|
||||
var buf [8]byte
|
||||
binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v))
|
||||
dst = append(dst, buf[:]...)
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbAppendSint32(dst []byte, field uint32, v int32) []byte {
|
||||
dst = appendProtoVarint(dst, uint64(field<<3))
|
||||
dst = appendProtoVarint(dst, uint64((uint32(v)<<1)^uint32(v>>31)))
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbAppendBytes(dst []byte, field uint32, data []byte) []byte {
|
||||
dst = appendProtoVarint(dst, uint64(field<<3|2))
|
||||
dst = appendProtoVarint(dst, uint64(len(data)))
|
||||
dst = append(dst, data...)
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbPackSint64s(values []int64) []byte {
|
||||
var dst []byte
|
||||
for _, v := range values {
|
||||
dst = appendProtoVarint(dst, uint64((uint64(v)<<1)^uint64(v>>63)))
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func pbPackDoubles(values []float64) []byte {
|
||||
var dst []byte
|
||||
for _, v := range values {
|
||||
var buf [8]byte
|
||||
binary.LittleEndian.PutUint64(buf[:], math.Float64bits(v))
|
||||
dst = append(dst, buf[:]...)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func appendProtoVarint(dst []byte, v uint64) []byte {
|
||||
for v >= 0x80 {
|
||||
dst = append(dst, byte(v)|0x80)
|
||||
v >>= 7
|
||||
}
|
||||
dst = append(dst, byte(v))
|
||||
return dst
|
||||
}
|
||||
|
||||
func appendVmrangeHelper(lower float64, upper float64) string {
|
||||
var fb fmtBuffer
|
||||
return fb.formatVmrange(lower, upper)
|
||||
}
|
||||
97
lib/prompb/write_request_unmarshaler_timing_test.go
Normal file
97
lib/prompb/write_request_unmarshaler_timing_test.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package prompb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkUnmarshalProtobuf_ClassicSamples(b *testing.B) {
|
||||
var src []byte
|
||||
for i := 0; i < 1000; i++ {
|
||||
ts := encodeTimeSeries(
|
||||
[]Label{
|
||||
{
|
||||
Name: "__name__",
|
||||
Value: "process_cpu_seconds_total",
|
||||
},
|
||||
{
|
||||
Name: "instance",
|
||||
Value: fmt.Sprintf("host-%d:4567", i),
|
||||
},
|
||||
{
|
||||
Name: "job",
|
||||
Value: "node-exporter",
|
||||
},
|
||||
{
|
||||
Name: "pod",
|
||||
Value: "foo-bar-pod-8983423843",
|
||||
},
|
||||
{
|
||||
Name: "cpu",
|
||||
Value: "1",
|
||||
},
|
||||
{
|
||||
Name: "mode",
|
||||
Value: "system",
|
||||
},
|
||||
{
|
||||
Name: "node",
|
||||
Value: "host-123",
|
||||
},
|
||||
{
|
||||
Name: "namespace",
|
||||
Value: "foo-bar-baz",
|
||||
},
|
||||
{
|
||||
Name: "container",
|
||||
Value: fmt.Sprintf("aaa-bb-cc-dd-ee-%d", i),
|
||||
},
|
||||
},
|
||||
[]Sample{{Value: float64(i), Timestamp: int64(i * 1000)}},
|
||||
nil,
|
||||
)
|
||||
src = pbAppendBytes(src, 1, ts)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
wru := GetWriteRequestUnmarshaler()
|
||||
if _, err := wru.UnmarshalProtobuf(src); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
PutWriteRequestUnmarshaler(wru)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalProtobuf_NativeHistogram(b *testing.B) {
|
||||
var src []byte
|
||||
histogram := encodeHistogram(nativeHistogramContext{
|
||||
countInt: 1072,
|
||||
sum: 1750000.5,
|
||||
schema: 0,
|
||||
zeroThreshold: 0.00001,
|
||||
zeroCountInt: 2,
|
||||
positiveSpans: []bucketSpan{{offset: 0, length: 10}, {offset: 2, length: 10}},
|
||||
positiveDeltas: []int64{2, -1, 2, -1, 1, 1, 1, 100, 1, 1, 2, -1, -80, -1, 1, 1, 100, 1, 1, 1},
|
||||
timestamp: 1000,
|
||||
})
|
||||
for i := 0; i < 100; i++ {
|
||||
ts := encodeTimeSeries(
|
||||
[]Label{{Name: "__name__", Value: "rpc_latency_seconds"}, {Name: "job", Value: "node-exporter"}},
|
||||
nil,
|
||||
[][]byte{histogram},
|
||||
)
|
||||
src = pbAppendBytes(src, 1, ts)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
wru := GetWriteRequestUnmarshaler()
|
||||
if _, err := wru.UnmarshalProtobuf(src); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
PutWriteRequestUnmarshaler(wru)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user