mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
Refactor apptests (#10940)
Fixes #10938. --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
committed by
Artem Fetishev
parent
57924f4167
commit
a52a0ddf2e
@@ -2,6 +2,7 @@ package apptest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -13,6 +14,10 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Client is used for interacting with the apps over the network.
|
||||
@@ -147,27 +152,34 @@ func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string {
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// ServesMetrics is used to retrieve the app's metrics.
|
||||
// metricsClient is used to retrieve the app's metrics.
|
||||
//
|
||||
// This type is expected to be embedded by the apps that serve metrics.
|
||||
type ServesMetrics struct {
|
||||
metricsURL string
|
||||
cli *Client
|
||||
type metricsClient struct {
|
||||
metricsCli *Client
|
||||
url string
|
||||
}
|
||||
|
||||
func newMetricsClient(cli *Client, addr string) *metricsClient {
|
||||
return &metricsClient{
|
||||
metricsCli: cli,
|
||||
url: fmt.Sprintf("http://%s/metrics", addr),
|
||||
}
|
||||
}
|
||||
|
||||
// GetIntMetric retrieves the value of a metric served by an app at /metrics URL.
|
||||
// The value is then converted to int.
|
||||
func (app *ServesMetrics) GetIntMetric(t *testing.T, metricName string) int {
|
||||
func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int {
|
||||
t.Helper()
|
||||
|
||||
return int(app.GetMetric(t, metricName))
|
||||
return int(c.GetMetric(t, metricName))
|
||||
}
|
||||
|
||||
// GetMetric retrieves the value of a metric served by an app at /metrics URL.
|
||||
func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 {
|
||||
func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 {
|
||||
t.Helper()
|
||||
|
||||
metrics, statusCode := app.cli.Get(t, app.metricsURL, nil)
|
||||
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
@@ -188,12 +200,12 @@ func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 {
|
||||
|
||||
// GetMetricsByPrefix retrieves the values of all metrics that start with given
|
||||
// prefix.
|
||||
func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []float64 {
|
||||
func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float64 {
|
||||
t.Helper()
|
||||
|
||||
values := []float64{}
|
||||
|
||||
metrics, statusCode := app.cli.Get(t, app.metricsURL, nil)
|
||||
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
@@ -217,12 +229,12 @@ func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []floa
|
||||
return values
|
||||
}
|
||||
|
||||
func (app *ServesMetrics) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []float64 {
|
||||
func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []float64 {
|
||||
t.Helper()
|
||||
|
||||
values := []float64{}
|
||||
|
||||
metrics, statusCode := app.cli.Get(t, app.metricsURL, nil)
|
||||
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
@@ -245,3 +257,756 @@ func (app *ServesMetrics) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// rpcRowsSentTotal retrieves the values of all vminsert
|
||||
// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and
|
||||
// returns their integer sum.
|
||||
func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int {
|
||||
total := 0.0
|
||||
for _, v := range c.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") {
|
||||
total += v
|
||||
}
|
||||
return int(total)
|
||||
}
|
||||
|
||||
type vmselectClient struct {
|
||||
vmselectCli *Client
|
||||
url func(op, path string, opts QueryOpts) string
|
||||
metricNamesStatsResetURL string
|
||||
tenantsURL string
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Export is a test helper function that performs the export of
|
||||
// raw samples in JSON line format by sending a request to
|
||||
// /prometheus/api/v1/export endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export
|
||||
func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/export", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ExportNative is a test helper function that performs the export of
|
||||
// raw samples in native binary format by sending a request to
|
||||
// /prometheus/api/v1/export/native endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative
|
||||
func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/export/native", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return []byte(res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL
|
||||
// instant query by sending a request to /prometheus/api/v1/query endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query
|
||||
func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/query", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1QueryRange is a test helper function that performs
|
||||
// PromQL/MetricsQL range query by sending a request to
|
||||
// /prometheus/api/v1/query_range endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range
|
||||
func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/query_range", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Series retrieves list of time series that match the query by
|
||||
// sending a request to /prometheus/api/v1/series endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/series", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1SeriesCount retrieves the total number of time series by
|
||||
// sending a request to /prometheus/api/v1/series/count endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/series/count", opts)
|
||||
values := opts.asURLValues()
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesCountResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Labels retrieves the label names for time series that match a
|
||||
// query by sending a request to /prometheus/api/v1/labels endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels
|
||||
func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/labels", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelsResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValues retrieves the labels values for the metrics that
|
||||
// match the query by sending a request to /prometheus/api/v1/label/.../values
|
||||
// endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues
|
||||
func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
path := fmt.Sprintf("prometheus/api/v1/label/%s/values", labelName)
|
||||
url := c.url("select", path, opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelValuesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Metadata retrieves metadata for the given metric by sending a
|
||||
// request to /prometheus/api/v1/metadata endpoint.
|
||||
func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/metadata", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("metric", metric)
|
||||
values.Add("limit", strconv.Itoa(limit))
|
||||
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
return NewPrometheusAPIV1Metadata(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1AdminTSDBDeleteSeries deletes the series that match the query
|
||||
// by sending a request to /prometheus/api/v1/admin/tsdb/delete_series.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series
|
||||
func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1StatusMetricNamesStats sends a query to
|
||||
// /prometheus/api/v1/status/metric_names_stats endpoint and returns the metric
|
||||
// usage stats response for given params.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/status/metric_names_stats", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("limit", limit)
|
||||
values.Add("le", le)
|
||||
values.Add("match_pattern", matchPattern)
|
||||
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
var resp MetricNamesStatsResponse
|
||||
if err := json.Unmarshal([]byte(res), &resp); err != nil {
|
||||
t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// PrometheusAPIV1StatusTSDB retrieves the TSDB status for the time series that
|
||||
// match the query on the given date by sending a request to
|
||||
// /prometheus/api/v1/status/tsdb endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats
|
||||
func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse {
|
||||
t.Helper()
|
||||
url := c.url("select", "prometheus/api/v1/status/tsdb", opts)
|
||||
values := opts.asURLValues()
|
||||
addNonEmpty := func(name, value string) {
|
||||
if len(value) == 0 {
|
||||
return
|
||||
}
|
||||
values.Add(name, value)
|
||||
}
|
||||
addNonEmpty("match[]", matchQuery)
|
||||
addNonEmpty("topN", topN)
|
||||
addNonEmpty("date", date)
|
||||
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var status TSDBStatusResponse
|
||||
if err := json.Unmarshal([]byte(res), &status); err != nil {
|
||||
t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
status.Sort()
|
||||
return status
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndex retrieves the list of all metrics by sending a request
|
||||
// to /graphite/metrics/index.json endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/metrics/index.json", opts)
|
||||
res, statusCode := c.vmselectCli.Get(t, url, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var index GraphiteMetricsIndexResponse
|
||||
if err := json.Unmarshal([]byte(res), &index); err != nil {
|
||||
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// GraphiteMetricsFind finds metrics under a given path by sending a request
|
||||
// to /metrics/find endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
// and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-find
|
||||
func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts QueryOpts) GraphiteMetricsFindResponse {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/metrics/find", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
|
||||
}
|
||||
|
||||
var res GraphiteMetricsFindResponse
|
||||
if err := json.Unmarshal([]byte(resText), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// GraphiteMetricsExpand expands the given query with matching paths by sending
|
||||
// a request to /graphite/metrics/expand endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
// and https://graphite.readthedocs.io/en/latest/metrics_api.html#metrics-expand
|
||||
func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts QueryOpts) GraphiteMetricsExpandResponse {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/metrics/expand", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
|
||||
}
|
||||
|
||||
var res GraphiteMetricsExpandResponse
|
||||
if err := json.Unmarshal([]byte(resText), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// GraphiteRender retrieves the raw metric data by sending a request to
|
||||
// /graphite/render endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#render-api
|
||||
// and https://graphite-api.readthedocs.io/en/latest/api.html#the-render-api-render
|
||||
func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryOpts) GraphiteRenderResponse {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/render", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("format", "json")
|
||||
values.Add("target", target)
|
||||
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
|
||||
}
|
||||
|
||||
var res GraphiteRenderResponse
|
||||
if err := json.Unmarshal([]byte(resText), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal response data:\n%s\n err: %v", resText, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// GraphiteTagsTagSeries is a test helper function that registers Graphite tags
|
||||
// for a single time series by sending a request to /graphite/tags/tagSeries
|
||||
// endpoint.
|
||||
func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/tags/tagSeries", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("path", record)
|
||||
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// GraphiteTagsTagMultiSeries is a test helper function that registers Graphite
|
||||
// tags for a multiple time series by sending a request to
|
||||
// /graphite/tags/tagSeries endpoint.
|
||||
func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("select", "graphite/tags/tagMultiSeries", opts)
|
||||
values := opts.asURLValues()
|
||||
for _, rec := range records {
|
||||
values.Add("path", rec)
|
||||
}
|
||||
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1AdminStatusMetricNamesStatsReset resets the metric name usage
|
||||
// stats by sending a request to
|
||||
// /prometheus/api/v1/admin/status/metric_names_stats/reset endpoint
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
|
||||
t.Helper()
|
||||
values := opts.asURLValues()
|
||||
res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// APIV1AdminTenants retrieves the list of tenants by sending a request to
|
||||
// /admin/tenants endpoint.
|
||||
func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse {
|
||||
t.Helper()
|
||||
res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
tenants := &AdminTenantsResponse{}
|
||||
if err := json.Unmarshal([]byte(res), tenants); err != nil {
|
||||
t.Fatalf("could not unmarshal tenants response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
|
||||
return tenants
|
||||
}
|
||||
|
||||
type vminsertClient struct {
|
||||
vminsertCli *Client
|
||||
url func(op, path string, opts QueryOpts) string
|
||||
openTSDBURL func(op, path string, opts QueryOpts) string
|
||||
graphiteListenAddr string
|
||||
sendBlocking func(t *testing.T, numRecordsToSend int, send func())
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection
|
||||
// of records in CSV format for the given tenant by sending an HTTP POST
|
||||
// request to prometheus/api/v1/import/csv vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "prometheus/api/v1/import/csv", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
c.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportNative is a test helper function that inserts a collection
|
||||
// of records in Native format for the given tenant by sending an HTTP POST
|
||||
// request to prometheus/api/v1/import/native vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "prometheus/api/v1/import/native", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
c.sendBlocking(t, 1, func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Write is a test helper function that inserts a
|
||||
// collection of records in Prometheus remote-write format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/write vminsert endpoint.
|
||||
func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "prometheus/api/v1/write", opts)
|
||||
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
|
||||
recordsCount := len(wr.Timeseries)
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += len(wr.Metadata)
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
c.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
|
||||
// collection of records in Prometheus text exposition format for the given
|
||||
// tenant by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/import/prometheus vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus
|
||||
func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "prometheus/api/v1/import/prometheus", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
var recordsCount int
|
||||
var metadataRecords int
|
||||
uniqueMetadataMetricNames := make(map[string]struct{})
|
||||
for _, record := range records {
|
||||
// metric metadata has the following format:
|
||||
//# HELP importprometheus_series
|
||||
//# TYPE importprometheus_series
|
||||
// it results into single metadata record
|
||||
if strings.HasPrefix(record, "# ") {
|
||||
metadataItems := strings.Split(record, " ")
|
||||
if len(metadataItems) < 3 {
|
||||
t.Fatalf("BUG: unexpected metadata format=%q", record)
|
||||
}
|
||||
metricName := metadataItems[2]
|
||||
if _, ok := uniqueMetadataMetricNames[metricName]; ok {
|
||||
continue
|
||||
}
|
||||
uniqueMetadataMetricNames[metricName] = struct{}{}
|
||||
metadataRecords++
|
||||
continue
|
||||
}
|
||||
recordsCount++
|
||||
}
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += metadataRecords
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
c.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// InfluxWrite is a test helper function that inserts a collection of records in
|
||||
// Influx line format by sending a HTTP POST request to /influx/write endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite
|
||||
func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "influx/write", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
c.sendBlocking(t, len(records), func() {
|
||||
t.Helper()
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// OpentelemetryV1Metrics is a test helper function that inserts a
|
||||
// collection of records in Opentelemetry protocol format by sending a HTTP
|
||||
// POST request to /opentelemetry/v1/metrics vminsert endpoint.
|
||||
func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
var recordsCount int
|
||||
for _, rss := range md.ResourceMetrics {
|
||||
for _, sm := range rss.ScopeMetrics {
|
||||
recordsCount += len(sm.Metrics)
|
||||
for _, m := range sm.Metrics {
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += len(m.Metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
url := c.url("insert", "opentelemetry/v1/metrics", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := md.MarshalProtobuf(nil)
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
c.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// OpenTSDBAPIPut is a test helper function that inserts a collection of
|
||||
// records in OpenTSDB format for the given tenant by sending an HTTP POST
|
||||
// request to /opentsdb/api/put vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.openTSDBURL("insert", "opentsdb/api/put", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte("[" + strings.Join(records, ",") + "]")
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/json")
|
||||
c.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ZabbixConnectorHistory is a test helper function that inserts a
|
||||
// collection of records in zabbixconnector format by sending a HTTP
|
||||
// POST request to /zabbixconnector/api/v1/history vmsingle endpoint.
|
||||
func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := c.url("insert", "zabbixconnector/api/v1/history", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/json")
|
||||
c.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// GraphiteWrite is a test helper function that sends a
|
||||
// collection of records to graphiteListenAddr port.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
|
||||
func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
|
||||
t.Helper()
|
||||
c.vminsertCli.Write(t, c.graphiteListenAddr, records)
|
||||
}
|
||||
|
||||
type vmstorageClient struct {
|
||||
vmstorageCli *Client
|
||||
httpListenAddr string
|
||||
}
|
||||
|
||||
// ForceFlush is a test helper function that forces the flushing of inserted
|
||||
// data, so it becomes available for searching immediately.
|
||||
func (c *vmstorageClient) ForceFlush(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr)
|
||||
_, statusCode := c.vmstorageCli.Get(t, url, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// ForceMerge is a test helper function that forces the merging of parts.
|
||||
func (c *vmstorageClient) ForceMerge(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr)
|
||||
_, statusCode := c.vmstorageCli.Get(t, url, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// SnapshotCreate creates a database snapshot by sending a query to the
|
||||
// /snapshot/create endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
|
||||
t.Helper()
|
||||
|
||||
data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotCreateResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotCreateURL returns the URL for creating snapshots.
|
||||
func (c *vmstorageClient) SnapshotCreateURL() string {
|
||||
return fmt.Sprintf("http://%s/snapshot/create", c.httpListenAddr)
|
||||
}
|
||||
|
||||
// APIV1AdminTSDBSnapshot creates a database snapshot by sending a query to the
|
||||
// /api/v1/admin/tsdb/snapshot endpoint.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot.
|
||||
func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSnapshotResponse {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr)
|
||||
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res APIV1AdminTSDBSnapshotResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal prometheus snapshot create response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotList lists existing database snapshots by sending a query to the
|
||||
// /snapshot/list endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr)
|
||||
data, statusCode := c.vmstorageCli.Get(t, url, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotListResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDelete deletes a snapshot by sending a query to the
|
||||
// /snapshot/delete endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName)
|
||||
data, statusCode := c.vmstorageCli.Delete(t, url)
|
||||
wantStatusCodes := map[int]bool{
|
||||
http.StatusOK: true,
|
||||
http.StatusInternalServerError: true,
|
||||
}
|
||||
if !wantStatusCodes[statusCode] {
|
||||
t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDeleteAll deletes all snapshots by sending a query to the
|
||||
// /snapshot/delete_all endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr)
|
||||
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteAllResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
@@ -27,13 +27,16 @@ type PrometheusQuerier interface {
|
||||
PrometheusAPIV1LabelValues(t *testing.T, labelName, query string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse
|
||||
PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte
|
||||
PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata
|
||||
|
||||
APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts)
|
||||
PrometheusAPIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse
|
||||
PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts)
|
||||
|
||||
// TODO(@rtm0): Prometheus does not provide this API. Either move it to a
|
||||
// separate interface or rename this interface to allow for multiple querier
|
||||
// types.
|
||||
GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse
|
||||
GraphiteMetricsFind(t *testing.T, query string, opts QueryOpts) GraphiteMetricsFindResponse
|
||||
GraphiteMetricsExpand(t *testing.T, query string, opts QueryOpts) GraphiteMetricsExpandResponse
|
||||
GraphiteRender(t *testing.T, target string, opts QueryOpts) GraphiteRenderResponse
|
||||
GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts)
|
||||
GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts)
|
||||
}
|
||||
@@ -91,6 +94,9 @@ type QueryOpts struct {
|
||||
Format string
|
||||
NoCache string
|
||||
Headers http.Header
|
||||
From string
|
||||
Until string
|
||||
StorageStep string
|
||||
}
|
||||
|
||||
func (qos *QueryOpts) getHeaders() http.Header {
|
||||
@@ -123,6 +129,9 @@ func (qos *QueryOpts) asURLValues() url.Values {
|
||||
addNonEmpty("latency_offset", qos.LatencyOffset)
|
||||
addNonEmpty("format", qos.Format)
|
||||
addNonEmpty("nocache", qos.NoCache)
|
||||
addNonEmpty("from", qos.From)
|
||||
addNonEmpty("until", qos.Until)
|
||||
addNonEmpty("storage_step", qos.StorageStep)
|
||||
|
||||
return uv
|
||||
}
|
||||
@@ -480,10 +489,6 @@ type TSDBStatusResponse struct {
|
||||
Data TSDBStatusResponseData
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndexResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/metrics/index.json endpoint.
|
||||
type GraphiteMetricsIndexResponse = []string
|
||||
|
||||
// AdminTenantsResponse is an in-memory representation of the json response
|
||||
// returned by the /api/v1/admin/tenants endpoint.
|
||||
type AdminTenantsResponse struct {
|
||||
@@ -533,3 +538,32 @@ func sortTSDBStatusResponseEntries(entries []TSDBStatusResponseEntry) {
|
||||
return left.Count < right.Count
|
||||
})
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndexResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/metrics/index.json endpoint.
|
||||
type GraphiteMetricsIndexResponse = []string
|
||||
|
||||
type GraphiteMetric struct {
|
||||
Id string
|
||||
Text string
|
||||
AllowChildren int
|
||||
Expandable int
|
||||
Leaf int
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndexResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/metrics/find endpoint.
|
||||
type GraphiteMetricsFindResponse = []GraphiteMetric
|
||||
|
||||
// GraphiteMetricsExpandResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/metrics/expand endpoint.
|
||||
type GraphiteMetricsExpandResponse = []string
|
||||
|
||||
type GraphiteRenderedTarget struct {
|
||||
Target string
|
||||
Datapoints [][2]float64
|
||||
}
|
||||
|
||||
// GraphiteRenderResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/render endpoint.
|
||||
type GraphiteRenderResponse = []GraphiteRenderedTarget
|
||||
|
||||
@@ -214,7 +214,7 @@ func testLegacyDeleteSeries(tc *at.TestCase, opts testLegacyDeleteSeriesOpts) {
|
||||
newSUT := opts.startNewSUT()
|
||||
assertSearchResults(newSUT, `{__name__=~".*"}`, start1, end1, "1d", want1)
|
||||
|
||||
newSUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
newSUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
wantNoResults := &want{
|
||||
series: []map[string]string{},
|
||||
queryResults: []*at.QueryResult{},
|
||||
@@ -877,7 +877,7 @@ func testLegacyDowngrade(tc *at.TestCase, opts testLegacyDowngradeOpts) {
|
||||
// Ingest legacy2 records, ensure the queries return only legacy2.
|
||||
legacySUT = opts.startLegacySUT()
|
||||
assertQueries(legacySUT, `{__name__=~".*"}`, wantLegacy1, numMetrics)
|
||||
legacySUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
legacySUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
assertQueries(legacySUT, `{__name__=~".*"}`, wantEmpty, numMetrics)
|
||||
legacySUT.PrometheusAPIV1ImportPrometheus(t, legacy2Data, at.QueryOpts{})
|
||||
legacySUT.ForceFlush(t)
|
||||
@@ -891,7 +891,7 @@ func testLegacyDowngrade(tc *at.TestCase, opts testLegacyDowngradeOpts) {
|
||||
newSUT = opts.startNewSUT()
|
||||
// series count includes deleted metrics
|
||||
assertQueries(newSUT, `{__name__=~".*"}`, wantLegacy2New1, 3*numMetrics)
|
||||
newSUT.APIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
newSUT.PrometheusAPIV1AdminTSDBDeleteSeries(t, `{__name__=~".*"}`, at.QueryOpts{})
|
||||
// series count includes deleted metrics
|
||||
assertQueries(newSUT, `{__name__=~".*"}`, wantEmpty, 3*numMetrics)
|
||||
opts.stopNewSUT()
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestSingleMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3"},
|
||||
},
|
||||
}
|
||||
got := sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
got := sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -63,7 +63,7 @@ func TestSingleMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3", QueryRequestsCount: 1},
|
||||
},
|
||||
}
|
||||
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func TestSingleMetricNamesStats(t *testing.T) {
|
||||
},
|
||||
}
|
||||
expectedStatsResponse.Sort()
|
||||
gotStatus := sut.APIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{})
|
||||
gotStatus := sut.PrometheusAPIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expectedStatsResponse, gotStatus, tsdbMetricNameEntryCmpOpts); diff != "" {
|
||||
t.Errorf("unexpected APIV1StatusTSDB response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -105,7 +105,7 @@ func TestSingleMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3", QueryRequestsCount: 1},
|
||||
},
|
||||
}
|
||||
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -118,17 +118,17 @@ func TestSingleMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3", QueryRequestsCount: 1},
|
||||
},
|
||||
}
|
||||
got = sut.APIV1StatusMetricNamesStats(t, "", "2", "", apptest.QueryOpts{})
|
||||
got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "2", "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// reset state and check empty request response
|
||||
sut.APIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{})
|
||||
sut.PrometheusAPIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{})
|
||||
expected = apptest.MetricNamesStatsResponse{
|
||||
Records: []apptest.MetricNamesStatsRecord{},
|
||||
}
|
||||
got = sut.APIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
got = sut.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -158,7 +158,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()),
|
||||
})
|
||||
// verify empty stats
|
||||
resp := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"})
|
||||
resp := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "0:0"})
|
||||
if len(resp.Records) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Records), 0)
|
||||
}
|
||||
@@ -198,7 +198,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_3"},
|
||||
},
|
||||
}
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
gotStats := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
@@ -216,7 +216,7 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 3},
|
||||
},
|
||||
}
|
||||
gotStats = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
gotStats = vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response tenant: %s (-want, +got):\n%s", tenantID, diff)
|
||||
}
|
||||
@@ -243,9 +243,9 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
},
|
||||
}
|
||||
expectedStatsResponse.Sort()
|
||||
gotStatus := vmselect.APIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{Tenant: tenantID})
|
||||
gotStatus := vmselect.PrometheusAPIV1StatusTSDB(t, "", date, "", apptest.QueryOpts{Tenant: tenantID})
|
||||
if diff := cmp.Diff(expectedStatsResponse, gotStatus, tsdbMetricNameEntryCmpOpts); diff != "" {
|
||||
t.Errorf("unexpected APIV1StatusTSDB response tenant: %s (-want, +got):\n%s", tenantID, diff)
|
||||
t.Errorf("unexpected TSDB status for tenant %s (-want, +got):\n%s", tenantID, diff)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -258,14 +258,14 @@ func TestClusterMetricNamesStats(t *testing.T) {
|
||||
{MetricName: "metric_name_1", QueryRequestsCount: 9},
|
||||
},
|
||||
}
|
||||
gotStats := vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
gotStats := vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
if diff := cmp.Diff(expected, gotStats); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// reset cache and check empty state
|
||||
vmselect.MetricNamesStatsReset(t, apptest.QueryOpts{})
|
||||
resp = vmselect.MetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
vmselect.PrometheusAPIV1AdminStatusMetricNamesStatsReset(t, apptest.QueryOpts{})
|
||||
resp = vmselect.PrometheusAPIV1StatusMetricNamesStats(t, "", "", "", apptest.QueryOpts{Tenant: "multitenant"})
|
||||
if len(resp.Records) != 0 {
|
||||
t.Fatalf("want 0 records, got: %d", len(resp.Records))
|
||||
}
|
||||
|
||||
@@ -219,7 +219,7 @@ func TestClusterMultiTenantSelectViaHeaders(t *testing.T) {
|
||||
tenantID := make(http.Header)
|
||||
tenantID.Set("AccountID", "5")
|
||||
tenantID.Set("ProjectID", "15")
|
||||
vmselect.APIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{
|
||||
Headers: tenantID,
|
||||
})
|
||||
wantSR = apptest.NewPrometheusAPIV1SeriesResponse(t,
|
||||
@@ -244,7 +244,7 @@ func TestClusterMultiTenantSelectViaHeaders(t *testing.T) {
|
||||
}
|
||||
|
||||
// Delete series for multitenant with tenant filter
|
||||
vmselect.APIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{
|
||||
Headers: multitenant,
|
||||
})
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ func TestClusterMultiTenantSelect(t *testing.T) {
|
||||
}
|
||||
|
||||
// Delete series from specific tenant
|
||||
vmselect.APIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, "foo_bar", apptest.QueryOpts{
|
||||
Tenant: "5:15",
|
||||
})
|
||||
wantSR = apptest.NewPrometheusAPIV1SeriesResponse(t,
|
||||
@@ -215,7 +215,7 @@ func TestClusterMultiTenantSelect(t *testing.T) {
|
||||
}
|
||||
|
||||
// Delete series for multitenant with tenant filter
|
||||
vmselect.APIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(t, `foo_bar{vm_account_id="1"}`, apptest.QueryOpts{
|
||||
Tenant: "multitenant",
|
||||
})
|
||||
|
||||
|
||||
@@ -19,9 +19,11 @@ import (
|
||||
// Vmagent holds the state of a vmagent app and provides vmagent-specific functions
|
||||
type Vmagent struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
|
||||
httpListenAddr string
|
||||
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// StartVmagent starts an instance of vmagent with the given flags. It also
|
||||
@@ -46,12 +48,10 @@ func StartVmagent(instance string, flags []string, cli *Client, promScrapeConfig
|
||||
}
|
||||
|
||||
return &Vmagent{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
},
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[0]),
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
cli: cli,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"syscall"
|
||||
@@ -17,7 +16,7 @@ var httpBuilitinListenAddrRE = regexp.MustCompile(`pprof handlers are exposed at
|
||||
// functions.
|
||||
type Vmauth struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
|
||||
httpListenAddr string
|
||||
configFilePath string
|
||||
@@ -45,11 +44,8 @@ func StartVmauth(instance string, flags []string, cli *Client, configFilePath st
|
||||
}
|
||||
|
||||
return &Vmauth{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
},
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[0]),
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
configFilePath: configFilePath,
|
||||
cli: cli,
|
||||
|
||||
@@ -3,31 +3,21 @@ package apptest
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||
)
|
||||
|
||||
// Vminsert holds the state of a vminsert app and provides vminsert-specific
|
||||
// functions.
|
||||
type Vminsert struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
*vminsertClient
|
||||
|
||||
httpListenAddr string
|
||||
clusternativeListenAddr string
|
||||
graphiteListenAddr string
|
||||
openTSDBListenAddr string
|
||||
|
||||
cli *Client
|
||||
}
|
||||
|
||||
// storageNodes returns the storage node addresses passed to vminsert via
|
||||
@@ -73,17 +63,26 @@ func StartVminsert(instance string, flags []string, cli *Client, output io.Write
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metricsClient := newMetricsClient(cli, stderrExtracts[0])
|
||||
return &Vminsert{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
app: app,
|
||||
metricsClient: metricsClient,
|
||||
vminsertClient: &vminsertClient{
|
||||
vminsertCli: cli,
|
||||
url: func(op, path string, opts QueryOpts) string {
|
||||
return getClusterPath(stderrExtracts[0], op, path, opts)
|
||||
},
|
||||
openTSDBURL: func(op, path string, opts QueryOpts) string {
|
||||
return getClusterPath(stderrExtracts[3], op, path, opts)
|
||||
},
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
sendBlocking: func(t *testing.T, numRecordsToSend int, send func()) {
|
||||
t.Helper()
|
||||
sendBlocking(t, metricsClient, numRecordsToSend, send)
|
||||
},
|
||||
},
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
clusternativeListenAddr: stderrExtracts[1],
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
openTSDBListenAddr: stderrExtracts[3],
|
||||
cli: cli,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -99,247 +98,6 @@ func (app *Vminsert) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// InfluxWrite is a test helper function that inserts a
|
||||
// collection of records in Influx line format by sending a HTTP
|
||||
// POST request to /influx/write vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite
|
||||
func (app *Vminsert) InfluxWrite(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "influx/write", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
app.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// GraphiteWrite is a test helper function that sends a
|
||||
// collection of records to graphiteListenAddr port.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
|
||||
func (app *Vminsert) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
|
||||
t.Helper()
|
||||
app.cli.Write(t, app.graphiteListenAddr, records)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection
|
||||
// of records in CSV format for the given tenant by sending an HTTP POST
|
||||
// request to prometheus/api/v1/import/csv vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (app *Vminsert) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/csv", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
app.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportNative is a test helper function that inserts a collection
|
||||
// of records in Native format for the given tenant by sending an HTTP POST
|
||||
// request to prometheus/api/v1/import/native vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (app *Vminsert) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/native", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
app.sendBlocking(t, 1, func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// OpenTSDBAPIPut is a test helper function that inserts a collection of
|
||||
// records in OpenTSDB format for the given tenant by sending an HTTP POST
|
||||
// request to /opentsdb/api/put vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
|
||||
func (app *Vminsert) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.openTSDBListenAddr, "insert", "opentsdb/api/put", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte("[" + strings.Join(records, ",") + "]")
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/json")
|
||||
app.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Write is a test helper function that inserts a
|
||||
// collection of records in Prometheus remote-write format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/write vminsert endpoint.
|
||||
func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/write", opts)
|
||||
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
|
||||
recordsCount := len(wr.Timeseries)
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += len(wr.Metadata)
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
app.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
|
||||
// collection of records in Prometheus text exposition format for the given
|
||||
// tenant by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/import/prometheus vminsert endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus
|
||||
func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "prometheus/api/v1/import/prometheus", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
var recordsCount int
|
||||
var metadataRecords int
|
||||
uniqueMetadataMetricNames := make(map[string]struct{})
|
||||
for _, record := range records {
|
||||
// metric metadata has the following format:
|
||||
//# HELP importprometheus_series
|
||||
//# TYPE importprometheus_series
|
||||
// it results into single metadata record
|
||||
if strings.HasPrefix(record, "# ") {
|
||||
metadataItems := strings.Split(record, " ")
|
||||
if len(metadataItems) < 2 {
|
||||
t.Fatalf("BUG: unexpected metadata format=%q", record)
|
||||
}
|
||||
metricName := metadataItems[2]
|
||||
if _, ok := uniqueMetadataMetricNames[metricName]; ok {
|
||||
continue
|
||||
}
|
||||
uniqueMetadataMetricNames[metricName] = struct{}{}
|
||||
metadataRecords++
|
||||
continue
|
||||
}
|
||||
recordsCount++
|
||||
}
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += metadataRecords
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
app.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ZabbixConnectorHistory is a test helper function that inserts a
|
||||
// collection of records in zabbixconnector format by sending a HTTP
|
||||
// POST request to /zabbixconnector/api/v1/history vmsingle endpoint.
|
||||
func (app *Vminsert) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "zabbixconnector/api/v1/history", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/json")
|
||||
app.sendBlocking(t, len(records), func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// OpentelemetryV1Metrics is a test helper function that inserts a
|
||||
// collection of records in Opentelemetry protocol format by sending a HTTP
|
||||
// POST request to /opentelemetry/v1/metrics vminsert endpoint.
|
||||
func (app *Vminsert) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
var recordsCount int
|
||||
for _, rss := range md.ResourceMetrics {
|
||||
for _, sm := range rss.ScopeMetrics {
|
||||
recordsCount += len(sm.Metrics)
|
||||
for _, m := range sm.Metrics {
|
||||
if prommetadata.IsEnabled() {
|
||||
recordsCount += len(m.Metadata)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
url := getClusterPath(app.httpListenAddr, "insert", "opentelemetry/v1/metrics", opts)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := md.MarshalProtobuf(nil)
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
app.sendBlocking(t, recordsCount, func() {
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// String returns the string representation of the vminsert app state.
|
||||
func (app *Vminsert) String() string {
|
||||
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
|
||||
@@ -355,13 +113,10 @@ func (app *Vminsert) String() string {
|
||||
// Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total`
|
||||
// metric and checking whether it is equal or greater than the wanted value.
|
||||
// If it is, then the data has been sent to vmstorage.
|
||||
//
|
||||
// Unreliable if the records are inserted concurrently.
|
||||
// TODO(rtm0): Put sending and waiting into a critical section to make reliable?
|
||||
func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func()) {
|
||||
func sendBlocking(t *testing.T, c *metricsClient, numRecordsToSend int, send func()) {
|
||||
t.Helper()
|
||||
|
||||
wantRowsSentCount := app.rpcRowsSentTotal(t) + numRecordsToSend
|
||||
wantRowsSentCount := c.rpcRowsSentTotal(t) + numRecordsToSend
|
||||
|
||||
send()
|
||||
|
||||
@@ -370,7 +125,7 @@ func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func(
|
||||
period = 100 * time.Millisecond
|
||||
)
|
||||
for range retries {
|
||||
d := app.rpcRowsSentTotal(t)
|
||||
d := c.rpcRowsSentTotal(t)
|
||||
if d >= wantRowsSentCount {
|
||||
return
|
||||
}
|
||||
@@ -378,14 +133,3 @@ func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func(
|
||||
}
|
||||
t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage")
|
||||
}
|
||||
|
||||
// rpcRowsSentTotal retrieves the values of all vminsert
|
||||
// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and
|
||||
// returns their integer sum.
|
||||
func (app *Vminsert) rpcRowsSentTotal(t *testing.T) int {
|
||||
total := 0.0
|
||||
for _, v := range app.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") {
|
||||
total += v
|
||||
}
|
||||
return int(total)
|
||||
}
|
||||
|
||||
@@ -1,20 +1,17 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Vmselect holds the state of a vmselect app and provides vmselect-specific
|
||||
// functions.
|
||||
type Vmselect struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
*vmselectClient
|
||||
|
||||
httpListenAddr string
|
||||
clusternativeListenAddr string
|
||||
@@ -41,10 +38,15 @@ func StartVmselect(instance string, flags []string, cli *Client, output io.Write
|
||||
}
|
||||
|
||||
return &Vmselect{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[0]),
|
||||
cli: cli,
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[0]),
|
||||
vmselectClient: &vmselectClient{
|
||||
vmselectCli: cli,
|
||||
url: func(op, path string, opts QueryOpts) string {
|
||||
return getClusterPath(stderrExtracts[0], op, path, opts)
|
||||
},
|
||||
metricNamesStatsResetURL: fmt.Sprintf("http://%s/admin/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[0]),
|
||||
tenantsURL: fmt.Sprintf("http://%s/admin/tenants", stderrExtracts[0]),
|
||||
},
|
||||
httpListenAddr: stderrExtracts[0],
|
||||
clusternativeListenAddr: stderrExtracts[1],
|
||||
@@ -64,299 +66,6 @@ func (app *Vmselect) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Export is a test helper function that performs the export of
|
||||
// raw samples in JSON line format by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/export vmselect endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export
|
||||
func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
|
||||
exportURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/export", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
res, _ := app.cli.PostForm(t, exportURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ExportNative is a test helper function that performs the export of
|
||||
// raw samples in native binary format by sending an HTTP POST request to
|
||||
// /prometheus/api/v1/export/native vmselect endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative
|
||||
func (app *Vmselect) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte {
|
||||
t.Helper()
|
||||
|
||||
exportURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/export/native", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
res, _ := app.cli.PostForm(t, exportURL, values, opts.Headers)
|
||||
return []byte(res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL
|
||||
// instant query by sending a HTTP POST request to /prometheus/api/v1/query
|
||||
// vmselect endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query
|
||||
func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/query", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1QueryRange is a test helper function that performs
|
||||
// PromQL/MetricsQL range query by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/query_range vmselect endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range
|
||||
func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/query_range", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint
|
||||
// and returns the list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/series", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res, _ := app.cli.PostForm(t, seriesURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1SeriesCount sends a query to a /prometheus/api/v1/series/count endpoint
|
||||
// and returns the total number of time series.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (app *Vmselect) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/series/count", opts)
|
||||
values := opts.asURLValues()
|
||||
|
||||
res, _ := app.cli.PostForm(t, seriesURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesCountResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint
|
||||
// and returns the label names list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels
|
||||
func (app *Vmselect) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/labels", opts)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelsResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/.../values endpoint
|
||||
// and returns the label names list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues
|
||||
func (app *Vmselect) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
suffix := fmt.Sprintf("prometheus/api/v1/label/%s/values", labelName)
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", suffix, opts)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelValuesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint
|
||||
// and returns the results.
|
||||
func (app *Vmselect) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("metric", metric)
|
||||
values.Add("limit", strconv.Itoa(limit))
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/metadata", opts)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1Metadata(t, res)
|
||||
}
|
||||
|
||||
// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending
|
||||
// a request to /api/v1/admin/tsdb/delete_series.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series
|
||||
func (app *Vmselect) APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
queryURL := getClusterPath(app.httpListenAddr, "delete", "prometheus/api/v1/admin/tsdb/delete_series", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// MetricNamesStats sends a query to a /select/tenant/prometheus/api/v1/status/metric_names_stats endpoint
|
||||
// and returns the statistics response for given params.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("limit", limit)
|
||||
values.Add("le", le)
|
||||
values.Add("match_pattern", matchPattern)
|
||||
queryURL := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/status/metric_names_stats", opts)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
var resp MetricNamesStatsResponse
|
||||
if err := json.Unmarshal([]byte(res), &resp); err != nil {
|
||||
t.Fatalf("could not unmarshal series response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// MetricNamesStatsReset sends a query to a /admin/api/v1/status/metric_names_stats/reset endpoint
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (app *Vmselect) MetricNamesStatsReset(t *testing.T, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
queryURL := fmt.Sprintf("http://%s/admin/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// APIV1StatusTSDB sends a query to a /prometheus/api/v1/status/tsdb
|
||||
// //
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats
|
||||
func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "select", "prometheus/api/v1/status/tsdb", opts)
|
||||
values := opts.asURLValues()
|
||||
addNonEmpty := func(name, value string) {
|
||||
if len(value) == 0 {
|
||||
return
|
||||
}
|
||||
values.Add(name, value)
|
||||
}
|
||||
addNonEmpty("match[]", matchQuery)
|
||||
addNonEmpty("topN", topN)
|
||||
addNonEmpty("date", date)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, url, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var status TSDBStatusResponse
|
||||
if err := json.Unmarshal([]byte(res), &status); err != nil {
|
||||
t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
status.Sort()
|
||||
return status
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndex sends a query to a /graphite/metrics/index.json
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "select", "graphite/metrics/index.json", opts)
|
||||
res, statusCode := app.cli.Get(t, url, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var index GraphiteMetricsIndexResponse
|
||||
if err := json.Unmarshal([]byte(res), &index); err != nil {
|
||||
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// GraphiteTagsTagSeries is a test helper function that registers Graphite tags
|
||||
// for a single time series by sending a HTTP POST request to
|
||||
// /graphite/tags/tagSeries vmsingle endpoint.
|
||||
func (app *Vmselect) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "select", "graphite/tags/tagSeries", opts)
|
||||
values := opts.asURLValues()
|
||||
values.Add("path", record)
|
||||
|
||||
_, statusCode := app.cli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *Vmselect) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := getClusterPath(app.httpListenAddr, "select", "graphite/tags/tagMultiSeries", opts)
|
||||
values := opts.asURLValues()
|
||||
for _, rec := range records {
|
||||
values.Add("path", rec)
|
||||
}
|
||||
|
||||
_, statusCode := app.cli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// APIV1AdminTenants sends a query to a /admin/tenants endpoint
|
||||
func (app *Vmselect) APIV1AdminTenants(t *testing.T) *AdminTenantsResponse {
|
||||
t.Helper()
|
||||
|
||||
tenantsURL := fmt.Sprintf("http://%s/admin/tenants", app.httpListenAddr)
|
||||
res, statusCode := app.cli.Get(t, tenantsURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var tenants *AdminTenantsResponse
|
||||
if err := json.Unmarshal([]byte(res), tenants); err != nil {
|
||||
t.Fatalf("could not unmarshal tenants response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
|
||||
return tenants
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmselect app state.
|
||||
func (app *Vmselect) String() string {
|
||||
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
|
||||
|
||||
@@ -1,49 +1,25 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
otlppb "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||
)
|
||||
|
||||
// Vmsingle holds the state of a vmsingle app and provides vmsingle-specific
|
||||
// functions.
|
||||
type Vmsingle struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
*vmstorageClient
|
||||
*vmselectClient
|
||||
*vminsertClient
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
|
||||
// vmstorage URLs.
|
||||
forceFlushURL string
|
||||
forceMergeURL string
|
||||
|
||||
// vminsert URLs.
|
||||
influxLineWriteURL string
|
||||
graphiteWriteAddr string
|
||||
openTSDBHTTPURL string
|
||||
prometheusAPIV1ImportPrometheusURL string
|
||||
prometheusAPIV1WriteURL string
|
||||
|
||||
// vmselect URLs.
|
||||
prometheusAPIV1ExportURL string
|
||||
prometheusAPIV1ExportNativeURL string
|
||||
prometheusAPIV1QueryURL string
|
||||
prometheusAPIV1QueryRangeURL string
|
||||
prometheusAPIV1SeriesURL string
|
||||
}
|
||||
|
||||
// StartVmsingleAt starts an instance of vmsingle with the given flags. It also
|
||||
@@ -70,617 +46,39 @@ func StartVmsingleAt(instance, binary string, flags []string, cli *Client, outpu
|
||||
}
|
||||
|
||||
return &Vmsingle{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]),
|
||||
cli: cli,
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[1]),
|
||||
vmstorageClient: &vmstorageClient{
|
||||
vmstorageCli: cli,
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
},
|
||||
vmselectClient: &vmselectClient{
|
||||
vmselectCli: cli,
|
||||
url: func(op, path string, opts QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path)
|
||||
},
|
||||
metricNamesStatsResetURL: fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[1]),
|
||||
tenantsURL: "vmsingle-does-not-serve-tenants",
|
||||
},
|
||||
vminsertClient: &vminsertClient{
|
||||
vminsertCli: cli,
|
||||
url: func(_, path string, _ QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path)
|
||||
},
|
||||
openTSDBURL: func(_, path string, _ QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[3], path)
|
||||
},
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
sendBlocking: func(t *testing.T, _ int, send func()) {
|
||||
t.Helper()
|
||||
send()
|
||||
},
|
||||
},
|
||||
storageDataPath: stderrExtracts[0],
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
|
||||
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
|
||||
forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]),
|
||||
|
||||
influxLineWriteURL: fmt.Sprintf("http://%s/influx/write", stderrExtracts[1]),
|
||||
graphiteWriteAddr: stderrExtracts[2],
|
||||
openTSDBHTTPURL: fmt.Sprintf("http://%s", stderrExtracts[3]),
|
||||
prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]),
|
||||
prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]),
|
||||
prometheusAPIV1ExportURL: fmt.Sprintf("http://%s/prometheus/api/v1/export", stderrExtracts[1]),
|
||||
prometheusAPIV1ExportNativeURL: fmt.Sprintf("http://%s/prometheus/api/v1/export/native", stderrExtracts[1]),
|
||||
prometheusAPIV1QueryURL: fmt.Sprintf("http://%s/prometheus/api/v1/query", stderrExtracts[1]),
|
||||
prometheusAPIV1QueryRangeURL: fmt.Sprintf("http://%s/prometheus/api/v1/query_range", stderrExtracts[1]),
|
||||
prometheusAPIV1SeriesURL: fmt.Sprintf("http://%s/prometheus/api/v1/series", stderrExtracts[1]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ForceFlush is a test helper function that forces the flushing of inserted
|
||||
// data, so it becomes available for searching immediately.
|
||||
func (app *Vmsingle) ForceFlush(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
_, statusCode := app.cli.Get(t, app.forceFlushURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// ForceMerge is a test helper function that forces the merging of parts.
|
||||
func (app *Vmsingle) ForceMerge(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
_, statusCode := app.cli.Get(t, app.forceMergeURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// InfluxWrite is a test helper function that inserts a
|
||||
// collection of records in Influx line format by sending a HTTP
|
||||
// POST request to /influx/write vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#influxwrite
|
||||
func (app *Vmsingle) InfluxWrite(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
|
||||
url := app.influxLineWriteURL
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// GraphiteWrite is a test helper function that sends a collection of records
|
||||
// to graphiteListenAddr port.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
|
||||
func (app *Vmsingle) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
|
||||
t.Helper()
|
||||
app.cli.Write(t, app.graphiteWriteAddr, records)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportCSV is a test helper function that inserts a collection
|
||||
// of records in CSV format for the given tenant by sending an HTTP POST
|
||||
// request to /api/v1/import/csv vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-import-csv-data
|
||||
func (app *Vmsingle) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/api/v1/import/csv", app.httpListenAddr)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportNative is a test helper function that inserts a collection
|
||||
// of records in native format for the given tenant by sending an HTTP POST
|
||||
// request to /api/v1/import/native vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-data-in-native-format
|
||||
func (app *Vmsingle) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/api/v1/import/native", app.httpListenAddr)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// OpenTSDBAPIPut is a test helper function that inserts a collection of
|
||||
// records in OpenTSDB format for the given tenant by sending an HTTP POST
|
||||
// request to /api/put vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/opentsdb/#sending-data-via-http
|
||||
func (app *Vmsingle) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
// add extra label
|
||||
url := app.openTSDBHTTPURL + "/api/put"
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte("[" + strings.Join(records, ",") + "]")
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Write is a test helper function that inserts a
|
||||
// collection of records in Prometheus remote-write format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/write vmsingle endpoint.
|
||||
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
_, statusCode := app.cli.Post(t, app.prometheusAPIV1WriteURL, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
|
||||
// collection of records in Prometheus text exposition format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/import/prometheus vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1importprometheus
|
||||
func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
// add extra label
|
||||
url := app.prometheusAPIV1ImportPrometheusURL
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "text/plain")
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
|
||||
}
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Export is a test helper function that performs the export of
|
||||
// raw samples in JSON line format by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/export vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1export
|
||||
func (app *Vmsingle) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
|
||||
res, _ := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ExportNative is a test helper function that performs the export of
|
||||
// raw samples in native binary format by sending an HTTP POST request to
|
||||
// /prometheus/api/v1/export/native vmselect endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1exportnative
|
||||
func (app *Vmsingle) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte {
|
||||
t.Helper()
|
||||
|
||||
t.Helper()
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
|
||||
res, _ := app.cli.PostForm(t, app.prometheusAPIV1ExportNativeURL, values, opts.Headers)
|
||||
return []byte(res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL
|
||||
// instant query by sending a HTTP POST request to /prometheus/api/v1/query
|
||||
// vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query
|
||||
func (app *Vmsingle) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
res, _ := app.cli.PostForm(t, app.prometheusAPIV1QueryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1QueryRange is a test helper function that performs
|
||||
// PromQL/MetricsQL range query by sending a HTTP POST request to
|
||||
// /prometheus/api/v1/query_range vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1query_range
|
||||
func (app *Vmsingle) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res, _ := app.cli.PostForm(t, app.prometheusAPIV1QueryRangeURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint
|
||||
// and returns the list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res, _ := app.cli.PostForm(t, app.prometheusAPIV1SeriesURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1SeriesCount sends a query to a /prometheus/api/v1/series/count endpoint
|
||||
// and returns the total number of time series.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1series
|
||||
func (app *Vmsingle) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/series/count", app.httpListenAddr)
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1SeriesCountResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint
|
||||
// and returns the label names list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels
|
||||
func (app *Vmsingle) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/labels", app.httpListenAddr)
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelsResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/.../values endpoint
|
||||
// and returns the label names list of time series that match the query.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues
|
||||
func (app *Vmsingle) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQuery string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/label/%s/values", app.httpListenAddr, labelName)
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1LabelValuesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint
|
||||
// and returns the results.
|
||||
func (app *Vmsingle) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("metric", metric)
|
||||
values.Add("limit", strconv.Itoa(limit))
|
||||
queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/metadata", app.httpListenAddr)
|
||||
|
||||
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
return NewPrometheusAPIV1Metadata(t, res)
|
||||
}
|
||||
|
||||
// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending
|
||||
// a request to /api/v1/admin/tsdb/delete_series.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series
|
||||
func (app *Vmsingle) APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/api/v1/admin/tsdb/delete_series", app.httpListenAddr)
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndex sends a query to a /metrics/index.json
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
func (app *Vmsingle) GraphiteMetricsIndex(t *testing.T, _ QueryOpts) GraphiteMetricsIndexResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := fmt.Sprintf("http://%s/metrics/index.json", app.httpListenAddr)
|
||||
res, statusCode := app.cli.Get(t, seriesURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var index GraphiteMetricsIndexResponse
|
||||
if err := json.Unmarshal([]byte(res), &index); err != nil {
|
||||
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// GraphiteTagsTagSeries is a test helper function that registers Graphite tags
|
||||
// for a single time series by sending a HTTP POST request to
|
||||
// /graphite/tags/tagSeries vmsingle endpoint.
|
||||
func (app *Vmsingle) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/graphite/tags/tagSeries", app.httpListenAddr)
|
||||
values := opts.asURLValues()
|
||||
values.Add("path", record)
|
||||
|
||||
_, statusCode := app.cli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *Vmsingle) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/graphite/tags/tagMultiSeries", app.httpListenAddr)
|
||||
values := opts.asURLValues()
|
||||
for _, rec := range records {
|
||||
values.Add("path", rec)
|
||||
}
|
||||
|
||||
_, statusCode := app.cli.PostForm(t, url, values, opts.Headers)
|
||||
if got, want := statusCode, http.StatusNotImplemented; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// APIV1StatusMetricNamesStats sends a query to a /api/v1/status/metric_names_stats endpoint
|
||||
// and returns the statistics response for given params.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (app *Vmsingle) APIV1StatusMetricNamesStats(t *testing.T, limit, le, matchPattern string, opts QueryOpts) MetricNamesStatsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("limit", limit)
|
||||
values.Add("le", le)
|
||||
values.Add("match_pattern", matchPattern)
|
||||
queryURL := fmt.Sprintf("http://%s/api/v1/status/metric_names_stats", app.httpListenAddr)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
var resp MetricNamesStatsResponse
|
||||
if err := json.Unmarshal([]byte(res), &resp); err != nil {
|
||||
t.Fatalf("could not unmarshal metric names stats response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
// APIV1AdminStatusMetricNamesStatsReset sends a query to a /api/v1/admin/status/metric_names_stats/reset endpoint
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage
|
||||
func (app *Vmsingle) APIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
queryURL := fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", app.httpListenAddr)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
|
||||
if statusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
|
||||
}
|
||||
}
|
||||
|
||||
// SnapshotCreate creates a database snapshot by sending a query to the
|
||||
// /snapshot/create endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmsingle) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
|
||||
t.Helper()
|
||||
|
||||
data, statusCode := app.cli.Post(t, app.SnapshotCreateURL(), nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotCreateResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotCreateURL returns the URL for creating snapshots.
|
||||
func (app *Vmsingle) SnapshotCreateURL() string {
|
||||
return fmt.Sprintf("http://%s/snapshot/create", app.httpListenAddr)
|
||||
}
|
||||
|
||||
// APIV1AdminTSDBSnapshot creates a database snapshot by sending a query to the
|
||||
// /api/v1/admin/tsdb/snapshot endpoint.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot.
|
||||
func (app *Vmsingle) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSnapshotResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", app.httpListenAddr)
|
||||
data, statusCode := app.cli.Post(t, queryURL, nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res APIV1AdminTSDBSnapshotResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal prometheus snapshot create response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotList lists existing database snapshots by sending a query to the
|
||||
// /snapshot/list endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmsingle) SnapshotList(t *testing.T) *SnapshotListResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/list", app.httpListenAddr)
|
||||
data, statusCode := app.cli.Get(t, queryURL, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotListResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDelete deletes a snapshot by sending a query to the
|
||||
// /snapshot/delete endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmsingle) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", app.httpListenAddr, snapshotName)
|
||||
data, statusCode := app.cli.Delete(t, queryURL)
|
||||
wantStatusCodes := map[int]bool{
|
||||
http.StatusOK: true,
|
||||
http.StatusInternalServerError: true,
|
||||
}
|
||||
if !wantStatusCodes[statusCode] {
|
||||
t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDeleteAll deletes all snapshots by sending a query to the
|
||||
// /snapshot/delete_all endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmsingle) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/delete_all", app.httpListenAddr)
|
||||
data, statusCode := app.cli.Get(t, queryURL, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteAllResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// APIV1StatusTSDB sends a query to a /prometheus/api/v1/status/tsdb
|
||||
// //
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats
|
||||
func (app *Vmsingle) APIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := fmt.Sprintf("http://%s/prometheus/api/v1/status/tsdb", app.httpListenAddr)
|
||||
values := opts.asURLValues()
|
||||
addNonEmpty := func(name, value string) {
|
||||
if len(value) == 0 {
|
||||
return
|
||||
}
|
||||
values.Add(name, value)
|
||||
}
|
||||
addNonEmpty("match[]", matchQuery)
|
||||
addNonEmpty("topN", topN)
|
||||
addNonEmpty("date", date)
|
||||
|
||||
res, statusCode := app.cli.PostForm(t, seriesURL, values, opts.Headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var status TSDBStatusResponse
|
||||
if err := json.Unmarshal([]byte(res), &status); err != nil {
|
||||
t.Fatalf("could not unmarshal tsdb status response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
status.Sort()
|
||||
return status
|
||||
}
|
||||
|
||||
// ZabbixConnectorHistory is a test helper function that inserts a
|
||||
// collection of records in zabbixconnector format by sending a HTTP
|
||||
// POST request to /zabbixconnector/api/v1/history vmsingle endpoint.
|
||||
func (app *Vmsingle) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/zabbixconnector/api/v1/history", app.httpListenAddr)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/json")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// OpentelemetryV1Metrics is a test helper function that inserts a
|
||||
// collection of records in Opentelemetry protocol format by sending a HTTP
|
||||
// POST request to /opentelemetry/v1/metrics vmsingle endpoint.
|
||||
func (app *Vmsingle) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsData, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
url := fmt.Sprintf("http://%s/opentelemetry/v1/metrics", app.httpListenAddr)
|
||||
uv := opts.asURLValues()
|
||||
uvs := uv.Encode()
|
||||
if len(uvs) > 0 {
|
||||
url += "?" + uvs
|
||||
}
|
||||
data := md.MarshalProtobuf(nil)
|
||||
headers := opts.getHeaders()
|
||||
headers.Set("Content-Type", "application/x-protobuf")
|
||||
_, statusCode := app.cli.Post(t, url, data, headers)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPAddr returns the address at which the vminsert process is
|
||||
// listening for incoming HTTP requests.
|
||||
func (app *Vmsingle) HTTPAddr() string {
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -15,7 +12,8 @@ import (
|
||||
// functions.
|
||||
type Vmstorage struct {
|
||||
*app
|
||||
*ServesMetrics
|
||||
*metricsClient
|
||||
*vmstorageClient
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
@@ -47,10 +45,11 @@ func StartVmstorageAt(instance, binary string, flags []string, cli *Client, outp
|
||||
}
|
||||
|
||||
return &Vmstorage{
|
||||
app: app,
|
||||
ServesMetrics: &ServesMetrics{
|
||||
metricsURL: fmt.Sprintf("http://%s/metrics", stderrExtracts[1]),
|
||||
cli: cli,
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[1]),
|
||||
vmstorageClient: &vmstorageClient{
|
||||
vmstorageCli: cli,
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
},
|
||||
storageDataPath: stderrExtracts[0],
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
@@ -71,121 +70,6 @@ func (app *Vmstorage) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// ForceFlush is a test helper function that forces the flushing of inserted
|
||||
// data, so it becomes available for searching immediately.
|
||||
func (app *Vmstorage) ForceFlush(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
forceFlushURL := fmt.Sprintf("http://%s/internal/force_flush", app.httpListenAddr)
|
||||
_, statusCode := app.cli.Get(t, forceFlushURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// ForceMerge is a test helper function that forces the merging of parts.
|
||||
func (app *Vmstorage) ForceMerge(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
forceMergeURL := fmt.Sprintf("http://%s/internal/force_merge", app.httpListenAddr)
|
||||
_, statusCode := app.cli.Get(t, forceMergeURL, nil)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
// SnapshotCreate creates a database snapshot by sending a query to the
|
||||
// /snapshot/create endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmstorage) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
|
||||
t.Helper()
|
||||
|
||||
data, statusCode := app.cli.Post(t, app.SnapshotCreateURL(), nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotCreateResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot create response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotCreateURL returns the URL for creating snapshots.
|
||||
func (app *Vmstorage) SnapshotCreateURL() string {
|
||||
return fmt.Sprintf("http://%s/snapshot/create", app.httpListenAddr)
|
||||
}
|
||||
|
||||
// SnapshotList lists existing database snapshots by sending a query to the
|
||||
// /snapshot/list endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmstorage) SnapshotList(t *testing.T) *SnapshotListResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/list", app.httpListenAddr)
|
||||
data, statusCode := app.cli.Get(t, queryURL, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotListResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot list response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDelete deletes a snapshot by sending a query to the
|
||||
// /snapshot/delete endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmstorage) SnapshotDelete(t *testing.T, snapshotName string) *SnapshotDeleteResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", app.httpListenAddr, snapshotName)
|
||||
data, statusCode := app.cli.Delete(t, queryURL)
|
||||
wantStatusCodes := map[int]bool{
|
||||
http.StatusOK: true,
|
||||
http.StatusInternalServerError: true,
|
||||
}
|
||||
if !wantStatusCodes[statusCode] {
|
||||
t.Fatalf("unexpected status code: got %d, want %v, resp text=%q", statusCode, wantStatusCodes, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// SnapshotDeleteAll deletes all snapshots by sending a query to the
|
||||
// /snapshot/delete_all endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots
|
||||
func (app *Vmstorage) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResponse {
|
||||
t.Helper()
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s/snapshot/delete_all", app.httpListenAddr)
|
||||
data, statusCode := app.cli.Post(t, queryURL, nil, nil)
|
||||
if got, want := statusCode, http.StatusOK; got != want {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
|
||||
}
|
||||
|
||||
var res SnapshotDeleteAllResponse
|
||||
if err := json.Unmarshal([]byte(data), &res); err != nil {
|
||||
t.Fatalf("could not unmarshal snapshot delete all response: data=%q, err: %v", data, err)
|
||||
}
|
||||
|
||||
return &res
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmstorage app state.
|
||||
func (app *Vmstorage) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{
|
||||
|
||||
Reference in New Issue
Block a user