Files
VictoriaMetrics/app/vminsert/vmimport/request_handler.go

83 lines
2.9 KiB
Go
Raw Permalink Normal View History

package vmimport
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2020-09-26 04:13:10 +03:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
lib/prompb: Merge prompbmarshal logic into prompb The prompb and prompbmarshal share exactly the same models and provide marshal and unmarshale capabilities for them. This creates duplication (changes in one model has to be made in another, case with metadata) and confusion where for example you compare same looking models but golang says they are not the same (because of the type). This commit merge prompbmarshal logic into prompb so the rest of the code is aligned on prompb models. Moves samplesPool and labelsPool to WriteRequestUnmarshaller. Make WriteRequest struct clean from unmarshal logic. The benchmark shows no significant changes: $benchstat prompbmarshal.bench prompb2.bench goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb cpu: Apple M1 Pro │ prompbmarshal.bench │ prompb2.bench │ │ sec/op │ sec/op vs base │ WriteRequestUnmarshalProtobuf-10 189.2µ ± 5% 190.8µ ± 8% ~ (p=0.579 n=10) WriteRequestMarshalProtobuf-10 145.3µ ± 7% 143.6µ ± 2% ~ (p=0.143 n=10) geomean 165.8µ 165.5µ -0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/s │ B/s vs base │ WriteRequestUnmarshalProtobuf-10 50.42Mi ± 5% 49.99Mi ± 8% ~ (p=0.593 n=10) WriteRequestMarshalProtobuf-10 65.64Mi ± 7% 66.39Mi ± 2% ~ (p=0.143 n=10) geomean 57.53Mi 57.61Mi +0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/op │ B/op vs base │ WriteRequestUnmarshalProtobuf-10 27.70Ki ± 4% 26.90Ki ± 7% ~ (p=0.190 n=10) WriteRequestMarshalProtobuf-10 3.267Ki ± 12% 3.273Ki ± 12% ~ (p=0.971 n=10) geomean 9.514Ki 9.383Ki -1.38% │ prompbmarshal.bench │ prompb2.bench │ │ allocs/op │ allocs/op vs base │ WriteRequestUnmarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ WriteRequestMarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean
2025-07-29 14:48:55 +03:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="vmimport"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="vmimport"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="vmimport"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 18:10:37 +01:00
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, encoding, func(rows []vmimport.Row) error {
return insertRows(at, rows, extraLabels)
})
}
lib/prompb: Merge prompbmarshal logic into prompb The prompb and prompbmarshal share exactly the same models and provide marshal and unmarshale capabilities for them. This creates duplication (changes in one model has to be made in another, case with metadata) and confusion where for example you compare same looking models but golang says they are not the same (because of the type). This commit merge prompbmarshal logic into prompb so the rest of the code is aligned on prompb models. Moves samplesPool and labelsPool to WriteRequestUnmarshaller. Make WriteRequest struct clean from unmarshal logic. The benchmark shows no significant changes: $benchstat prompbmarshal.bench prompb2.bench goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb cpu: Apple M1 Pro │ prompbmarshal.bench │ prompb2.bench │ │ sec/op │ sec/op vs base │ WriteRequestUnmarshalProtobuf-10 189.2µ ± 5% 190.8µ ± 8% ~ (p=0.579 n=10) WriteRequestMarshalProtobuf-10 145.3µ ± 7% 143.6µ ± 2% ~ (p=0.143 n=10) geomean 165.8µ 165.5µ -0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/s │ B/s vs base │ WriteRequestUnmarshalProtobuf-10 50.42Mi ± 5% 49.99Mi ± 8% ~ (p=0.593 n=10) WriteRequestMarshalProtobuf-10 65.64Mi ± 7% 66.39Mi ± 2% ~ (p=0.143 n=10) geomean 57.53Mi 57.61Mi +0.14% │ prompbmarshal.bench │ prompb2.bench │ │ B/op │ B/op vs base │ WriteRequestUnmarshalProtobuf-10 27.70Ki ± 4% 26.90Ki ± 7% ~ (p=0.190 n=10) WriteRequestMarshalProtobuf-10 3.267Ki ± 12% 3.273Ki ± 12% ~ (p=0.971 n=10) geomean 9.514Ki 9.383Ki -1.38% │ prompbmarshal.bench │ prompb2.bench │ │ allocs/op │ allocs/op vs base │ WriteRequestUnmarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ WriteRequestMarshalProtobuf-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean
2025-07-29 14:48:55 +03:00
func insertRows(at *auth.Token, rows []vmimport.Row, extraLabels []prompb.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
rowsTotal := 0
perTenantRows := make(map[auth.Token]int)
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
ctx.Labels = ctx.Labels[:0]
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if !ctx.TryPrepareLabels(hasRelabeling) {
continue
}
atLocal := ctx.GetLocalAuthToken(at)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
values := r.Values
timestamps := r.Timestamps
2020-09-26 04:13:10 +03:00
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
for j, value := range values {
timestamp := timestamps[j]
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
perTenantRows[*atLocal] += len(r.Values)
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.MultiAdd(perTenantRows)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}