Compare commits

..

3 Commits

Author SHA1 Message Date
Max Kotliar
4a1ceccee4 app/vmagent: fix sharding correctness when disableOnDiskQueue is set (#10947)
When -remoteWrite.shardByURL is enabled, and one of the remote write
targets has -remoteWrite.disableOnDiskQueue set becomes blocked, samples
could be rerouted to other shards (see `getEligibleRemoteWriteCtxs` impl), breaking the sharding guarantee. Fix this by always using `rwctxsGlobal` in sharding mode.

Add a startup check that requires `-remoteWrite.disableOnDiskQueue` to be
configured uniformly across all targets when -remoteWrite.shardByURL` is enabled.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10947
2026-05-18 14:46:24 +03:00
Max Kotliar
48a3eb0215 dashboards: zoom in the "CPU spent on GC" panel. (#10955)
The "CPU spent on GC" panel often shows sub-1% GC CPU usage, which is
barely visible with the current fixed Y-axis settings. Removing the
fixed scale allows Grafana to auto-adjust the Y axis and makes small
fluctuations easier to observe.

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10955

Before:
<img width="1499" height="477" alt="Screenshot 2026-05-14 at 18 35 14"
src="https://github.com/user-attachments/assets/8a78bea4-3dcd-4f26-a40e-304acbf68eb1"
/>

After:
<img width="1508" height="501" alt="Screenshot 2026-05-14 at 18 34 51"
src="https://github.com/user-attachments/assets/aefd2fbc-6b1c-42a9-b45d-e44bcd30be48"
/>
2026-05-18 14:39:13 +03:00
Immanuel Tikhonov
200c03416f chore: use %s instead of %w for error formatting in logger calls
Previously, errors in app/vmalert-tool and lib packages used the %w verb
in logger.Errorf calls, which is intended for wrapping errors via fmt.Errorf.
Using %w with the logger package does not wrap the error — instead, it prints
a malformed %!w(...) placeholder rather than the actual error message.

This commit replaces all affected occurrences of %w with %s to correctly
format and display errors.

Related PR: https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10962
2026-05-18 13:13:51 +02:00
24 changed files with 136 additions and 238 deletions

View File

@@ -169,6 +169,18 @@ func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
}
if *shardByURL && len(*disableOnDiskQueue) > 1 {
disableOnDiskQueues := *disableOnDiskQueue
firstValue := disableOnDiskQueues[0]
for _, v := range disableOnDiskQueues[1:] {
if firstValue != v {
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
}
}
}
if limit := getMaxHourlySeries(); limit > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
@@ -501,7 +513,9 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
//
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
if !disableOnDiskQueueAny {
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
if !disableOnDiskQueueAny || *shardByURL {
return rwctxsGlobal, true
}
@@ -516,12 +530,6 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
return nil, false
}
rowsCount := getRowsCount(tss)
if *shardByURL {
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
// data to all rwctxs evenly.
rowsCount = rowsCount / len(rwctxsGlobal)
}
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
}
}

View File

@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
}
eu, err := url.Parse(externalURL)
if err != nil {
logger.Fatalf("failed to parse external URL: %w", err)
logger.Fatalf("failed to parse external URL: %s", err)
}
if err := templates.Load([]string{}, *eu); err != nil {
logger.Fatalf("failed to load template: %v", err)

View File

@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
}
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
for _, err := range errors {
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
}
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
}
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
for addr, metadata := range targetMts {
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
if err != nil {
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
continue
}
updatedTargets = append(updatedTargets, Target{

View File

@@ -1,6 +1,7 @@
package graphite
import (
"flag"
"fmt"
"math"
"net/http"
@@ -14,13 +15,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
// MetricsFindHandler implements /metrics/find handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
@@ -220,11 +222,10 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
// metricsFind searches for label values that match the given qHead and qTail.
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
maxSuffixes := limits.MaxTagValueSuffixes(0)
n := strings.IndexAny(qTail, "*{[")
if n < 0 {
query := qHead + qTail
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
if err != nil {
return nil, err
}
@@ -244,7 +245,7 @@ func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byt
}
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
query := qHead + qTail[:len(qTail)-1]
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
if err != nil {
return nil, err
}

View File

@@ -21,11 +21,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
@@ -36,6 +36,12 @@ var (
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
"See also -search.logQueryMemoryUsage")
@@ -44,14 +50,23 @@ var (
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
func getDefaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
// Init initializes vmselect
func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
fs.MustRemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
concurrencyLimitCh = make(chan struct{}, limits.MaxConcurrentRequests())
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
initVMUIConfig()
initVMAlertProxy()
}
@@ -74,7 +89,7 @@ var (
return float64(len(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
return float64(limits.MaxUniqueTimeseries())
return float64(prometheus.GetMaxUniqueTimeSeries())
})
)
@@ -114,12 +129,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
d := min(searchutil.GetMaxQueryDuration(r), limits.MaxQueueDuration())
d := min(searchutil.GetMaxQueryDuration(r), *maxQueueDuration)
t := timerpool.Get(d)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
qt.Printf("wait in queue because -%s=%d concurrent requests are executed", limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests())
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
defer func() { <-concurrencyLimitCh }()
case <-r.Context().Done():
timerpool.Put(t)
@@ -132,11 +147,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
"to increase -%s=%d; to increase -%s",
d.Seconds(), limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests(),
limits.MaxQueueDurationFlagName(), limits.MaxQueueDuration(), limits.MaxConcurrentRequestsFlagName()),
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
StatusCode: http.StatusTooManyRequests,
}
w.Header().Add("Retry-After", "10")

View File

@@ -20,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
@@ -28,6 +27,10 @@ import (
)
var (
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned from /api/v1/labels . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
@@ -770,8 +773,9 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
maxLabelNames = limits.MaxLabelNames(maxLabelNames)
if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
maxLabelNames = *maxTagKeysPerSearch
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
@@ -837,7 +841,9 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
maxLabelValues = limits.MaxLabelValues(maxLabelValues)
if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
maxLabelValues = *maxTagValuesPerSearch
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {

View File

@@ -28,7 +28,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -49,6 +50,9 @@ var (
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. "+
"When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).")
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
@@ -849,7 +853,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
End: start,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: limits.MaxUniqueTimeseries(),
MaxSeries: GetMaxUniqueTimeSeries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -960,7 +964,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: limits.MaxUniqueTimeseries(),
MaxSeries: GetMaxUniqueTimeSeries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -1296,6 +1300,43 @@ func (sw *scalableWriter) flush() error {
return sw.bw.Flush()
}
var (
maxUniqueTimeseriesValueOnce sync.Once
maxUniqueTimeseriesValue int
)
// InitMaxUniqueTimeseries init the max metrics limit calculated by available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func InitMaxUniqueTimeseries(maxConcurrentRequests int) {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, memory.Remaining())
}
})
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
func GetMaxUniqueTimeSeries() int {
return maxUniqueTimeseriesValue
}
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
// it's not possible to use direct import due to increased binary size
func unescapePrometheusLabelName(name string) string {

View File

@@ -4,6 +4,7 @@ import (
"math"
"net/http"
"reflect"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
@@ -229,3 +230,29 @@ func TestGetLatencyOffsetMillisecondsFailure(t *testing.T) {
}
f("http://localhost?latency_offset=foobar")
}
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}

View File

@@ -3109,7 +3109,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3406,7 +3406,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3110,7 +3110,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3407,7 +3407,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2946,7 +2946,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2324,7 +2324,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2945,7 +2945,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2323,7 +2323,6 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -37,6 +37,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10958](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10958)
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -1,141 +0,0 @@
package limits
import (
"flag"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
var (
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically "+
"calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). "+
"See also -search.max* command-line flags at vmselect")
maxLabelNames = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxLabelValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", defaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
"when -search.maxConcurrentRequests limit is reached")
)
func defaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
// MaxConcurrentRequests returns the maximum number of concurrent requests
// a server can process.
//
// The remaining requests wait for up to MaxQueueDuration for their execution.
func MaxConcurrentRequests() int {
return *maxConcurrentRequests
}
// MaxConcurrentRequestsFlagName returns the name of the flag used for
// configuring max number of concurrent search requests.
func MaxConcurrentRequestsFlagName() string {
return "search.maxConcurrentRequests"
}
// MaxQueueDuration returns the maximum duration to wait if
// MaxConcurrentRequests are executed.
func MaxQueueDuration() time.Duration {
return *maxQueueDuration
}
// MaxQueueDurationFlagName returns the name of the flag used for configuring
// the max time duration during which a search request may remain in queue.
func MaxQueueDurationFlagName() string {
return "search.maxQueueDuration"
}
// MaxMetrics calculates the max number of metric names a single query is
// allowed to return based on the limit from the search query and
// -search.maxUniqueTimeseries flag value.
func MaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return MaxUniqueTimeseries()
}
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
searchQueryLimit = *maxUniqueTimeseries
}
return searchQueryLimit
}
// MaxLabelNames calculates the max number of label names a single query is
// allowed to return based on the limit from the search query and
// -search.maxTagKeys flag value.
func MaxLabelNames(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxLabelNames)
}
// MaxLabelValues calculates the max number of label values a single query is
// allowed to return based on the limit from the search query and
// -search.maxTagValues flag value.
func MaxLabelValues(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxLabelValues)
}
// MaxTagValueSuffixes calculates the max number of tag value suffixes a single
// query is allowed to return based on the limit from the search query and
// -search.maxTagValueSuffixesPerSearch flag value.
func MaxTagValueSuffixes(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxTagValueSuffixes)
}
func calculateLimit(searchQueryLimit, flagValue int) int {
if 0 < searchQueryLimit && searchQueryLimit < flagValue {
return searchQueryLimit
}
return flagValue
}
var (
maxUniqueTimeseriesValue int
maxUniqueTimeseriesValueOnce sync.Once
)
// MaxUniqueTimeseries returns `-search.maxUniqueTimeseries` or the
// auto-calculated value based on available resources.
func MaxUniqueTimeseries() int {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeseries(*maxConcurrentRequests, memory.Remaining())
}
})
return maxUniqueTimeseriesValue
}
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries limit based
// on available resources.
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect
// `-search.maxConcurrentRequests`. In such cases, fallback to
// unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the maxUniqueTimeseries limit for a single request in the
// worst-case concurrent scenario. The approximate size of 1 unique series
// that could occupy in vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. "+
"To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}

View File

@@ -1,52 +0,0 @@
package limits
import (
"math"
"runtime"
"testing"
)
func TestCalculateMaxUniqueTimeseries(t *testing.T) {
f := func(maxConcurrentRequests, remainingMemory, want int) {
t.Helper()
got := calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory)
if got != want {
t.Fatalf("unexpected maxUniqueTimeseries: got %d, want %d", got, want)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}
func TestMaxMetrics(t *testing.T) {
originalMaxUniqueTimeseries := *maxUniqueTimeseries
defer func() {
*maxUniqueTimeseries = originalMaxUniqueTimeseries
}()
f := func(searchQueryLimit, flagLimit, want int) {
t.Helper()
*maxUniqueTimeseries = flagLimit
got := MaxMetrics(searchQueryLimit)
if got != want {
t.Fatalf("unexpected maxMetrics: got %d, want %d", got, want)
}
}
f(0, 1e6, 1e6)
f(2e6, 0, 2e6)
f(2e6, 1e6, 1e6)
}

View File

@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
srcDir := tb.path
srcDir, err = filepath.Abs(srcDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
}
dstDir, err = filepath.Abs(dstDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
}
prefix := srcDir + string(filepath.Separator)
if strings.HasPrefix(dstDir, prefix) {

View File

@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
ok, err := tf.match(b)
if err != nil {
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
}
if !ok {
delete(lvs, lv)

View File

@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
ts.Seek(prefixBuf.B)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
}
return false
}

View File

@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
}
defer func() {
if err := zr.Close(); err != nil {
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
}
}()

View File

@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
}
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())