mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-06 18:42:00 +03:00
The prompb and prompbmarshal share exactly the same models and provide
marshal and unmarshale capabilities for them. This creates duplication
(changes in one model has to be made in another, case with metadata) and
confusion where for example you compare same looking models but golang
says they are not the same (because of the type).
This commit merge prompbmarshal logic into prompb so the rest of the
code is aligned on prompb models.
Moves samplesPool and labelsPool to WriteRequestUnmarshaller.
Make WriteRequest struct clean from unmarshal logic.
The benchmark shows no significant changes:
$benchstat prompbmarshal.bench prompb2.bench
goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb
cpu: Apple M1 Pro
│ prompbmarshal.bench │ prompb2.bench │
│ sec/op │ sec/op vs base │
WriteRequestUnmarshalProtobuf-10 189.2µ ± 5% 190.8µ ± 8% ~ (p=0.579 n=10)
WriteRequestMarshalProtobuf-10 145.3µ ± 7% 143.6µ ± 2% ~ (p=0.143 n=10)
geomean 165.8µ 165.5µ -0.14%
│ prompbmarshal.bench │ prompb2.bench │
│ B/s │ B/s vs base │
WriteRequestUnmarshalProtobuf-10 50.42Mi ± 5% 49.99Mi ± 8% ~ (p=0.593 n=10)
WriteRequestMarshalProtobuf-10 65.64Mi ± 7% 66.39Mi ± 2% ~ (p=0.143 n=10)
geomean 57.53Mi 57.61Mi +0.14%
│ prompbmarshal.bench │ prompb2.bench │
│ B/op │ B/op vs base │
WriteRequestUnmarshalProtobuf-10 27.70Ki ± 4% 26.90Ki ± 7% ~ (p=0.190 n=10)
WriteRequestMarshalProtobuf-10 3.267Ki ± 12% 3.273Ki ± 12% ~ (p=0.971 n=10)
geomean 9.514Ki 9.383Ki -1.38%
│ prompbmarshal.bench │ prompb2.bench │
│ allocs/op │ allocs/op vs base │
WriteRequestUnmarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹
WriteRequestMarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹
geomean ² +0.00% ²
¹ all samples are equal
² summaries must be >0 to compute geomean
101 lines
2.8 KiB
Go
101 lines
2.8 KiB
Go
package native
|
|
|
|
import (
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="native"}`)
|
|
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="native"}`)
|
|
)
|
|
|
|
// InsertHandler processes `/api/v1/import/native` request.
|
|
func InsertHandler(req *http.Request) error {
|
|
extraLabels, err := protoparserutil.GetExtraLabels(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
encoding := req.Header.Get("Content-Encoding")
|
|
return stream.Parse(req.Body, encoding, func(block *stream.Block) error {
|
|
return insertRows(block, extraLabels)
|
|
})
|
|
}
|
|
|
|
func insertRows(block *stream.Block, extraLabels []prompb.Label) error {
|
|
ctx := getPushCtx()
|
|
defer putPushCtx(ctx)
|
|
|
|
// Update rowsInserted and rowsPerInsert before actual inserting,
|
|
// since relabeling can prevent from inserting the rows.
|
|
rowsLen := len(block.Values)
|
|
rowsInserted.Add(rowsLen)
|
|
rowsPerInsert.Update(float64(rowsLen))
|
|
|
|
ic := &ctx.Common
|
|
ic.Reset(rowsLen)
|
|
hasRelabeling := relabel.HasRelabeling()
|
|
mn := &block.MetricName
|
|
ic.Labels = ic.Labels[:0]
|
|
ic.AddLabelBytes(nil, mn.MetricGroup)
|
|
for j := range mn.Tags {
|
|
tag := &mn.Tags[j]
|
|
ic.AddLabelBytes(tag.Key, tag.Value)
|
|
}
|
|
for j := range extraLabels {
|
|
label := &extraLabels[j]
|
|
ic.AddLabel(label.Name, label.Value)
|
|
}
|
|
if !ic.TryPrepareLabels(hasRelabeling) {
|
|
return nil
|
|
}
|
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
|
values := block.Values
|
|
timestamps := block.Timestamps
|
|
if len(timestamps) != len(values) {
|
|
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
|
|
}
|
|
for j, value := range values {
|
|
timestamp := timestamps[j]
|
|
// TODO: @f41gh7 looks like it's better to use WriteDataPointExt
|
|
// since metricName never changes inside insertRows call
|
|
if err := ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels, timestamp, value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return ic.FlushBufs()
|
|
}
|
|
|
|
type pushCtx struct {
|
|
Common common.InsertCtx
|
|
metricNameBuf []byte
|
|
}
|
|
|
|
func (ctx *pushCtx) reset() {
|
|
ctx.Common.Reset(0)
|
|
ctx.metricNameBuf = ctx.metricNameBuf[:0]
|
|
}
|
|
|
|
func getPushCtx() *pushCtx {
|
|
if v := pushCtxPool.Get(); v != nil {
|
|
return v.(*pushCtx)
|
|
}
|
|
return &pushCtx{}
|
|
}
|
|
|
|
func putPushCtx(ctx *pushCtx) {
|
|
ctx.reset()
|
|
pushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var pushCtxPool sync.Pool
|