package apptest import ( "bytes" "encoding/json" "fmt" "io" "net" "net/http" "net/url" "regexp" "strconv" "strings" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" "github.com/golang/snappy" ) // Client is used for interacting with the apps over the network. type Client struct { httpCli *http.Client } // NewClient creates a new client. func NewClient() *Client { return &Client{ httpCli: &http.Client{ Transport: httputil.NewTransport(false, "apptest_client"), }, } } // CloseConnections closes client connections. func (c *Client) CloseConnections() { c.httpCli.CloseIdleConnections() } // Get sends an HTTP GET request, returns // the response body and status code to the caller. func (c *Client) Get(t *testing.T, url string, headers http.Header) (string, int) { t.Helper() return c.do(t, http.MethodGet, url, nil, headers) } // Post sends an HTTP POST request, returns // the response body and status code to the caller. func (c *Client) Post(t *testing.T, url string, data []byte, headers http.Header) (string, int) { t.Helper() return c.do(t, http.MethodPost, url, data, headers) } // PostForm sends an HTTP POST request containing the POST-form data with attached getHeaders, returns // the response body and status code to the caller. func (c *Client) PostForm(t *testing.T, url string, data url.Values, headers http.Header) (string, int) { t.Helper() if headers == nil { headers = make(http.Header) } headers.Set("Content-Type", "application/x-www-form-urlencoded") return c.Post(t, url, []byte(data.Encode()), headers) } // Delete sends an HTTP DELETE request and returns the response body and status code // to the caller. func (c *Client) Delete(t *testing.T, url string) (string, int) { t.Helper() return c.do(t, http.MethodDelete, url, nil, nil) } // do prepares an HTTP request, sends it to the server, receives the response // from the server, returns the response body and status code to the caller. func (c *Client) do(t *testing.T, method, url string, data []byte, headers http.Header) (string, int) { t.Helper() req, err := http.NewRequest(method, url, bytes.NewReader(data)) if err != nil { t.Fatalf("could not create a HTTP request: %v", err) } req.Header = headers res, err := c.httpCli.Do(req) if err != nil { t.Fatalf("could not send HTTP request: %v", err) } body := readAllAndClose(t, res.Body) return body, res.StatusCode } func (c *Client) Write(t *testing.T, address string, data []string) { conn, err := net.Dial("tcp", address) if err != nil { t.Fatalf("cannot dial %s: %s", address, err) } defer func() { _ = conn.Close() }() d := []byte(strings.Join(data, "\n")) n, err := conn.Write(d) if err != nil { t.Fatalf("cannot write %d bytes to %s: %s", len(d), address, err) } if n != len(d) { t.Fatalf("BUG: conn.Write() returned unexpected number of written bytes to %s; got %d; want %d", address, n, len(d)) } } // getClusterPath returns path in cluster's URL format. // Based on QueryOpts, it will either put tenant ID into URL // or will skip it if tenant is set via HTTP headers. func getClusterPath(addr, prefix, suffix string, o QueryOpts) string { if o.Tenant != "" { // QueryOpts.Tenant has priority over headers return tenantViaURL(addr, prefix, o.Tenant, suffix) } h := o.getHeaders() if h.Get("AccountID") != "" || h.Get("ProjectID") != "" { return tenantViaHeaders(addr, prefix, suffix) } // tenant is missing in QueryOpts and in HTTP headers. Falling back to default 0:0 tenant in URL return tenantViaURL(addr, prefix, "0:0", suffix) } // tenantViaURL returns path in cluster's URL format with tenant specified in URL func tenantViaURL(addr, prefix, tenant, suffix string) string { return fmt.Sprintf("http://%s/%s/%s/%s", addr, prefix, tenant, suffix) } // tenantViaHeaders returns path in cluster's URL format where tenant is omitted in URL // Only supported if -enableMultitenancyViaHeaders is specified func tenantViaHeaders(addr, prefix, suffix string) string { return fmt.Sprintf("http://%s/%s/%s", addr, prefix, suffix) } // readAllAndClose reads everything from the response body and then closes it. func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string { t.Helper() defer responseBody.Close() b, err := io.ReadAll(responseBody) if err != nil { t.Fatalf("could not read response body: %d", err) } return string(b) } // metricsClient is used to retrieve the app's metrics. // // This type is expected to be embedded by the apps that serve metrics. type metricsClient struct { metricsCli *Client url string } func newMetricsClient(cli *Client, addr string) *metricsClient { return &metricsClient{ metricsCli: cli, url: fmt.Sprintf("http://%s/metrics", addr), } } // GetIntMetric retrieves the value of a metric served by an app at /metrics URL. // The value is then converted to int. func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int { t.Helper() return int(c.GetMetric(t, metricName)) } // GetMetric retrieves the value of a metric served by an app at /metrics URL. func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 { t.Helper() metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } for _, metric := range strings.Split(metrics, "\n") { value, found := strings.CutPrefix(metric, metricName) if found { value = strings.Trim(value, " ") res, err := strconv.ParseFloat(value, 64) if err != nil { t.Fatalf("could not parse metric value %s: %v", metric, err) } return res } } t.Fatalf("metric not found: %s", metricName) return 0 } // GetMetricsByPrefix retrieves the values of all metrics that start with given // prefix. func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float64 { t.Helper() values := []float64{} metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } for _, metric := range strings.Split(metrics, "\n") { if !strings.HasPrefix(metric, prefix) { continue } parts := strings.Split(metric, " ") if len(parts) < 2 { t.Fatalf("unexpected record format: got %q, want metric name and value separated by a space", metric) } value, err := strconv.ParseFloat(parts[len(parts)-1], 64) if err != nil { t.Fatalf("could not parse metric value %s: %v", metric, err) } values = append(values, value) } return values } func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []float64 { t.Helper() values := []float64{} metrics, statusCode := c.metricsCli.Get(t, c.url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } for _, metric := range strings.Split(metrics, "\n") { if !re.MatchString(metric) { continue } parts := strings.Split(metric, " ") if len(parts) < 2 { t.Fatalf("unexpected record format: got %q, want metric name and value separated by a space", metric) } value, err := strconv.ParseFloat(parts[len(parts)-1], 64) if err != nil { t.Fatalf("could not parse metric value %s: %v", metric, err) } values = append(values, value) } return values } // rpcRowsSentTotal retrieves the values of all vminsert // `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and // returns their integer sum. func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int { total := 0.0 for _, v := range c.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") { total += v } return int(total) } type vmselectClient struct { vmselectCli *Client url func(op, path string, opts QueryOpts) string metricNamesStatsResetURL string tenantsURL string } // PrometheusAPIV1Export is a test helper function that performs the export of // raw samples in JSON line format by sending a request to // /prometheus/api/v1/export endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { t.Helper() url := c.url("select", "prometheus/api/v1/export", opts) values := opts.asURLValues() values.Add("match[]", query) values.Add("format", "promapi") res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1QueryResponse(t, res) } // PrometheusAPIV1ExportNative is a test helper function that performs the export of // raw samples in native binary format by sending a request to // /prometheus/api/v1/export/native endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte { t.Helper() url := c.url("select", "prometheus/api/v1/export/native", opts) values := opts.asURLValues() values.Add("match[]", query) values.Add("format", "promapi") res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return []byte(res) } // PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL // instant query by sending a request to /prometheus/api/v1/query endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { t.Helper() url := c.url("select", "prometheus/api/v1/query", opts) values := opts.asURLValues() values.Add("query", query) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1QueryResponse(t, res) } // PrometheusAPIV1QueryRange is a test helper function that performs // PromQL/MetricsQL range query by sending a request to // /prometheus/api/v1/query_range endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse { t.Helper() url := c.url("select", "prometheus/api/v1/query_range", opts) values := opts.asURLValues() values.Add("query", query) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1QueryResponse(t, res) } // PrometheusAPIV1Series retrieves list of time series that match the query by // sending a request to /prometheus/api/v1/series endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse { t.Helper() url := c.url("select", "prometheus/api/v1/series", opts) values := opts.asURLValues() values.Add("match[]", matchQuery) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1SeriesResponse(t, res) } // PrometheusAPIV1SeriesCount retrieves the total number of time series by // sending a request to /prometheus/api/v1/series/count endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse { t.Helper() url := c.url("select", "prometheus/api/v1/series/count", opts) values := opts.asURLValues() res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1SeriesCountResponse(t, res) } // PrometheusAPIV1Labels retrieves the label names for time series that match a // query by sending a request to /prometheus/api/v1/labels endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse { t.Helper() url := c.url("select", "prometheus/api/v1/labels", opts) values := opts.asURLValues() values.Add("match[]", matchQuery) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1LabelsResponse(t, res) } // PrometheusAPIV1LabelValues retrieves the labels values for the metrics that // match the query by sending a request to /prometheus/api/v1/label/.../values // endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse { t.Helper() path := fmt.Sprintf("prometheus/api/v1/label/%s/values", labelName) url := c.url("select", path, opts) values := opts.asURLValues() values.Add("match[]", matchQuery) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1LabelValuesResponse(t, res) } // PrometheusAPIV1Metadata retrieves metadata for the given metric by sending a // request to /prometheus/api/v1/metadata endpoint. func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata { t.Helper() url := c.url("select", "prometheus/api/v1/metadata", opts) values := opts.asURLValues() values.Add("metric", metric) values.Add("limit", strconv.Itoa(limit)) res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers) return NewPrometheusAPIV1Metadata(t, res) } // PrometheusAPIV1AdminTSDBDeleteSeries deletes the series that match the query // by sending a request to /prometheus/api/v1/admin/tsdb/delete_series. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) { t.Helper() url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts) values := opts.asURLValues() values.Add("match[]", matchQuery) res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) } } // PrometheusAPIV1StatusMetricNamesStats sends a query to // /prometheus/api/v1/status/metric_names_stats endpoint and returns the metric // usage stats response for given params. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse { t.Helper() url := c.url("select", "prometheus/api/v1/status/metric_names_stats", opts) values := opts.asURLValues() values.Add("limit", limit) values.Add("le", le) values.Add("match_pattern", matchPattern) res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) } var resp MetricNamesStatsResponse if err := json.Unmarshal([]byte(res), &resp); err != nil { t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err) } return resp } // PrometheusAPIV1StatusTSDB retrieves the TSDB status for the time series that // match the query on the given date by sending a request to // /prometheus/api/v1/status/tsdb endpoint. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse { t.Helper() url := c.url("select", "prometheus/api/v1/status/tsdb", opts) values := opts.asURLValues() addNonEmpty := func(name, value string) { if len(value) == 0 { return } values.Add(name, value) } addNonEmpty("match[]", matchQuery) addNonEmpty("topN", topN) addNonEmpty("date", date) res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) } var status TSDBStatusResponse if err := json.Unmarshal([]byte(res), &status); err != nil { t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err) } status.Sort() return status } // GraphiteMetricsIndex retrieves the list of all metrics by sending a request // to /graphite/metrics/index.json endpoint. // // See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse { t.Helper() url := c.url("select", "graphite/metrics/index.json", opts) res, statusCode := c.vmselectCli.Get(t, url, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) } var index GraphiteMetricsIndexResponse if err := json.Unmarshal([]byte(res), &index); err != nil { t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err) } return index } // GraphiteMetricsFind finds metrics under a given path by sending a request // to /metrics/find endpoint. // // See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api // and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-find func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts QueryOpts) GraphiteMetricsFindResponse { t.Helper() url := c.url("select", "graphite/metrics/find", opts) values := opts.asURLValues() values.Add("query", query) resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) } var res GraphiteMetricsFindResponse if err := json.Unmarshal([]byte(resText), &res); err != nil { t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) } return res } // GraphiteMetricsExpand expands the given query with matching paths by sending // a request to /graphite/metrics/expand endpoint. // // See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api // and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-expand func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts QueryOpts) GraphiteMetricsExpandResponse { t.Helper() url := c.url("select", "graphite/metrics/expand", opts) values := opts.asURLValues() values.Add("query", query) resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) } var res GraphiteMetricsExpandResponse if err := json.Unmarshal([]byte(resText), &res); err != nil { t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) } return res } // GraphiteRender retrieves the raw metric data by sending a request to // /graphite/render endpoint. // // See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#render-api // and https://graphite-api.readthedocs.io/en/latest/api.html#the-render-api-render func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryOpts) GraphiteRenderResponse { t.Helper() url := c.url("select", "graphite/render", opts) values := opts.asURLValues() values.Add("format", "json") values.Add("target", target) resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText) } var res GraphiteRenderResponse if err := json.Unmarshal([]byte(resText), &res); err != nil { t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err) } return res } // GraphiteTagsTagSeries is a test helper function that registers Graphite tags // for a single time series by sending a request to /graphite/tags/tagSeries // endpoint. func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) { t.Helper() url := c.url("select", "graphite/tags/tagSeries", opts) values := opts.asURLValues() values.Add("path", record) _, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if got, want := statusCode, http.StatusNotImplemented; got != want { t.Fatalf("unexpected status code: got %d, want %d", got, want) } } // GraphiteTagsTagMultiSeries is a test helper function that registers Graphite // tags for a multiple time series by sending a request to // /graphite/tags/tagSeries endpoint. func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.url("select", "graphite/tags/tagMultiSeries", opts) values := opts.asURLValues() for _, rec := range records { values.Add("path", rec) } _, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers) if got, want := statusCode, http.StatusNotImplemented; got != want { t.Fatalf("unexpected status code: got %d, want %d", got, want) } } // PrometheusAPIV1AdminStatusMetricNamesStatsReset resets the metric name usage // stats by sending a request to // /prometheus/api/v1/admin/status/metric_names_stats/reset endpoint // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) { t.Helper() values := opts.asURLValues() res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res) } } // APIV1AdminTenants retrieves the list of tenants by sending a request to // /admin/tenants endpoint. func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse { t.Helper() res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res) } tenants := &AdminTenantsResponse{} if err := json.Unmarshal([]byte(res), tenants); err != nil { t.Fatalf("could not unmarshal tenants response data:\n%s\n err: %v", res, err) } return tenants } type vminsertClient struct { vminsertCli *Client url func(op, path string, opts QueryOpts) string openTSDBURL func(op, path string, opts QueryOpts) string graphiteListenAddr string sendBlocking func(t *testing.T, numRecordsToSend int, send func()) } // PrometheusAPIV1ImportCSV is a test helper function that inserts a collection // of records in CSV format for the given tenant by sending an HTTP POST // request to prometheus/api/v1/import/csv vminsert endpoint. // // See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.url("insert", "prometheus/api/v1/import/csv", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := []byte(strings.Join(records, "\n")) headers := opts.getHeaders() headers.Set("Content-Type", "text/plain") c.sendBlocking(t, len(records), func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // PrometheusAPIV1ImportNative is a test helper function that inserts a collection // of records in Native format for the given tenant by sending an HTTP POST // request to prometheus/api/v1/import/native vminsert endpoint. // // See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) { t.Helper() url := c.url("insert", "prometheus/api/v1/import/native", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } headers := opts.getHeaders() headers.Set("Content-Type", "text/plain") c.sendBlocking(t, 1, func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // PrometheusAPIV1Write is a test helper function that inserts a // collection of records in Prometheus remote-write format by sending a HTTP // POST request to /prometheus/api/v1/write vminsert endpoint. func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) { t.Helper() url := c.url("insert", "prometheus/api/v1/write", opts) data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) recordsCount := len(wr.Timeseries) if prommetadata.IsEnabled() { recordsCount += len(wr.Metadata) } headers := opts.getHeaders() headers.Set("Content-Type", "application/x-protobuf") c.sendBlocking(t, recordsCount, func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // PrometheusAPIV1ImportPrometheus is a test helper function that inserts a // collection of records in Prometheus text exposition format for the given // tenant by sending a HTTP POST request to // /prometheus/api/v1/import/prometheus vminsert endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.url("insert", "prometheus/api/v1/import/prometheus", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := []byte(strings.Join(records, "\n")) var recordsCount int var metadataRecords int uniqueMetadataMetricNames := make(map[string]struct{}) for _, record := range records { // metric metadata has the following format: //# HELP importprometheus_series //# TYPE importprometheus_series // it results into single metadata record if strings.HasPrefix(record, "# ") { metadataItems := strings.Split(record, " ") if len(metadataItems) < 3 { t.Fatalf("BUG: unexpected metadata format=%q", record) } metricName := metadataItems[2] if _, ok := uniqueMetadataMetricNames[metricName]; ok { continue } uniqueMetadataMetricNames[metricName] = struct{}{} metadataRecords++ continue } recordsCount++ } if prommetadata.IsEnabled() { recordsCount += metadataRecords } headers := opts.getHeaders() headers.Set("Content-Type", "text/plain") c.sendBlocking(t, recordsCount, func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // InfluxWrite is a test helper function that inserts a collection of records in // Influx line format by sending a HTTP POST request to /influx/write endpoint. // // See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.url("insert", "influx/write", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := []byte(strings.Join(records, "\n")) headers := opts.getHeaders() headers.Set("Content-Type", "text/plain") c.sendBlocking(t, len(records), func() { t.Helper() _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // OpentelemetryV1Metrics is a test helper function that inserts a // collection of records in Opentelemetry protocol format by sending a HTTP // POST request to /opentelemetry/v1/metrics vminsert endpoint. func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) { t.Helper() var recordsCount int for _, rss := range md.ResourceMetrics { for _, sm := range rss.ScopeMetrics { recordsCount += len(sm.Metrics) for _, m := range sm.Metrics { if prommetadata.IsEnabled() { recordsCount += len(m.Metadata) } } } } url := c.url("insert", "opentelemetry/v1/metrics", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := md.MarshalProtobuf(nil) headers := opts.getHeaders() headers.Set("Content-Type", "application/x-protobuf") c.sendBlocking(t, recordsCount, func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } }) } // OpenTSDBAPIPut is a test helper function that inserts a collection of // records in OpenTSDB format for the given tenant by sending an HTTP POST // request to /opentsdb/api/put vminsert endpoint. // // See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.openTSDBURL("insert", "opentsdb/api/put", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := []byte("[" + strings.Join(records, ",") + "]") headers := opts.getHeaders() headers.Set("Content-Type", "application/json") c.sendBlocking(t, len(records), func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusNoContent { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent) } }) } // ZabbixConnectorHistory is a test helper function that inserts a // collection of records in zabbixconnector format by sending a HTTP // POST request to /zabbixconnector/api/v1/history vmsingle endpoint. func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) { t.Helper() url := c.url("insert", "zabbixconnector/api/v1/history", opts) uv := opts.asURLValues() uvs := uv.Encode() if len(uvs) > 0 { url += "?" + uvs } data := []byte(strings.Join(records, "\n")) headers := opts.getHeaders() headers.Set("Content-Type", "application/json") c.sendBlocking(t, len(records), func() { _, statusCode := c.vminsertCli.Post(t, url, data, headers) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } }) } // GraphiteWrite is a test helper function that sends a // collection of records to graphiteListenAddr port. // // See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) { t.Helper() c.vminsertCli.Write(t, c.graphiteListenAddr, records) } type vmstorageClient struct { vmstorageCli *Client httpListenAddr string } // ForceFlush is a test helper function that forces the flushing of inserted // data, so it becomes available for searching immediately. func (c *vmstorageClient) ForceFlush(t *testing.T) { t.Helper() url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr) _, statusCode := c.vmstorageCli.Get(t, url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } } // ForceMerge is a test helper function that forces the merging of parts. func (c *vmstorageClient) ForceMerge(t *testing.T) { t.Helper() url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr) _, statusCode := c.vmstorageCli.Get(t, url, nil) if statusCode != http.StatusOK { t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK) } } // SnapshotCreate creates a database snapshot by sending a query to the // /snapshot/create endpoint. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse { t.Helper() data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil) if got, want := statusCode, http.StatusOK; got != want { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) } var res SnapshotCreateResponse if err := json.Unmarshal([]byte(data), &res); err != nil { t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err) } return &res } // SnapshotCreateURL returns the URL for creating snapshots. func (c *vmstorageClient) SnapshotCreateURL() string { return fmt.Sprintf("http://%s/snapshot/create", c.httpListenAddr) } // APIV1AdminTSDBSnapshot creates a database snapshot by sending a query to the // /api/v1/admin/tsdb/snapshot endpoint. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot. func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSnapshotResponse { t.Helper() url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr) data, statusCode := c.vmstorageCli.Post(t, url, nil, nil) if got, want := statusCode, http.StatusOK; got != want { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) } var res APIV1AdminTSDBSnapshotResponse if err := json.Unmarshal([]byte(data), &res); err != nil { t.Fatalf("could not unmarshal prometheus snapshot create response: data=%q, err: %v", data, err) } return &res } // SnapshotList lists existing database snapshots by sending a query to the // /snapshot/list endpoint. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse { t.Helper() url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr) data, statusCode := c.vmstorageCli.Get(t, url, nil) if got, want := statusCode, http.StatusOK; got != want { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) } var res SnapshotListResponse if err := json.Unmarshal([]byte(data), &res); err != nil { t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err) } return &res } // SnapshotDelete deletes a snapshot by sending a query to the // /snapshot/delete endpoint. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse { t.Helper() url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName) data, statusCode := c.vmstorageCli.Delete(t, url) wantStatusCodes := map[int]bool{ http.StatusOK: true, http.StatusInternalServerError: true, } if !wantStatusCodes[statusCode] { t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data) } var res SnapshotDeleteResponse if err := json.Unmarshal([]byte(data), &res); err != nil { t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err) } return &res } // SnapshotDeleteAll deletes all snapshots by sending a query to the // /snapshot/delete_all endpoint. // // See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse { t.Helper() url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr) data, statusCode := c.vmstorageCli.Post(t, url, nil, nil) if got, want := statusCode, http.StatusOK; got != want { t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data) } var res SnapshotDeleteAllResponse if err := json.Unmarshal([]byte(data), &res); err != nil { t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err) } return &res }