Files
VictoriaMetrics/app/vminsert/native/request_handler.go
Max Kotliar 250e4cd1d3 lib/prompb: Merge prompbmarshal logic into prompb
The prompb and prompbmarshal share exactly the same models and provide
marshal and unmarshale capabilities for them. This creates duplication
(changes in one model has to be made in another, case with metadata) and
confusion where for example you compare same looking models but golang
says they are not the same (because of the type).

This commit merge prompbmarshal logic into prompb so the rest of the
code is aligned on prompb models.

Moves samplesPool and labelsPool to WriteRequestUnmarshaller.
Make WriteRequest struct clean from unmarshal logic.

The benchmark shows no significant changes:

$benchstat prompbmarshal.bench prompb2.bench
goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb
cpu: Apple M1 Pro
│ prompbmarshal.bench │
prompb2.bench            │
│       sec/op        │   sec/op     vs
base               │
WriteRequestUnmarshalProtobuf-10           189.2µ ± 5%   190.8µ ± 8%
~ (p=0.579 n=10)
WriteRequestMarshalProtobuf-10             145.3µ ± 7%   143.6µ ± 2%
~ (p=0.143 n=10)
geomean                                    165.8µ        165.5µ
-0.14%

│ prompbmarshal.bench │
prompb2.bench            │
│         B/s         │     B/s
vs base               │
WriteRequestUnmarshalProtobuf-10          50.42Mi ± 5%   49.99Mi ± 8%
~ (p=0.593 n=10)
WriteRequestMarshalProtobuf-10            65.64Mi ± 7%   66.39Mi ± 2%
~ (p=0.143 n=10)
geomean                                   57.53Mi        57.61Mi
+0.14%

│ prompbmarshal.bench │
prompb2.bench             │
│        B/op         │     B/op
vs base               │
WriteRequestUnmarshalProtobuf-10         27.70Ki ±  4%   26.90Ki ±  7%
~ (p=0.190 n=10)
WriteRequestMarshalProtobuf-10           3.267Ki ± 12%   3.273Ki ± 12%
~ (p=0.971 n=10)
geomean                                  9.514Ki         9.383Ki
-1.38%

│ prompbmarshal.bench │
prompb2.bench            │
│      allocs/op      │ allocs/op   vs
base                 │
WriteRequestUnmarshalProtobuf-10          0.000 ± 0%     0.000 ± 0%
~ (p=1.000 n=10) ¹
WriteRequestMarshalProtobuf-10            0.000 ± 0%     0.000 ± 0%
~ (p=1.000 n=10) ¹
geomean                                              ²
+0.00%                ²
¹ all samples are equal
² summaries must be >0 to compute geomean
2025-07-31 01:37:10 +03:00

88 lines
2.9 KiB
Go

package native
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="native"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import/native` request.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, encoding, func(block *stream.Block) error {
return insertRows(at, block, extraLabels)
})
}
func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompb.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
// Update rowsInserted and rowsPerInsert before actual inserting,
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsLen)
}
rowsPerInsert.Update(float64(rowsLen))
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
mn := &block.MetricName
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if !ctx.TryPrepareLabels(hasRelabeling) {
return nil
}
// use tenant info from data if it's a multi-tenant import.
atLocal := ctx.GetLocalAuthToken(at)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
values := block.Values
timestamps := block.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
for j, value := range values {
timestamp := timestamps[j]
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
if err := ctx.FlushBufs(); err != nil {
return err
}
if at == nil {
rowsTenantInserted.Get(atLocal).Add(rowsLen)
}
return nil
}