Compare commits

..

1 Commits

Author SHA1 Message Date
Artem Fetishev
263c236d52 extract search flags/limits into a separate lib for reusing
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-05-18 12:59:46 +02:00
44 changed files with 545 additions and 535 deletions

View File

@@ -63,11 +63,11 @@ jobs:
arch: amd64
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum

View File

@@ -9,7 +9,7 @@ jobs:
tip-lint:
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: 'actions/checkout@v6'
with:
# needed for proper diff
fetch-depth: 0

View File

@@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
fetch-depth: 0 # we need full history for commit verification

View File

@@ -15,11 +15,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@master
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
cache: false
@@ -27,7 +27,7 @@ jobs:
- run: go version
- name: Cache Go artifacts
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/go-build

View File

@@ -29,18 +29,18 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Set up Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache: false
go-version-file: 'go.mod'
- run: go version
- name: Cache Go artifacts
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/go-build
@@ -50,14 +50,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/init@v4.35.2
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/autobuild@v4.35.2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/analyze@v4.35.2
with:
category: 'language:go'

View File

@@ -16,19 +16,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
path: __vm
- name: Checkout private code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
repository: VictoriaMetrics/vmdocs
token: ${{ secrets.VM_BOT_GH_TOKEN }}
path: __vm-docs
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@2dc316deee8e90f13e1a351ab510b4d5bc0c82cd # v7.0.0
uses: crazy-max/ghaction-import-gpg@v7
id: import-gpg
with:
gpg_private_key: ${{ secrets.VM_BOT_GPG_PRIVATE_KEY }}

View File

@@ -32,11 +32,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum
@@ -47,7 +47,7 @@ jobs:
- run: go version
- name: Cache golangci-lint
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/golangci-lint
@@ -72,11 +72,11 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum
@@ -94,11 +94,11 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum

View File

@@ -32,11 +32,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Cache node_modules
id: cache
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: app/vmui/packages/vmui/node_modules
key: vmui-deps-${{ runner.os }}-${{ hashFiles('app/vmui/packages/vmui/package-lock.json', 'app/vmui/Dockerfile-build') }}
@@ -69,7 +69,7 @@ jobs:
VMUI_SKIP_INSTALL: true
- name: Annotate Code Linting Results
uses: ataylorme/eslint-annotate-action@d57a1193d4c59cbfbf3f86c271f42612f9dbd9e9 # 3.0.0
uses: ataylorme/eslint-annotate-action@v3
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
report-json: app/vmui/packages/vmui/vmui-lint-report.json

View File

@@ -2,7 +2,6 @@ package remotewrite
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@@ -311,6 +310,11 @@ func (c *client) runWorker() {
if !ok {
return
}
if len(block) == 0 {
// skip empty data blocks from sending
// see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241
continue
}
go func() {
startTime := time.Now()
ch <- c.sendBlock(block)
@@ -326,20 +330,15 @@ func (c *client) runWorker() {
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
case <-c.stopCh:
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
// If it succeeds, drain the remaining in-memory queue before returning.
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time.Second
select {
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} else {
c.drainInMemoryQueue(stopCtx, block[:0])
}
case <-stopCtx.Done():
case <-time.After(graceDuration):
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
@@ -509,32 +508,6 @@ again:
goto again
}
func (c *client) drainInMemoryQueue(stopCtx context.Context, block []byte) {
var ok bool
for {
select {
case <-stopCtx.Done():
return
default:
}
block, ok = c.fq.MustReadInMemoryBlock(block[:0])
if !ok {
// The in memory queue has already been drained,
// or persisted queue is being used.
// In this case it is guaranteed that fq will be empty
return
}
// at this stage c.stopCh should be closed
// so sendBlock function should not perform retries
if ok := c.sendBlock(block); !ok {
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
}
}
}
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second)

View File

@@ -53,9 +53,6 @@ func TestInitSecretFlags(t *testing.T) {
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to be secret")
}
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
t.Fatalf("expecting remoteWrite.proxyURL to be secret")
}
flagutil.UnregisterAllSecretFlags()
*showRemoteWriteURL = true
@@ -66,9 +63,6 @@ func TestInitSecretFlags(t *testing.T) {
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to remain secret")
}
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
t.Fatalf("expecting remoteWrite.proxyURL to remain secret")
}
}
func TestRepackBlockFromZstdToSnappy(t *testing.T) {

View File

@@ -151,8 +151,6 @@ func InitSecretFlags() {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil.RegisterSecretFlag("remoteWrite.url")
}
// remoteWrite.proxyURL can contain authentication codes.
flagutil.RegisterSecretFlag("remoteWrite.proxyURL")
// remoteWrite.headers can contain auth headers such as Authorization and API keys.
flagutil.RegisterSecretFlag("remoteWrite.headers")
}
@@ -171,18 +169,6 @@ func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
}
if *shardByURL && len(*disableOnDiskQueue) > 1 {
disableOnDiskQueues := *disableOnDiskQueue
firstValue := disableOnDiskQueues[0]
for _, v := range disableOnDiskQueues[1:] {
if firstValue != v {
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
}
}
}
if limit := getMaxHourlySeries(); limit > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
@@ -515,9 +501,7 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
//
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
if !disableOnDiskQueueAny || *shardByURL {
if !disableOnDiskQueueAny {
return rwctxsGlobal, true
}
@@ -532,6 +516,12 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
return nil, false
}
rowsCount := getRowsCount(tss)
if *shardByURL {
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
// data to all rwctxs evenly.
rowsCount = rowsCount / len(rwctxsGlobal)
}
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
}
}

View File

@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
}
eu, err := url.Parse(externalURL)
if err != nil {
logger.Fatalf("failed to parse external URL: %s", err)
logger.Fatalf("failed to parse external URL: %w", err)
}
if err := templates.Load([]string{}, *eu); err != nil {
logger.Fatalf("failed to load template: %v", err)

View File

@@ -64,7 +64,6 @@ func InitSecretFlags() {
if !*showDatasourceURL {
flagutil.RegisterSecretFlag("datasource.url")
}
flagutil.RegisterSecretFlag("datasource.headers")
}
// ShowDatasourceURL whether to show -datasource.url with sensitive information

View File

@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
}
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
for _, err := range errors {
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
}
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
}
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
for addr, metadata := range targetMts {
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
if err != nil {
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
continue
}
updatedTargets = append(updatedTargets, Target{

View File

@@ -194,7 +194,6 @@ func InitSecretFlags() {
if !*showNotifierURL {
flagutil.RegisterSecretFlag("notifier.url")
}
flagutil.RegisterSecretFlag("notifier.headers")
}
func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {

View File

@@ -59,7 +59,6 @@ func InitSecretFlags() {
if !*showRemoteReadURL {
flagutil.RegisterSecretFlag("remoteRead.url")
}
flagutil.RegisterSecretFlag("remoteRead.headers")
}
// Init creates a Querier from provided flag values.

View File

@@ -62,7 +62,6 @@ func InitSecretFlags() {
if !*showRemoteWriteURL {
flagutil.RegisterSecretFlag("remoteWrite.url")
}
flagutil.RegisterSecretFlag("remoteWrite.headers")
}
// Init creates Client object from given flags.

View File

@@ -1,7 +1,6 @@
package graphite
import (
"flag"
"fmt"
"math"
"net/http"
@@ -15,14 +14,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
// MetricsFindHandler implements /metrics/find handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
@@ -222,10 +220,11 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
// metricsFind searches for label values that match the given qHead and qTail.
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
maxSuffixes := limits.MaxTagValueSuffixes(0)
n := strings.IndexAny(qTail, "*{[")
if n < 0 {
query := qHead + qTail
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
@@ -245,7 +244,7 @@ func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byt
}
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
query := qHead + qTail[:len(qTail)-1]
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}

View File

@@ -21,11 +21,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
@@ -36,12 +36,6 @@ var (
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
"See also -search.logQueryMemoryUsage")
@@ -50,27 +44,16 @@ var (
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
func getDefaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
// Init initializes vmselect
func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
fs.MustRemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
concurrencyLimitCh = make(chan struct{}, limits.MaxConcurrentRequests())
initVMUIConfig()
initVMAlertProxy()
flagutil.RegisterSecretFlag("vmalert.proxyURL")
}
// Stop stops vmselect
@@ -91,7 +74,7 @@ var (
return float64(len(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
return float64(prometheus.GetMaxUniqueTimeSeries())
return float64(limits.MaxUniqueTimeseries())
})
)
@@ -131,12 +114,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
d := min(searchutil.GetMaxQueryDuration(r), *maxQueueDuration)
d := min(searchutil.GetMaxQueryDuration(r), limits.MaxQueueDuration())
t := timerpool.Get(d)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
qt.Printf("wait in queue because -%s=%d concurrent requests are executed", limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests())
defer func() { <-concurrencyLimitCh }()
case <-r.Context().Done():
timerpool.Put(t)
@@ -149,10 +132,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+
"are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+
"to increase -%s=%d; to increase -%s",
d.Seconds(), limits.MaxConcurrentRequestsFlagName(), limits.MaxConcurrentRequests(),
limits.MaxQueueDurationFlagName(), limits.MaxQueueDuration(), limits.MaxConcurrentRequestsFlagName()),
StatusCode: http.StatusTooManyRequests,
}
w.Header().Add("Retry-After", "10")

View File

@@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
@@ -27,10 +28,6 @@ import (
)
var (
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned from /api/v1/labels . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
@@ -773,9 +770,8 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
maxLabelNames = *maxTagKeysPerSearch
}
maxLabelNames = limits.MaxLabelNames(maxLabelNames)
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
@@ -841,9 +837,7 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
maxLabelValues = *maxTagValuesPerSearch
}
maxLabelValues = limits.MaxLabelValues(maxLabelValues)
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {

View File

@@ -28,8 +28,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/limits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -50,9 +49,6 @@ var (
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. "+
"When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).")
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
@@ -853,7 +849,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
End: start,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
MaxSeries: limits.MaxUniqueTimeseries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -964,7 +960,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
MaxSeries: limits.MaxUniqueTimeseries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -1300,43 +1296,6 @@ func (sw *scalableWriter) flush() error {
return sw.bw.Flush()
}
var (
maxUniqueTimeseriesValueOnce sync.Once
maxUniqueTimeseriesValue int
)
// InitMaxUniqueTimeseries init the max metrics limit calculated by available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func InitMaxUniqueTimeseries(maxConcurrentRequests int) {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, memory.Remaining())
}
})
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
func GetMaxUniqueTimeSeries() int {
return maxUniqueTimeseriesValue
}
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
// it's not possible to use direct import due to increased binary size
func unescapePrometheusLabelName(name string) string {

View File

@@ -4,7 +4,6 @@ import (
"math"
"net/http"
"reflect"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
@@ -230,29 +229,3 @@ func TestGetLatencyOffsetMillisecondsFailure(t *testing.T) {
}
f("http://localhost?latency_offset=foobar")
}
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}

View File

@@ -3109,6 +3109,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3406,6 +3406,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3110,6 +3110,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3407,6 +3407,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2946,6 +2946,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2324,6 +2324,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2945,6 +2945,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2323,6 +2323,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -566,7 +566,7 @@ The following optional command-line flags related to mTLS are supported:
- `-cluster.tlsCAFile` can be set at `vminsert`, `vmselect` and `vmstorage` for verifying peer certificates issued with custom [certificate authority](https://en.wikipedia.org/wiki/Certificate_authority). By default, system-wide certificate authority is used for peer certificate verification.
- `-cluster.tlsCipherSuites` can be set to the list of supported TLS cipher suites at `vmstorage`. See [the list of supported TLS cipher suites](https://pkg.go.dev/crypto/tls#pkg-constants).
When `vmselect` or `vminsert` runs with `-clusternativeListenAddr` command-line option, then it can be configured with `-clusternative.tls*` options similar to `-cluster.tls*` for accepting `mTLS` connections from top-level `vmselect` or `vminsert` nodes in [multi-level cluster setup](#multi-level-cluster-setup).
When `vmselect` runs with `-clusternativeListenAddr` command-line option, then it can be configured with `-clusternative.tls*` options similar to `-cluster.tls*` for accepting `mTLS` connections from top-level `vmselect` nodes in [multi-level cluster setup](#multi-level-cluster-setup).
See [these docs](https://gist.github.com/f41gh7/76ed8e5fb1ebb9737fe746bae9175ee6) on how to set up mTLS in VictoriaMetrics cluster.

View File

@@ -43,8 +43,8 @@ Just download VictoriaMetrics and follow [these instructions](https://docs.victo
See [available integrations](https://docs.victoriametrics.com/victoriametrics/integrations/) with other systems like
[Prometheus](https://docs.victoriametrics.com/victoriametrics/integrations/prometheus/) or [Grafana](https://docs.victoriametrics.com/victoriametrics/integrations/grafana/).
VictoriaMetrics is developed at a fast pace, so it is recommended to periodically check the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
and perform [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
VictoriaMetrics is developed at a fast pace, so it is recommended periodically checking the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
and performing [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
### Starting VictoriaMetrics Single Node or Cluster on VictoriaMetrics Cloud {id="starting-vm-on-cloud"}
@@ -63,7 +63,7 @@ docker run -it --rm -v `pwd`/victoria-metrics-data:/victoria-metrics-data -p 842
victoriametrics/victoria-metrics:v1.143.0 --selfScrapeInterval=5s -storageDataPath=victoria-metrics-data
```
_For Enterprise images, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
_For Enterprise images see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
You should see:
@@ -113,7 +113,7 @@ See more details about [cluster architecture](https://docs.victoriametrics.com/v
### Starting VictoriaMetrics Single Node from a Binary {id="starting-vm-single-from-a-binary"}
1. Download the correct binary for your OS and architecture from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
2. Extract the archive to /usr/local/bin by running:
@@ -164,7 +164,7 @@ END'
Extra [command-line flags](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#list-of-command-line-flags) can be added to `ExecStart` line.
If you want to deploy VictoriaMetrics Single Node as a Windows Service, review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
If you want to deploy VictoriaMetrics Single Node as a Windows Service review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
> Please note, `victoriametrics` service is listening on `:8428` for HTTP connections (see `-httpListenAddr` flag).
@@ -174,7 +174,7 @@ If you want to deploy VictoriaMetrics Single Node as a Windows Service, review t
sudo systemctl daemon-reload && sudo systemctl enable --now victoriametrics.service
```
7. Check that the service started successfully:
7. Check that service started successfully:
```sh
sudo systemctl status victoriametrics.service
@@ -187,12 +187,12 @@ by going to `http://<ip_or_hostname>:8428/vmui`.
VictoriaMetrics cluster consists of [3 components](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#architecture-overview).
It is recommended to run these components in the same private network (for [security reasons](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#security)),
but on separate physical nodes for the best performance.
but on the separate physical nodes for the best performance.
On all nodes, you will need to do the following:
On all nodes you will need to do the following:
1. Download the correct binary for your OS and architecture with `-cluster` suffix from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
2. Extract the archive to /usr/local/bin by running:
@@ -254,7 +254,7 @@ for vmstorage can be added to `ExecStart` line.
sudo systemctl daemon-reload && sudo systemctl enable --now vmstorage
```
4. Check that the service started successfully:
4. Check that service started successfully:
```sh
sudo systemctl status vmstorage
@@ -301,14 +301,14 @@ in one flag. See more details in `-storageNode` flag description in [vminsert fl
sudo systemctl daemon-reload && sudo systemctl enable --now vminsert.service
```
3. Check that the service started successfully:
3. Check that service started successfully:
```sh
sudo systemctl status vminsert.service
```
4. After `vminsert` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8480/-/healthy` link.
It should say "VictoriaMetrics is Healthy."
It should say "VictoriaMetrics is Healthy"
#### Installing vmselect
@@ -344,7 +344,7 @@ END'
```
Replace `<list of vmstorages>` with addresses of previously configured `vmstorage` services.
To specify multiple addresses, you can repeat the flag multiple times or separate addresses with commas
To specify multiple addresses you can repeat the flag multiple times, or separate addresses with commas
in one flag. See more details in `-storageNode` flag description [vminsert flags](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#list-of-command-line-flags-for-vminsert).
> Please note, `vmselect` service is listening on `:8481` for HTTP connections (see `-httpListenAddr` flag).
@@ -362,12 +362,12 @@ sudo systemctl status vmselect.service
```
5. After `vmselect` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8481/select/0/vmui` link.
It should open the [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
It should open [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
## Write data
There are two main models in monitoring for data collection: [push](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#push-model)
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring, and both are
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring and both are
supported by VictoriaMetrics.
See more details on [key concepts of writing data here](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#write-data).
@@ -389,7 +389,7 @@ and [other integrations](https://docs.victoriametrics.com/victoriametrics/integr
## Alerting
To run periodic conditions checks use [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/).
It allows creating a set of conditions using MetricsQL expressions and sending notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
It allows creating set of conditions using MetricsQL expressions and send notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
when such conditions are met.
See [vmalert quick start](https://docs.victoriametrics.com/victoriametrics/vmalert/#quickstart).
@@ -413,7 +413,7 @@ command line tool. It supports the following databases for migration to Victoria
## Productionization
When moving to production with VictoriaMetrics, we recommend following these best practices.
When going to production with VictoriaMetrics we recommend following the recommendations below.
### Monitoring
@@ -429,7 +429,7 @@ Using the [recommended alerting rules](https://github.com/VictoriaMetrics/Victor
will help to identify unwanted issues.
The rule of thumb is to have a separate installation of VictoriaMetrics or any other monitoring system to monitor the
production installation of VictoriaMetrics. This would make monitoring independent and help identify problems with
production installation of VictoriaMetrics. This would make monitoring independent and will help identify problems with
the main monitoring installation.
See more details in the article [VictoriaMetrics Monitoring](https://victoriametrics.com/blog/victoriametrics-monitoring/).
@@ -457,7 +457,7 @@ For backup configuration, please refer to [vmbackup documentation](https://docs.
### Configuring limits
To avoid excessive resource usage or performance degradation, limits must be in place:
To avoid excessive resource usage or performance degradation limits must be in place:
* [Resource usage limits](https://docs.victoriametrics.com/victoriametrics/faq/#how-to-set-a-memory-limit-for-victoriametrics-components);
* [Cardinality limiter](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cardinality-limiter).

View File

@@ -28,22 +28,15 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: all VictoriaMetrics components: improve logging for the `-memory.allowedBytes` flag to warn about excessively low value (less than 1MB). See issue [#10935](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10935).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `basicAuth.usernameFile` command-line flags for reading basic auth username from a file, similar to the existing `basicAuth.passwordFile`. The file is re-read every second. See [#9436](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9436). Thanks to @kimjune01 for the contribution.
* FEATURE: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `clusternative.tls` `vminsert` configuration flags for [multi-level cluster setups](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multi-level-cluster-setup). See [#10958](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10958).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve the [Top Queries](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#top-queries) table UI. Duration columns now display human-readable values (e.g. `1.23s`) instead of raw seconds, memory column shows human-readable sizes (e.g. `1.23 MB`), instant queries are labeled as `instant` instead of empty string, and column headers now show tooltips with descriptions. See [#10790](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10790).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. See [#10918](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). Thanks to @alexei38 for the contribution.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive credentials.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): hide values passed to `-remoteWrite.headers`,`remoteRead.headers`, `datasource.headers` and `notifier.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10972](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10972).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10958](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10958)
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): hide values passed to `vmalert.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -353,18 +353,6 @@ Example Docker image:
`victoriametrics/victoria-metrics:v1.143.0-enterprise-fips` uses the FIPS-compatible binary and based on `scratch` image.
## What Happens to Licensed Components When a License Expires
When a license expires, all licensed components continue to function normally until a restart occurs.
License checks happen only at startup. If a license expires while the component is running, nothing changes; the component continues to run until the next restart.
This means you don't need to restart components to install a new license. The component automatically picks up the new license the next time it restarts. The exception is when the `-license` flag is used, because the license is supplied at startup and changing it requires restarting VictoriaMetrics with the updated flag value.
If your license has expired and you decide to not renew it, you can switch to the VictoriaMetrics Open Source version without data loss, as both versions share the same data model. In doing so, however, you will lose access to the [VictoriaMetrics Enterprise features](https://docs.victoriametrics.com/victoriametrics/enterprise/#victoriametrics-enterprise-features).
See [updating the license key](https://docs.victoriametrics.com/victoriametrics/enterprise/#updating-the-license-key) for more details.
## Monitoring license expiration
All the Enterprise components expose the following metrics at the `/metrics` page:

File diff suppressed because it is too large Load Diff

141
lib/limits/select.go Normal file
View File

@@ -0,0 +1,141 @@
package limits
import (
"flag"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
var (
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically "+
"calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). "+
"See also -search.max* command-line flags at vmselect")
maxLabelNames = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxLabelValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", defaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
"when -search.maxConcurrentRequests limit is reached")
)
func defaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
// MaxConcurrentRequests returns the maximum number of concurrent requests
// a server can process.
//
// The remaining requests wait for up to MaxQueueDuration for their execution.
func MaxConcurrentRequests() int {
return *maxConcurrentRequests
}
// MaxConcurrentRequestsFlagName returns the name of the flag used for
// configuring max number of concurrent search requests.
func MaxConcurrentRequestsFlagName() string {
return "search.maxConcurrentRequests"
}
// MaxQueueDuration returns the maximum duration to wait if
// MaxConcurrentRequests are executed.
func MaxQueueDuration() time.Duration {
return *maxQueueDuration
}
// MaxQueueDurationFlagName returns the name of the flag used for configuring
// the max time duration during which a search request may remain in queue.
func MaxQueueDurationFlagName() string {
return "search.maxQueueDuration"
}
// MaxMetrics calculates the max number of metric names a single query is
// allowed to return based on the limit from the search query and
// -search.maxUniqueTimeseries flag value.
func MaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return MaxUniqueTimeseries()
}
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
searchQueryLimit = *maxUniqueTimeseries
}
return searchQueryLimit
}
// MaxLabelNames calculates the max number of label names a single query is
// allowed to return based on the limit from the search query and
// -search.maxTagKeys flag value.
func MaxLabelNames(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxLabelNames)
}
// MaxLabelValues calculates the max number of label values a single query is
// allowed to return based on the limit from the search query and
// -search.maxTagValues flag value.
func MaxLabelValues(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxLabelValues)
}
// MaxTagValueSuffixes calculates the max number of tag value suffixes a single
// query is allowed to return based on the limit from the search query and
// -search.maxTagValueSuffixesPerSearch flag value.
func MaxTagValueSuffixes(searchQueryLimit int) int {
return calculateLimit(searchQueryLimit, *maxTagValueSuffixes)
}
func calculateLimit(searchQueryLimit, flagValue int) int {
if 0 < searchQueryLimit && searchQueryLimit < flagValue {
return searchQueryLimit
}
return flagValue
}
var (
maxUniqueTimeseriesValue int
maxUniqueTimeseriesValueOnce sync.Once
)
// MaxUniqueTimeseries returns `-search.maxUniqueTimeseries` or the
// auto-calculated value based on available resources.
func MaxUniqueTimeseries() int {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeseries(*maxConcurrentRequests, memory.Remaining())
}
})
return maxUniqueTimeseriesValue
}
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries limit based
// on available resources.
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect
// `-search.maxConcurrentRequests`. In such cases, fallback to
// unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the maxUniqueTimeseries limit for a single request in the
// worst-case concurrent scenario. The approximate size of 1 unique series
// that could occupy in vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. "+
"To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}

52
lib/limits/select_test.go Normal file
View File

@@ -0,0 +1,52 @@
package limits
import (
"math"
"runtime"
"testing"
)
func TestCalculateMaxUniqueTimeseries(t *testing.T) {
f := func(maxConcurrentRequests, remainingMemory, want int) {
t.Helper()
got := calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory)
if got != want {
t.Fatalf("unexpected maxUniqueTimeseries: got %d, want %d", got, want)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}
func TestMaxMetrics(t *testing.T) {
originalMaxUniqueTimeseries := *maxUniqueTimeseries
defer func() {
*maxUniqueTimeseries = originalMaxUniqueTimeseries
}()
f := func(searchQueryLimit, flagLimit, want int) {
t.Helper()
*maxUniqueTimeseries = flagLimit
got := MaxMetrics(searchQueryLimit)
if got != want {
t.Fatalf("unexpected maxMetrics: got %d, want %d", got, want)
}
}
f(0, 1e6, 1e6)
f(2e6, 0, 2e6)
f(2e6, 1e6, 1e6)
}

View File

@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
srcDir := tb.path
srcDir, err = filepath.Abs(srcDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
}
dstDir, err = filepath.Abs(dstDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
}
prefix := srcDir + string(filepath.Separator)
if strings.HasPrefix(dstDir, prefix) {

View File

@@ -228,9 +228,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
return true
}
// MustReadBlock reads the next block from fq into dst and returns it.
// It first reads from the in-memory queue, then checks file-based queue.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
// MustReadBlock reads the next block from fq to dst and returns it.
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
@@ -240,7 +238,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
@@ -259,35 +265,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It does not block waiting for new blocks.
func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
return nil, false
}
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst
}
// Dirname returns the directory name for persistent queue.
func (fq *FastQueue) Dirname() string {
return filepath.Base(fq.pq.dir)

View File

@@ -485,14 +485,6 @@ again:
}
goto again
}
// see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241
if blockLen == 0 {
logger.Errorf("skipping corrupted %q, since zero block size is read from it", q.readerPath)
if err := q.skipBrokenChunkFile(); err != nil {
return dst, err
}
goto again
}
if blockLen > q.maxBlockSize {
logger.Errorf("skipping corrupted %q, since too big block size is read from it: %d bytes; cannot exceed %d bytes", q.readerPath, blockLen, q.maxBlockSize)
if err := q.skipBrokenChunkFile(); err != nil {

View File

@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
ok, err := tf.match(b)
if err != nil {
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
}
if !ok {
delete(lvs, lv)

View File

@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
ts.Seek(prefixBuf.B)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
}
return false
}

View File

@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
}
defer func() {
if err := zr.Close(); err != nil {
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
}
}()

View File

@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
}
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())