From f645479b5e0612e7e781aa50a0375c3624b3adfe Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 18 Mar 2025 16:24:48 +0100 Subject: [PATCH] lib/protoparser: rename lib/protoparser/common to lib/protoparser/protoparserutil This improves readability of the code, which uses this package. --- app/vlinsert/datadog/datadog.go | 4 ++-- app/vlinsert/elasticsearch/elasticsearch.go | 6 +++--- app/vlinsert/journald/journald.go | 4 ++-- app/vlinsert/jsonline/jsonline.go | 6 +++--- app/vlinsert/loki/loki_json.go | 4 ++-- app/vlinsert/loki/loki_protobuf.go | 4 ++-- app/vlinsert/opentelemetry/opentelemetry.go | 4 ++-- app/vlinsert/syslog/syslog.go | 6 +++--- app/vmagent/csvimport/request_handler.go | 10 +++++----- app/vmagent/datadogsketches/request_handler.go | 4 ++-- app/vmagent/datadogv1/request_handler.go | 4 ++-- app/vmagent/datadogv2/request_handler.go | 4 ++-- app/vmagent/influx/request_handler.go | 12 ++++++------ app/vmagent/main.go | 8 ++++---- app/vmagent/native/request_handler.go | 4 ++-- app/vmagent/newrelic/request_handler.go | 4 ++-- app/vmagent/opentelemetry/request_handler.go | 4 ++-- app/vmagent/opentsdbhttp/request_handler.go | 10 +++++----- app/vmagent/prometheusimport/request_handler.go | 12 ++++++------ .../prometheusimport/request_handler_test.go | 6 +++--- app/vmagent/promremotewrite/request_handler.go | 4 ++-- app/vmagent/remotewrite/client.go | 4 ++-- app/vmagent/vmimport/request_handler.go | 10 +++++----- app/vmctl/main.go | 4 ++-- .../remote_write_server.go | 6 +++--- app/vminsert/csvimport/request_handler.go | 10 +++++----- app/vminsert/datadogsketches/request_handler.go | 4 ++-- app/vminsert/datadogv1/request_handler.go | 4 ++-- app/vminsert/datadogv2/request_handler.go | 4 ++-- app/vminsert/influx/request_handler.go | 12 ++++++------ app/vminsert/main.go | 14 +++++++------- app/vminsert/native/request_handler.go | 4 ++-- app/vminsert/newrelic/request_handler.go | 4 ++-- app/vminsert/opentelemetry/request_handler.go | 4 ++-- app/vminsert/opentsdbhttp/request_handler.go | 10 +++++----- app/vminsert/prometheusimport/request_handler.go | 12 ++++++------ app/vminsert/promremotewrite/request_handler.go | 4 ++-- app/vminsert/vmimport/request_handler.go | 10 +++++----- lib/promscrape/scrapework_test.go | 14 +++++++------- lib/protoparser/csvimport/stream/streamparser.go | 12 ++++++------ .../datadogsketches/stream/streamparser.go | 4 ++-- lib/protoparser/datadogv1/stream/streamparser.go | 4 ++-- lib/protoparser/datadogv2/stream/streamparser.go | 4 ++-- lib/protoparser/graphite/stream/streamparser.go | 12 ++++++------ lib/protoparser/influx/stream/streamparser.go | 14 +++++++------- lib/protoparser/influx/stream/streamparser_test.go | 6 +++--- lib/protoparser/native/stream/streamparser.go | 10 +++++----- lib/protoparser/newrelic/stream/streamparser.go | 4 ++-- .../opentelemetry/stream/streamparser.go | 4 ++-- lib/protoparser/opentsdb/stream/streamparser.go | 8 ++++---- .../opentsdbhttp/stream/streamparser.go | 4 ++-- lib/protoparser/prometheus/stream/streamparser.go | 12 ++++++------ .../prometheus/stream/streamparser_test.go | 6 +++--- .../{common => protoparserutil}/compress_reader.go | 2 +- .../compress_reader_test.go | 2 +- .../{common => protoparserutil}/extra_labels.go | 2 +- .../extra_labels_test.go | 2 +- .../{common => protoparserutil}/lines_reader.go | 2 +- .../lines_reader_test.go | 2 +- .../{common => protoparserutil}/timestamp.go | 2 +- .../{common => protoparserutil}/unmarshal_work.go | 2 +- .../vmproto_handshake.go | 2 +- lib/protoparser/vmimport/stream/streamparser.go | 12 ++++++------ 63 files changed, 196 insertions(+), 196 deletions(-) rename lib/protoparser/{common => protoparserutil}/compress_reader.go (99%) rename lib/protoparser/{common => protoparserutil}/compress_reader_test.go (99%) rename lib/protoparser/{common => protoparserutil}/extra_labels.go (98%) rename lib/protoparser/{common => protoparserutil}/extra_labels_test.go (99%) rename lib/protoparser/{common => protoparserutil}/lines_reader.go (99%) rename lib/protoparser/{common => protoparserutil}/lines_reader_test.go (99%) rename lib/protoparser/{common => protoparserutil}/timestamp.go (95%) rename lib/protoparser/{common => protoparserutil}/unmarshal_work.go (98%) rename lib/protoparser/{common => protoparserutil}/vmproto_handshake.go (97%) diff --git a/app/vlinsert/datadog/datadog.go b/app/vlinsert/datadog/datadog.go index 30b1b5c8c0..f2e7d827d5 100644 --- a/app/vlinsert/datadog/datadog.go +++ b/app/vlinsert/datadog/datadog.go @@ -16,7 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var ( @@ -80,7 +80,7 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool { } encoding := r.Header.Get("Content-Encoding") - err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { + err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { lmp := cp.NewLogMessageProcessor("datadog", false) err := readLogsRequest(ts, data, lmp) lmp.MustClose() diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 25fa58be83..619045b1a4 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -17,7 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" ) @@ -134,11 +134,11 @@ var ( func readBulkRequest(streamName string, r io.Reader, encoding string, timeField string, msgFields []string, lmp insertutils.LogMessageProcessor) (int, error) { // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return 0, fmt.Errorf("cannot decode Elasticsearch protocol data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) diff --git a/app/vlinsert/journald/journald.go b/app/vlinsert/journald/journald.go index b1980f06a6..5f89e25b30 100644 --- a/app/vlinsert/journald/journald.go +++ b/app/vlinsert/journald/journald.go @@ -18,7 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -100,7 +100,7 @@ func handleJournald(r *http.Request, w http.ResponseWriter) { } encoding := r.Header.Get("Content-Encoding") - err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { + err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { lmp := cp.NewLogMessageProcessor("journald", false) err := parseJournaldRequest(data, lmp, cp) lmp.MustClose() diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 123abc516c..7ddf0b5b57 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -11,7 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -39,12 +39,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) { } encoding := r.Header.Get("Content-Encoding") - reader, err := common.GetUncompressedReader(r.Body, encoding) + reader, err := protoparserutil.GetUncompressedReader(r.Body, encoding) if err != nil { logger.Errorf("cannot decode jsonline request: %s", err) return } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) lmp := cp.NewLogMessageProcessor("jsonline", true) streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI) diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 660586c96a..49f5870a5f 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -14,7 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var maxRequestSize = flagutil.NewBytes("loki.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single Loki request") @@ -36,7 +36,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { } encoding := r.Header.Get("Content-Encoding") - err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { + err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { lmp := cp.cp.NewLogMessageProcessor("loki_json", false) useDefaultStreamFields := len(cp.cp.StreamFields) == 0 err := parseJSONRequest(data, lmp, cp.cp.MsgFields, useDefaultStreamFields, cp.parseMessage) diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index 2ac99f5017..8f5d1c8040 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -12,7 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -40,7 +40,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { // See https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs encoding = "snappy" } - err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { + err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { lmp := cp.cp.NewLogMessageProcessor("loki_protobuf", false) useDefaultStreamFields := len(cp.cp.StreamFields) == 0 err := parseProtobufRequest(data, lmp, cp.cp.MsgFields, useDefaultStreamFields, cp.parseMessage) diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index 6872aac7d5..2401b5def1 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -10,8 +10,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/metrics" ) @@ -50,7 +50,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { } encoding := r.Header.Get("Content-Encoding") - err = common.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { + err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error { lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf", false) useDefaultStreamFields := len(cp.StreamFields) == 0 err := pushProtobufRequest(data, lmp, useDefaultStreamFields) diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index dd67f19d0c..8302f32625 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -25,7 +25,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -380,11 +380,11 @@ func processStream(protocol string, r io.Reader, encoding string, useLocalTimest } func processStreamInternal(r io.Reader, encoding string, useLocalTimestamp bool, lmp insertutils.LogMessageProcessor) error { - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return fmt.Errorf("cannot decode syslog data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) return processUncompressedStream(reader, useLocalTimestamp, lmp) } diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 8d5464a06a..595fee37fa 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -7,9 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -22,16 +22,16 @@ var ( // InsertHandler processes csv data from req. func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - return stream.Parse(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []csvimport.Row) error { return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []csvimport.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) diff --git a/app/vmagent/datadogsketches/request_handler.go b/app/vmagent/datadogsketches/request_handler.go index cb7c9cb3dc..796aa5bd70 100644 --- a/app/vmagent/datadogsketches/request_handler.go +++ b/app/vmagent/datadogsketches/request_handler.go @@ -7,10 +7,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -23,7 +23,7 @@ var ( // InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request. func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/datadogv1/request_handler.go b/app/vmagent/datadogv1/request_handler.go index 722cdffb35..361b557f72 100644 --- a/app/vmagent/datadogv1/request_handler.go +++ b/app/vmagent/datadogv1/request_handler.go @@ -7,10 +7,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -23,7 +23,7 @@ var ( // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/datadogv2/request_handler.go b/app/vmagent/datadogv2/request_handler.go index 917df7efcb..e7f41c1dc5 100644 --- a/app/vmagent/datadogv2/request_handler.go +++ b/app/vmagent/datadogv2/request_handler.go @@ -7,10 +7,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -25,7 +25,7 @@ var ( // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 2f99cb8f72..b164440319 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -12,9 +12,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -36,7 +36,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error { - return stream.Parse(r, encoding, true, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, encoding, true, "", "", func(db string, rows []influx.Row) error { return insertRows(at, db, rows, nil) }) } @@ -45,7 +45,7 @@ func InsertHandlerForReader(at *auth.Token, r io.Reader, encoding string) error // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } @@ -55,12 +55,12 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { db := q.Get("db") encoding := req.Header.Get("Content-Encoding") isStreamMode := req.Header.Get("Stream-Mode") == "1" - return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []influx.Row) error { return insertRows(at, db, rows, extraLabels) }) } -func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, db string, rows []influx.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 49b6d3fcbf..f3446a0787 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -42,8 +42,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" @@ -145,7 +145,7 @@ func main() { startTime := time.Now() remotewrite.StartIngestionRateLimiter() remotewrite.Init() - common.StartUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error { return influx.InsertHandlerForReader(nil, r, "") @@ -195,7 +195,7 @@ func main() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer.MustStop() } - common.StopUnmarshalWorkers() + protoparserutil.StopUnmarshalWorkers() remotewrite.Stop() logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds()) @@ -282,7 +282,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } switch path { case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push": - if common.HandleVMProtoServerHandshake(w, r) { + if protoparserutil.HandleVMProtoServerHandshake(w, r) { return true } prometheusWriteRequests.Inc() diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index 34ada77fb2..6fa14448d7 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -9,8 +9,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -25,7 +25,7 @@ var ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/newrelic/request_handler.go b/app/vmagent/newrelic/request_handler.go index c68d108e4c..4efdef9644 100644 --- a/app/vmagent/newrelic/request_handler.go +++ b/app/vmagent/newrelic/request_handler.go @@ -10,9 +10,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" ) @@ -24,7 +24,7 @@ var ( // InsertHandlerForHTTP processes remote write for NewRelic POST /infra/v2/metrics/events/bulk request. func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/opentelemetry/request_handler.go b/app/vmagent/opentelemetry/request_handler.go index 77c5490596..9af047cf49 100644 --- a/app/vmagent/opentelemetry/request_handler.go +++ b/app/vmagent/opentelemetry/request_handler.go @@ -8,9 +8,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -23,7 +23,7 @@ var ( // InsertHandler processes opentelemetry metrics. func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index d9cb4b7919..3389449bc5 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -7,9 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -21,16 +21,16 @@ var ( // InsertHandler processes HTTP OpenTSDB put requests. // See http://opentsdb.net/docs/build/html/api_http/put.html func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - return stream.Parse(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []opentsdbhttp.Row) error { return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []opentsdbhttp.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index e95beee196..c8f2b462b7 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -8,9 +8,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -23,23 +23,23 @@ var ( // InsertHandler processes `/api/v1/import/prometheus` request. func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - defaultTimestamp, err := parserCommon.GetTimestamp(req) + defaultTimestamp, err := protoparserutil.GetTimestamp(req) if err != nil { return err } encoding := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []parser.Row) error { + return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []prometheus.Row) error { return insertRows(at, rows, extraLabels) }, func(s string) { httpserver.LogError(req, s) }) } -func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []prometheus.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) diff --git a/app/vmagent/prometheusimport/request_handler_test.go b/app/vmagent/prometheusimport/request_handler_test.go index 8600a3fd8c..fdb04fad48 100644 --- a/app/vmagent/prometheusimport/request_handler_test.go +++ b/app/vmagent/prometheusimport/request_handler_test.go @@ -12,7 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var ( @@ -44,14 +44,14 @@ func setUp() { log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err) } logger.Init() - common.StartUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() remotewrite.Init() testOutput = &bytes.Buffer{} logger.SetOutputForTests(testOutput) } func tearDown() { - common.StopUnmarshalWorkers() + protoparserutil.StopUnmarshalWorkers() srv.Close() logger.ResetOutputForTest() tmpDataDir := flag.Lookup("remoteWrite.tmpDataPath").Value.String() diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 02f758c5dc..c3e89046c2 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -8,8 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -22,7 +22,7 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index b9aa189b6a..f4a4e23767 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -18,7 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" @@ -170,7 +170,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste doRequest := func(url string) (*http.Response, error) { return c.doRequest(url, nil) } - useVMProto = common.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest) + useVMProto = protoparserutil.HandleVMProtoClientHandshake(c.remoteWriteURL, doRequest) if !useVMProto { logger.Infof("the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. "+ "See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol", sanitizedURL) diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index a566e06266..b0940234d7 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -9,8 +9,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -26,17 +26,17 @@ var ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 func InsertHandler(at *auth.Token, req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } encoding := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, encoding, func(rows []parser.Row) error { + return stream.Parse(req.Body, encoding, func(rows []vmimport.Row) error { return insertRows(at, rows, extraLabels) }) } -func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(at *auth.Token, rows []vmimport.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) diff --git a/app/vmctl/main.go b/app/vmctl/main.go index fe27235902..4a3765db35 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -26,8 +26,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) func main() { @@ -382,7 +382,7 @@ func main() { }, Before: beforeFn, Action: func(c *cli.Context) error { - common.StartUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() blockPath := c.Args().First() encoding := "" if c.Bool("gunzip") { diff --git a/app/vmctl/testdata/servers_integration_test/remote_write_server.go b/app/vmctl/testdata/servers_integration_test/remote_write_server.go index 3cde3b424b..3ddcfb433b 100644 --- a/app/vmctl/testdata/servers_integration_test/remote_write_server.go +++ b/app/vmctl/testdata/servers_integration_test/remote_write_server.go @@ -17,8 +17,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" ) @@ -206,8 +206,8 @@ func (rws *RemoteWriteServer) exportNativeHandler() http.Handler { func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - common.StartUnmarshalWorkers() - defer common.StopUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() + defer protoparserutil.StopUnmarshalWorkers() var gotTimeSeries []vm.TimeSeries var mx sync.RWMutex diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index deba553f60..67914d5621 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -6,9 +6,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -19,16 +19,16 @@ var ( // InsertHandler processes /api/v1/import/csv requests. func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - return stream.Parse(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []csvimport.Row) error { return insertRows(rows, extraLabels) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []csvimport.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) diff --git a/app/vminsert/datadogsketches/request_handler.go b/app/vminsert/datadogsketches/request_handler.go index 098d3f2e27..d39b0b201c 100644 --- a/app/vminsert/datadogsketches/request_handler.go +++ b/app/vminsert/datadogsketches/request_handler.go @@ -6,10 +6,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -20,7 +20,7 @@ var ( // InsertHandlerForHTTP processes remote write for DataDog POST /api/beta/sketches request. func InsertHandlerForHTTP(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/datadogv1/request_handler.go b/app/vminsert/datadogv1/request_handler.go index f491746b11..4de7b7328e 100644 --- a/app/vminsert/datadogv1/request_handler.go +++ b/app/vminsert/datadogv1/request_handler.go @@ -6,10 +6,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -20,7 +20,7 @@ var ( // InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request. func InsertHandlerForHTTP(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/datadogv2/request_handler.go b/app/vminsert/datadogv2/request_handler.go index 4774f1ccd8..5f19c5b006 100644 --- a/app/vminsert/datadogv2/request_handler.go +++ b/app/vminsert/datadogv2/request_handler.go @@ -6,10 +6,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -22,7 +22,7 @@ var ( // // See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics func InsertHandlerForHTTP(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 92946d7000..51faac50d3 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -10,9 +10,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" "github.com/VictoriaMetrics/metrics" @@ -34,7 +34,7 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { - return stream.Parse(r, "", true, "", "", func(db string, rows []parser.Row) error { + return stream.Parse(r, "", true, "", "", func(db string, rows []influx.Row) error { return insertRows(db, rows, nil) }) } @@ -43,7 +43,7 @@ func InsertHandlerForReader(r io.Reader) error { // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } @@ -53,12 +53,12 @@ func InsertHandlerForHTTP(req *http.Request) error { db := q.Get("db") encoding := req.Header.Get("Content-Encoding") isStreamMode := req.Header.Get("Stream-Mode") == "1" - return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []parser.Row) error { + return stream.Parse(req.Body, encoding, isStreamMode, precision, db, func(db string, rows []influx.Row) error { return insertRows(db, rows, extraLabels) }) } -func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(db string, rows []influx.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 2e3e278dc3..baabf06992 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -10,7 +10,7 @@ import ( "github.com/VictoriaMetrics/metrics" - vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogv1" @@ -39,8 +39,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" ) @@ -87,8 +87,8 @@ var staticServer = http.FileServer(http.FS(staticFiles)) // Init initializes vminsert. func Init() { relabel.Init() - vminsertCommon.InitStreamAggr() - common.StartUnmarshalWorkers() + common.InitStreamAggr() + protoparserutil.StartUnmarshalWorkers() if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler) } @@ -122,8 +122,8 @@ func Stop() { if len(*opentsdbHTTPListenAddr) > 0 { opentsdbhttpServer.MustStop() } - common.StopUnmarshalWorkers() - vminsertCommon.MustStopStreamAggr() + protoparserutil.StopUnmarshalWorkers() + common.MustStopStreamAggr() } // RequestHandler is a handler for Prometheus remote storage write API @@ -165,7 +165,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } switch path { case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push": - if common.HandleVMProtoServerHandshake(w, r) { + if protoparserutil.HandleVMProtoServerHandshake(w, r) { return true } prometheusWriteRequests.Inc() diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index 54fe28a72a..06beee7987 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -8,8 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) @@ -21,7 +21,7 @@ var ( // InsertHandler processes `/api/v1/import/native` request. func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/newrelic/request_handler.go b/app/vminsert/newrelic/request_handler.go index 4591eac406..918f46f0a6 100644 --- a/app/vminsert/newrelic/request_handler.go +++ b/app/vminsert/newrelic/request_handler.go @@ -8,9 +8,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var ( @@ -20,7 +20,7 @@ var ( // InsertHandlerForHTTP processes remote write for request to /newrelic/infra/v2/metrics/events/bulk request. func InsertHandlerForHTTP(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/opentelemetry/request_handler.go b/app/vminsert/opentelemetry/request_handler.go index c6c4f440e4..f9f4e15989 100644 --- a/app/vminsert/opentelemetry/request_handler.go +++ b/app/vminsert/opentelemetry/request_handler.go @@ -7,9 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -20,7 +20,7 @@ var ( // InsertHandler processes opentelemetry metrics. func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 29dd85e7d3..15c67dc5bc 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -7,9 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -24,11 +24,11 @@ func InsertHandler(req *http.Request) error { path := req.URL.Path switch path { case "/opentsdb/api/put", "/api/put": - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - return stream.Parse(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []opentsdbhttp.Row) error { return insertRows(rows, extraLabels) }) default: @@ -36,7 +36,7 @@ func InsertHandler(req *http.Request) error { } } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []opentsdbhttp.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 025d9bad9e..b265650d7f 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -7,9 +7,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -20,23 +20,23 @@ var ( // InsertHandler processes `/api/v1/import/prometheus` request. func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } - defaultTimestamp, err := parserCommon.GetTimestamp(req) + defaultTimestamp, err := protoparserutil.GetTimestamp(req) if err != nil { return err } encoding := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []parser.Row) error { + return stream.Parse(req.Body, defaultTimestamp, encoding, true, func(rows []prometheus.Row) error { return insertRows(rows, extraLabels) }, func(s string) { httpserver.LogError(req, s) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []prometheus.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index e7b207d38f..7c29658c3c 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -7,8 +7,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -19,7 +19,7 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 3a92d88790..8893701036 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -8,8 +8,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" @@ -24,17 +24,17 @@ var ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 func InsertHandler(req *http.Request) error { - extraLabels, err := parserCommon.GetExtraLabels(req) + extraLabels, err := protoparserutil.GetExtraLabels(req) if err != nil { return err } encoding := req.Header.Get("Content-Encoding") - return stream.Parse(req.Body, encoding, func(rows []parser.Row) error { + return stream.Parse(req.Body, encoding, func(rows []vmimport.Row) error { return insertRows(rows, extraLabels) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(rows []vmimport.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 3985b60248..781d00bd7a 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -11,8 +11,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) func TestIsAutoMetric(t *testing.T) { @@ -733,8 +733,8 @@ func TestSendStaleSeries(t *testing.T) { sw.Config = &ScrapeWork{ NoStaleMarkers: false, } - common.StartUnmarshalWorkers() - defer common.StopUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() + defer protoparserutil.StopUnmarshalWorkers() var staleMarks int sw.PushData = func(_ *auth.Token, wr *prompbmarshal.WriteRequest) { @@ -762,8 +762,8 @@ func TestSendStaleSeries(t *testing.T) { f(generateScrape(20000), generateScrape(10), 19990) } -func parsePromRow(data string) *parser.Row { - var rows parser.Rows +func parsePromRow(data string) *prometheus.Row { + var rows prometheus.Rows errLogger := func(s string) { panic(fmt.Errorf("unexpected error when unmarshaling Prometheus rows: %s", s)) } @@ -775,7 +775,7 @@ func parsePromRow(data string) *parser.Row { } func parseData(data string) []prompbmarshal.TimeSeries { - return parser.MustParsePromMetrics(data, 0) + return prometheus.MustParsePromMetrics(data, 0) } func expectEqualTimeseries(tss, tssExpected []prompbmarshal.TimeSeries) error { diff --git a/lib/protoparser/csvimport/stream/streamparser.go b/lib/protoparser/csvimport/stream/streamparser.go index 2982d683de..4158818cb3 100644 --- a/lib/protoparser/csvimport/stream/streamparser.go +++ b/lib/protoparser/csvimport/stream/streamparser.go @@ -10,8 +10,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -35,11 +35,11 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error { } encoding := req.Header.Get("Content-Encoding") - reader, err := common.GetUncompressedReader(req.Body, encoding) + reader, err := protoparserutil.GetUncompressedReader(req.Body, encoding) if err != nil { return fmt.Errorf("cannot decode csv data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) @@ -54,7 +54,7 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error { uw.cds = cds uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } ctx.wg.Wait() @@ -69,7 +69,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -165,7 +165,7 @@ func (uw *unmarshalWork) runCallback(rows []csvimport.Row) { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements prototparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf), uw.cds) rows := uw.rows.Rows diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go index 383b6374fd..f95c07ff05 100644 --- a/lib/protoparser/datadogsketches/stream/streamparser.go +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -5,9 +5,9 @@ import ( "io" "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -16,7 +16,7 @@ import ( // callback shouldn't hold series after returning. func Parse(r io.Reader, encoding string, callback func(series []*datadogsketches.Sketch) error) error { readCalls.Inc() - err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { return parseData(data, callback) }) if err != nil { diff --git a/lib/protoparser/datadogv1/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go index c959bbbe06..957d3aa2bc 100644 --- a/lib/protoparser/datadogv1/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -5,9 +5,9 @@ import ( "io" "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv1" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -16,7 +16,7 @@ import ( // callback shouldn't hold series after returning. func Parse(r io.Reader, encoding string, callback func(series []datadogv1.Series) error) error { readCalls.Inc() - err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { return parseData(data, callback) }) if err != nil { diff --git a/lib/protoparser/datadogv2/stream/streamparser.go b/lib/protoparser/datadogv2/stream/streamparser.go index 6c6abf99cb..beaf9e7cad 100644 --- a/lib/protoparser/datadogv2/stream/streamparser.go +++ b/lib/protoparser/datadogv2/stream/streamparser.go @@ -5,9 +5,9 @@ import ( "io" "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogv2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -16,7 +16,7 @@ import ( // callback shouldn't hold series after returning. func Parse(r io.Reader, encoding, contentType string, callback func(series []datadogv2.Series) error) error { readCalls.Inc() - err := common.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, datadogutils.MaxInsertRequestSize, func(data []byte) error { return parseData(data, contentType, callback) }) if err != nil { diff --git a/lib/protoparser/graphite/stream/streamparser.go b/lib/protoparser/graphite/stream/streamparser.go index 3c91cfcc73..aefd000c6b 100644 --- a/lib/protoparser/graphite/stream/streamparser.go +++ b/lib/protoparser/graphite/stream/streamparser.go @@ -10,8 +10,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -27,11 +27,11 @@ var ( // // callback shouldn't hold rows after returning. func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) error) error { - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return fmt.Errorf("Cannot decode graphite data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) @@ -46,7 +46,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) erro uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } ctx.wg.Wait() @@ -61,7 +61,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -155,7 +155,7 @@ func (uw *unmarshalWork) runCallback(rows []graphite.Row) { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements protoparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) rows := uw.rows.Rows diff --git a/lib/protoparser/influx/stream/streamparser.go b/lib/protoparser/influx/stream/streamparser.go index d2313b872d..38994772da 100644 --- a/lib/protoparser/influx/stream/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -10,8 +10,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -41,7 +41,7 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string // Process the whole request in one go. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7090 readCalls.Inc() - err := common.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error { ctx := getBatchContext() defer putBatchContext(ctx) @@ -59,11 +59,11 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string } func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string, callback func(db string, rows []influx.Row) error) error { - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return fmt.Errorf("cannot decode influx line protocol data: %w; see https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) @@ -79,7 +79,7 @@ func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string uw.tsMultiplier = tsMultiplier uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } ctx.wg.Wait() @@ -153,7 +153,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN()) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.IntN()) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -235,7 +235,7 @@ func (uw *unmarshalWork) runCallback() { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements protoparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { _ = unmarshal(&uw.rows, uw.reqBuf, uw.tsMultiplier) uw.runCallback() diff --git a/lib/protoparser/influx/stream/streamparser_test.go b/lib/protoparser/influx/stream/streamparser_test.go index 5cd256cc22..33936c84fe 100644 --- a/lib/protoparser/influx/stream/streamparser_test.go +++ b/lib/protoparser/influx/stream/streamparser_test.go @@ -7,8 +7,8 @@ import ( "sync" "testing" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) func TestDetectTimestamp(t *testing.T) { @@ -37,8 +37,8 @@ func TestDetectTimestamp(t *testing.T) { } func TestParseStream(t *testing.T) { - common.StartUnmarshalWorkers() - defer common.StopUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() + defer protoparserutil.StopUnmarshalWorkers() f := func(data string, rowsExpected []influx.Row, isStreamMode bool, badData bool) { t.Helper() diff --git a/lib/protoparser/native/stream/streamparser.go b/lib/protoparser/native/stream/streamparser.go index 95aa315758..3b300663dc 100644 --- a/lib/protoparser/native/stream/streamparser.go +++ b/lib/protoparser/native/stream/streamparser.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -20,11 +20,11 @@ import ( // // callback shouldn't hold block after returning. func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error { - reader, err := common.GetUncompressedReader(r, contentEncoding) + reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding) if err != nil { return fmt.Errorf("cannot decode vmimport data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) @@ -103,7 +103,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(block *Block) erro blocksRead.Inc() ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } } @@ -154,7 +154,7 @@ func (uw *unmarshalWork) reset() { uw.block.reset() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements protoparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { err := uw.unmarshal() if err != nil { diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index d8b998e830..7f90aeaecf 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -9,8 +9,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/newrelic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var ( @@ -23,7 +23,7 @@ var ( // callback shouldn't hold rows after returning. func Parse(r io.Reader, encoding string, callback func(rows []newrelic.Row) error) error { readCalls.Inc() - err := common.ReadUncompressedData(r, encoding, maxInsertRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, maxInsertRequestSize, func(data []byte) error { return parseData(data, callback) }) if err != nil { diff --git a/lib/protoparser/opentelemetry/stream/streamparser.go b/lib/protoparser/opentelemetry/stream/streamparser.go index dcf2d05fbd..b2f549676a 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser.go +++ b/lib/protoparser/opentelemetry/stream/streamparser.go @@ -15,8 +15,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request") @@ -27,7 +27,7 @@ var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1 // // optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format. func ParseStream(r io.Reader, encoding string, processBody func(data []byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error { - err := common.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(r, encoding, maxRequestSize, func(data []byte) error { if processBody != nil { dataNew, err := processBody(data) if err != nil { diff --git a/lib/protoparser/opentsdb/stream/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go index 0a9e2c057f..3a8aec340e 100644 --- a/lib/protoparser/opentsdb/stream/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -10,8 +10,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -39,7 +39,7 @@ func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error { uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } ctx.wg.Wait() @@ -54,7 +54,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -148,7 +148,7 @@ func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements protoparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) rows := uw.rows.Rows diff --git a/lib/protoparser/opentsdbhttp/stream/streamparser.go b/lib/protoparser/opentsdbhttp/stream/streamparser.go index f0d23ca7ff..d31bd8ce1e 100644 --- a/lib/protoparser/opentsdbhttp/stream/streamparser.go +++ b/lib/protoparser/opentsdbhttp/stream/streamparser.go @@ -9,8 +9,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" ) @@ -28,7 +28,7 @@ var ( func Parse(req *http.Request, callback func(rows []opentsdbhttp.Row) error) error { readCalls.Inc() encoding := req.Header.Get("Content-Encoding") - err := common.ReadUncompressedData(req.Body, encoding, maxInsertRequestSize, func(data []byte) error { + err := protoparserutil.ReadUncompressedData(req.Body, encoding, maxInsertRequestSize, func(data []byte) error { return parseData(data, callback) }) if err != nil { diff --git a/lib/protoparser/prometheus/stream/streamparser.go b/lib/protoparser/prometheus/stream/streamparser.go index 8ccde8494d..f931c2415c 100644 --- a/lib/protoparser/prometheus/stream/streamparser.go +++ b/lib/protoparser/prometheus/stream/streamparser.go @@ -8,8 +8,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -24,11 +24,11 @@ import ( // It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set, // like /api/v1/write calls. func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error { - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return fmt.Errorf("cannot decode Prometheus text exposition data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) var wcr *writeconcurrencylimiter.Reader if limitConcurrency { @@ -47,7 +47,7 @@ func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrenc uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) if wcr != nil { wcr.DecConcurrency() } @@ -64,7 +64,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -162,7 +162,7 @@ func (uw *unmarshalWork) runCallback(rows []prometheus.Row) { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements protoparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { if uw.errLogger != nil { uw.rows.UnmarshalWithErrLogger(bytesutil.ToUnsafeString(uw.reqBuf), uw.errLogger) diff --git a/lib/protoparser/prometheus/stream/streamparser_test.go b/lib/protoparser/prometheus/stream/streamparser_test.go index 254e3c60b9..cda6091ef1 100644 --- a/lib/protoparser/prometheus/stream/streamparser_test.go +++ b/lib/protoparser/prometheus/stream/streamparser_test.go @@ -9,13 +9,13 @@ import ( "testing" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) func TestParse(t *testing.T) { - common.StartUnmarshalWorkers() - defer common.StopUnmarshalWorkers() + protoparserutil.StartUnmarshalWorkers() + defer protoparserutil.StopUnmarshalWorkers() const defaultTimestamp = 123 f := func(s string, rowsExpected []prometheus.Row) { diff --git a/lib/protoparser/common/compress_reader.go b/lib/protoparser/protoparserutil/compress_reader.go similarity index 99% rename from lib/protoparser/common/compress_reader.go rename to lib/protoparser/protoparserutil/compress_reader.go index 3918fe1acd..d0e5ddda16 100644 --- a/lib/protoparser/common/compress_reader.go +++ b/lib/protoparser/protoparserutil/compress_reader.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "fmt" diff --git a/lib/protoparser/common/compress_reader_test.go b/lib/protoparser/protoparserutil/compress_reader_test.go similarity index 99% rename from lib/protoparser/common/compress_reader_test.go rename to lib/protoparser/protoparserutil/compress_reader_test.go index c61c567d60..45e0f8a92c 100644 --- a/lib/protoparser/common/compress_reader_test.go +++ b/lib/protoparser/protoparserutil/compress_reader_test.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "bytes" diff --git a/lib/protoparser/common/extra_labels.go b/lib/protoparser/protoparserutil/extra_labels.go similarity index 98% rename from lib/protoparser/common/extra_labels.go rename to lib/protoparser/protoparserutil/extra_labels.go index b68d94fe5f..e4ee440d43 100644 --- a/lib/protoparser/common/extra_labels.go +++ b/lib/protoparser/protoparserutil/extra_labels.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "encoding/base64" diff --git a/lib/protoparser/common/extra_labels_test.go b/lib/protoparser/protoparserutil/extra_labels_test.go similarity index 99% rename from lib/protoparser/common/extra_labels_test.go rename to lib/protoparser/protoparserutil/extra_labels_test.go index 483fb88654..be6b5f152b 100644 --- a/lib/protoparser/common/extra_labels_test.go +++ b/lib/protoparser/protoparserutil/extra_labels_test.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "fmt" diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/protoparserutil/lines_reader.go similarity index 99% rename from lib/protoparser/common/lines_reader.go rename to lib/protoparser/protoparserutil/lines_reader.go index 04e8e33e14..195236dd25 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/protoparserutil/lines_reader.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "bytes" diff --git a/lib/protoparser/common/lines_reader_test.go b/lib/protoparser/protoparserutil/lines_reader_test.go similarity index 99% rename from lib/protoparser/common/lines_reader_test.go rename to lib/protoparser/protoparserutil/lines_reader_test.go index b4bce35143..b649e5aa23 100644 --- a/lib/protoparser/common/lines_reader_test.go +++ b/lib/protoparser/protoparserutil/lines_reader_test.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "bytes" diff --git a/lib/protoparser/common/timestamp.go b/lib/protoparser/protoparserutil/timestamp.go similarity index 95% rename from lib/protoparser/common/timestamp.go rename to lib/protoparser/protoparserutil/timestamp.go index 2e51abb91b..73be46fff6 100644 --- a/lib/protoparser/common/timestamp.go +++ b/lib/protoparser/protoparserutil/timestamp.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "fmt" diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/protoparserutil/unmarshal_work.go similarity index 98% rename from lib/protoparser/common/unmarshal_work.go rename to lib/protoparser/protoparserutil/unmarshal_work.go index 47eb730a27..00f138a116 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/protoparserutil/unmarshal_work.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "sync" diff --git a/lib/protoparser/common/vmproto_handshake.go b/lib/protoparser/protoparserutil/vmproto_handshake.go similarity index 97% rename from lib/protoparser/common/vmproto_handshake.go rename to lib/protoparser/protoparserutil/vmproto_handshake.go index a0cf9f00e5..acd4c923fe 100644 --- a/lib/protoparser/common/vmproto_handshake.go +++ b/lib/protoparser/protoparserutil/vmproto_handshake.go @@ -1,4 +1,4 @@ -package common +package protoparserutil import ( "io" diff --git a/lib/protoparser/vmimport/stream/streamparser.go b/lib/protoparser/vmimport/stream/streamparser.go index 27d5294b7d..3939a776f3 100644 --- a/lib/protoparser/vmimport/stream/streamparser.go +++ b/lib/protoparser/vmimport/stream/streamparser.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -23,11 +23,11 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 10*1024*1024, "The maxim // // callback shouldn't hold rows after returning. func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) error) error { - reader, err := common.GetUncompressedReader(r, encoding) + reader, err := protoparserutil.GetUncompressedReader(r, encoding) if err != nil { return fmt.Errorf("cannot decode vmimport data: %w", err) } - defer common.PutUncompressedReader(reader) + defer protoparserutil.PutUncompressedReader(reader) wcr := writeconcurrencylimiter.GetReader(reader) defer writeconcurrencylimiter.PutReader(wcr) @@ -41,7 +41,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) erro uw.callback = callback uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) + protoparserutil.ScheduleUnmarshalWork(uw) wcr.DecConcurrency() } ctx.wg.Wait() @@ -56,7 +56,7 @@ func (ctx *streamContext) Read() bool { if ctx.err != nil || ctx.hasCallbackError() { return false } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN()) + ctx.reqBuf, ctx.tailBuf, ctx.err = protoparserutil.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.IntN()) if ctx.err != nil { if ctx.err != io.EOF { readErrors.Inc() @@ -150,7 +150,7 @@ func (uw *unmarshalWork) runCallback(rows []vmimport.Row) { ctx.wg.Done() } -// Unmarshal implements common.UnmarshalWork +// Unmarshal implements proroparserutil.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) rows := uw.rows.Rows