mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-28 04:57:20 +03:00
Compare commits
1 Commits
dependabot
...
search-lim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
263c236d52 |
@@ -1,7 +1,6 @@
|
|||||||
package graphite
|
package graphite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -15,14 +14,13 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
"github.com/VictoriaMetrics/metricsql"
|
||||||
)
|
)
|
||||||
|
|
||||||
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
|
||||||
|
|
||||||
// MetricsFindHandler implements /metrics/find handler.
|
// MetricsFindHandler implements /metrics/find handler.
|
||||||
//
|
//
|
||||||
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
|
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
|
||||||
@@ -222,10 +220,11 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
|
|||||||
|
|
||||||
// metricsFind searches for label values that match the given qHead and qTail.
|
// metricsFind searches for label values that match the given qHead and qTail.
|
||||||
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
|
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
|
||||||
|
maxSuffixes := limits.MaxTagValueSuffixes(0)
|
||||||
n := strings.IndexAny(qTail, "*{[")
|
n := strings.IndexAny(qTail, "*{[")
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
query := qHead + qTail
|
query := qHead + qTail
|
||||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
|
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -245,7 +244,7 @@ func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byt
|
|||||||
}
|
}
|
||||||
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
|
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
|
||||||
query := qHead + qTail[:len(qTail)-1]
|
query := qHead + qTail[:len(qTail)-1]
|
||||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
|
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,11 +21,11 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
@@ -36,12 +36,6 @@ var (
|
|||||||
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||||
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
|
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
|
||||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
|
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
|
||||||
|
|
||||||
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
|
||||||
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
|
|
||||||
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
|
|
||||||
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
|
|
||||||
"limit is reached; see also -search.maxQueryDuration")
|
|
||||||
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||||
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
|
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
|
||||||
"See also -search.logQueryMemoryUsage")
|
"See also -search.logQueryMemoryUsage")
|
||||||
@@ -50,23 +44,14 @@ var (
|
|||||||
|
|
||||||
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
||||||
|
|
||||||
func getDefaultMaxConcurrentRequests() int {
|
|
||||||
// A single request can saturate all the CPU cores, so there is no sense
|
|
||||||
// in allowing higher number of concurrent requests - they will just contend
|
|
||||||
// for unavailable CPU time.
|
|
||||||
n := min(cgroup.AvailableCPUs()*2, 16)
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init initializes vmselect
|
// Init initializes vmselect
|
||||||
func Init() {
|
func Init() {
|
||||||
tmpDirPath := *vmstorage.DataPath + "/tmp"
|
tmpDirPath := *vmstorage.DataPath + "/tmp"
|
||||||
fs.MustRemoveDirContents(tmpDirPath)
|
fs.MustRemoveDirContents(tmpDirPath)
|
||||||
netstorage.InitTmpBlocksDir(tmpDirPath)
|
netstorage.InitTmpBlocksDir(tmpDirPath)
|
||||||
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
||||||
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
|
|
||||||
|
|
||||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
|
concurrencyLimitCh = make(chan struct{}, limits.MaxConcurrentRequests())
|
||||||
initVMUIConfig()
|
initVMUIConfig()
|
||||||
initVMAlertProxy()
|
initVMAlertProxy()
|
||||||
}
|
}
|
||||||
@@ -89,7 +74,7 @@ var (
|
|||||||
return float64(len(concurrencyLimitCh))
|
return float64(len(concurrencyLimitCh))
|
||||||
})
|
})
|
||||||
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
|
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
|
||||||
return float64(prometheus.GetMaxUniqueTimeSeries())
|
return float64(limits.MaxUniqueTimeseries())
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -129,12 +114,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||||||
default:
|
default:
|
||||||
// Sleep for a while until giving up. This should resolve short bursts in requests.
|
// Sleep for a while until giving up. This should resolve short bursts in requests.
|
||||||
concurrencyLimitReached.Inc()
|
concurrencyLimitReached.Inc()
|
||||||
d := min(searchutil.GetMaxQueryDuration(r), *maxQueueDuration)
|
d := min(searchutil.GetMaxQueryDuration(r), limits.MaxQueueDuration())
|
||||||
t := timerpool.Get(d)
|
t := timerpool.Get(d)
|
||||||
select {
|
select {
|
||||||
case concurrencyLimitCh <- struct{}{}:
|
case concurrencyLimitCh <- struct{}{}:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
|
qt.Printf("wait in queue because -%s=%d concurrent requests are executed", limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests())
|
||||||
defer func() { <-concurrencyLimitCh }()
|
defer func() { <-concurrencyLimitCh }()
|
||||||
case <-r.Context().Done():
|
case <-r.Context().Done():
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
@@ -147,10 +132,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
concurrencyLimitTimeout.Inc()
|
concurrencyLimitTimeout.Inc()
|
||||||
err := &httpserver.ErrorWithStatusCode{
|
err := &httpserver.ErrorWithStatusCode{
|
||||||
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
|
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
|
||||||
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
|
"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
|
||||||
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
|
"to increase -%s=%d; to increase -%s",
|
||||||
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
|
d.Seconds(), limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests(),
|
||||||
|
limits.MaxQueueDurationFlagName(), limits.MaxQueueDuration(), limits.MaxConcurrentRequestsFlagName()),
|
||||||
StatusCode: http.StatusTooManyRequests,
|
StatusCode: http.StatusTooManyRequests,
|
||||||
}
|
}
|
||||||
w.Header().Add("Retry-After", "10")
|
w.Header().Add("Retry-After", "10")
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||||
@@ -27,10 +28,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned from /api/v1/labels . "+
|
|
||||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
|
||||||
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values . "+
|
|
||||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
|
||||||
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
|
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
|
||||||
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
|
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
|
||||||
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
||||||
@@ -773,9 +770,8 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
|
|||||||
if deadline.Exceeded() {
|
if deadline.Exceeded() {
|
||||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
}
|
}
|
||||||
if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
|
|
||||||
maxLabelNames = *maxTagKeysPerSearch
|
maxLabelNames = limits.MaxLabelNames(maxLabelNames)
|
||||||
}
|
|
||||||
tr := sq.GetTimeRange()
|
tr := sq.GetTimeRange()
|
||||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -841,9 +837,7 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
|
|||||||
if deadline.Exceeded() {
|
if deadline.Exceeded() {
|
||||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
}
|
}
|
||||||
if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
|
maxLabelValues = limits.MaxLabelValues(maxLabelValues)
|
||||||
maxLabelValues = *maxTagValuesPerSearch
|
|
||||||
}
|
|
||||||
tr := sq.GetTimeRange()
|
tr := sq.GetTimeRange()
|
||||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -28,8 +28,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
@@ -50,9 +49,6 @@ var (
|
|||||||
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
|
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
|
||||||
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
|
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
|
||||||
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
|
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
|
||||||
|
|
||||||
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. "+
|
|
||||||
"When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).")
|
|
||||||
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
|
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
|
||||||
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
|
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
|
||||||
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
|
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
|
||||||
@@ -853,7 +849,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
|
|||||||
End: start,
|
End: start,
|
||||||
Step: step,
|
Step: step,
|
||||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
MaxSeries: limits.MaxUniqueTimeseries(),
|
||||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
MayCache: mayCache,
|
MayCache: mayCache,
|
||||||
@@ -964,7 +960,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
|||||||
End: end,
|
End: end,
|
||||||
Step: step,
|
Step: step,
|
||||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
MaxSeries: limits.MaxUniqueTimeseries(),
|
||||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
MayCache: mayCache,
|
MayCache: mayCache,
|
||||||
@@ -1300,43 +1296,6 @@ func (sw *scalableWriter) flush() error {
|
|||||||
return sw.bw.Flush()
|
return sw.bw.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
maxUniqueTimeseriesValueOnce sync.Once
|
|
||||||
maxUniqueTimeseriesValue int
|
|
||||||
)
|
|
||||||
|
|
||||||
// InitMaxUniqueTimeseries init the max metrics limit calculated by available resources.
|
|
||||||
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
|
|
||||||
func InitMaxUniqueTimeseries(maxConcurrentRequests int) {
|
|
||||||
maxUniqueTimeseriesValueOnce.Do(func() {
|
|
||||||
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
|
||||||
if maxUniqueTimeseriesValue <= 0 {
|
|
||||||
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, memory.Remaining())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
|
|
||||||
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
|
|
||||||
if maxConcurrentRequests <= 0 {
|
|
||||||
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
|
|
||||||
// In such cases, fallback to unlimited.
|
|
||||||
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
|
||||||
return 2e9
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
|
|
||||||
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
|
|
||||||
mts := remainingMemory / 200 / maxConcurrentRequests
|
|
||||||
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
|
||||||
return mts
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
|
|
||||||
func GetMaxUniqueTimeSeries() int {
|
|
||||||
return maxUniqueTimeseriesValue
|
|
||||||
}
|
|
||||||
|
|
||||||
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
|
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
|
||||||
// it's not possible to use direct import due to increased binary size
|
// it's not possible to use direct import due to increased binary size
|
||||||
func unescapePrometheusLabelName(name string) string {
|
func unescapePrometheusLabelName(name string) string {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||||
@@ -230,29 +229,3 @@ func TestGetLatencyOffsetMillisecondsFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f("http://localhost?latency_offset=foobar")
|
f("http://localhost?latency_offset=foobar")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
|
|
||||||
f := func(maxConcurrentRequest, remainingMemory, expect int) {
|
|
||||||
t.Helper()
|
|
||||||
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
|
|
||||||
if maxMetricsLimit != expect {
|
|
||||||
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip when GOARCH=386
|
|
||||||
if runtime.GOARCH != "386" {
|
|
||||||
// 8 CPU & 32 GiB
|
|
||||||
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
|
|
||||||
// 4 CPU & 32 GiB
|
|
||||||
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2 CPU & 4 GiB
|
|
||||||
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
|
|
||||||
|
|
||||||
// other edge cases
|
|
||||||
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
|
|
||||||
f(4, 0, 0)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|||||||
141
lib/limits/select.go
Normal file
141
lib/limits/select.go
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
package limits
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
|
||||||
|
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically "+
|
||||||
|
"calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). "+
|
||||||
|
"See also -search.max* command-line flags at vmselect")
|
||||||
|
maxLabelNames = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
|
||||||
|
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||||
|
maxLabelValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
|
||||||
|
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||||
|
maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||||
|
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", defaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
||||||
|
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
|
||||||
|
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
|
||||||
|
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
|
||||||
|
"when -search.maxConcurrentRequests limit is reached")
|
||||||
|
)
|
||||||
|
|
||||||
|
func defaultMaxConcurrentRequests() int {
|
||||||
|
// A single request can saturate all the CPU cores, so there is no sense
|
||||||
|
// in allowing higher number of concurrent requests - they will just contend
|
||||||
|
// for unavailable CPU time.
|
||||||
|
n := min(cgroup.AvailableCPUs()*2, 16)
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxConcurrentRequests returns the maximum number of concurrent requests
|
||||||
|
// a server can process.
|
||||||
|
//
|
||||||
|
// The remaining requests wait for up to MaxQueueDuration for their execution.
|
||||||
|
func MaxConcurrentRequests() int {
|
||||||
|
return *maxConcurrentRequests
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxConcurrentRequestsFlagName returns the name of the flag used for
|
||||||
|
// configuring max number of concurrent search requests.
|
||||||
|
func MaxConcurrentRequestsFlagName() string {
|
||||||
|
return "search.maxConcurrentRequests"
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxQueueDuration returns the maximum duration to wait if
|
||||||
|
// MaxConcurrentRequests are executed.
|
||||||
|
func MaxQueueDuration() time.Duration {
|
||||||
|
return *maxQueueDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxQueueDurationFlagName returns the name of the flag used for configuring
|
||||||
|
// the max time duration during which a search request may remain in queue.
|
||||||
|
func MaxQueueDurationFlagName() string {
|
||||||
|
return "search.maxQueueDuration"
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxMetrics calculates the max number of metric names a single query is
|
||||||
|
// allowed to return based on the limit from the search query and
|
||||||
|
// -search.maxUniqueTimeseries flag value.
|
||||||
|
func MaxMetrics(searchQueryLimit int) int {
|
||||||
|
if searchQueryLimit <= 0 {
|
||||||
|
return MaxUniqueTimeseries()
|
||||||
|
}
|
||||||
|
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
|
||||||
|
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
|
||||||
|
searchQueryLimit = *maxUniqueTimeseries
|
||||||
|
}
|
||||||
|
return searchQueryLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxLabelNames calculates the max number of label names a single query is
|
||||||
|
// allowed to return based on the limit from the search query and
|
||||||
|
// -search.maxTagKeys flag value.
|
||||||
|
func MaxLabelNames(searchQueryLimit int) int {
|
||||||
|
return calculateLimit(searchQueryLimit, *maxLabelNames)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxLabelValues calculates the max number of label values a single query is
|
||||||
|
// allowed to return based on the limit from the search query and
|
||||||
|
// -search.maxTagValues flag value.
|
||||||
|
func MaxLabelValues(searchQueryLimit int) int {
|
||||||
|
return calculateLimit(searchQueryLimit, *maxLabelValues)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxTagValueSuffixes calculates the max number of tag value suffixes a single
|
||||||
|
// query is allowed to return based on the limit from the search query and
|
||||||
|
// -search.maxTagValueSuffixesPerSearch flag value.
|
||||||
|
func MaxTagValueSuffixes(searchQueryLimit int) int {
|
||||||
|
return calculateLimit(searchQueryLimit, *maxTagValueSuffixes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateLimit(searchQueryLimit, flagValue int) int {
|
||||||
|
if 0 < searchQueryLimit && searchQueryLimit < flagValue {
|
||||||
|
return searchQueryLimit
|
||||||
|
}
|
||||||
|
return flagValue
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
maxUniqueTimeseriesValue int
|
||||||
|
maxUniqueTimeseriesValueOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
// MaxUniqueTimeseries returns `-search.maxUniqueTimeseries` or the
|
||||||
|
// auto-calculated value based on available resources.
|
||||||
|
func MaxUniqueTimeseries() int {
|
||||||
|
maxUniqueTimeseriesValueOnce.Do(func() {
|
||||||
|
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
||||||
|
if maxUniqueTimeseriesValue <= 0 {
|
||||||
|
maxUniqueTimeseriesValue = calculateMaxUniqueTimeseries(*maxConcurrentRequests, memory.Remaining())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return maxUniqueTimeseriesValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries limit based
|
||||||
|
// on available resources.
|
||||||
|
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
|
||||||
|
if maxConcurrentRequests <= 0 {
|
||||||
|
// This line should NOT be reached unless the user has set an incorrect
|
||||||
|
// `-search.maxConcurrentRequests`. In such cases, fallback to
|
||||||
|
// unlimited.
|
||||||
|
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
||||||
|
return 2e9
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the maxUniqueTimeseries limit for a single request in the
|
||||||
|
// worst-case concurrent scenario. The approximate size of 1 unique series
|
||||||
|
// that could occupy in vmstorage is 200 bytes.
|
||||||
|
mts := remainingMemory / 200 / maxConcurrentRequests
|
||||||
|
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. "+
|
||||||
|
"To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
||||||
|
return mts
|
||||||
|
}
|
||||||
52
lib/limits/select_test.go
Normal file
52
lib/limits/select_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package limits
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCalculateMaxUniqueTimeseries(t *testing.T) {
|
||||||
|
f := func(maxConcurrentRequests, remainingMemory, want int) {
|
||||||
|
t.Helper()
|
||||||
|
got := calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory)
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected maxUniqueTimeseries: got %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip when GOARCH=386
|
||||||
|
if runtime.GOARCH != "386" {
|
||||||
|
// 8 CPU & 32 GiB
|
||||||
|
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
|
||||||
|
// 4 CPU & 32 GiB
|
||||||
|
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2 CPU & 4 GiB
|
||||||
|
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
|
||||||
|
|
||||||
|
// other edge cases
|
||||||
|
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
|
||||||
|
f(4, 0, 0)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMaxMetrics(t *testing.T) {
|
||||||
|
originalMaxUniqueTimeseries := *maxUniqueTimeseries
|
||||||
|
defer func() {
|
||||||
|
*maxUniqueTimeseries = originalMaxUniqueTimeseries
|
||||||
|
}()
|
||||||
|
f := func(searchQueryLimit, flagLimit, want int) {
|
||||||
|
t.Helper()
|
||||||
|
*maxUniqueTimeseries = flagLimit
|
||||||
|
got := MaxMetrics(searchQueryLimit)
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected maxMetrics: got %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(0, 1e6, 1e6)
|
||||||
|
f(2e6, 0, 2e6)
|
||||||
|
f(2e6, 1e6, 1e6)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user