Files
VictoriaMetrics/app/vminsert/csvimport/request_handler.go

67 lines
2.2 KiB
Go
Raw Permalink Normal View History

package csvimport
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
lib/prompb: Merge prompbmarshal logic into prompb The prompb and prompbmarshal share exactly the same models and provide marshal and unmarshale capabilities for them. This creates duplication (changes in one model has to be made in another, case with metadata) and confusion where for example you compare same looking models but golang says they are not the same (because of the type). This commit merge prompbmarshal logic into prompb so the rest of the code is aligned on prompb models. Moves samplesPool and labelsPool to WriteRequestUnmarshaller. Make WriteRequest struct clean from unmarshal logic. The benchmark shows no significant changes: $benchstat prompbmarshal.bench prompb2.bench goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb cpu: Apple M1 Pro │ prompbmarshal.bench │ prompb2.bench │ │ sec/op │ sec/op vs base │ WriteRequestUnmarshalProtobuf-10 189.2µ ± 5% 190.8µ ± 8% ~ (p=0.579 n=10) WriteRequestMarshalProtobuf-10 145.3µ ± 7% 143.6µ ± 2% ~ (p=0.143 n=10) geomean 165.8µ 165.5µ -0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/s │ B/s vs base │ WriteRequestUnmarshalProtobuf-10 50.42Mi ± 5% 49.99Mi ± 8% ~ (p=0.593 n=10) WriteRequestMarshalProtobuf-10 65.64Mi ± 7% 66.39Mi ± 2% ~ (p=0.143 n=10) geomean 57.53Mi 57.61Mi +0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/op │ B/op vs base │ WriteRequestUnmarshalProtobuf-10 27.70Ki ± 4% 26.90Ki ± 7% ~ (p=0.190 n=10) WriteRequestMarshalProtobuf-10 3.267Ki ± 12% 3.273Ki ± 12% ~ (p=0.971 n=10) geomean 9.514Ki 9.383Ki -1.38% │ prompbmarshal.bench │ prompb2.bench │ │ allocs/op │ allocs/op vs base │ WriteRequestUnmarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ WriteRequestMarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean
2025-07-29 14:48:55 +03:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="csvimport"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="csvimport"}`)
)
// InsertHandler processes /api/v1/import/csv requests.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []csvimport.Row) error {
return insertRows(at, rows, extraLabels)
})
}
lib/prompb: Merge prompbmarshal logic into prompb The prompb and prompbmarshal share exactly the same models and provide marshal and unmarshale capabilities for them. This creates duplication (changes in one model has to be made in another, case with metadata) and confusion where for example you compare same looking models but golang says they are not the same (because of the type). This commit merge prompbmarshal logic into prompb so the rest of the code is aligned on prompb models. Moves samplesPool and labelsPool to WriteRequestUnmarshaller. Make WriteRequest struct clean from unmarshal logic. The benchmark shows no significant changes: $benchstat prompbmarshal.bench prompb2.bench goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb cpu: Apple M1 Pro │ prompbmarshal.bench │ prompb2.bench │ │ sec/op │ sec/op vs base │ WriteRequestUnmarshalProtobuf-10 189.2µ ± 5% 190.8µ ± 8% ~ (p=0.579 n=10) WriteRequestMarshalProtobuf-10 145.3µ ± 7% 143.6µ ± 2% ~ (p=0.143 n=10) geomean 165.8µ 165.5µ -0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/s │ B/s vs base │ WriteRequestUnmarshalProtobuf-10 50.42Mi ± 5% 49.99Mi ± 8% ~ (p=0.593 n=10) WriteRequestMarshalProtobuf-10 65.64Mi ± 7% 66.39Mi ± 2% ~ (p=0.143 n=10) geomean 57.53Mi 57.61Mi +0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/op │ B/op vs base │ WriteRequestUnmarshalProtobuf-10 27.70Ki ± 4% 26.90Ki ± 7% ~ (p=0.190 n=10) WriteRequestMarshalProtobuf-10 3.267Ki ± 12% 3.273Ki ± 12% ~ (p=0.971 n=10) geomean 9.514Ki 9.383Ki -1.38% │ prompbmarshal.bench │ prompb2.bench │ │ allocs/op │ allocs/op vs base │ WriteRequestUnmarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ WriteRequestMarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean
2025-07-29 14:48:55 +03:00
func insertRows(at *auth.Token, rows []csvimport.Row, extraLabels []prompb.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
perTenantRows := make(map[auth.Token]int)
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if !ctx.TryPrepareLabels(hasRelabeling) {
continue
}
atLocal := ctx.GetLocalAuthToken(at)
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
perTenantRows[*atLocal]++
}
rowsInserted.Add(len(rows))
rowsTenantInserted.MultiAdd(perTenantRows)
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}