From 01b36ddd1918d6cdab96ae675a09b4080dccd006 Mon Sep 17 00:00:00 2001 From: Artem Fetishev <149964189+rtm0@users.noreply.github.com> Date: Tue, 12 May 2026 16:24:01 +0200 Subject: [PATCH] Refactor apptests (#10940) Fixes #10938. --------- Signed-off-by: Artem Fetishev --- apptest/client.go | 789 +++++++++++++++++- apptest/model.go | 46 +- apptest/tests/legacy_indexdb_test.go | 6 +- apptest/tests/metric_names_stats_test.go | 30 +- .../tests/multitenancy_via_headers_test.go | 4 +- apptest/tests/multitenant_test.go | 4 +- apptest/vmagent.go | 12 +- apptest/vmauth.go | 10 +- apptest/vminsert.go | 298 +------ apptest/vmselect.go | 313 +------ apptest/vmsingle.go | 664 +-------------- apptest/vmstorage.go | 130 +-- 12 files changed, 918 insertions(+), 1388 deletions(-) diff --git a/apptest/client.go b/apptest/client.go index 83bee7735d..075977f923 100644 --- a/apptest/client.go +++ b/apptest/client.go @@ -2,6 +2,7 @@ package apptest import ( "bytes" + "encoding/json" "fmt" "io" "net" @@ -13,6 +14,10 @@ import ( "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" + "github.com/golang/snappy" ) // Client is used for interacting with the apps over the network. @@ -147,27 +152,34 @@ func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string { return string(b) } -// ServesMetrics is used to retrieve the app's metrics. +// metricsClient is used to retrieve the app's metrics. // // This type is expected to be embedded by the apps that serve metrics. -type ServesMetrics struct { - metricsURL string - cli *Client +type metricsClient struct { + metricsCli *Client + url string +} + +func newMetricsClient(cli *Client, addr string) *metricsClient { + return &metricsClient{ + metricsCli: cli, + url: fmt.Sprintf("http://%s/metrics", addr), + } } // GetIntMetric retrieves the value of a metric served by an app at /metrics URL. // The value is then converted to int. -func (app *ServesMetrics) GetIntMetric(t *testing.T, metricName string) int { +func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int { t.Helper() - return int(app.GetMetric(t, metricName)) + return int(c.GetMetric(t, metricName)) } // GetMetric retrieves the value of a metric served by an app at /metrics URL. -func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 { +func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 { t.Helper() - metrics, statusCode := app.cli.Get(t, app.metricsURL, nil) + metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } @@ -188,12 +200,12 @@ func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 { // GetMetricsByPrefix retrieves the values of all metrics that start with given // prefix. -func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []float64 { +func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float64 { t.Helper() values := []float64{} - metrics, statusCode := app.cli.Get(t, app.metricsURL, nil) + metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } @@ -217,12 +229,12 @@ func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []floa return values } -func (app *ServesMetrics) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []float64 { +func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []float64 { t.Helper() values := []float64{} - metrics, statusCode := app.cli.Get(t, app.metricsURL, nil) + metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } @@ -245,3 +257,756 @@ func (app *ServesMetrics) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) [] } return values } + +// rpcRowsSentTotal retrieves the values of all vminsert +// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and +// returns their integer sum. +func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int { + total := 0.0 + for _, v := range c.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") { + total += v + } + return int(total) +} + +type vmselectClient struct { + vmselectCli *Client + url func(op, path string, opts QueryOpts) string + metricNamesStatsResetURL string + tenantsURL string +} + +// PrometheusAPIV1Export is a test helper function that performs the export of +// raw samples in JSON line format by sending a request to +// /prometheus/api/v1/export endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export +func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/export", opts) + values := opts.asURLValues() + values.Add("match[]", query) + values.Add("format", "promapi") + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1QueryResponse(t, res) +} + +// PrometheusAPIV1ExportNative is a test helper function that performs the export of +// raw samples in native binary format by sending a request to +// /prometheus/api/v1/export/native endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative +func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte { + t.Helper() + url := c.url("select", "prometheus/api/v1/export/native", opts) + values := opts.asURLValues() + values.Add("match[]", query) + values.Add("format", "promapi") + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return []byte(res) +} + +// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL +// instant query by sending a request to /prometheus/api/v1/query endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query +func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/query", opts) + values := opts.asURLValues() + values.Add("query", query) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1QueryResponse(t, res) +} + +// PrometheusAPIV1QueryRange is a test helper function that performs +// PromQL/MetricsQL range query by sending a request to +// /prometheus/api/v1/query_range endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range +func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/query_range", opts) + values := opts.asURLValues() + values.Add("query", query) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1QueryResponse(t, res) +} + +// PrometheusAPIV1Series retrieves list of time series that match the query by +// sending a request to /prometheus/api/v1/series endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series +func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/series", opts) + values := opts.asURLValues() + values.Add("match[]", matchQuery) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1SeriesResponse(t, res) +} + +// PrometheusAPIV1SeriesCount retrieves the total number of time series by +// sending a request to /prometheus/api/v1/series/count endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series +func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/series/count", opts) + values := opts.asURLValues() + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1SeriesCountResponse(t, res) +} + +// PrometheusAPIV1Labels retrieves the label names for time series that match a +// query by sending a request to /prometheus/api/v1/labels endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels +func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/labels", opts) + values := opts.asURLValues() + values.Add("match[]", matchQuery) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1LabelsResponse(t, res) +} + +// PrometheusAPIV1LabelValues retrieves the labels values for the metrics that +// match the query by sending a request to /prometheus/api/v1/label/.../values +// endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues +func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse { + t.Helper() + path := fmt.Sprintf("prometheus/api/v1/label/%s/values", labelName) + url := c.url("select", path, opts) + values := opts.asURLValues() + values.Add("match[]", matchQuery) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1LabelValuesResponse(t, res) +} + +// PrometheusAPIV1Metadata retrieves metadata for the given metric by sending a +// request to /prometheus/api/v1/metadata endpoint. +func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata { + t.Helper() + url := c.url("select", "prometheus/api/v1/metadata", opts) + values := opts.asURLValues() + values.Add("metric", metric) + values.Add("limit", strconv.Itoa(limit)) + res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) + return NewPrometheusAPIV1Metadata(t, res) +} + +// PrometheusAPIV1AdminTSDBDeleteSeries deletes the series that match the query +// by sending a request to /prometheus/api/v1/admin/tsdb/delete_series. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series +func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) { + t.Helper() + + url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts) + values := opts.asURLValues() + values.Add("match[]", matchQuery) + res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) + } +} + +// PrometheusAPIV1StatusMetricNamesStats sends a query to +// /prometheus/api/v1/status/metric_names_stats endpoint and returns the metric +// usage stats response for given params. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage +func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/status/metric_names_stats", opts) + values := opts.asURLValues() + values.Add("limit", limit) + values.Add("le", le) + values.Add("match_pattern", matchPattern) + res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) + } + var resp MetricNamesStatsResponse + if err := json.Unmarshal([]byte(res), &resp); err != nil { + t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err) + } + return resp +} + +// PrometheusAPIV1StatusTSDB retrieves the TSDB status for the time series that +// match the query on the given date by sending a request to +// /prometheus/api/v1/status/tsdb endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats +func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse { + t.Helper() + url := c.url("select", "prometheus/api/v1/status/tsdb", opts) + values := opts.asURLValues() + addNonEmpty := func(name, value string) { + if len(value) == 0 { + return + } + values.Add(name, value) + } + addNonEmpty("match[]", matchQuery) + addNonEmpty("topN", topN) + addNonEmpty("date", date) + res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) + } + + var status TSDBStatusResponse + if err := json.Unmarshal([]byte(res), &status); err != nil { + t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err) + } + status.Sort() + return status +} + +// GraphiteMetricsIndex retrieves the list of all metrics by sending a request +// to /graphite/metrics/index.json endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api +func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse { + t.Helper() + + url := c.url("select", "graphite/metrics/index.json", opts) + res, statusCode := c.vmselectCli.Get(t, url, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) + } + + var index GraphiteMetricsIndexResponse + if err := json.Unmarshal([]byte(res), &index); err != nil { + t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err) + } + return index +} + +// GraphiteMetricsFind finds metrics under a given path by sending a request +// to /metrics/find endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api +// and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-find +func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts QueryOpts) GraphiteMetricsFindResponse { + t.Helper() + + url := c.url("select", "graphite/metrics/find", opts) + values := opts.asURLValues() + values.Add("query", query) + resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) + } + + var res GraphiteMetricsFindResponse + if err := json.Unmarshal([]byte(resText), &res); err != nil { + t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) + } + return res +} + +// GraphiteMetricsExpand expands the given query with matching paths by sending +// a request to /graphite/metrics/expand endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api +// and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-expand +func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts QueryOpts) GraphiteMetricsExpandResponse { + t.Helper() + + url := c.url("select", "graphite/metrics/expand", opts) + values := opts.asURLValues() + values.Add("query", query) + resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) + } + + var res GraphiteMetricsExpandResponse + if err := json.Unmarshal([]byte(resText), &res); err != nil { + t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) + } + return res +} + +// GraphiteRender retrieves the raw metric data by sending a request to +// /graphite/render endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#render-api +// and https://graphite-api.readthedocs.io/en/latest/api.html#the-render-api-render +func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryOpts) GraphiteRenderResponse { + t.Helper() + + url := c.url("select", "graphite/render", opts) + values := opts.asURLValues() + values.Add("format", "json") + values.Add("target", target) + resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) + } + + var res GraphiteRenderResponse + if err := json.Unmarshal([]byte(resText), &res); err != nil { + t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) + } + return res +} + +// GraphiteTagsTagSeries is a test helper function that registers Graphite tags +// for a single time series by sending a request to /graphite/tags/tagSeries +// endpoint. +func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) { + t.Helper() + + url := c.url("select", "graphite/tags/tagSeries", opts) + values := opts.asURLValues() + values.Add("path", record) + _, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if got, want := statusCode, http.StatusNotImplemented; got != want { + t.Fatalf("unexpected status code: got %d, want %d", got, want) + } +} + +// GraphiteTagsTagMultiSeries is a test helper function that registers Graphite +// tags for a multiple time series by sending a request to +// /graphite/tags/tagSeries endpoint. +func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.url("select", "graphite/tags/tagMultiSeries", opts) + values := opts.asURLValues() + for _, rec := range records { + values.Add("path", rec) + } + _, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) + if got, want := statusCode, http.StatusNotImplemented; got != want { + t.Fatalf("unexpected status code: got %d, want %d", got, want) + } +} + +// PrometheusAPIV1AdminStatusMetricNamesStatsReset resets the metric name usage +// stats by sending a request to +// /prometheus/api/v1/admin/status/metric_names_stats/reset endpoint +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage +func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) { + t.Helper() + values := opts.asURLValues() + res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) + } +} + +// APIV1AdminTenants retrieves the list of tenants by sending a request to +// /admin/tenants endpoint. +func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse { + t.Helper() + res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) + } + + tenants := &AdminTenantsResponse{} + if err := json.Unmarshal([]byte(res), tenants); err != nil { + t.Fatalf("could not unmarshal tenants response data:\n%s\n err: %v", res, err) + } + + return tenants +} + +type vminsertClient struct { + vminsertCli *Client + url func(op, path string, opts QueryOpts) string + openTSDBURL func(op, path string, opts QueryOpts) string + graphiteListenAddr string + sendBlocking func(t *testing.T, numRecordsToSend int, send func()) +} + +// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection +// of records in CSV format for the given tenant by sending an HTTP POST +// request to prometheus/api/v1/import/csv vminsert endpoint. +// +// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format +func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "prometheus/api/v1/import/csv", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + data := []byte(strings.Join(records, "\n")) + headers := opts.getHeaders() + headers.Set("Content-Type", "text/plain") + c.sendBlocking(t, len(records), func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// PrometheusAPIV1ImportNative is a test helper function that inserts a collection +// of records in Native format for the given tenant by sending an HTTP POST +// request to prometheus/api/v1/import/native vminsert endpoint. +// +// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format +func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "prometheus/api/v1/import/native", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + headers := opts.getHeaders() + headers.Set("Content-Type", "text/plain") + c.sendBlocking(t, 1, func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// PrometheusAPIV1Write is a test helper function that inserts a +// collection of records in Prometheus remote-write format by sending a HTTP +// POST request to /prometheus/api/v1/write vminsert endpoint. +func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "prometheus/api/v1/write", opts) + data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) + recordsCount := len(wr.Timeseries) + if prommetadata.IsEnabled() { + recordsCount += len(wr.Metadata) + } + headers := opts.getHeaders() + headers.Set("Content-Type", "application/x-protobuf") + c.sendBlocking(t, recordsCount, func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a +// collection of records in Prometheus text exposition format for the given +// tenant by sending a HTTP POST request to +// /prometheus/api/v1/import/prometheus vminsert endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus +func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "prometheus/api/v1/import/prometheus", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + data := []byte(strings.Join(records, "\n")) + var recordsCount int + var metadataRecords int + uniqueMetadataMetricNames := make(map[string]struct{}) + for _, record := range records { + // metric metadata has the following format: + //# HELP importprometheus_series + //# TYPE importprometheus_series + // it results into single metadata record + if strings.HasPrefix(record, "# ") { + metadataItems := strings.Split(record, " ") + if len(metadataItems) < 3 { + t.Fatalf("BUG: unexpected metadata format=%q", record) + } + metricName := metadataItems[2] + if _, ok := uniqueMetadataMetricNames[metricName]; ok { + continue + } + uniqueMetadataMetricNames[metricName] = struct{}{} + metadataRecords++ + continue + } + recordsCount++ + } + if prommetadata.IsEnabled() { + recordsCount += metadataRecords + } + headers := opts.getHeaders() + headers.Set("Content-Type", "text/plain") + c.sendBlocking(t, recordsCount, func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// InfluxWrite is a test helper function that inserts a collection of records in +// Influx line format by sending a HTTP POST request to /influx/write endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite +func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "influx/write", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + + data := []byte(strings.Join(records, "\n")) + headers := opts.getHeaders() + headers.Set("Content-Type", "text/plain") + c.sendBlocking(t, len(records), func() { + t.Helper() + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// OpentelemetryV1Metrics is a test helper function that inserts a +// collection of records in Opentelemetry protocol format by sending a HTTP +// POST request to /opentelemetry/v1/metrics vminsert endpoint. +func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) { + t.Helper() + + var recordsCount int + for _, rss := range md.ResourceMetrics { + for _, sm := range rss.ScopeMetrics { + recordsCount += len(sm.Metrics) + for _, m := range sm.Metrics { + if prommetadata.IsEnabled() { + recordsCount += len(m.Metadata) + } + } + } + } + url := c.url("insert", "opentelemetry/v1/metrics", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + data := md.MarshalProtobuf(nil) + headers := opts.getHeaders() + headers.Set("Content-Type", "application/x-protobuf") + c.sendBlocking(t, recordsCount, func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) + } + }) +} + +// OpenTSDBAPIPut is a test helper function that inserts a collection of +// records in OpenTSDB format for the given tenant by sending an HTTP POST +// request to /opentsdb/api/put vminsert endpoint. +// +// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format +func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.openTSDBURL("insert", "opentsdb/api/put", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + data := []byte("[" + strings.Join(records, ",") + "]") + headers := opts.getHeaders() + headers.Set("Content-Type", "application/json") + c.sendBlocking(t, len(records), func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) + } + }) +} + +// ZabbixConnectorHistory is a test helper function that inserts a +// collection of records in zabbixconnector format by sending a HTTP +// POST request to /zabbixconnector/api/v1/history vmsingle endpoint. +func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) { + t.Helper() + + url := c.url("insert", "zabbixconnector/api/v1/history", opts) + uv := opts.asURLValues() + uvs := uv.Encode() + if len(uvs) > 0 { + url += "?" + uvs + } + data := []byte(strings.Join(records, "\n")) + headers := opts.getHeaders() + headers.Set("Content-Type", "application/json") + c.sendBlocking(t, len(records), func() { + _, statusCode := c.vminsertCli.Post(t, url, data, headers) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) + } + }) + +} + +// GraphiteWrite is a test helper function that sends a +// collection of records to graphiteListenAddr port. +// +// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting +func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) { + t.Helper() + c.vminsertCli.Write(t, c.graphiteListenAddr, records) +} + +type vmstorageClient struct { + vmstorageCli *Client + httpListenAddr string +} + +// ForceFlush is a test helper function that forces the flushing of inserted +// data, so it becomes available for searching immediately. +func (c *vmstorageClient) ForceFlush(t *testing.T) { + t.Helper() + + url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr) + _, statusCode := c.vmstorageCli.Get(t, url, nil) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) + } +} + +// ForceMerge is a test helper function that forces the merging of parts. +func (c *vmstorageClient) ForceMerge(t *testing.T) { + t.Helper() + + url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr) + _, statusCode := c.vmstorageCli.Get(t, url, nil) + if statusCode != http.StatusOK { + t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) + } +} + +// SnapshotCreate creates a database snapshot by sending a query to the +// /snapshot/create endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots +func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse { + t.Helper() + + data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil) + if got, want := statusCode, http.StatusOK; got != want { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) + } + + var res SnapshotCreateResponse + if err := json.Unmarshal([]byte(data), &res); err != nil { + t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err) + } + + return &res +} + +// SnapshotCreateURL returns the URL for creating snapshots. +func (c *vmstorageClient) SnapshotCreateURL() string { + return fmt.Sprintf("http://%s/snapshot/create", c.httpListenAddr) +} + +// APIV1AdminTSDBSnapshot creates a database snapshot by sending a query to the +// /api/v1/admin/tsdb/snapshot endpoint. +// +// See https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot. +func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSnapshotResponse { + t.Helper() + + url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr) + data, statusCode := c.vmstorageCli.Post(t, url, nil, nil) + if got, want := statusCode, http.StatusOK; got != want { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) + } + + var res APIV1AdminTSDBSnapshotResponse + if err := json.Unmarshal([]byte(data), &res); err != nil { + t.Fatalf("could not unmarshal prometheus snapshot create response: data=%q, err: %v", data, err) + } + + return &res +} + +// SnapshotList lists existing database snapshots by sending a query to the +// /snapshot/list endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots +func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse { + t.Helper() + + url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr) + data, statusCode := c.vmstorageCli.Get(t, url, nil) + if got, want := statusCode, http.StatusOK; got != want { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) + } + + var res SnapshotListResponse + if err := json.Unmarshal([]byte(data), &res); err != nil { + t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err) + } + + return &res +} + +// SnapshotDelete deletes a snapshot by sending a query to the +// /snapshot/delete endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots +func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse { + t.Helper() + + url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName) + data, statusCode := c.vmstorageCli.Delete(t, url) + wantStatusCodes := map[int]bool{ + http.StatusOK: true, + http.StatusInternalServerError: true, + } + if !wantStatusCodes[statusCode] { + t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data) + } + + var res SnapshotDeleteResponse + if err := json.Unmarshal([]byte(data), &res); err != nil { + t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err) + } + + return &res +} + +// SnapshotDeleteAll deletes all snapshots by sending a query to the +// /snapshot/delete_all endpoint. +// +// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots +func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse { + t.Helper() + + url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr) + data, statusCode := c.vmstorageCli.Post(t, url, nil, nil) + if got, want := statusCode, http.StatusOK; got != want { + t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) + } + + var res SnapshotDeleteAllResponse + if err := json.Unmarshal([]byte(data), &res); err != nil { + t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err) + } + + return &res +} diff --git a/apptest/model.go b/apptest/model.go index 50f47baf01..ea5c37872a 100644 --- a/apptest/model.go +++ b/apptest/model.go @@ -27,13 +27,16 @@ type PrometheusQuerier interface { PrometheusAPIV1LabelValues(t *testing.T, labelName, query string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata - - APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) + PrometheusAPIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse + PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) // TODO(@rtm0): Prometheus does not provide this API. Either move it to a // separate interface or rename this interface to allow for multiple querier // types. GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse + GraphiteMetricsFind(t *testing.T, query string, opts QueryOpts) GraphiteMetricsFindResponse + GraphiteMetricsExpand(t *testing.T, query string, opts QueryOpts) GraphiteMetricsExpandResponse + GraphiteRender(t *testing.T, target string, opts QueryOpts) GraphiteRenderResponse GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) } @@ -91,6 +94,9 @@ type QueryOpts struct { Format string NoCache string Headers http.Header + From string + Until string + StorageStep string } func (qos *QueryOpts) getHeaders() http.Header { @@ -123,6 +129,9 @@ func (qos *QueryOpts) asURLValues() url.Values { addNonEmpty("latency_offset", qos.LatencyOffset) addNonEmpty("format", qos.Format) addNonEmpty("nocache", qos.NoCache) + addNonEmpty("from", qos.From) + addNonEmpty("until", qos.Until) + addNonEmpty("storage_step", qos.StorageStep) return uv } @@ -480,10 +489,6 @@ type TSDBStatusResponse struct { Data TSDBStatusResponseData } -// GraphiteMetricsIndexResponse is an in-memory representation of the json response -// returned by the /graphite/metrics/index.json endpoint. -type GraphiteMetricsIndexResponse = []string - // AdminTenantsResponse is an in-memory representation of the json response // returned by the /api/v1/admin/tenants endpoint. type AdminTenantsResponse struct { @@ -533,3 +538,32 @@ func sortTSDBStatusResponseEntries(entries []TSDBStatusResponseEntry) { return left.Count < right.Count }) } + +// GraphiteMetricsIndexResponse is an in-memory representation of the json response +// returned by the /graphite/metrics/index.json endpoint. +type GraphiteMetricsIndexResponse = []string + +type GraphiteMetric struct { + Id string + Text string + AllowChildren int + Expandable int + Leaf int +} + +// GraphiteMetricsIndexResponse is an in-memory representation of the json response +// returned by the /graphite/metrics/find endpoint. +type GraphiteMetricsFindResponse = []GraphiteMetric + +// GraphiteMetricsExpandResponse is an in-memory representation of the json response +// returned by the /graphite/metrics/expand endpoint. +type GraphiteMetricsExpandResponse = []string + +type GraphiteRenderedTarget struct { + Target string + Datapoints [][2]float64 +} + +// GraphiteRenderResponse is an in-memory representation of the json response +// returned by the /graphite/render endpoint. +type GraphiteRenderResponse = []GraphiteRenderedTarget diff --git a/apptest/tests/legacy_indexdb_test.go b/apptest/tests/legacy_indexdb_test.go index f0d5ce5abb..01a74c38b0 100644 --- a/apptest/tests/legacy_indexdb_test.go +++ b/apptest/tests/legacy_indexdb_test.go @@ -214,7 +214,7 @@ func testLegacyDeleteSeries(tc *at.TestCase, opts testLegacyDeleteSeriesOpts) { newSUT := opts.startNewSUT() assertSearchResults(newSUT, `{__name__=~".*"}`, start1, end1, "1d", want1) - newSUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) + newSUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) wantNoResults := &want{ series: []map[string]string{}, queryResults: []*at.QueryResult{}, @@ -877,7 +877,7 @@ func testLegacyDowngrade(tc *at.TestCase, opts testLegacyDowngradeOpts) { // Ingest legacy2 records, ensure the queries return only legacy2. legacySUT = opts.startLegacySUT() assertQueries(legacySUT, `{__name__=~".*"}`, wantLegacy1, numMetrics) - legacySUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) + legacySUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) assertQueries(legacySUT, `{__name__=~".*"}`, wantEmpty, numMetrics) legacySUT.PrometheusAPIV1ImportPrometheus(t, legacy2Data, at.QueryOpts{}) legacySUT.ForceFlush(t) @@ -891,7 +891,7 @@ func testLegacyDowngrade(tc *at.TestCase, opts testLegacyDowngradeOpts) { newSUT = opts.startNewSUT() // series count includes deleted metrics assertQueries(newSUT, `{__name__=~".*"}`, wantLegacy2New1, 3*numMetrics) - newSUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) + newSUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{}) // series count includes deleted metrics assertQueries(newSUT, `{__name__=~".*"}`, wantEmpty, 3*numMetrics) opts.stopNewSUT() diff --git a/apptest/tests/metric_names_stats_test.go b/apptest/tests/metric_names_stats_test.go index 01b54629ae..0b281ddab6 100644 --- a/apptest/tests/metric_names_stats_test.go +++ b/apptest/tests/metric_names_stats_test.go @@ -48,7 +48,7 @@ func TestSingleMetricNamesStats(t *testing.T) { {MetricName: "metric_name_3"}, }, } - got := sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) + got := sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) if diff := cmp.Diff(expected, got); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } @@ -63,7 +63,7 @@ func TestSingleMetricNamesStats(t *testing.T) { {MetricName: "metric_name_3", QueryRequestsCount: 1}, }, } - got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) + got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) if diff := cmp.Diff(expected, got); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } @@ -90,7 +90,7 @@ func TestSingleMetricNamesStats(t *testing.T) { }, } expectedStatsResponse.Sort() - gotStatus := sut.APIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{}) + gotStatus := sut.PrometheusAPIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{}) if diff := cmp.Diff(expectedStatsResponse, gotStatus, tsdbMetricNameEntryCmpOpts); diff != "" { t.Errorf("unexpected APIV1StatusTSDB response (-want, +got):\n%s", diff) } @@ -105,7 +105,7 @@ func TestSingleMetricNamesStats(t *testing.T) { {MetricName: "metric_name_3", QueryRequestsCount: 1}, }, } - got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) + got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) if diff := cmp.Diff(expected, got); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } @@ -118,17 +118,17 @@ func TestSingleMetricNamesStats(t *testing.T) { {MetricName: "metric_name_3", QueryRequestsCount: 1}, }, } - got = sut.APIV1StatusMetricNamesStats(t, "", "2", "", apptest.QueryOpts{}) + got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "2", "", apptest.QueryOpts{}) if diff := cmp.Diff(expected, got); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } // reset state and check empty request response - sut.APIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{}) + sut.PrometheusAPIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{}) expected = apptest.MetricNamesStatsResponse{ Records: []apptest.MetricNamesStatsRecord{}, } - got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) + got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{}) if diff := cmp.Diff(expected, got); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } @@ -158,7 +158,7 @@ func TestClusterMetricNamesStats(t *testing.T) { fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()), }) // verify empty stats - resp := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"}) + resp := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"}) if len(resp.Records) != 0 { t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Records), 0) } @@ -198,7 +198,7 @@ func TestClusterMetricNamesStats(t *testing.T) { {MetricName: "metric_name_3"}, }, } - gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID}) + gotStats := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID}) if diff := cmp.Diff(expected, gotStats); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } @@ -216,7 +216,7 @@ func TestClusterMetricNamesStats(t *testing.T) { {MetricName: "metric_name_1", QueryRequestsCount: 3}, }, } - gotStats = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID}) + gotStats = vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID}) if diff := cmp.Diff(expected, gotStats); diff != "" { t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff) } @@ -243,9 +243,9 @@ func TestClusterMetricNamesStats(t *testing.T) { }, } expectedStatsResponse.Sort() - gotStatus := vmselect.APIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{Tenant: tenantID}) + gotStatus := vmselect.PrometheusAPIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{Tenant: tenantID}) if diff := cmp.Diff(expectedStatsResponse, gotStatus, tsdbMetricNameEntryCmpOpts); diff != "" { - t.Errorf("unexpected APIV1StatusTSDB response tenant: %s (-want, +got):\n%s", tenantID, diff) + t.Errorf("unexpected TSDB status for tenant %s (-want, +got):\n%s", tenantID, diff) } } @@ -258,14 +258,14 @@ func TestClusterMetricNamesStats(t *testing.T) { {MetricName: "metric_name_1", QueryRequestsCount: 9}, }, } - gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"}) + gotStats := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"}) if diff := cmp.Diff(expected, gotStats); diff != "" { t.Errorf("unexpected response (-want, +got):\n%s", diff) } // reset cache and check empty state - vmselect.MetricNamesStatsReset(t, apptest.QueryOpts{}) - resp = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"}) + vmselect.PrometheusAPIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{}) + resp = vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"}) if len(resp.Records) != 0 { t.Fatalf("want 0 records, got: %d", len(resp.Records)) } diff --git a/apptest/tests/multitenancy_via_headers_test.go b/apptest/tests/multitenancy_via_headers_test.go index 1adac332f0..d8f4c0d36e 100644 --- a/apptest/tests/multitenancy_via_headers_test.go +++ b/apptest/tests/multitenancy_via_headers_test.go @@ -219,7 +219,7 @@ func TestClusterMultiTenantSelectViaHeaders(t *testing.T) { tenantID := make(http.Header) tenantID.Set("AccountID", "5") tenantID.Set("ProjectID", "15") - vmselect.APIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{ + vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{ Headers: tenantID, }) wantSR = apptest.NewPrometheusAPIV1SeriesResponse(t, @@ -244,7 +244,7 @@ func TestClusterMultiTenantSelectViaHeaders(t *testing.T) { } // Delete series for multitenant with tenant filter - vmselect.APIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{ + vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{ Headers: multitenant, }) diff --git a/apptest/tests/multitenant_test.go b/apptest/tests/multitenant_test.go index 17a78c5f8f..9fd2b2b77e 100644 --- a/apptest/tests/multitenant_test.go +++ b/apptest/tests/multitenant_test.go @@ -192,7 +192,7 @@ func TestClusterMultiTenantSelect(t *testing.T) { } // Delete series from specific tenant - vmselect.APIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{ + vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{ Tenant: "5:15", }) wantSR = apptest.NewPrometheusAPIV1SeriesResponse(t, @@ -215,7 +215,7 @@ func TestClusterMultiTenantSelect(t *testing.T) { } // Delete series for multitenant with tenant filter - vmselect.APIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{ + vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{ Tenant: "multitenant", }) diff --git a/apptest/vmagent.go b/apptest/vmagent.go index 55d441b99e..55547a4d83 100644 --- a/apptest/vmagent.go +++ b/apptest/vmagent.go @@ -19,9 +19,11 @@ import ( // Vmagent holds the state of a vmagent app and provides vmagent-specific functions type Vmagent struct { *app - *ServesMetrics + *metricsClient httpListenAddr string + + cli *Client } // StartVmagent starts an instance of vmagent with the given flags. It also @@ -46,12 +48,10 @@ func StartVmagent(instance string, flags []string, cli *Client, promScrapeConfig } return &Vmagent{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), - cli: cli, - }, + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[0]), httpListenAddr: stderrExtracts[0], + cli: cli, }, nil } diff --git a/apptest/vmauth.go b/apptest/vmauth.go index 8e3e1950b5..c3dcdd97be 100644 --- a/apptest/vmauth.go +++ b/apptest/vmauth.go @@ -1,7 +1,6 @@ package apptest import ( - "fmt" "io" "regexp" "syscall" @@ -17,7 +16,7 @@ var httpBuilitinListenAddrRE = regexp.MustCompile(`pprof handlers are exposed at // functions. type Vmauth struct { *app - *ServesMetrics + *metricsClient httpListenAddr string configFilePath string @@ -45,11 +44,8 @@ func StartVmauth(instance string, flags []string, cli *Client, configFilePath st } return &Vmauth{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), - cli: cli, - }, + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[0]), httpListenAddr: stderrExtracts[0], configFilePath: configFilePath, cli: cli, diff --git a/apptest/vminsert.go b/apptest/vminsert.go index 54af4e4352..ea09a7101c 100644 --- a/apptest/vminsert.go +++ b/apptest/vminsert.go @@ -3,31 +3,21 @@ package apptest import ( "fmt" "io" - "net/http" "regexp" "strings" "testing" "time" - - "github.com/golang/snappy" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" - otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" ) // Vminsert holds the state of a vminsert app and provides vminsert-specific // functions. type Vminsert struct { *app - *ServesMetrics + *metricsClient + *vminsertClient httpListenAddr string clusternativeListenAddr string - graphiteListenAddr string - openTSDBListenAddr string - - cli *Client } // storageNodes returns the storage node addresses passed to vminsert via @@ -73,17 +63,26 @@ func StartVminsert(instance string, flags []string, cli *Client, output io.Write return nil, err } + metricsClient := newMetricsClient(cli, stderrExtracts[0]) return &Vminsert{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), - cli: cli, + app: app, + metricsClient: metricsClient, + vminsertClient: &vminsertClient{ + vminsertCli: cli, + url: func(op, path string, opts QueryOpts) string { + return getClusterPath(stderrExtracts[0], op, path, opts) + }, + openTSDBURL: func(op, path string, opts QueryOpts) string { + return getClusterPath(stderrExtracts[3], op, path, opts) + }, + graphiteListenAddr: stderrExtracts[2], + sendBlocking: func(t *testing.T, numRecordsToSend int, send func()) { + t.Helper() + sendBlocking(t, metricsClient, numRecordsToSend, send) + }, }, httpListenAddr: stderrExtracts[0], clusternativeListenAddr: stderrExtracts[1], - graphiteListenAddr: stderrExtracts[2], - openTSDBListenAddr: stderrExtracts[3], - cli: cli, }, nil } @@ -99,247 +98,6 @@ func (app *Vminsert) HTTPAddr() string { return app.httpListenAddr } -// InfluxWrite is a test helper function that inserts a -// collection of records in Influx line format by sending a HTTP -// POST request to /influx/write vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite -func (app *Vminsert) InfluxWrite(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "influx/write", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - - data := []byte(strings.Join(records, "\n")) - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - app.sendBlocking(t, len(records), func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// GraphiteWrite is a test helper function that sends a -// collection of records to graphiteListenAddr port. -// -// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting -func (app *Vminsert) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) { - t.Helper() - app.cli.Write(t, app.graphiteListenAddr, records) -} - -// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection -// of records in CSV format for the given tenant by sending an HTTP POST -// request to prometheus/api/v1/import/csv vminsert endpoint. -// -// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format -func (app *Vminsert) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/csv", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte(strings.Join(records, "\n")) - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - app.sendBlocking(t, len(records), func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// PrometheusAPIV1ImportNative is a test helper function that inserts a collection -// of records in Native format for the given tenant by sending an HTTP POST -// request to prometheus/api/v1/import/native vminsert endpoint. -// -// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format -func (app *Vminsert) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/native", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - app.sendBlocking(t, 1, func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// OpenTSDBAPIPut is a test helper function that inserts a collection of -// records in OpenTSDB format for the given tenant by sending an HTTP POST -// request to /opentsdb/api/put vminsert endpoint. -// -// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format -func (app *Vminsert) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.openTSDBListenAddr, "insert", "opentsdb/api/put", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte("[" + strings.Join(records, ",") + "]") - headers := opts.getHeaders() - headers.Set("Content-Type", "application/json") - app.sendBlocking(t, len(records), func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// PrometheusAPIV1Write is a test helper function that inserts a -// collection of records in Prometheus remote-write format by sending a HTTP -// POST request to /prometheus/api/v1/write vminsert endpoint. -func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/write", opts) - data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) - recordsCount := len(wr.Timeseries) - if prommetadata.IsEnabled() { - recordsCount += len(wr.Metadata) - } - headers := opts.getHeaders() - headers.Set("Content-Type", "application/x-protobuf") - app.sendBlocking(t, recordsCount, func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a -// collection of records in Prometheus text exposition format for the given -// tenant by sending a HTTP POST request to -// /prometheus/api/v1/import/prometheus vminsert endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus -func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/prometheus", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte(strings.Join(records, "\n")) - var recordsCount int - var metadataRecords int - uniqueMetadataMetricNames := make(map[string]struct{}) - for _, record := range records { - // metric metadata has the following format: - //# HELP importprometheus_series - //# TYPE importprometheus_series - // it results into single metadata record - if strings.HasPrefix(record, "# ") { - metadataItems := strings.Split(record, " ") - if len(metadataItems) < 2 { - t.Fatalf("BUG: unexpected metadata format=%q", record) - } - metricName := metadataItems[2] - if _, ok := uniqueMetadataMetricNames[metricName]; ok { - continue - } - uniqueMetadataMetricNames[metricName] = struct{}{} - metadataRecords++ - continue - } - recordsCount++ - } - if prommetadata.IsEnabled() { - recordsCount += metadataRecords - } - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - app.sendBlocking(t, recordsCount, func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } - }) -} - -// ZabbixConnectorHistory is a test helper function that inserts a -// collection of records in zabbixconnector format by sending a HTTP -// POST request to /zabbixconnector/api/v1/history vmsingle endpoint. -func (app *Vminsert) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "insert", "zabbixconnector/api/v1/history", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte(strings.Join(records, "\n")) - headers := opts.getHeaders() - headers.Set("Content-Type", "application/json") - app.sendBlocking(t, len(records), func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } - }) - -} - -// OpentelemetryV1Metrics is a test helper function that inserts a -// collection of records in Opentelemetry protocol format by sending a HTTP -// POST request to /opentelemetry/v1/metrics vminsert endpoint. -func (app *Vminsert) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) { - t.Helper() - - var recordsCount int - for _, rss := range md.ResourceMetrics { - for _, sm := range rss.ScopeMetrics { - recordsCount += len(sm.Metrics) - for _, m := range sm.Metrics { - if prommetadata.IsEnabled() { - recordsCount += len(m.Metadata) - } - } - } - } - url := getClusterPath(app.httpListenAddr, "insert", "opentelemetry/v1/metrics", opts) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := md.MarshalProtobuf(nil) - headers := opts.getHeaders() - headers.Set("Content-Type", "application/x-protobuf") - app.sendBlocking(t, recordsCount, func() { - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } - }) -} - // String returns the string representation of the vminsert app state. func (app *Vminsert) String() string { return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) @@ -355,13 +113,10 @@ func (app *Vminsert) String() string { // Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total` // metric and checking whether it is equal or greater than the wanted value. // If it is, then the data has been sent to vmstorage. -// -// Unreliable if the records are inserted concurrently. -// TODO(rtm0): Put sending and waiting into a critical section to make reliable? -func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func()) { +func sendBlocking(t *testing.T, c *metricsClient, numRecordsToSend int, send func()) { t.Helper() - wantRowsSentCount := app.rpcRowsSentTotal(t) + numRecordsToSend + wantRowsSentCount := c.rpcRowsSentTotal(t) + numRecordsToSend send() @@ -370,7 +125,7 @@ func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func( period = 100 * time.Millisecond ) for range retries { - d := app.rpcRowsSentTotal(t) + d := c.rpcRowsSentTotal(t) if d >= wantRowsSentCount { return } @@ -378,14 +133,3 @@ func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func( } t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage") } - -// rpcRowsSentTotal retrieves the values of all vminsert -// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and -// returns their integer sum. -func (app *Vminsert) rpcRowsSentTotal(t *testing.T) int { - total := 0.0 - for _, v := range app.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") { - total += v - } - return int(total) -} diff --git a/apptest/vmselect.go b/apptest/vmselect.go index cf15292e8d..9054345fce 100644 --- a/apptest/vmselect.go +++ b/apptest/vmselect.go @@ -1,20 +1,17 @@ package apptest import ( - "encoding/json" "fmt" "io" - "net/http" "regexp" - "strconv" - "testing" ) // Vmselect holds the state of a vmselect app and provides vmselect-specific // functions. type Vmselect struct { *app - *ServesMetrics + *metricsClient + *vmselectClient httpListenAddr string clusternativeListenAddr string @@ -41,10 +38,15 @@ func StartVmselect(instance string, flags []string, cli *Client, output io.Write } return &Vmselect{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]), - cli: cli, + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[0]), + vmselectClient: &vmselectClient{ + vmselectCli: cli, + url: func(op, path string, opts QueryOpts) string { + return getClusterPath(stderrExtracts[0], op, path, opts) + }, + metricNamesStatsResetURL: fmt.Sprintf("http://%s/admin/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[0]), + tenantsURL: fmt.Sprintf("http://%s/admin/tenants", stderrExtracts[0]), }, httpListenAddr: stderrExtracts[0], clusternativeListenAddr: stderrExtracts[1], @@ -64,299 +66,6 @@ func (app *Vmselect) HTTPAddr() string { return app.httpListenAddr } -// PrometheusAPIV1Export is a test helper function that performs the export of -// raw samples in JSON line format by sending a HTTP POST request to -// /prometheus/api/v1/export vmselect endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export -func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - - exportURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/export", opts) - values := opts.asURLValues() - values.Add("match[]", query) - values.Add("format", "promapi") - res, _ := app.cli.PostForm(t, exportURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1ExportNative is a test helper function that performs the export of -// raw samples in native binary format by sending an HTTP POST request to -// /prometheus/api/v1/export/native vmselect endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative -func (app *Vmselect) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte { - t.Helper() - - exportURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/export/native", opts) - values := opts.asURLValues() - values.Add("match[]", query) - values.Add("format", "promapi") - res, _ := app.cli.PostForm(t, exportURL, values, opts.Headers) - return []byte(res) -} - -// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL -// instant query by sending a HTTP POST request to /prometheus/api/v1/query -// vmselect endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query -func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - - queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/query", opts) - values := opts.asURLValues() - values.Add("query", query) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1QueryRange is a test helper function that performs -// PromQL/MetricsQL range query by sending a HTTP POST request to -// /prometheus/api/v1/query_range vmselect endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range -func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - - queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/query_range", opts) - values := opts.asURLValues() - values.Add("query", query) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint -// and returns the list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series -func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse { - t.Helper() - - seriesURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/series", opts) - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - res, _ := app.cli.PostForm(t, seriesURL, values, opts.Headers) - return NewPrometheusAPIV1SeriesResponse(t, res) -} - -// PrometheusAPIV1SeriesCount sends a query to a /prometheus/api/v1/series/count endpoint -// and returns the total number of time series. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series -func (app *Vmselect) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse { - t.Helper() - - seriesURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/series/count", opts) - values := opts.asURLValues() - - res, _ := app.cli.PostForm(t, seriesURL, values, opts.Headers) - return NewPrometheusAPIV1SeriesCountResponse(t, res) -} - -// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint -// and returns the label names list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels -func (app *Vmselect) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("match[]", matchQuery) - queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/labels", opts) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1LabelsResponse(t, res) -} - -// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/.../values endpoint -// and returns the label names list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues -func (app *Vmselect) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("match[]", matchQuery) - suffix := fmt.Sprintf("prometheus/api/v1/label/%s/values", labelName) - queryURL := getClusterPath(app.httpListenAddr, "select", suffix, opts) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1LabelValuesResponse(t, res) -} - -// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint -// and returns the results. -func (app *Vmselect) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata { - t.Helper() - - values := opts.asURLValues() - values.Add("metric", metric) - values.Add("limit", strconv.Itoa(limit)) - queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/metadata", opts) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1Metadata(t, res) -} - -// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending -// a request to /api/v1/admin/tsdb/delete_series. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series -func (app *Vmselect) APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) { - t.Helper() - - queryURL := getClusterPath(app.httpListenAddr, "delete", "prometheus/api/v1/admin/tsdb/delete_series", opts) - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) - } -} - -// MetricNamesStats sends a query to a /select/tenant/prometheus/api/v1/status/metric_names_stats endpoint -// and returns the statistics response for given params. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage -func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("limit", limit) - values.Add("le", le) - values.Add("match_pattern", matchPattern) - queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/status/metric_names_stats", opts) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - var resp MetricNamesStatsResponse - if err := json.Unmarshal([]byte(res), &resp); err != nil { - t.Fatalf("could not unmarshal series response data:\n%s\n err: %v", res, err) - } - return resp -} - -// MetricNamesStatsReset sends a query to a /admin/api/v1/status/metric_names_stats/reset endpoint -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage -func (app *Vmselect) MetricNamesStatsReset(t *testing.T, opts QueryOpts) { - t.Helper() - - values := opts.asURLValues() - queryURL := fmt.Sprintf("http://%s/admin/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) - } -} - -// APIV1StatusTSDB sends a query to a /prometheus/api/v1/status/tsdb -// // -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats -func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/status/tsdb", opts) - values := opts.asURLValues() - addNonEmpty := func(name, value string) { - if len(value) == 0 { - return - } - values.Add(name, value) - } - addNonEmpty("match[]", matchQuery) - addNonEmpty("topN", topN) - addNonEmpty("date", date) - - res, statusCode := app.cli.PostForm(t, url, values, opts.Headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - - var status TSDBStatusResponse - if err := json.Unmarshal([]byte(res), &status); err != nil { - t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err) - } - status.Sort() - return status -} - -// GraphiteMetricsIndex sends a query to a /graphite/metrics/index.json -// -// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api -func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "select", "graphite/metrics/index.json", opts) - res, statusCode := app.cli.Get(t, url, opts.Headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - - var index GraphiteMetricsIndexResponse - if err := json.Unmarshal([]byte(res), &index); err != nil { - t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err) - } - return index -} - -// GraphiteTagsTagSeries is a test helper function that registers Graphite tags -// for a single time series by sending a HTTP POST request to -// /graphite/tags/tagSeries vmsingle endpoint. -func (app *Vmselect) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "select", "graphite/tags/tagSeries", opts) - values := opts.asURLValues() - values.Add("path", record) - - _, statusCode := app.cli.PostForm(t, url, values, opts.Headers) - if got, want := statusCode, http.StatusNotImplemented; got != want { - t.Fatalf("unexpected status code: got %d, want %d", got, want) - } -} - -func (app *Vmselect) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := getClusterPath(app.httpListenAddr, "select", "graphite/tags/tagMultiSeries", opts) - values := opts.asURLValues() - for _, rec := range records { - values.Add("path", rec) - } - - _, statusCode := app.cli.PostForm(t, url, values, opts.Headers) - if got, want := statusCode, http.StatusNotImplemented; got != want { - t.Fatalf("unexpected status code: got %d, want %d", got, want) - } -} - -// APIV1AdminTenants sends a query to a /admin/tenants endpoint -func (app *Vmselect) APIV1AdminTenants(t *testing.T) *AdminTenantsResponse { - t.Helper() - - tenantsURL := fmt.Sprintf("http://%s/admin/tenants", app.httpListenAddr) - res, statusCode := app.cli.Get(t, tenantsURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - - var tenants *AdminTenantsResponse - if err := json.Unmarshal([]byte(res), tenants); err != nil { - t.Fatalf("could not unmarshal tenants response data:\n%s\n err: %v", res, err) - } - - return tenants -} - // String returns the string representation of the vmselect app state. func (app *Vmselect) String() string { return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) diff --git a/apptest/vmsingle.go b/apptest/vmsingle.go index b123c60de5..53c41aad18 100644 --- a/apptest/vmsingle.go +++ b/apptest/vmsingle.go @@ -1,49 +1,25 @@ package apptest import ( - "encoding/json" "fmt" "io" - "net/http" "os" "regexp" - "strconv" - "strings" "testing" "time" - - "github.com/golang/snappy" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" - otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" ) // Vmsingle holds the state of a vmsingle app and provides vmsingle-specific // functions. type Vmsingle struct { *app - *ServesMetrics + *metricsClient + *vmstorageClient + *vmselectClient + *vminsertClient storageDataPath string httpListenAddr string - - // vmstorage URLs. - forceFlushURL string - forceMergeURL string - - // vminsert URLs. - influxLineWriteURL string - graphiteWriteAddr string - openTSDBHTTPURL string - prometheusAPIV1ImportPrometheusURL string - prometheusAPIV1WriteURL string - - // vmselect URLs. - prometheusAPIV1ExportURL string - prometheusAPIV1ExportNativeURL string - prometheusAPIV1QueryURL string - prometheusAPIV1QueryRangeURL string - prometheusAPIV1SeriesURL string } // StartVmsingleAt starts an instance of vmsingle with the given flags. It also @@ -70,617 +46,39 @@ func StartVmsingleAt(instance, binary string, flags []string, cli *Client, outpu } return &Vmsingle{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]), - cli: cli, + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[1]), + vmstorageClient: &vmstorageClient{ + vmstorageCli: cli, + httpListenAddr: stderrExtracts[1], + }, + vmselectClient: &vmselectClient{ + vmselectCli: cli, + url: func(op, path string, opts QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path) + }, + metricNamesStatsResetURL: fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[1]), + tenantsURL: "vmsingle-does-not-serve-tenants", + }, + vminsertClient: &vminsertClient{ + vminsertCli: cli, + url: func(_, path string, _ QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path) + }, + openTSDBURL: func(_, path string, _ QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[3], path) + }, + graphiteListenAddr: stderrExtracts[2], + sendBlocking: func(t *testing.T, _ int, send func()) { + t.Helper() + send() + }, }, storageDataPath: stderrExtracts[0], httpListenAddr: stderrExtracts[1], - - forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]), - forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]), - - influxLineWriteURL: fmt.Sprintf("http://%s/influx/write", stderrExtracts[1]), - graphiteWriteAddr: stderrExtracts[2], - openTSDBHTTPURL: fmt.Sprintf("http://%s", stderrExtracts[3]), - prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]), - prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]), - prometheusAPIV1ExportURL: fmt.Sprintf("http://%s/prometheus/api/v1/export", stderrExtracts[1]), - prometheusAPIV1ExportNativeURL: fmt.Sprintf("http://%s/prometheus/api/v1/export/native", stderrExtracts[1]), - prometheusAPIV1QueryURL: fmt.Sprintf("http://%s/prometheus/api/v1/query", stderrExtracts[1]), - prometheusAPIV1QueryRangeURL: fmt.Sprintf("http://%s/prometheus/api/v1/query_range", stderrExtracts[1]), - prometheusAPIV1SeriesURL: fmt.Sprintf("http://%s/prometheus/api/v1/series", stderrExtracts[1]), }, nil } -// ForceFlush is a test helper function that forces the flushing of inserted -// data, so it becomes available for searching immediately. -func (app *Vmsingle) ForceFlush(t *testing.T) { - t.Helper() - - _, statusCode := app.cli.Get(t, app.forceFlushURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - -// ForceMerge is a test helper function that forces the merging of parts. -func (app *Vmsingle) ForceMerge(t *testing.T) { - t.Helper() - - _, statusCode := app.cli.Get(t, app.forceMergeURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - -// InfluxWrite is a test helper function that inserts a -// collection of records in Influx line format by sending a HTTP -// POST request to /influx/write vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite -func (app *Vmsingle) InfluxWrite(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - data := []byte(strings.Join(records, "\n")) - - url := app.influxLineWriteURL - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// GraphiteWrite is a test helper function that sends a collection of records -// to graphiteListenAddr port. -// -// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting -func (app *Vmsingle) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) { - t.Helper() - app.cli.Write(t, app.graphiteWriteAddr, records) -} - -// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection -// of records in CSV format for the given tenant by sending an HTTP POST -// request to /api/v1/import/csv vmsingle endpoint. -// -// See https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-import-csv-data -func (app *Vmsingle) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/api/v1/import/csv", app.httpListenAddr) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte(strings.Join(records, "\n")) - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// PrometheusAPIV1ImportNative is a test helper function that inserts a collection -// of records in native format for the given tenant by sending an HTTP POST -// request to /api/v1/import/native vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-data-in-native-format -func (app *Vmsingle) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/api/v1/import/native", app.httpListenAddr) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// OpenTSDBAPIPut is a test helper function that inserts a collection of -// records in OpenTSDB format for the given tenant by sending an HTTP POST -// request to /api/put vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/integrations/opentsdb/#sending-data-via-http -func (app *Vmsingle) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - // add extra label - url := app.openTSDBHTTPURL + "/api/put" - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte("[" + strings.Join(records, ",") + "]") - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// PrometheusAPIV1Write is a test helper function that inserts a -// collection of records in Prometheus remote-write format by sending a HTTP -// POST request to /prometheus/api/v1/write vmsingle endpoint. -func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) { - t.Helper() - - data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) - headers := opts.getHeaders() - headers.Set("Content-Type", "application/x-protobuf") - _, statusCode := app.cli.Post(t, app.prometheusAPIV1WriteURL, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a -// collection of records in Prometheus text exposition format by sending a HTTP -// POST request to /prometheus/api/v1/import/prometheus vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus -func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - // add extra label - url := app.prometheusAPIV1ImportPrometheusURL - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - headers := opts.getHeaders() - headers.Set("Content-Type", "text/plain") - data := []byte(strings.Join(records, "\n")) - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) - } -} - -// PrometheusAPIV1Export is a test helper function that performs the export of -// raw samples in JSON line format by sending a HTTP POST request to -// /prometheus/api/v1/export vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export -func (app *Vmsingle) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - values := opts.asURLValues() - values.Add("match[]", query) - values.Add("format", "promapi") - - res, _ := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1ExportNative is a test helper function that performs the export of -// raw samples in native binary format by sending an HTTP POST request to -// /prometheus/api/v1/export/native vmselect endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative -func (app *Vmsingle) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte { - t.Helper() - - t.Helper() - values := opts.asURLValues() - values.Add("match[]", query) - values.Add("format", "promapi") - - res, _ := app.cli.PostForm(t, app.prometheusAPIV1ExportNativeURL, values, opts.Headers) - return []byte(res) -} - -// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL -// instant query by sending a HTTP POST request to /prometheus/api/v1/query -// vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query -func (app *Vmsingle) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("query", query) - res, _ := app.cli.PostForm(t, app.prometheusAPIV1QueryURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1QueryRange is a test helper function that performs -// PromQL/MetricsQL range query by sending a HTTP POST request to -// /prometheus/api/v1/query_range vmsingle endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range -func (app *Vmsingle) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("query", query) - - res, _ := app.cli.PostForm(t, app.prometheusAPIV1QueryRangeURL, values, opts.Headers) - return NewPrometheusAPIV1QueryResponse(t, res) -} - -// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint -// and returns the list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series -func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - res, _ := app.cli.PostForm(t, app.prometheusAPIV1SeriesURL, values, opts.Headers) - return NewPrometheusAPIV1SeriesResponse(t, res) -} - -// PrometheusAPIV1SeriesCount sends a query to a /prometheus/api/v1/series/count endpoint -// and returns the total number of time series. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series -func (app *Vmsingle) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse { - t.Helper() - - values := opts.asURLValues() - - queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/series/count", app.httpListenAddr) - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1SeriesCountResponse(t, res) -} - -// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint -// and returns the label names list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels -func (app *Vmsingle) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/labels", app.httpListenAddr) - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1LabelsResponse(t, res) -} - -// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/.../values endpoint -// and returns the label names list of time series that match the query. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues -func (app *Vmsingle) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/label/%s/values", app.httpListenAddr, labelName) - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1LabelValuesResponse(t, res) -} - -// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint -// and returns the results. -func (app *Vmsingle) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata { - t.Helper() - - values := opts.asURLValues() - values.Add("metric", metric) - values.Add("limit", strconv.Itoa(limit)) - queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/metadata", app.httpListenAddr) - - res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers) - return NewPrometheusAPIV1Metadata(t, res) -} - -// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending -// a request to /api/v1/admin/tsdb/delete_series. -// -// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series -func (app *Vmsingle) APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/api/v1/admin/tsdb/delete_series", app.httpListenAddr) - values := opts.asURLValues() - values.Add("match[]", matchQuery) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) - } -} - -// GraphiteMetricsIndex sends a query to a /metrics/index.json -// -// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api -func (app *Vmsingle) GraphiteMetricsIndex(t *testing.T, _ QueryOpts) GraphiteMetricsIndexResponse { - t.Helper() - - seriesURL := fmt.Sprintf("http://%s/metrics/index.json", app.httpListenAddr) - res, statusCode := app.cli.Get(t, seriesURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - - var index GraphiteMetricsIndexResponse - if err := json.Unmarshal([]byte(res), &index); err != nil { - t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err) - } - return index -} - -// GraphiteTagsTagSeries is a test helper function that registers Graphite tags -// for a single time series by sending a HTTP POST request to -// /graphite/tags/tagSeries vmsingle endpoint. -func (app *Vmsingle) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/graphite/tags/tagSeries", app.httpListenAddr) - values := opts.asURLValues() - values.Add("path", record) - - _, statusCode := app.cli.PostForm(t, url, values, opts.Headers) - if got, want := statusCode, http.StatusNotImplemented; got != want { - t.Fatalf("unexpected status code: got %d, want %d", got, want) - } -} - -func (app *Vmsingle) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/graphite/tags/tagMultiSeries", app.httpListenAddr) - values := opts.asURLValues() - for _, rec := range records { - values.Add("path", rec) - } - - _, statusCode := app.cli.PostForm(t, url, values, opts.Headers) - if got, want := statusCode, http.StatusNotImplemented; got != want { - t.Fatalf("unexpected status code: got %d, want %d", got, want) - } -} - -// APIV1StatusMetricNamesStats sends a query to a /api/v1/status/metric_names_stats endpoint -// and returns the statistics response for given params. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage -func (app *Vmsingle) APIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse { - t.Helper() - - values := opts.asURLValues() - values.Add("limit", limit) - values.Add("le", le) - values.Add("match_pattern", matchPattern) - queryURL := fmt.Sprintf("http://%s/api/v1/status/metric_names_stats", app.httpListenAddr) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - var resp MetricNamesStatsResponse - if err := json.Unmarshal([]byte(res), &resp); err != nil { - t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err) - } - return resp -} - -// APIV1AdminStatusMetricNamesStatsReset sends a query to a /api/v1/admin/status/metric_names_stats/reset endpoint -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage -func (app *Vmsingle) APIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) { - t.Helper() - - values := opts.asURLValues() - queryURL := fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr) - - res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers) - if statusCode != http.StatusNoContent { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) - } -} - -// SnapshotCreate creates a database snapshot by sending a query to the -// /snapshot/create endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmsingle) SnapshotCreate(t *testing.T) *SnapshotCreateResponse { - t.Helper() - - data, statusCode := app.cli.Post(t, app.SnapshotCreateURL(), nil, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotCreateResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotCreateURL returns the URL for creating snapshots. -func (app *Vmsingle) SnapshotCreateURL() string { - return fmt.Sprintf("http://%s/snapshot/create", app.httpListenAddr) -} - -// APIV1AdminTSDBSnapshot creates a database snapshot by sending a query to the -// /api/v1/admin/tsdb/snapshot endpoint. -// -// See https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot. -func (app *Vmsingle) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSnapshotResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", app.httpListenAddr) - data, statusCode := app.cli.Post(t, queryURL, nil, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res APIV1AdminTSDBSnapshotResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal prometheus snapshot create response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotList lists existing database snapshots by sending a query to the -// /snapshot/list endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmsingle) SnapshotList(t *testing.T) *SnapshotListResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/list", app.httpListenAddr) - data, statusCode := app.cli.Get(t, queryURL, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotListResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotDelete deletes a snapshot by sending a query to the -// /snapshot/delete endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmsingle) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", app.httpListenAddr, snapshotName) - data, statusCode := app.cli.Delete(t, queryURL) - wantStatusCodes := map[int]bool{ - http.StatusOK: true, - http.StatusInternalServerError: true, - } - if !wantStatusCodes[statusCode] { - t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data) - } - - var res SnapshotDeleteResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotDeleteAll deletes all snapshots by sending a query to the -// /snapshot/delete_all endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmsingle) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/delete_all", app.httpListenAddr) - data, statusCode := app.cli.Get(t, queryURL, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotDeleteAllResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err) - } - - return &res -} - -// APIV1StatusTSDB sends a query to a /prometheus/api/v1/status/tsdb -// // -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats -func (app *Vmsingle) APIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse { - t.Helper() - - seriesURL := fmt.Sprintf("http://%s/prometheus/api/v1/status/tsdb", app.httpListenAddr) - values := opts.asURLValues() - addNonEmpty := func(name, value string) { - if len(value) == 0 { - return - } - values.Add(name, value) - } - addNonEmpty("match[]", matchQuery) - addNonEmpty("topN", topN) - addNonEmpty("date", date) - - res, statusCode := app.cli.PostForm(t, seriesURL, values, opts.Headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) - } - - var status TSDBStatusResponse - if err := json.Unmarshal([]byte(res), &status); err != nil { - t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err) - } - status.Sort() - return status -} - -// ZabbixConnectorHistory is a test helper function that inserts a -// collection of records in zabbixconnector format by sending a HTTP -// POST request to /zabbixconnector/api/v1/history vmsingle endpoint. -func (app *Vmsingle) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/zabbixconnector/api/v1/history", app.httpListenAddr) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := []byte(strings.Join(records, "\n")) - headers := opts.getHeaders() - headers.Set("Content-Type", "application/json") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - -// OpentelemetryV1Metrics is a test helper function that inserts a -// collection of records in Opentelemetry protocol format by sending a HTTP -// POST request to /opentelemetry/v1/metrics vmsingle endpoint. -func (app *Vmsingle) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) { - t.Helper() - - url := fmt.Sprintf("http://%s/opentelemetry/v1/metrics", app.httpListenAddr) - uv := opts.asURLValues() - uvs := uv.Encode() - if len(uvs) > 0 { - url += "?" + uvs - } - data := md.MarshalProtobuf(nil) - headers := opts.getHeaders() - headers.Set("Content-Type", "application/x-protobuf") - _, statusCode := app.cli.Post(t, url, data, headers) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - // HTTPAddr returns the address at which the vminsert process is // listening for incoming HTTP requests. func (app *Vmsingle) HTTPAddr() string { diff --git a/apptest/vmstorage.go b/apptest/vmstorage.go index dc68a3ed4e..604728e4cb 100644 --- a/apptest/vmstorage.go +++ b/apptest/vmstorage.go @@ -1,13 +1,10 @@ package apptest import ( - "encoding/json" "fmt" "io" - "net/http" "os" "regexp" - "testing" "time" ) @@ -15,7 +12,8 @@ import ( // functions. type Vmstorage struct { *app - *ServesMetrics + *metricsClient + *vmstorageClient storageDataPath string httpListenAddr string @@ -47,10 +45,11 @@ func StartVmstorageAt(instance, binary string, flags []string, cli *Client, outp } return &Vmstorage{ - app: app, - ServesMetrics: &ServesMetrics{ - metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]), - cli: cli, + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[1]), + vmstorageClient: &vmstorageClient{ + vmstorageCli: cli, + httpListenAddr: stderrExtracts[1], }, storageDataPath: stderrExtracts[0], httpListenAddr: stderrExtracts[1], @@ -71,121 +70,6 @@ func (app *Vmstorage) VmselectAddr() string { return app.vmselectAddr } -// ForceFlush is a test helper function that forces the flushing of inserted -// data, so it becomes available for searching immediately. -func (app *Vmstorage) ForceFlush(t *testing.T) { - t.Helper() - - forceFlushURL := fmt.Sprintf("http://%s/internal/force_flush", app.httpListenAddr) - _, statusCode := app.cli.Get(t, forceFlushURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - -// ForceMerge is a test helper function that forces the merging of parts. -func (app *Vmstorage) ForceMerge(t *testing.T) { - t.Helper() - - forceMergeURL := fmt.Sprintf("http://%s/internal/force_merge", app.httpListenAddr) - _, statusCode := app.cli.Get(t, forceMergeURL, nil) - if statusCode != http.StatusOK { - t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) - } -} - -// SnapshotCreate creates a database snapshot by sending a query to the -// /snapshot/create endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmstorage) SnapshotCreate(t *testing.T) *SnapshotCreateResponse { - t.Helper() - - data, statusCode := app.cli.Post(t, app.SnapshotCreateURL(), nil, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotCreateResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotCreateURL returns the URL for creating snapshots. -func (app *Vmstorage) SnapshotCreateURL() string { - return fmt.Sprintf("http://%s/snapshot/create", app.httpListenAddr) -} - -// SnapshotList lists existing database snapshots by sending a query to the -// /snapshot/list endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmstorage) SnapshotList(t *testing.T) *SnapshotListResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/list", app.httpListenAddr) - data, statusCode := app.cli.Get(t, queryURL, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotListResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotDelete deletes a snapshot by sending a query to the -// /snapshot/delete endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmstorage) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", app.httpListenAddr, snapshotName) - data, statusCode := app.cli.Delete(t, queryURL) - wantStatusCodes := map[int]bool{ - http.StatusOK: true, - http.StatusInternalServerError: true, - } - if !wantStatusCodes[statusCode] { - t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data) - } - - var res SnapshotDeleteResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err) - } - - return &res -} - -// SnapshotDeleteAll deletes all snapshots by sending a query to the -// /snapshot/delete_all endpoint. -// -// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots -func (app *Vmstorage) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse { - t.Helper() - - queryURL := fmt.Sprintf("http://%s/snapshot/delete_all", app.httpListenAddr) - data, statusCode := app.cli.Post(t, queryURL, nil, nil) - if got, want := statusCode, http.StatusOK; got != want { - t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) - } - - var res SnapshotDeleteAllResponse - if err := json.Unmarshal([]byte(data), &res); err != nil { - t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err) - } - - return &res -} - // String returns the string representation of the vmstorage app state. func (app *Vmstorage) String() string { return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{