lib/protoparser: rename lib/protoparser/common to lib/protoparser/protoparserutil

This improves readability of the code, which uses this package.
This commit is contained in:
Aliaksandr Valialkin
2025-03-18 16:24:48 +01:00
parent 1aa14f3b6d
commit f645479b5e
63 changed files with 196 additions and 196 deletions

View File

@@ -16,7 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var (
@@ -80,7 +80,7 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool {
}
encoding := r.Header.Get("Content-Encoding")
err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.NewLogMessageProcessor("datadog", false)
err := readLogsRequest(ts, data, lmp)
lmp.MustClose()

View File

@@ -17,7 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)
@@ -134,11 +134,11 @@ var (
func readBulkRequest(streamName string, r io.Reader, encoding string, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return 0, fmt.Errorf("cannot decode Elasticsearch protocol data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)

View File

@@ -18,7 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -100,7 +100,7 @@ func handleJournald(r *http.Request, w http.ResponseWriter) {
}
encoding := r.Header.Get("Content-Encoding")
err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.NewLogMessageProcessor("journald", false)
err := parseJournaldRequest(data, lmp, cp)
lmp.MustClose()

View File

@@ -11,7 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -39,12 +39,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) {
}
encoding := r.Header.Get("Content-Encoding")
reader, err := common.GetUncompressedReader(r.Body, encoding)
reader, err := protoparserutil.GetUncompressedReader(r.Body, encoding)
if err != nil {
logger.Errorf("cannot decode jsonline request: %s", err)
return
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
lmp := cp.NewLogMessageProcessor("jsonline", true)
streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI)

View File

@@ -14,7 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var maxRequestSize = flagutil.NewBytes("loki.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single Loki request")
@@ -36,7 +36,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) {
}
encoding := r.Header.Get("Content-Encoding")
err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.cp.NewLogMessageProcessor("loki_json", false)
useDefaultStreamFields := len(cp.cp.StreamFields) == 0
err := parseJSONRequest(data, lmp, cp.cp.MsgFields, useDefaultStreamFields, cp.parseMessage)

View File

@@ -12,7 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -40,7 +40,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
// See https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
encoding = "snappy"
}
err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.cp.NewLogMessageProcessor("loki_protobuf", false)
useDefaultStreamFields := len(cp.cp.StreamFields) == 0
err := parseProtobufRequest(data, lmp, cp.cp.MsgFields, useDefaultStreamFields, cp.parseMessage)

View File

@@ -10,8 +10,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -50,7 +50,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
}
encoding := r.Header.Get("Content-Encoding")
err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf", false)
useDefaultStreamFields := len(cp.StreamFields) == 0
err := pushProtobufRequest(data, lmp, useDefaultStreamFields)

View File

@@ -25,7 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@@ -380,11 +380,11 @@ func processStream(protocol string, r io.Reader, encoding string, useLocalTimest
}
func processStreamInternal(r io.Reader, encoding string, useLocalTimestamp bool, lmp insertutils.LogMessageProcessor) error {
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode syslog data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
return processUncompressedStream(reader, useLocalTimestamp, lmp)
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -22,16 +22,16 @@ var (
// InsertHandler processes csv data from req.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return stream.Parse(req, func(rows []csvimport.Row) error {
return insertRows(at, rows, extraLabels)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, rows []csvimport.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View File

@@ -7,10 +7,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -23,7 +23,7 @@ var (
// InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request.
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -7,10 +7,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -23,7 +23,7 @@ var (
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -7,10 +7,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -25,7 +25,7 @@ var (
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -12,9 +12,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -36,7 +36,7 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error {
return stream.Parse(r, encoding, true, "", "", func(db string, rows []parser.Row) error {
return stream.Parse(r, encoding, true, "", "", func(db string, rows []influx.Row) error {
return insertRows(at, db, rows, nil)
})
}
@@ -45,7 +45,7 @@ func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
@@ -55,12 +55,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
db := q.Get("db")
encoding := req.Header.Get("Content-Encoding")
isStreamMode := req.Header.Get("Stream-Mode") == "1"
return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []parser.Row) error {
return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []influx.Row) error {
return insertRows(at, db, rows, extraLabels)
})
}
func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, db string, rows []influx.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)

View File

@@ -42,8 +42,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
@@ -145,7 +145,7 @@ func main() {
startTime := time.Now()
remotewrite.StartIngestionRateLimiter()
remotewrite.Init()
common.StartUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(nil, r, "")
@@ -195,7 +195,7 @@ func main() {
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer.MustStop()
}
common.StopUnmarshalWorkers()
protoparserutil.StopUnmarshalWorkers()
remotewrite.Stop()
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
@@ -282,7 +282,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
switch path {
case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push":
if common.HandleVMProtoServerHandshake(w, r) {
if protoparserutil.HandleVMProtoServerHandshake(w, r) {
return true
}
prometheusWriteRequests.Inc()

View File

@@ -9,8 +9,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -25,7 +25,7 @@ var (
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
)
@@ -24,7 +24,7 @@ var (
// InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request.
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -8,9 +8,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -23,7 +23,7 @@ var (
// InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -21,16 +21,16 @@ var (
// InsertHandler processes HTTP OpenTSDB put requests.
// See http://opentsdb.net/docs/build/html/api_http/put.html
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return stream.Parse(req, func(rows []opentsdbhttp.Row) error {
return insertRows(at, rows, extraLabels)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, rows []opentsdbhttp.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View File

@@ -8,9 +8,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -23,23 +23,23 @@ var (
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
defaultTimestamp, err := parserCommon.GetTimestamp(req)
defaultTimestamp, err := protoparserutil.GetTimestamp(req)
if err != nil {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []parser.Row) error {
return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []prometheus.Row) error {
return insertRows(at, rows, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, rows []prometheus.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View File

@@ -12,7 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var (
@@ -44,14 +44,14 @@ func setUp() {
log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err)
}
logger.Init()
common.StartUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
remotewrite.Init()
testOutput = &bytes.Buffer{}
logger.SetOutputForTests(testOutput)
}
func tearDown() {
common.StopUnmarshalWorkers()
protoparserutil.StopUnmarshalWorkers()
srv.Close()
logger.ResetOutputForTest()
tmpDataDir := flag.Lookup("remoteWrite.tmpDataPath").Value.String()

View File

@@ -8,8 +8,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
@@ -22,7 +22,7 @@ var (
// InsertHandler processes remote write for prometheus.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -18,7 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
@@ -170,7 +170,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
doRequest := func(url string) (*http.Response, error) {
return c.doRequest(url, nil)
}
useVMProto = common.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest)
useVMProto = protoparserutil.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest)
if !useVMProto {
logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+
"See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol", sanitizedURL)

View File

@@ -9,8 +9,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
@@ -26,17 +26,17 @@ var (
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, encoding, func(rows []parser.Row) error {
return stream.Parse(req.Body, encoding, func(rows []vmimport.Row) error {
return insertRows(at, rows, extraLabels)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(at *auth.Token, rows []vmimport.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)

View File

@@ -26,8 +26,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
func main() {
@@ -382,7 +382,7 @@ func main() {
},
Before: beforeFn,
Action: func(c *cli.Context) error {
common.StartUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
blockPath := c.Args().First()
encoding := ""
if c.Bool("gunzip") {

View File

@@ -17,8 +17,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
)
@@ -206,8 +206,8 @@ func (rws *RemoteWriteServer) exportNativeHandler() http.Handler {
func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
var gotTimeSeries []vm.TimeSeries
var mx sync.RWMutex

View File

@@ -6,9 +6,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -19,16 +19,16 @@ var (
// InsertHandler processes /api/v1/import/csv requests.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return stream.Parse(req, func(rows []csvimport.Row) error {
return insertRows(rows, extraLabels)
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []csvimport.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)

View File

@@ -6,10 +6,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,7 +20,7 @@ var (
// InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request.
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -6,10 +6,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,7 +20,7 @@ var (
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -6,10 +6,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -22,7 +22,7 @@ var (
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
@@ -34,7 +34,7 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
return stream.Parse(r, "", true, "", "", func(db string, rows []parser.Row) error {
return stream.Parse(r, "", true, "", "", func(db string, rows []influx.Row) error {
return insertRows(db, rows, nil)
})
}
@@ -43,7 +43,7 @@ func InsertHandlerForReader(r io.Reader) error {
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
@@ -53,12 +53,12 @@ func InsertHandlerForHTTP(req *http.Request) error {
db := q.Get("db")
encoding := req.Header.Get("Content-Encoding")
isStreamMode := req.Header.Get("Stream-Mode") == "1"
return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []parser.Row) error {
return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []influx.Row) error {
return insertRows(db, rows, extraLabels)
})
}
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(db string, rows []influx.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)

View File

@@ -10,7 +10,7 @@ import (
"github.com/VictoriaMetrics/metrics"
vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1"
@@ -39,8 +39,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
)
@@ -87,8 +87,8 @@ var staticServer = http.FileServer(http.FS(staticFiles))
// Init initializes vminsert.
func Init() {
relabel.Init()
vminsertCommon.InitStreamAggr()
common.StartUnmarshalWorkers()
common.InitStreamAggr()
protoparserutil.StartUnmarshalWorkers()
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
@@ -122,8 +122,8 @@ func Stop() {
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer.MustStop()
}
common.StopUnmarshalWorkers()
vminsertCommon.MustStopStreamAggr()
protoparserutil.StopUnmarshalWorkers()
common.MustStopStreamAggr()
}
// RequestHandler is a handler for Prometheus remote storage write API
@@ -165,7 +165,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
switch path {
case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push":
if common.HandleVMProtoServerHandshake(w, r) {
if protoparserutil.HandleVMProtoServerHandshake(w, r) {
return true
}
prometheusWriteRequests.Inc()

View File

@@ -8,8 +8,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
@@ -21,7 +21,7 @@ var (
// InsertHandler processes `/api/v1/import/native` request.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -8,9 +8,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var (
@@ -20,7 +20,7 @@ var (
// InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request.
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,7 +20,7 @@ var (
// InsertHandler processes opentelemetry metrics.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -7,9 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -24,11 +24,11 @@ func InsertHandler(req *http.Request) error {
path := req.URL.Path
switch path {
case "/opentsdb/api/put", "/api/put":
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return stream.Parse(req, func(rows []opentsdbhttp.Row) error {
return insertRows(rows, extraLabels)
})
default:
@@ -36,7 +36,7 @@ func InsertHandler(req *http.Request) error {
}
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []opentsdbhttp.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)

View File

@@ -7,9 +7,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,23 +20,23 @@ var (
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
defaultTimestamp, err := parserCommon.GetTimestamp(req)
defaultTimestamp, err := protoparserutil.GetTimestamp(req)
if err != nil {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []parser.Row) error {
return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []prometheus.Row) error {
return insertRows(rows, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []prometheus.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)

View File

@@ -7,8 +7,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -19,7 +19,7 @@ var (
// InsertHandler processes remote write for prometheus.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}

View File

@@ -8,8 +8,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
@@ -24,17 +24,17 @@ var (
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
extraLabels, err := protoparserutil.GetExtraLabels(req)
if err != nil {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, encoding, func(rows []parser.Row) error {
return stream.Parse(req.Body, encoding, func(rows []vmimport.Row) error {
return insertRows(rows, extraLabels)
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []vmimport.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)

View File

@@ -11,8 +11,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
func TestIsAutoMetric(t *testing.T) {
@@ -733,8 +733,8 @@ func TestSendStaleSeries(t *testing.T) {
sw.Config = &ScrapeWork{
NoStaleMarkers: false,
}
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
var staleMarks int
sw.PushData = func(_ *auth.Token, wr *prompbmarshal.WriteRequest) {
@@ -762,8 +762,8 @@ func TestSendStaleSeries(t *testing.T) {
f(generateScrape(20000), generateScrape(10), 19990)
}
func parsePromRow(data string) *parser.Row {
var rows parser.Rows
func parsePromRow(data string) *prometheus.Row {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when unmarshaling Prometheus rows: %s", s))
}
@@ -775,7 +775,7 @@ func parsePromRow(data string) *parser.Row {
}
func parseData(data string) []prompbmarshal.TimeSeries {
return parser.MustParsePromMetrics(data, 0)
return prometheus.MustParsePromMetrics(data, 0)
}
func expectEqualTimeseries(tss, tssExpected []prompbmarshal.TimeSeries) error {

View File

@@ -10,8 +10,8 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -35,11 +35,11 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
}
encoding := req.Header.Get("Content-Encoding")
reader, err := common.GetUncompressedReader(req.Body, encoding)
reader, err := protoparserutil.GetUncompressedReader(req.Body, encoding)
if err != nil {
return fmt.Errorf("cannot decode csv data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
@@ -54,7 +54,7 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
uw.cds = cds
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
@@ -69,7 +69,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -165,7 +165,7 @@ func (uw *unmarshalWork) runCallback(rows []csvimport.Row) {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements prototparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds)
rows := uw.rows.Rows

View File

@@ -5,9 +5,9 @@ import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -16,7 +16,7 @@ import (
// callback shouldn't hold series after returning.
func Parse(r io.Reader, encoding string, callback func(series []*datadogsketches.Sketch) error) error {
readCalls.Inc()
err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
return parseData(data, callback)
})
if err != nil {

View File

@@ -5,9 +5,9 @@ import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -16,7 +16,7 @@ import (
// callback shouldn't hold series after returning.
func Parse(r io.Reader, encoding string, callback func(series []datadogv1.Series) error) error {
readCalls.Inc()
err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
return parseData(data, callback)
})
if err != nil {

View File

@@ -5,9 +5,9 @@ import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -16,7 +16,7 @@ import (
// callback shouldn't hold series after returning.
func Parse(r io.Reader, encoding, contentType string, callback func(series []datadogv2.Series) error) error {
readCalls.Inc()
err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error {
return parseData(data, contentType, callback)
})
if err != nil {

View File

@@ -10,8 +10,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -27,11 +27,11 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) error) error {
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("Cannot decode graphite data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
@@ -46,7 +46,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) erro
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
@@ -61,7 +61,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -155,7 +155,7 @@ func (uw *unmarshalWork) runCallback(rows []graphite.Row) {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows

View File

@@ -10,8 +10,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -41,7 +41,7 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string
// Process the whole request in one go.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090
readCalls.Inc()
err := common.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error {
ctx := getBatchContext()
defer putBatchContext(ctx)
@@ -59,11 +59,11 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string
}
func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string, callback func(db string, rows []influx.Row) error) error {
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode influx line protocol data: %w; see https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
@@ -79,7 +79,7 @@ func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string
uw.tsMultiplier = tsMultiplier
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
@@ -153,7 +153,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN())
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN())
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -235,7 +235,7 @@ func (uw *unmarshalWork) runCallback() {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
_ = unmarshal(&uw.rows, uw.reqBuf, uw.tsMultiplier)
uw.runCallback()

View File

@@ -7,8 +7,8 @@ import (
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
func TestDetectTimestamp(t *testing.T) {
@@ -37,8 +37,8 @@ func TestDetectTimestamp(t *testing.T) {
}
func TestParseStream(t *testing.T) {
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
f := func(data string, rowsExpected []influx.Row, isStreamMode bool, badData bool) {
t.Helper()

View File

@@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@@ -20,11 +20,11 @@ import (
//
// callback shouldn't hold block after returning.
func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error {
reader, err := common.GetUncompressedReader(r, contentEncoding)
reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
@@ -103,7 +103,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(block *Block) erro
blocksRead.Inc()
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
}
@@ -154,7 +154,7 @@ func (uw *unmarshalWork) reset() {
uw.block.reset()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
err := uw.unmarshal()
if err != nil {

View File

@@ -9,8 +9,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var (
@@ -23,7 +23,7 @@ var (
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []newrelic.Row) error) error {
readCalls.Inc()
err := common.ReadUncompressedData(r, encoding, maxInsertRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, maxInsertRequestSize, func(data []byte) error {
return parseData(data, callback)
})
if err != nil {

View File

@@ -15,8 +15,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
@@ -27,7 +27,7 @@ var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1
//
// optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format.
func ParseStream(r io.Reader, encoding string, processBody func(data []byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error {
err := common.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error {
if processBody != nil {
dataNew, err := processBody(data)
if err != nil {

View File

@@ -10,8 +10,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -39,7 +39,7 @@ func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
@@ -54,7 +54,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -148,7 +148,7 @@ func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows

View File

@@ -9,8 +9,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -28,7 +28,7 @@ var (
func Parse(req *http.Request, callback func(rows []opentsdbhttp.Row) error) error {
readCalls.Inc()
encoding := req.Header.Get("Content-Encoding")
err := common.ReadUncompressedData(req.Body, encoding, maxInsertRequestSize, func(data []byte) error {
err := protoparserutil.ReadUncompressedData(req.Body, encoding, maxInsertRequestSize, func(data []byte) error {
return parseData(data, callback)
})
if err != nil {

View File

@@ -8,8 +8,8 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -24,11 +24,11 @@ import (
// It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set,
// like /api/v1/write calls.
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error {
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode Prometheus text exposition data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
var wcr *writeconcurrencylimiter.Reader
if limitConcurrency {
@@ -47,7 +47,7 @@ func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrenc
uw.defaultTimestamp = defaultTimestamp
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
if wcr != nil {
wcr.DecConcurrency()
}
@@ -64,7 +64,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -162,7 +162,7 @@ func (uw *unmarshalWork) runCallback(rows []prometheus.Row) {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
if uw.errLogger != nil {
uw.rows.UnmarshalWithErrLogger(bytesutil.ToUnsafeString(uw.reqBuf), uw.errLogger)

View File

@@ -9,13 +9,13 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
func TestParse(t *testing.T) {
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
const defaultTimestamp = 123
f := func(s string, rowsExpected []prometheus.Row) {

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"encoding/base64"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"sync"

View File

@@ -1,4 +1,4 @@
package common
package protoparserutil
import (
"io"

View File

@@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
@@ -23,11 +23,11 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 10*1024*1024, "The maxim
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) error) error {
reader, err := common.GetUncompressedReader(r, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer common.PutUncompressedReader(reader)
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
@@ -41,7 +41,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) erro
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
@@ -56,7 +56,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN())
ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN())
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
@@ -150,7 +150,7 @@ func (uw *unmarshalWork) runCallback(rows []vmimport.Row) {
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
// Unmarshal implements proroparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows