Files
VictoriaMetrics/app/vminsert/influx/request_handler.go
Max Kotliar 250e4cd1d3 lib/prompb: Merge prompbmarshal logic into prompb
The prompb and prompbmarshal share exactly the same models and provide
marshal and unmarshale capabilities for them. This creates duplication
(changes in one model has to be made in another, case with metadata) and
confusion where for example you compare same looking models but golang
says they are not the same (because of the type).

This commit merge prompbmarshal logic into prompb so the rest of the
code is aligned on prompb models.

Moves samplesPool and labelsPool to WriteRequestUnmarshaller.
Make WriteRequest struct clean from unmarshal logic.

The benchmark shows no significant changes:

$benchstat prompbmarshal.bench prompb2.bench
goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb
cpu: Apple M1 Pro
│ prompbmarshal.bench │
prompb2.bench            │
│       sec/op        │   sec/op     vs
base               │
WriteRequestUnmarshalProtobuf-10           189.2µ ± 5%   190.8µ ± 8%
~ (p=0.579 n=10)
WriteRequestMarshalProtobuf-10             145.3µ ± 7%   143.6µ ± 2%
~ (p=0.143 n=10)
geomean                                    165.8µ        165.5µ
-0.14%

│ prompbmarshal.bench │
prompb2.bench            │
│         B/s         │     B/s
vs base               │
WriteRequestUnmarshalProtobuf-10          50.42Mi ± 5%   49.99Mi ± 8%
~ (p=0.593 n=10)
WriteRequestMarshalProtobuf-10            65.64Mi ± 7%   66.39Mi ± 2%
~ (p=0.143 n=10)
geomean                                   57.53Mi        57.61Mi
+0.14%

│ prompbmarshal.bench │
prompb2.bench             │
│        B/op         │     B/op
vs base               │
WriteRequestUnmarshalProtobuf-10         27.70Ki ±  4%   26.90Ki ±  7%
~ (p=0.190 n=10)
WriteRequestMarshalProtobuf-10           3.267Ki ± 12%   3.273Ki ± 12%
~ (p=0.971 n=10)
geomean                                  9.514Ki         9.383Ki
-1.38%

│ prompbmarshal.bench │
prompb2.bench            │
│      allocs/op      │ allocs/op   vs
base                 │
WriteRequestUnmarshalProtobuf-10          0.000 ± 0%     0.000 ± 0%
~ (p=1.000 n=10) ¹
WriteRequestMarshalProtobuf-10            0.000 ± 0%     0.000 ± 0%
~ (p=1.000 n=10) ¹
geomean                                              ²
+0.00%                ²
¹ all samples are equal
² summaries must be >0 to compute geomean
2025-07-31 01:37:10 +03:00

200 lines
7.1 KiB
Go

package influx
import (
"flag"
"io"
"net/http"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
)
var (
measurementFieldSeparator = flag.String("influxMeasurementFieldSeparator", "_", "Separator for '{measurement}{separator}{field_name}' metric name when inserted via InfluxDB line protocol")
skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metric name if InfluxDB line contains only a single field")
skipMeasurement = flag.Bool("influxSkipMeasurement", false, "Uses '{field_name}' as a metric name while ignoring '{measurement}' and '-influxMeasurementFieldSeparator'")
dbLabel = flag.String("influxDBLabel", "db", "Default label for the DB name sent over '?db={db_name}' query parameter")
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="influx"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="influx"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="influx"}`)
)
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
return stream.Parse(r, "", true, "", "", func(db string, rows []influx.Row) error {
return insertRows(at, db, rows, nil)
})
}
// InsertHandlerForHTTP processes remote write for influx line protocol.
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
encoding := req.Header.Get("Content-Encoding")
isStreamMode := req.Header.Get("Stream-Mode") == "1"
return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []influx.Row) error {
return insertRows(at, db, rows, extraLabels)
})
}
func insertRows(at *auth.Token, db string, rows []influx.Row, extraLabels []prompb.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
ic := &ctx.Common
ic.Reset() // This line is required for initializing ic internals.
rowsTotal := 0
perTenantRows := make(map[auth.Token]int)
hasRelabeling := relabel.HasRelabeling()
hasLimitsEnabled := timeserieslimits.Enabled()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Fields)
ic.Labels = ic.Labels[:0]
hasDBKey := false
for j := range r.Tags {
tag := &r.Tags[j]
if tag.Key == *dbLabel {
hasDBKey = true
}
ic.AddLabel(tag.Key, tag.Value)
}
if !hasDBKey {
ic.AddLabel(*dbLabel, db)
}
for j := range extraLabels {
label := &extraLabels[j]
ic.AddLabel(label.Name, label.Value)
}
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
if !*skipMeasurement {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)
}
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
skipFieldKey := len(r.Measurement) > 0 && len(r.Fields) == 1 && *skipSingleField
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
}
metricGroupPrefixLen := len(ctx.metricGroupBuf)
if hasRelabeling {
ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...)
for j := range r.Fields {
f := &r.Fields[j]
if !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
}
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ic.Labels = append(ic.Labels[:0], ctx.originLabels...)
ic.AddLabel("", metricGroup)
if !ic.TryPrepareLabels(hasRelabeling) {
continue
}
atLocal := ic.GetLocalAuthToken(at)
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, nil)
for i := range ic.Labels {
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[i])
}
storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels)
if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
return err
}
perTenantRows[*atLocal]++
}
} else {
// special case for optimisations below
// do not call TryPrepareLabels
// manually apply sort and limits on demand
ic.SortLabelsIfNeeded()
if hasLimitsEnabled {
if timeserieslimits.IsExceeding(ic.Labels) {
continue
}
}
atLocal := ic.GetLocalAuthToken(at)
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ic.Labels)
metricNameBufLen := len(ic.MetricNameBuf)
labelsLen := len(ic.Labels)
for j := range r.Fields {
f := &r.Fields[j]
if !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
}
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ic.Labels = ic.Labels[:labelsLen]
ic.AddLabel("", metricGroup)
if hasLimitsEnabled {
if timeserieslimits.IsExceeding(ic.Labels[len(ic.Labels)-1:]) {
continue
}
}
ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen]
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1])
storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels)
if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
return err
}
perTenantRows[*atLocal]++
}
}
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.MultiAdd(perTenantRows)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs()
}
type pushCtx struct {
Common netstorage.InsertCtx
metricGroupBuf []byte
originLabels []prompb.Label
}
func (ctx *pushCtx) reset() {
ctx.Common.Reset()
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
originLabels := ctx.originLabels
for i := range originLabels {
originLabels[i] = prompb.Label{}
}
ctx.originLabels = originLabels[:0]
}
func getPushCtx() *pushCtx {
if v := pushCtxPool.Get(); v != nil {
return v.(*pushCtx)
}
return &pushCtx{}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool