mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 16:59:40 +03:00
Compare commits
4 Commits
test/memor
...
vmselect/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5af6b108e0 | ||
|
|
ac602476dc | ||
|
|
0a1638acd5 | ||
|
|
11bb4ef7a7 |
@@ -63,6 +63,10 @@ var (
|
||||
|
||||
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for requests from other vmselect nodes in multi-level cluster setup. "+
|
||||
"See https://docs.victoriametrics.com/cluster-victoriametrics/#multi-level-cluster-setup . Usually :8401 should be set to match default vmstorage port for vmselect. Disabled work if empty")
|
||||
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
|
||||
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
|
||||
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
|
||||
"See also -search.logQueryMemoryUsage")
|
||||
)
|
||||
|
||||
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
||||
@@ -116,6 +120,10 @@ func main() {
|
||||
netstorage.InitTmpBlocksDir("")
|
||||
promql.InitRollupResultCache("")
|
||||
}
|
||||
|
||||
promql.SetMaxMemoryPerQuery(maxMemoryPerQuery.N)
|
||||
netstorage.SetMaxMemoryUsagePerQuery(maxMemoryPerQuery.N)
|
||||
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
|
||||
initVMAlertProxy()
|
||||
var vmselectapiServer *vmselectapi.Server
|
||||
|
||||
73
app/vmselect/netstorage/memory_limiter.go
Normal file
73
app/vmselect/netstorage/memory_limiter.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package netstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
maxMemoryUsagePerQuery int64
|
||||
)
|
||||
|
||||
// getMaxMemoryUsagePerQuery returns the maximum memory usage per query.
|
||||
func getMaxMemoryUsagePerQuery() int64 {
|
||||
return maxMemoryUsagePerQuery
|
||||
}
|
||||
|
||||
// SetMaxMemoryUsagePerQuery sets the maximum memory usage per query.
|
||||
func SetMaxMemoryUsagePerQuery(v int64) {
|
||||
maxMemoryUsagePerQuery = v
|
||||
}
|
||||
|
||||
// memoryLimiter tracks and limits memory usage for operations
|
||||
type memoryLimiter struct {
|
||||
maxSize uint64
|
||||
|
||||
mu sync.Mutex
|
||||
usage uint64
|
||||
}
|
||||
|
||||
func newMemoryLimiter() *memoryLimiter {
|
||||
maxSize := uint64(getMaxMemoryUsagePerQuery())
|
||||
return &memoryLimiter{
|
||||
maxSize: maxSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (ml *memoryLimiter) AddBytes(n uint64) error {
|
||||
if ml.maxSize <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ml.mu.Lock()
|
||||
ok := n <= ml.maxSize && ml.maxSize-n >= ml.usage
|
||||
if ok {
|
||||
ml.usage += n
|
||||
}
|
||||
ml.mu.Unlock()
|
||||
if !ok {
|
||||
return &limitExceededErr{
|
||||
err: fmt.Errorf("cannot allocate %d bytes; max allowed memory usage is %d bytes", n, ml.maxSize),
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddStrings accounts for and limits memory usage of string slices
|
||||
func (ml *memoryLimiter) AddStrings(v []string) error {
|
||||
if len(v) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Account for:
|
||||
// - Slice header (24 bytes)
|
||||
// - String headers (16 bytes each)
|
||||
// - String data
|
||||
overhead := uint64(24 + (16 * len(v)))
|
||||
dataSize := uint64(0)
|
||||
for _, s := range v {
|
||||
dataSize += uint64(len(s))
|
||||
}
|
||||
return ml.AddBytes(overhead + dataSize)
|
||||
}
|
||||
@@ -906,10 +906,14 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||
return nil, false, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
ml := newMemoryLimiter()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.labelNamesRequests.Inc()
|
||||
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
||||
if err == nil {
|
||||
err = ml.AddStrings(labelNames)
|
||||
}
|
||||
if err != nil {
|
||||
sn.labelNamesErrors.Inc()
|
||||
err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||
@@ -1023,7 +1027,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
|
||||
case "vm_project_id":
|
||||
idx = 1
|
||||
default:
|
||||
logger.Panicf("BUG: unexpected labeName=%q", labelName)
|
||||
logger.Fatalf("BUG: unexpected labelName=%q", labelName)
|
||||
}
|
||||
|
||||
labelValues := make([]string, 0, len(tenants))
|
||||
@@ -1049,11 +1053,15 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
ml := newMemoryLimiter()
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.labelValuesRequests.Inc()
|
||||
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
||||
if err == nil {
|
||||
err = ml.AddStrings(labelValues)
|
||||
}
|
||||
if err != nil {
|
||||
sn.labelValuesErrors.Inc()
|
||||
err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||
@@ -1194,10 +1202,14 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
|
||||
suffixes []string
|
||||
err error
|
||||
}
|
||||
ml := newMemoryLimiter()
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
sn.tagValueSuffixesRequests.Inc()
|
||||
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
if err == nil {
|
||||
err = ml.AddStrings(suffixes)
|
||||
}
|
||||
if err != nil {
|
||||
sn.tagValueSuffixesErrors.Inc()
|
||||
err = fmt.Errorf("cannot get tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
|
||||
@@ -1695,11 +1707,15 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
ml := newMemoryLimiter()
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
|
||||
sn.searchMetricNamesRequests.Inc()
|
||||
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
||||
if err == nil {
|
||||
err = ml.AddStrings(metricNames)
|
||||
}
|
||||
if sq.IsMultiTenant {
|
||||
// TODO: (@f41gh7) this function could produce duplicate labels
|
||||
// if original metricName already have tenant labels
|
||||
|
||||
@@ -36,10 +36,7 @@ var (
|
||||
"See https://docs.victoriametrics.com/#backfilling . See also -search.resetRollupResultCacheOnStartup")
|
||||
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
|
||||
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
|
||||
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
|
||||
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
|
||||
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
|
||||
"See also -search.logQueryMemoryUsage")
|
||||
|
||||
logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+
|
||||
"the query requires more memory than specified by this flag. "+
|
||||
"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
|
||||
@@ -52,6 +49,20 @@ var (
|
||||
"A larger value makes the pushed-down filter more complex but fewer time series will be returned. This flag is useful when selective label contains numerous values, for example `instance`, and storage resources are abundant.")
|
||||
)
|
||||
|
||||
var (
|
||||
maxMemoryPerQuery int64
|
||||
)
|
||||
|
||||
// SetMaxMemoryPerQuery sets the maximum memory allowed per query.
|
||||
func SetMaxMemoryPerQuery(v int64) {
|
||||
maxMemoryPerQuery = v
|
||||
}
|
||||
|
||||
// getMaxMemoryPerQuery returns the maximum memory allowed per query.
|
||||
func getMaxMemoryPerQuery() int64 {
|
||||
return maxMemoryPerQuery
|
||||
}
|
||||
|
||||
// The minimum number of points per timeseries for enabling time rounding.
|
||||
// This improves cache hit ratio for frequently requested queries over
|
||||
// big time ranges.
|
||||
@@ -1793,7 +1804,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
"the query selects %d time series and generates %d points across all the time series; try reducing the number of selected time series",
|
||||
ec.QuotedRemoteAddr, requestURI, expr.AppendString(nil), rollupMemorySize, maxMemory, timeseriesLen*len(rcs), rollupPoints)
|
||||
}
|
||||
if maxMemory := int64(maxMemoryPerQuery.N); maxMemory > 0 && rollupMemorySize > maxMemory {
|
||||
if maxMemory := int64(getMaxMemoryPerQuery()); maxMemory > 0 && rollupMemorySize > maxMemory {
|
||||
rss.Cancel()
|
||||
err := fmt.Errorf("not enough memory for processing %s, which returns %d data points across %d time series with %d points in each time series "+
|
||||
"according to -search.maxMemoryPerQuery=%d; requested memory: %d bytes; "+
|
||||
|
||||
@@ -8,7 +8,7 @@ as apposed to a unit test that verifies the behavior of a building block of an
|
||||
application.
|
||||
|
||||
To achieve that an integration test starts an application in a separate process
|
||||
and then issues HTTP requets to it and verifies the responses, examines the
|
||||
and then issues HTTP request to it and verifies the responses, examines the
|
||||
metrics the app exposes and/or files it creates, etc.
|
||||
|
||||
Note that an object of testing may be not just a single app, but several apps
|
||||
|
||||
@@ -159,3 +159,11 @@ func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []floa
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func getExpectedResponse(opts, def int) int {
|
||||
if opts == 0 {
|
||||
return def
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ type PrometheusQuerier interface {
|
||||
PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse
|
||||
PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse
|
||||
PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse
|
||||
PrometheusAPIV1Labels(t *testing.T, opts QueryOpts) *PrometheusAPIV1LabelsResponse
|
||||
PrometheusAPIV1LabelValues(t *testing.T, labelName string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse
|
||||
}
|
||||
|
||||
// PrometheusWriter contains methods available to Prometheus-like HTTP API for Writing new data
|
||||
@@ -53,6 +55,8 @@ type QueryOpts struct {
|
||||
ExtraFilters []string
|
||||
ExtraLabels []string
|
||||
Trace string
|
||||
|
||||
ExpectedResponseCode int
|
||||
}
|
||||
|
||||
func (qos *QueryOpts) asURLValues() url.Values {
|
||||
@@ -254,3 +258,39 @@ func (t *Trace) Contains(s string) int {
|
||||
}
|
||||
return times
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelsResponse represents the response from /api/v1/labels endpoint
|
||||
type PrometheusAPIV1LabelsResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data []string `json:"data"`
|
||||
ErrorType string `json:"errorType,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// NewPrometheusAPIV1LabelsResponse creates a new PrometheusAPIV1LabelsResponse from JSON
|
||||
func NewPrometheusAPIV1LabelsResponse(t *testing.T, s string) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
res := &PrometheusAPIV1LabelsResponse{}
|
||||
if err := json.Unmarshal([]byte(s), res); err != nil {
|
||||
t.Fatalf("could not unmarshal labels response data:\n%s\n err: %v", s, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValuesResponse represents the response from /api/v1/label/{label}/values endpoint
|
||||
type PrometheusAPIV1LabelValuesResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data []string `json:"data"`
|
||||
ErrorType string `json:"errorType,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// NewPrometheusAPIV1LabelValuesResponse creates a new PrometheusAPIV1LabelValuesResponse from JSON
|
||||
func NewPrometheusAPIV1LabelValuesResponse(t *testing.T, s string) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
res := &PrometheusAPIV1LabelValuesResponse{}
|
||||
if err := json.Unmarshal([]byte(s), res); err != nil {
|
||||
t.Fatalf("could not unmarshal label values response data:\n%s\n err: %v", s, err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
56
apptest/tests/memory_limits_test.go
Normal file
56
apptest/tests/memory_limits_test.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
// Data used in tests
|
||||
var testDataMemoryLimits = []string{
|
||||
`foo_bar{job="some_value"} 1.00`,
|
||||
`foo_bar{job="other_value"} 2.00`,
|
||||
}
|
||||
|
||||
func TestMaxMemoryUsageExceeded(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
querier := tc.MustStartCluster(&apptest.ClusterOptions{
|
||||
VminsertInstance: "vminsert",
|
||||
VmselectInstance: "vmselect",
|
||||
Vmstorage1Instance: "vmstorage1",
|
||||
Vmstorage2Instance: "vmstorage2",
|
||||
VmselectFlags: []string{"-search.maxMemoryPerQuery=1"},
|
||||
VminsertFlags: []string{"-clusternativeListenAddr=127.0.0.1:0"},
|
||||
})
|
||||
|
||||
querier.PrometheusAPIV1ImportPrometheus(t, testDataMemoryLimits, apptest.QueryOpts{})
|
||||
|
||||
querier.ForceFlush(t)
|
||||
|
||||
// Test labels endpoint
|
||||
labelsResp := querier.PrometheusAPIV1Labels(t, apptest.QueryOpts{
|
||||
ExpectedResponseCode: http.StatusUnprocessableEntity,
|
||||
})
|
||||
if labelsResp.Status != "error" {
|
||||
t.Fatalf("unexpected status for labels; got %q, want %q", labelsResp.Status, "error")
|
||||
}
|
||||
|
||||
// Test label values endpoint
|
||||
labelValuesResp := querier.PrometheusAPIV1LabelValues(t, "job", apptest.QueryOpts{
|
||||
ExpectedResponseCode: http.StatusUnprocessableEntity,
|
||||
})
|
||||
if labelValuesResp.Status != "error" {
|
||||
t.Fatalf("unexpected status for label values; got %q, want %q", labelValuesResp.Status, "error")
|
||||
}
|
||||
|
||||
// Test series endpoint
|
||||
seriesResp := querier.PrometheusAPIV1Series(t, "foo_bar", apptest.QueryOpts{
|
||||
ExpectedResponseCode: http.StatusUnprocessableEntity,
|
||||
})
|
||||
if seriesResp.Status != "error" {
|
||||
t.Fatalf("unexpected status for series; got %q, want %q", seriesResp.Status, "error")
|
||||
}
|
||||
}
|
||||
@@ -66,7 +66,7 @@ func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query string, opts Quer
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
res := app.cli.PostForm(t, exportURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, exportURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query string, opts Query
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res := app.cli.PostForm(t, queryURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, queryURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query string, opts
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res := app.cli.PostForm(t, queryURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, queryURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
@@ -113,10 +113,36 @@ func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res := app.cli.PostForm(t, seriesURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, seriesURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint
|
||||
// and returns the list of label names.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1labels
|
||||
func (app *Vmselect) PrometheusAPIV1Labels(t *testing.T, opts QueryOpts) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
|
||||
labelsURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/labels", app.httpListenAddr, opts.getTenant())
|
||||
values := opts.asURLValues()
|
||||
res := app.cli.PostForm(t, labelsURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1LabelsResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/{label}/values endpoint
|
||||
// and returns the list of label values for the specified label name.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1labelvalues
|
||||
func (app *Vmselect) PrometheusAPIV1LabelValues(t *testing.T, labelName string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
|
||||
valuesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/label/%s/values", app.httpListenAddr, opts.getTenant(), labelName)
|
||||
values := opts.asURLValues()
|
||||
res := app.cli.PostForm(t, valuesURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1LabelValuesResponse(t, res)
|
||||
}
|
||||
|
||||
// DeleteSeries sends a query to a /prometheus/api/v1/admin/tsdb/delete_series
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1admintsdbdelete_series
|
||||
|
||||
@@ -9,8 +9,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/golang/snappy"
|
||||
|
||||
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
// Vmsingle holds the state of a vmsingle app and provides vmsingle-specific
|
||||
@@ -31,10 +32,12 @@ type Vmsingle struct {
|
||||
prometheusAPIV1WriteURL string
|
||||
|
||||
// vmselect URLs.
|
||||
prometheusAPIV1ExportURL string
|
||||
prometheusAPIV1QueryURL string
|
||||
prometheusAPIV1QueryRangeURL string
|
||||
prometheusAPIV1SeriesURL string
|
||||
prometheusAPIV1ExportURL string
|
||||
prometheusAPIV1QueryURL string
|
||||
prometheusAPIV1QueryRangeURL string
|
||||
prometheusAPIV1SeriesURL string
|
||||
prometheusAPIV1LabelsURL string
|
||||
prometheusAPIV1LabelValuesURL string
|
||||
}
|
||||
|
||||
// StartVmsingle starts an instance of vmsingle with the given flags. It also
|
||||
@@ -72,6 +75,8 @@ func StartVmsingle(instance string, flags []string, cli *Client) (*Vmsingle, err
|
||||
prometheusAPIV1QueryURL: fmt.Sprintf("http://%s/prometheus/api/v1/query", stderrExtracts[1]),
|
||||
prometheusAPIV1QueryRangeURL: fmt.Sprintf("http://%s/prometheus/api/v1/query_range", stderrExtracts[1]),
|
||||
prometheusAPIV1SeriesURL: fmt.Sprintf("http://%s/prometheus/api/v1/series", stderrExtracts[1]),
|
||||
prometheusAPIV1LabelsURL: fmt.Sprintf("http://%s/prometheus/api/v1/labels", stderrExtracts[1]),
|
||||
prometheusAPIV1LabelValuesURL: fmt.Sprintf("http://%s/prometheus/api/v1/label/{}/values", stderrExtracts[1]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -98,12 +103,12 @@ func (app *Vmsingle) InfluxWrite(t *testing.T, records []string, _ QueryOpts) {
|
||||
// PrometheusAPIV1Write is a test helper function that inserts a
|
||||
// collection of records in Prometheus remote-write format by sending a HTTP
|
||||
// POST request to /prometheus/api/v1/write vmsingle endpoint.
|
||||
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, _ QueryOpts) {
|
||||
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
wr := pb.WriteRequest{Timeseries: records}
|
||||
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
|
||||
app.cli.Post(t, app.prometheusAPIV1WriteURL, "application/x-protobuf", data, http.StatusNoContent)
|
||||
app.cli.Post(t, app.prometheusAPIV1WriteURL, "application/x-protobuf", data, getExpectedResponse(opts.ExpectedResponseCode, http.StatusNoContent))
|
||||
}
|
||||
|
||||
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
|
||||
@@ -111,11 +116,11 @@ func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries,
|
||||
// POST request to /prometheus/api/v1/import/prometheus vmsingle endpoint.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1importprometheus
|
||||
func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, _ QueryOpts) {
|
||||
func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) {
|
||||
t.Helper()
|
||||
|
||||
data := []byte(strings.Join(records, "\n"))
|
||||
app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", data, http.StatusNoContent)
|
||||
app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", data, getExpectedResponse(opts.ExpectedResponseCode, http.StatusNoContent))
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Export is a test helper function that performs the export of
|
||||
@@ -129,7 +134,7 @@ func (app *Vmsingle) PrometheusAPIV1Export(t *testing.T, query string, opts Quer
|
||||
values.Add("match[]", query)
|
||||
values.Add("format", "promapi")
|
||||
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
@@ -143,7 +148,7 @@ func (app *Vmsingle) PrometheusAPIV1Query(t *testing.T, query string, opts Query
|
||||
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1QueryURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1QueryURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
@@ -158,10 +163,35 @@ func (app *Vmsingle) PrometheusAPIV1QueryRange(t *testing.T, query string, opts
|
||||
values := opts.asURLValues()
|
||||
values.Add("query", query)
|
||||
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1QueryRangeURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1QueryRangeURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1QueryResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Labels sends a query to a /prometheus/api/v1/labels endpoint
|
||||
// and returns the list of label names.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1labels
|
||||
func (app *Vmsingle) PrometheusAPIV1Labels(t *testing.T, opts QueryOpts) *PrometheusAPIV1LabelsResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1LabelsURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1LabelsResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1LabelValues sends a query to a /prometheus/api/v1/label/{label}/values endpoint
|
||||
// and returns the list of label values for the specified label name.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/url-examples/#apiv1labellabelvalues
|
||||
func (app *Vmsingle) PrometheusAPIV1LabelValues(t *testing.T, labelName string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse {
|
||||
t.Helper()
|
||||
|
||||
values := opts.asURLValues()
|
||||
url := strings.Replace(app.prometheusAPIV1LabelValuesURL, "{}", labelName, 1)
|
||||
res := app.cli.PostForm(t, url, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1LabelValuesResponse(t, res)
|
||||
}
|
||||
|
||||
// PrometheusAPIV1Series sends a query to a /prometheus/api/v1/series endpoint
|
||||
// and returns the list of time series that match the query.
|
||||
//
|
||||
@@ -172,7 +202,7 @@ func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
|
||||
values := opts.asURLValues()
|
||||
values.Add("match[]", matchQuery)
|
||||
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1SeriesURL, values, http.StatusOK)
|
||||
res := app.cli.PostForm(t, app.prometheusAPIV1SeriesURL, values, getExpectedResponse(opts.ExpectedResponseCode, http.StatusOK))
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user