mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-18 17:26:31 +03:00
Compare commits
3 Commits
search-lim
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a1ceccee4 | ||
|
|
48a3eb0215 | ||
|
|
200c03416f |
@@ -169,6 +169,18 @@ func Init() {
|
||||
if len(*remoteWriteURLs) == 0 {
|
||||
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
|
||||
}
|
||||
if *shardByURL && len(*disableOnDiskQueue) > 1 {
|
||||
disableOnDiskQueues := *disableOnDiskQueue
|
||||
|
||||
firstValue := disableOnDiskQueues[0]
|
||||
for _, v := range disableOnDiskQueues[1:] {
|
||||
if firstValue != v {
|
||||
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
|
||||
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if limit := getMaxHourlySeries(); limit > 0 {
|
||||
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
|
||||
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
|
||||
@@ -501,7 +513,9 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
|
||||
//
|
||||
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
|
||||
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
|
||||
if !disableOnDiskQueueAny {
|
||||
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
|
||||
if !disableOnDiskQueueAny || *shardByURL {
|
||||
return rwctxsGlobal, true
|
||||
}
|
||||
|
||||
@@ -516,12 +530,6 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
|
||||
return nil, false
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
if *shardByURL {
|
||||
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
|
||||
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
|
||||
// data to all rwctxs evenly.
|
||||
rowsCount = rowsCount / len(rwctxsGlobal)
|
||||
}
|
||||
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
|
||||
}
|
||||
eu, err := url.Parse(externalURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to parse external URL: %w", err)
|
||||
logger.Fatalf("failed to parse external URL: %s", err)
|
||||
}
|
||||
if err := templates.Load([]string{}, *eu); err != nil {
|
||||
logger.Fatalf("failed to load template: %v", err)
|
||||
|
||||
@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
|
||||
}
|
||||
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
|
||||
for _, err := range errors {
|
||||
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
|
||||
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
|
||||
}
|
||||
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
|
||||
}
|
||||
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
|
||||
for addr, metadata := range targetMts {
|
||||
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
|
||||
if err != nil {
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
|
||||
continue
|
||||
}
|
||||
updatedTargets = append(updatedTargets, Target{
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
@@ -14,13 +15,14 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||
|
||||
// MetricsFindHandler implements /metrics/find handler.
|
||||
//
|
||||
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
|
||||
@@ -220,11 +222,10 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
|
||||
|
||||
// metricsFind searches for label values that match the given qHead and qTail.
|
||||
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
|
||||
maxSuffixes := limits.MaxTagValueSuffixes(0)
|
||||
n := strings.IndexAny(qTail, "*{[")
|
||||
if n < 0 {
|
||||
query := qHead + qTail
|
||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
|
||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -244,7 +245,7 @@ func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byt
|
||||
}
|
||||
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
|
||||
query := qHead + qTail[:len(qTail)-1]
|
||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
|
||||
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -21,11 +21,11 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
@@ -36,6 +36,12 @@ var (
|
||||
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
|
||||
|
||||
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
||||
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
|
||||
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
|
||||
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
|
||||
"limit is reached; see also -search.maxQueryDuration")
|
||||
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
|
||||
"See also -search.logQueryMemoryUsage")
|
||||
@@ -44,14 +50,23 @@ var (
|
||||
|
||||
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
||||
|
||||
func getDefaultMaxConcurrentRequests() int {
|
||||
// A single request can saturate all the CPU cores, so there is no sense
|
||||
// in allowing higher number of concurrent requests - they will just contend
|
||||
// for unavailable CPU time.
|
||||
n := min(cgroup.AvailableCPUs()*2, 16)
|
||||
return n
|
||||
}
|
||||
|
||||
// Init initializes vmselect
|
||||
func Init() {
|
||||
tmpDirPath := *vmstorage.DataPath + "/tmp"
|
||||
fs.MustRemoveDirContents(tmpDirPath)
|
||||
netstorage.InitTmpBlocksDir(tmpDirPath)
|
||||
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
|
||||
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
|
||||
|
||||
concurrencyLimitCh = make(chan struct{}, limits.MaxConcurrentRequests())
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
|
||||
initVMUIConfig()
|
||||
initVMAlertProxy()
|
||||
}
|
||||
@@ -74,7 +89,7 @@ var (
|
||||
return float64(len(concurrencyLimitCh))
|
||||
})
|
||||
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
|
||||
return float64(limits.MaxUniqueTimeseries())
|
||||
return float64(prometheus.GetMaxUniqueTimeSeries())
|
||||
})
|
||||
)
|
||||
|
||||
@@ -114,12 +129,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
default:
|
||||
// Sleep for a while until giving up. This should resolve short bursts in requests.
|
||||
concurrencyLimitReached.Inc()
|
||||
d := min(searchutil.GetMaxQueryDuration(r), limits.MaxQueueDuration())
|
||||
d := min(searchutil.GetMaxQueryDuration(r), *maxQueueDuration)
|
||||
t := timerpool.Get(d)
|
||||
select {
|
||||
case concurrencyLimitCh <- struct{}{}:
|
||||
timerpool.Put(t)
|
||||
qt.Printf("wait in queue because -%s=%d concurrent requests are executed", limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests())
|
||||
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
|
||||
defer func() { <-concurrencyLimitCh }()
|
||||
case <-r.Context().Done():
|
||||
timerpool.Put(t)
|
||||
@@ -132,11 +147,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
timerpool.Put(t)
|
||||
concurrencyLimitTimeout.Inc()
|
||||
err := &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
|
||||
"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
|
||||
"to increase -%s=%d; to increase -%s",
|
||||
d.Seconds(), limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests(),
|
||||
limits.MaxQueueDurationFlagName(), limits.MaxQueueDuration(), limits.MaxConcurrentRequestsFlagName()),
|
||||
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
|
||||
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
|
||||
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
|
||||
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
|
||||
StatusCode: http.StatusTooManyRequests,
|
||||
}
|
||||
w.Header().Add("Retry-After", "10")
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
@@ -28,6 +27,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned from /api/v1/labels . "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values . "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
|
||||
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
|
||||
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
|
||||
@@ -770,8 +773,9 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
|
||||
if deadline.Exceeded() {
|
||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
|
||||
maxLabelNames = limits.MaxLabelNames(maxLabelNames)
|
||||
if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
|
||||
maxLabelNames = *maxTagKeysPerSearch
|
||||
}
|
||||
tr := sq.GetTimeRange()
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
@@ -837,7 +841,9 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
|
||||
if deadline.Exceeded() {
|
||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
maxLabelValues = limits.MaxLabelValues(maxLabelValues)
|
||||
if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
|
||||
maxLabelValues = *maxTagValuesPerSearch
|
||||
}
|
||||
tr := sq.GetTimeRange()
|
||||
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
|
||||
if err != nil {
|
||||
|
||||
@@ -28,7 +28,8 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
@@ -49,6 +50,9 @@ var (
|
||||
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
|
||||
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
|
||||
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
|
||||
|
||||
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. "+
|
||||
"When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).")
|
||||
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
|
||||
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
|
||||
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
|
||||
@@ -849,7 +853,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
|
||||
End: start,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: limits.MaxUniqueTimeseries(),
|
||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
@@ -960,7 +964,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: limits.MaxUniqueTimeseries(),
|
||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
@@ -1296,6 +1300,43 @@ func (sw *scalableWriter) flush() error {
|
||||
return sw.bw.Flush()
|
||||
}
|
||||
|
||||
var (
|
||||
maxUniqueTimeseriesValueOnce sync.Once
|
||||
maxUniqueTimeseriesValue int
|
||||
)
|
||||
|
||||
// InitMaxUniqueTimeseries init the max metrics limit calculated by available resources.
|
||||
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
|
||||
func InitMaxUniqueTimeseries(maxConcurrentRequests int) {
|
||||
maxUniqueTimeseriesValueOnce.Do(func() {
|
||||
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
||||
if maxUniqueTimeseriesValue <= 0 {
|
||||
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, memory.Remaining())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
|
||||
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
|
||||
if maxConcurrentRequests <= 0 {
|
||||
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
|
||||
// In such cases, fallback to unlimited.
|
||||
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
||||
return 2e9
|
||||
}
|
||||
|
||||
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
|
||||
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
|
||||
mts := remainingMemory / 200 / maxConcurrentRequests
|
||||
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
||||
return mts
|
||||
}
|
||||
|
||||
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
|
||||
func GetMaxUniqueTimeSeries() int {
|
||||
return maxUniqueTimeseriesValue
|
||||
}
|
||||
|
||||
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
|
||||
// it's not possible to use direct import due to increased binary size
|
||||
func unescapePrometheusLabelName(name string) string {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||
@@ -229,3 +230,29 @@ func TestGetLatencyOffsetMillisecondsFailure(t *testing.T) {
|
||||
}
|
||||
f("http://localhost?latency_offset=foobar")
|
||||
}
|
||||
|
||||
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
|
||||
f := func(maxConcurrentRequest, remainingMemory, expect int) {
|
||||
t.Helper()
|
||||
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
|
||||
if maxMetricsLimit != expect {
|
||||
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
|
||||
}
|
||||
}
|
||||
|
||||
// Skip when GOARCH=386
|
||||
if runtime.GOARCH != "386" {
|
||||
// 8 CPU & 32 GiB
|
||||
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
|
||||
// 4 CPU & 32 GiB
|
||||
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
|
||||
}
|
||||
|
||||
// 2 CPU & 4 GiB
|
||||
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
|
||||
|
||||
// other edge cases
|
||||
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
|
||||
f(4, 0, 0)
|
||||
|
||||
}
|
||||
|
||||
@@ -3109,7 +3109,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3406,7 +3406,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3110,7 +3110,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3407,7 +3407,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2946,7 +2946,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2324,7 +2324,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2945,7 +2945,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2323,7 +2323,6 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -37,6 +37,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10958](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10958)
|
||||
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
|
||||
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)
|
||||
|
||||
|
||||
@@ -1,141 +0,0 @@
|
||||
package limits
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
|
||||
var (
|
||||
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
|
||||
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically "+
|
||||
"calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). "+
|
||||
"See also -search.max* command-line flags at vmselect")
|
||||
maxLabelNames = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxLabelValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", defaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
|
||||
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
|
||||
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
|
||||
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
|
||||
"when -search.maxConcurrentRequests limit is reached")
|
||||
)
|
||||
|
||||
func defaultMaxConcurrentRequests() int {
|
||||
// A single request can saturate all the CPU cores, so there is no sense
|
||||
// in allowing higher number of concurrent requests - they will just contend
|
||||
// for unavailable CPU time.
|
||||
n := min(cgroup.AvailableCPUs()*2, 16)
|
||||
return n
|
||||
}
|
||||
|
||||
// MaxConcurrentRequests returns the maximum number of concurrent requests
|
||||
// a server can process.
|
||||
//
|
||||
// The remaining requests wait for up to MaxQueueDuration for their execution.
|
||||
func MaxConcurrentRequests() int {
|
||||
return *maxConcurrentRequests
|
||||
}
|
||||
|
||||
// MaxConcurrentRequestsFlagName returns the name of the flag used for
|
||||
// configuring max number of concurrent search requests.
|
||||
func MaxConcurrentRequestsFlagName() string {
|
||||
return "search.maxConcurrentRequests"
|
||||
}
|
||||
|
||||
// MaxQueueDuration returns the maximum duration to wait if
|
||||
// MaxConcurrentRequests are executed.
|
||||
func MaxQueueDuration() time.Duration {
|
||||
return *maxQueueDuration
|
||||
}
|
||||
|
||||
// MaxQueueDurationFlagName returns the name of the flag used for configuring
|
||||
// the max time duration during which a search request may remain in queue.
|
||||
func MaxQueueDurationFlagName() string {
|
||||
return "search.maxQueueDuration"
|
||||
}
|
||||
|
||||
// MaxMetrics calculates the max number of metric names a single query is
|
||||
// allowed to return based on the limit from the search query and
|
||||
// -search.maxUniqueTimeseries flag value.
|
||||
func MaxMetrics(searchQueryLimit int) int {
|
||||
if searchQueryLimit <= 0 {
|
||||
return MaxUniqueTimeseries()
|
||||
}
|
||||
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
|
||||
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
|
||||
searchQueryLimit = *maxUniqueTimeseries
|
||||
}
|
||||
return searchQueryLimit
|
||||
}
|
||||
|
||||
// MaxLabelNames calculates the max number of label names a single query is
|
||||
// allowed to return based on the limit from the search query and
|
||||
// -search.maxTagKeys flag value.
|
||||
func MaxLabelNames(searchQueryLimit int) int {
|
||||
return calculateLimit(searchQueryLimit, *maxLabelNames)
|
||||
}
|
||||
|
||||
// MaxLabelValues calculates the max number of label values a single query is
|
||||
// allowed to return based on the limit from the search query and
|
||||
// -search.maxTagValues flag value.
|
||||
func MaxLabelValues(searchQueryLimit int) int {
|
||||
return calculateLimit(searchQueryLimit, *maxLabelValues)
|
||||
}
|
||||
|
||||
// MaxTagValueSuffixes calculates the max number of tag value suffixes a single
|
||||
// query is allowed to return based on the limit from the search query and
|
||||
// -search.maxTagValueSuffixesPerSearch flag value.
|
||||
func MaxTagValueSuffixes(searchQueryLimit int) int {
|
||||
return calculateLimit(searchQueryLimit, *maxTagValueSuffixes)
|
||||
}
|
||||
|
||||
func calculateLimit(searchQueryLimit, flagValue int) int {
|
||||
if 0 < searchQueryLimit && searchQueryLimit < flagValue {
|
||||
return searchQueryLimit
|
||||
}
|
||||
return flagValue
|
||||
}
|
||||
|
||||
var (
|
||||
maxUniqueTimeseriesValue int
|
||||
maxUniqueTimeseriesValueOnce sync.Once
|
||||
)
|
||||
|
||||
// MaxUniqueTimeseries returns `-search.maxUniqueTimeseries` or the
|
||||
// auto-calculated value based on available resources.
|
||||
func MaxUniqueTimeseries() int {
|
||||
maxUniqueTimeseriesValueOnce.Do(func() {
|
||||
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
||||
if maxUniqueTimeseriesValue <= 0 {
|
||||
maxUniqueTimeseriesValue = calculateMaxUniqueTimeseries(*maxConcurrentRequests, memory.Remaining())
|
||||
}
|
||||
})
|
||||
return maxUniqueTimeseriesValue
|
||||
}
|
||||
|
||||
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries limit based
|
||||
// on available resources.
|
||||
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
|
||||
if maxConcurrentRequests <= 0 {
|
||||
// This line should NOT be reached unless the user has set an incorrect
|
||||
// `-search.maxConcurrentRequests`. In such cases, fallback to
|
||||
// unlimited.
|
||||
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
||||
return 2e9
|
||||
}
|
||||
|
||||
// Calculate the maxUniqueTimeseries limit for a single request in the
|
||||
// worst-case concurrent scenario. The approximate size of 1 unique series
|
||||
// that could occupy in vmstorage is 200 bytes.
|
||||
mts := remainingMemory / 200 / maxConcurrentRequests
|
||||
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. "+
|
||||
"To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
||||
return mts
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
package limits
|
||||
|
||||
import (
|
||||
"math"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCalculateMaxUniqueTimeseries(t *testing.T) {
|
||||
f := func(maxConcurrentRequests, remainingMemory, want int) {
|
||||
t.Helper()
|
||||
got := calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory)
|
||||
if got != want {
|
||||
t.Fatalf("unexpected maxUniqueTimeseries: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// Skip when GOARCH=386
|
||||
if runtime.GOARCH != "386" {
|
||||
// 8 CPU & 32 GiB
|
||||
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
|
||||
// 4 CPU & 32 GiB
|
||||
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
|
||||
}
|
||||
|
||||
// 2 CPU & 4 GiB
|
||||
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
|
||||
|
||||
// other edge cases
|
||||
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
|
||||
f(4, 0, 0)
|
||||
|
||||
}
|
||||
|
||||
func TestMaxMetrics(t *testing.T) {
|
||||
originalMaxUniqueTimeseries := *maxUniqueTimeseries
|
||||
defer func() {
|
||||
*maxUniqueTimeseries = originalMaxUniqueTimeseries
|
||||
}()
|
||||
f := func(searchQueryLimit, flagLimit, want int) {
|
||||
t.Helper()
|
||||
*maxUniqueTimeseries = flagLimit
|
||||
got := MaxMetrics(searchQueryLimit)
|
||||
if got != want {
|
||||
t.Fatalf("unexpected maxMetrics: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
f(0, 1e6, 1e6)
|
||||
f(2e6, 0, 2e6)
|
||||
f(2e6, 1e6, 1e6)
|
||||
}
|
||||
@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
|
||||
srcDir := tb.path
|
||||
srcDir, err = filepath.Abs(srcDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
|
||||
}
|
||||
dstDir, err = filepath.Abs(dstDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
|
||||
}
|
||||
prefix := srcDir + string(filepath.Separator)
|
||||
if strings.HasPrefix(dstDir, prefix) {
|
||||
|
||||
@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
|
||||
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
|
||||
ok, err := tf.match(b)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
|
||||
}
|
||||
if !ok {
|
||||
delete(lvs, lv)
|
||||
|
||||
@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
|
||||
ts.Seek(prefixBuf.B)
|
||||
if !ts.NextItem() {
|
||||
if err := ts.Error(); err != nil {
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
|
||||
}
|
||||
defer func() {
|
||||
if err := zr.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
|
||||
|
||||
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
|
||||
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
}
|
||||
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user