mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-07-02 15:15:04 +03:00
Compare commits
99 Commits
docs/file-
...
unauthoriz
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
beeea77690 | ||
|
|
142d7a9505 | ||
|
|
3548858843 | ||
|
|
1c05efb6ed | ||
|
|
2220f4e110 | ||
|
|
658f0b8a06 | ||
|
|
381468a9a2 | ||
|
|
632c42d8e7 | ||
|
|
8e5ca7e9b0 | ||
|
|
119ba0fb5b | ||
|
|
c82127b6d4 | ||
|
|
06bc808ddc | ||
|
|
a6927c46be | ||
|
|
15a4c31e87 | ||
|
|
54f9cd6edd | ||
|
|
2e16874e95 | ||
|
|
81d330f297 | ||
|
|
3278ddd170 | ||
|
|
cc790c2ea1 | ||
|
|
950f38fd6a | ||
|
|
ab9db9152f | ||
|
|
5b89f52c72 | ||
|
|
3608ab5b4c | ||
|
|
5ee1fa70c1 | ||
|
|
1b0e843e8f | ||
|
|
679646a3b3 | ||
|
|
bb3c038e2f | ||
|
|
8f32b6648f | ||
|
|
df5f11623f | ||
|
|
6c8a41f5ed | ||
|
|
e749a6ce8d | ||
|
|
615176ad55 | ||
|
|
3aec167f00 | ||
|
|
6f633e5654 | ||
|
|
50a827256a | ||
|
|
e30e8be1f4 | ||
|
|
24ac567a9f | ||
|
|
dce8193c16 | ||
|
|
e196479fb2 | ||
|
|
1c774564a2 | ||
|
|
e1c554d4a6 | ||
|
|
3419328f1c | ||
|
|
e841e45877 | ||
|
|
d53d8849e7 | ||
|
|
d3641394d9 | ||
|
|
53a8f4bd47 | ||
|
|
2b256952c9 | ||
|
|
12086e75de | ||
|
|
d426575622 | ||
|
|
a76b1ce0e3 | ||
|
|
5f49fb7f31 | ||
|
|
80d1104fca | ||
|
|
ae59c2624c | ||
|
|
4661f69d9f | ||
|
|
4d9901fbf4 | ||
|
|
9356c2111a | ||
|
|
45f0b87150 | ||
|
|
8480f6b43e | ||
|
|
61668f0672 | ||
|
|
d1ebbf573c | ||
|
|
16422b2d14 | ||
|
|
0f1ca87611 | ||
|
|
0dd2b2cee6 | ||
|
|
7caec5fcb4 | ||
|
|
612f8ac8d6 | ||
|
|
6aa31a09d7 | ||
|
|
b6e6a50e29 | ||
|
|
a6d48b6af3 | ||
|
|
dc4cf5631b | ||
|
|
005f133146 | ||
|
|
35fc595e6f | ||
|
|
710c920d60 | ||
|
|
0ceeb14076 | ||
|
|
adc29732f9 | ||
|
|
41ffe23b18 | ||
|
|
6229a8fe7d | ||
|
|
b58c73ac90 | ||
|
|
77efbb2e36 | ||
|
|
e388e41430 | ||
|
|
ed795a8443 | ||
|
|
94e5955b1f | ||
|
|
5b31a047a5 | ||
|
|
17c3fb5656 | ||
|
|
30133ec182 | ||
|
|
8b27a36fb5 | ||
|
|
f33cd8a937 | ||
|
|
615e49c983 | ||
|
|
eb1b4c6df4 | ||
|
|
ca71127158 | ||
|
|
1df805e23b | ||
|
|
dfc459eb38 | ||
|
|
83ef694e9c | ||
|
|
f6830298dc | ||
|
|
f16bcb1355 | ||
|
|
22802101e0 | ||
|
|
00420e16f9 | ||
|
|
6c3c548ddb | ||
|
|
d52de359d5 | ||
|
|
892f4aced2 |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -66,6 +66,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
|
||||
1
.github/workflows/changelog-linter.yml
vendored
1
.github/workflows/changelog-linter.yml
vendored
@@ -17,6 +17,7 @@ jobs:
|
||||
with:
|
||||
# needed for proper diff
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
|
||||
- name: 'Validate that changelog changes are under ## tip'
|
||||
run: |
|
||||
|
||||
1
.github/workflows/check-commit-signed.yml
vendored
1
.github/workflows/check-commit-signed.yml
vendored
@@ -15,6 +15,7 @@ jobs:
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
fetch-depth: 0 # we need full history for commit verification
|
||||
persist-credentials: false
|
||||
|
||||
- name: Check commit signatures
|
||||
run: |
|
||||
|
||||
2
.github/workflows/check-licenses.yml
vendored
2
.github/workflows/check-licenses.yml
vendored
@@ -18,6 +18,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
|
||||
2
.github/workflows/codeql-analysis-go.yml
vendored
2
.github/workflows/codeql-analysis-go.yml
vendored
@@ -32,6 +32,8 @@ jobs:
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Set up Go
|
||||
id: go
|
||||
|
||||
2
.github/workflows/docs.yaml
vendored
2
.github/workflows/docs.yaml
vendored
@@ -21,6 +21,7 @@ jobs:
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
path: __vm
|
||||
persist-credentials: false
|
||||
|
||||
- name: Checkout private code
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
@@ -28,6 +29,7 @@ jobs:
|
||||
repository: VictoriaMetrics/vmdocs
|
||||
token: ${{ secrets.VM_BOT_GH_TOKEN }}
|
||||
path: __vm-docs
|
||||
persist-credentials: true
|
||||
|
||||
- name: Import GPG key
|
||||
uses: crazy-max/ghaction-import-gpg@2dc316deee8e90f13e1a351ab510b4d5bc0c82cd # v7.0.0
|
||||
|
||||
6
.github/workflows/test.yml
vendored
6
.github/workflows/test.yml
vendored
@@ -35,6 +35,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
@@ -78,6 +80,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
@@ -103,6 +107,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
|
||||
2
.github/workflows/vmui.yml
vendored
2
.github/workflows/vmui.yml
vendored
@@ -33,6 +33,8 @@ jobs:
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Cache node_modules
|
||||
id: cache
|
||||
|
||||
@@ -1,9 +1,18 @@
|
||||
version: "2"
|
||||
linters:
|
||||
enable:
|
||||
- errorlint
|
||||
settings:
|
||||
errcheck:
|
||||
exclude-functions:
|
||||
- (net/http.ResponseWriter).Write
|
||||
errorlint:
|
||||
errorf: true
|
||||
# Do not enable `comparison` and `asserts`: they produce false positives,
|
||||
# since many call sites intentionally compare sentinel errors directly (e.g. err == io.EOF)
|
||||
# when the producer is documented to return them unwrapped. See https://github.com/VictoriaMetrics/VictoriaLogs/pull/1490
|
||||
comparison: false
|
||||
asserts: false
|
||||
exclusions:
|
||||
generated: lax
|
||||
presets:
|
||||
|
||||
19
Makefile
19
Makefile
@@ -447,7 +447,7 @@ vet:
|
||||
go vet ./app/...
|
||||
go vet ./apptest/...
|
||||
|
||||
check-all: fmt vet golangci-lint govulncheck
|
||||
check-all: fmt vet golangci-lint
|
||||
|
||||
clean-checkers: remove-golangci-lint remove-govulncheck
|
||||
|
||||
@@ -471,8 +471,9 @@ test-full-386:
|
||||
|
||||
apptest:
|
||||
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
|
||||
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
|
||||
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
|
||||
|
||||
# App tests for legacy indexDB
|
||||
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
@@ -489,6 +490,20 @@ apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
VMSTORAGE_V1_132_0_PATH=$${DIR}/vmstorage-prod \
|
||||
go test ./apptest/tests -run="^TestLegacySingle.*"
|
||||
|
||||
# App tests for mixed setups where vmsingle and vmcluster coexist.
|
||||
apptest-mixed: victoria-metrics-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
VERSION=v1.145.0; \
|
||||
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
|
||||
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
|
||||
DIR=/tmp/$${VERSION}; \
|
||||
test -d $${DIR} || (mkdir $${DIR} && \
|
||||
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
|
||||
); \
|
||||
VMSELECT_PATH=$${DIR}/vmselect-prod \
|
||||
go test ./apptest/tests -run="^TestMixed.*"
|
||||
|
||||
benchmark:
|
||||
go test -run=NO_TESTS -bench=. ./lib/...
|
||||
go test -run=NO_TESTS -bench=. ./app/...
|
||||
|
||||
@@ -89,7 +89,7 @@ func main() {
|
||||
}
|
||||
logger.Infof("starting VictoriaMetrics at %q...", listenAddrs)
|
||||
startTime := time.Now()
|
||||
vmstorage.Init(*vmselectMaxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
|
||||
vmstorage.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
|
||||
vmselect.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration)
|
||||
vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate)
|
||||
vminsert.Init()
|
||||
|
||||
@@ -51,7 +51,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
}
|
||||
q := req.URL.Query()
|
||||
precision := q.Get("precision")
|
||||
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
|
||||
// Read db tag from https://docs.influxdata.com/influxdb/v1/api/write/#operation/PostWrite
|
||||
db := q.Get("db")
|
||||
encoding := req.Header.Get("Content-Encoding")
|
||||
isStreamMode := req.Header.Get("Stream-Mode") == "1"
|
||||
|
||||
@@ -35,6 +35,9 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
|
||||
if len(extraLabels) == 0 && !prommetadata.IsEnabled() && at == nil {
|
||||
return insertRowsFast(at, timeseries)
|
||||
}
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
@@ -102,3 +105,17 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertRowsFast(at *auth.Token, timeseries []prompb.TimeSeries) error {
|
||||
rowsTotal := 0
|
||||
for i := range timeseries {
|
||||
rowsTotal += len(timeseries[i].Samples)
|
||||
}
|
||||
wr := &prompb.WriteRequest{Timeseries: timeseries}
|
||||
if !remotewrite.TryPush(at, wr) {
|
||||
return remotewrite.ErrQueueFullHTTPRetry
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -187,7 +187,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
||||
func (c *client) init(argIdx int, sanitizedURL string) {
|
||||
limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
|
||||
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
|
||||
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
|
||||
@@ -204,11 +204,20 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
||||
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
|
||||
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
|
||||
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
|
||||
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
|
||||
return float64(concurrency)
|
||||
})
|
||||
for range concurrency {
|
||||
c.wg.Go(c.runWorker)
|
||||
workers := queues.GetOptionalArg(argIdx)
|
||||
if workers <= 0 {
|
||||
workers = 1
|
||||
}
|
||||
inmemoryWorkers := inmemoryQueues.GetOptionalArg(argIdx)
|
||||
for range inmemoryWorkers {
|
||||
c.wg.Go(func() {
|
||||
c.runWorker(c.fq.MustReadInMemoryBlockBlocking)
|
||||
})
|
||||
}
|
||||
for range workers {
|
||||
c.wg.Go(func() {
|
||||
c.runWorker(c.fq.MustReadBlock)
|
||||
})
|
||||
}
|
||||
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
|
||||
}
|
||||
@@ -302,12 +311,12 @@ func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (c *client) runWorker() {
|
||||
func (c *client) runWorker(readBlock func(dst []byte) ([]byte, bool)) {
|
||||
var ok bool
|
||||
var block []byte
|
||||
ch := make(chan bool, 1)
|
||||
for {
|
||||
block, ok = c.fq.MustReadBlock(block[:0])
|
||||
block, ok = readBlock(block[:0])
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -209,13 +209,12 @@ func (wr *writeRequest) tryPushMetadata(mms []prompb.MetricMetadata) bool {
|
||||
func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
|
||||
// Direct copy for non-string fields, which are safe by value.
|
||||
dst.Type = src.Type
|
||||
dst.Unit = src.Unit
|
||||
|
||||
dst.AccountID = src.AccountID
|
||||
dst.ProjectID = src.ProjectID
|
||||
|
||||
// Pre-allocate memory for all string fields.
|
||||
neededBufLen := len(src.MetricFamilyName) + len(src.Help)
|
||||
neededBufLen := len(src.MetricFamilyName) + len(src.Help) + len(src.Unit)
|
||||
bufLen := len(wr.metadatabuf)
|
||||
wr.metadatabuf = slicesutil.SetLength(wr.metadatabuf, bufLen+neededBufLen)
|
||||
buf := wr.metadatabuf[:bufLen]
|
||||
@@ -230,6 +229,11 @@ func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
|
||||
buf = append(buf, src.Help...)
|
||||
dst.Help = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
// Copy Unit
|
||||
bufLen = len(buf)
|
||||
buf = append(buf, src.Unit...)
|
||||
dst.Unit = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
wr.metadatabuf = buf
|
||||
}
|
||||
|
||||
|
||||
@@ -12,19 +12,18 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
@@ -66,6 +65,9 @@ var (
|
||||
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
||||
"isn't enough for sending high volume of collected data to remote storage. "+
|
||||
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
|
||||
inmemoryQueues = flagutil.NewArrayInt("remoteWrite.inmemoryQueues", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
|
||||
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
|
||||
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
|
||||
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
||||
"It is hidden by default, since it can contain sensitive info such as auth key")
|
||||
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
|
||||
@@ -103,6 +105,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -159,8 +164,8 @@ func InitSecretFlags() {
|
||||
}
|
||||
|
||||
var (
|
||||
shardByURLLabelsMap map[string]struct{}
|
||||
shardByURLIgnoreLabelsMap map[string]struct{}
|
||||
shardByURLLabelsFilter []string
|
||||
shardByURLIgnoreLabelsFilter []string
|
||||
)
|
||||
|
||||
// Init initializes remotewrite.
|
||||
@@ -207,8 +212,8 @@ func Init() {
|
||||
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
|
||||
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
|
||||
}
|
||||
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
|
||||
shardByURLLabelsFilter = slices.Clone(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsFilter = slices.Clone(*shardByURLIgnoreLabels)
|
||||
|
||||
initLabelsGlobal()
|
||||
|
||||
@@ -304,6 +309,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
fs.RegisterPathFsMetrics(*tmpDataPath)
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -562,6 +571,14 @@ func tryPushMetadataToRemoteStorages(at *auth.Token, rwctxs []*remoteWriteCtx, m
|
||||
mm.ProjectID = at.ProjectID
|
||||
}
|
||||
}
|
||||
tmp := mms[:0]
|
||||
for _, mm := range mms {
|
||||
if timeserieslimits.IsMetricMetadataExceeding(&mm) {
|
||||
continue
|
||||
}
|
||||
tmp = append(tmp, mm)
|
||||
}
|
||||
mms = tmp
|
||||
// Do not shard metadata even if -remoteWrite.shardByURL is set, just replicate it among rwctxs.
|
||||
// Since metadata is usually small and there is no guarantee that metadata can be sent to
|
||||
// the same remote storage with the corresponding metrics.
|
||||
@@ -695,18 +712,18 @@ func shardAmountRemoteWriteCtx(tssBlock []prompb.TimeSeries, shards [][]prompb.T
|
||||
|
||||
for _, ts := range tssBlock {
|
||||
hashLabels := ts.Labels
|
||||
if len(shardByURLLabelsMap) > 0 {
|
||||
if len(shardByURLLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLLabelsMap[label.Name]; ok {
|
||||
if slices.Contains(shardByURLLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
tmpLabels.Labels = hashLabels
|
||||
} else if len(shardByURLIgnoreLabelsMap) > 0 {
|
||||
} else if len(shardByURLIgnoreLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
|
||||
if !slices.Contains(shardByURLIgnoreLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
@@ -807,34 +824,26 @@ var (
|
||||
// it omits the '=' separator between label name and value for backward compatibility.
|
||||
// Changing it would re-shard all series across remoteWrite targets.
|
||||
func getLabelsHashForShard(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
func getLabelsHash(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, '=')
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString("=")
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
var labelsHashBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func logSkippedSeries(labels []prompb.Label, flagName string, flagValue int) {
|
||||
select {
|
||||
case <-logSkippedSeriesTicker.C:
|
||||
@@ -859,6 +868,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
mdxFilter *mdx.Filter
|
||||
|
||||
streamAggrKeepInput bool
|
||||
streamAggrDropInput bool
|
||||
@@ -873,6 +883,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
mdxRowsPreserved *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -906,7 +917,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
|
||||
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
|
||||
queuesSize := queues.GetOptionalArg(argIdx)
|
||||
inmemoryQueueSize := inmemoryQueues.GetOptionalArg(argIdx)
|
||||
queuesSize := queues.GetOptionalArg(argIdx) + inmemoryQueueSize
|
||||
if queuesSize > maxQueues {
|
||||
queuesSize = maxQueues
|
||||
} else if queuesSize <= 0 {
|
||||
@@ -923,7 +935,13 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
if maxInmemoryBlocks < 2 {
|
||||
maxInmemoryBlocks = 2
|
||||
}
|
||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
||||
fqOpts := persistentqueue.OpenFastQueueOpts{
|
||||
MaxInmemoryBlocks: maxInmemoryBlocks,
|
||||
MaxPendingBytes: maxPendingBytes,
|
||||
IsPQDisabled: isPQDisabled,
|
||||
PrioritizeInmemoryData: inmemoryQueueSize > 0,
|
||||
}
|
||||
fq := persistentqueue.MustOpenFastQueueWithOpts(queuePath, sanitizedURL, fqOpts)
|
||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(fq.GetPendingBytes())
|
||||
})
|
||||
@@ -936,6 +954,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
return 0
|
||||
})
|
||||
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, sanitizedURL), func() float64 {
|
||||
return float64(queuesSize)
|
||||
})
|
||||
|
||||
var c *client
|
||||
switch remoteWriteURL.Scheme {
|
||||
@@ -944,7 +965,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
default:
|
||||
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
|
||||
}
|
||||
c.init(argIdx, queuesSize, sanitizedURL)
|
||||
c.init(argIdx, sanitizedURL)
|
||||
|
||||
// Initialize pss
|
||||
sf := significantFigures.GetOptionalArg(argIdx)
|
||||
@@ -959,7 +980,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
@@ -976,6 +996,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
|
||||
if enableMdx.GetOptionalArg(argIdx) {
|
||||
mdxFilter := mdx.NewFilter()
|
||||
rwctx.mdxFilter = mdxFilter
|
||||
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_mdx_tracked_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(mdxFilter.VMInstancesCount())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return rwctx
|
||||
}
|
||||
|
||||
@@ -989,6 +1019,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
if rwctx.mdxFilter != nil {
|
||||
rwctx.mdxFilter.MustStop()
|
||||
rwctx.mdxFilter = nil
|
||||
rwctx.mdxRowsPreserved = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
@@ -1004,6 +1039,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -1011,16 +1047,41 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
// TryPushTimeSeries doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
|
||||
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||
var rctx *relabelCtx
|
||||
var mctx *mdx.Ctx
|
||||
var v *[]prompb.TimeSeries
|
||||
defer func() {
|
||||
if rctx == nil {
|
||||
return
|
||||
if v != nil {
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
}
|
||||
if rctx != nil {
|
||||
putRelabelCtx(rctx)
|
||||
}
|
||||
if mctx != nil {
|
||||
mdx.PutContext(mctx)
|
||||
}
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
copyTimeSeriesIfNeeded := func() {
|
||||
if v == nil {
|
||||
v := tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
}
|
||||
|
||||
if rwctx.mdxFilter != nil {
|
||||
mctx = mdx.GetContext()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.mdx configs.
|
||||
copyTimeSeriesIfNeeded()
|
||||
tss = rwctx.mdxFilter.Filter(mctx, tss)
|
||||
if len(tss) == 0 {
|
||||
return true
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
rwctx.mdxRowsPreserved.Add(rowsCount)
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
@@ -1030,8 +1091,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, pcs)
|
||||
rowsCountAfterRelabel := getRowsCount(tss)
|
||||
@@ -1049,8 +1109,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
||||
} else if rwctx.streamAggrDropInput {
|
||||
@@ -1058,8 +1117,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropUnaggregatedSeries(tss, matchIdxs.B)
|
||||
}
|
||||
@@ -1178,15 +1236,6 @@ func getRowsCount(tss []prompb.TimeSeries) int {
|
||||
}
|
||||
return rowsCount
|
||||
}
|
||||
|
||||
func newMapFromStrings(a []string) map[string]struct{} {
|
||||
m := make(map[string]struct{}, len(a))
|
||||
for _, s := range a {
|
||||
m[s] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getMaxHourlySeries() int {
|
||||
limit := *maxHourlySeries
|
||||
if limit == -1 || limit > math.MaxInt32 {
|
||||
|
||||
@@ -282,7 +282,8 @@ func processFlags() {
|
||||
|
||||
func setUp() {
|
||||
const maxConcurrentRequests = 4
|
||||
vmstorage.Init(maxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
|
||||
maxQueueDuration := 5 * time.Second
|
||||
vmstorage.Init(maxConcurrentRequests, maxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
readyCheckFunc := func() bool {
|
||||
|
||||
@@ -145,10 +145,10 @@ func TestRuleValidate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGroupValidate_Failure(t *testing.T) {
|
||||
f := func(group *Group, validateExpressions bool, errStrExpected string) {
|
||||
f := func(data []byte, validateExpressions bool, errStrExpected string) {
|
||||
t.Helper()
|
||||
|
||||
err := group.Validate(nil, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, nil, validateExpressions)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
@@ -158,275 +158,238 @@ func TestGroupValidate_Failure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{}, false, "group name must be set")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: ""
|
||||
`), false, "group name must be set")
|
||||
|
||||
f(&Group{
|
||||
Name: "both record and alert are not set",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
},
|
||||
},
|
||||
}, false, "invalid rule")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: both record and alert are not set
|
||||
rules:
|
||||
- expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
- expr: "sumSeries(time('foo.bar',10))"
|
||||
`), false, "invalid rule")
|
||||
|
||||
f(&Group{
|
||||
Name: "negative interval",
|
||||
Interval: promutil.NewDuration(-1),
|
||||
}, false, "interval shouldn't be lower than 0")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: negative interval
|
||||
interval: -1ms
|
||||
`), false, "interval shouldn't be lower than 0")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big eval_offset
|
||||
interval: 1m
|
||||
eval_offset: 2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big negative eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(-2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big negative eval_offset
|
||||
interval: 1m
|
||||
eval_offset: -2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
limit := -1
|
||||
f(&Group{
|
||||
Name: "wrong limit",
|
||||
Limit: &limit,
|
||||
}, false, "invalid limit")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong limit
|
||||
limit: -1
|
||||
`), false, "invalid limit")
|
||||
|
||||
f(&Group{
|
||||
Name: "wrong concurrency",
|
||||
Concurrency: -1,
|
||||
}, false, "invalid concurrency")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong concurrency
|
||||
concurrency: -1
|
||||
`), false, "invalid concurrency")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test thanos",
|
||||
Type: NewRawType("thanos"),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, true, "unknown datasource type")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test thanos
|
||||
type: thanos
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), true, "unknown datasource type")
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus expr",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), true, "bad MetricsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "some-description",
|
||||
}},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite expr
|
||||
type: graphite
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: some-description
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "stats count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "stats count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr multipart
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite with prometheus expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
ID: 1,
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Record: "r2",
|
||||
ID: 2,
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite with prometheus expr
|
||||
type: graphite
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sumSeries(time('foo.bar',10))"
|
||||
for: 10ms
|
||||
- record: r2
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs with prometheus exp",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs with prometheus expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test prometheus with vlogs exp",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "* | stats by (path) count()",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus with vlogs expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "* | stats by (path) count()"
|
||||
for: 10ms
|
||||
`), true, "bad MetricsQL expr")
|
||||
}
|
||||
|
||||
func TestGroupValidate_Success(t *testing.T) {
|
||||
f := func(group *Group, validateAnnotations, validateExpressions bool) {
|
||||
f := func(data []byte, validateAnnotations, validateExpressions bool) {
|
||||
t.Helper()
|
||||
|
||||
var validateTplFn ValidateTplFn
|
||||
if validateAnnotations {
|
||||
validateTplFn = notifier.ValidateTemplates
|
||||
}
|
||||
err := group.Validate(validateTplFn, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, validateTplFn, validateExpressions)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), false, false)
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, false)
|
||||
|
||||
// validate annotations
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": `
|
||||
{{ with printf "node_memory_MemTotal{job='node',instance='%s'}" "localhost" | query }}
|
||||
{{ . | first | value | humanize1024 }}B
|
||||
{{ end }}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, true, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "\n{{ with printf \"node_memory_MemTotal{job='node',instance='%s'}\" \"localhost\" | query }}\n {{ . | first | value | humanize1024 }}B\n{{ end }}"
|
||||
`), true, false)
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f(&Group{
|
||||
Name: "test victorialogs",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus
|
||||
type: prometheus
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test victorialogs
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: " _time: 1m | stats count(*) as requests"
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
}
|
||||
|
||||
func TestHashRule_NotEqual(t *testing.T) {
|
||||
|
||||
@@ -315,6 +315,11 @@ func configReload(ctx context.Context, m *manager, groupsCfg []config.Group, sig
|
||||
|
||||
parseFn := config.Parse
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
@@ -457,12 +457,10 @@ func TestSetIntervalAsTimeFilter(t *testing.T) {
|
||||
f(`* | count()`, "vlogs", true)
|
||||
f(`error OR _time:5m | count()`, "vlogs", true)
|
||||
f(`(_time: 5m AND error) OR (_time: 5m AND warn) | count()`, "vlogs", true)
|
||||
f(`* | error OR _time:5m | count()`, "vlogs", true)
|
||||
|
||||
f(`_time:5m | count()`, "vlogs", false)
|
||||
f(`_time:2023-04-25T22:45:59Z | count()`, "vlogs", false)
|
||||
f(`error AND _time:5m | count()`, "vlogs", false)
|
||||
f(`* | error AND _time:5m | count()`, "vlogs", false)
|
||||
}
|
||||
|
||||
func TestRecordingRuleExec_Partial(t *testing.T) {
|
||||
|
||||
@@ -118,9 +118,10 @@ type AccessLogFilters struct {
|
||||
}
|
||||
|
||||
func (ui *UserInfo) logRequest(r *http.Request, userName string, statusCode int, duration time.Duration) {
|
||||
if ui.AccessLog == nil {
|
||||
if ui == nil || ui.AccessLog == nil {
|
||||
return
|
||||
}
|
||||
|
||||
filters := ui.AccessLog.Filters
|
||||
if filters != nil && len(filters.SkipStatusCodes) > 0 {
|
||||
if slices.Contains(filters.SkipStatusCodes, statusCode) {
|
||||
@@ -134,6 +135,17 @@ func (ui *UserInfo) logRequest(r *http.Request, userName string, statusCode int,
|
||||
r.Host, requestURI, statusCode, remoteAddr, r.UserAgent(), r.Referer(), duration.Milliseconds(), userName)
|
||||
}
|
||||
|
||||
// haveURLs reports whether ui has at least one URL route configured.
|
||||
// It is used for validating unauthorized_user config, since other users
|
||||
// must always have either URLPrefix or URLMaps set.
|
||||
func (ui *UserInfo) haveURLs() bool {
|
||||
if ui == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return ui.URLPrefix != nil || len(ui.URLMaps) > 0 || ui.DefaultURL != nil
|
||||
}
|
||||
|
||||
// HeadersConf represents config for request and response headers.
|
||||
type HeadersConf struct {
|
||||
RequestHeaders []*Header `yaml:"headers,omitempty"`
|
||||
@@ -840,6 +852,11 @@ func authConfigReloader(sighupCh <-chan os.Signal) {
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
@@ -906,7 +923,8 @@ func reloadAuthConfigData(data []byte) (bool, error) {
|
||||
return false, fmt.Errorf("failed to parse auth config: %w", err)
|
||||
}
|
||||
|
||||
jui, oidcDP, err := parseJWTUsers(ac)
|
||||
oidcDP := &oidcDiscovererPool{}
|
||||
jui, err := parseJWTUsers(ac, oidcDP)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to parse JWT users from auth config: %w", err)
|
||||
}
|
||||
@@ -977,8 +995,11 @@ func parseAuthConfig(data []byte) (*AuthConfig, error) {
|
||||
if err := parseJWTPlaceholdersForUserInfo(ui, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := ui.initURLs(); err != nil {
|
||||
return nil, err
|
||||
|
||||
if ui.URLPrefix != nil || len(ui.URLMaps) > 0 || ui.DefaultURL != nil {
|
||||
if err := ui.initURLs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
metricLabels, err := ui.getMetricLabels()
|
||||
|
||||
@@ -140,6 +140,18 @@ users:
|
||||
- "ProjectID: {{.MetricsProjectID}}"
|
||||
url_prefix: "http://vminsert:8480/insert/prometheus"
|
||||
|
||||
# JWT-based routing that relies solely on custom claims.
|
||||
# The `vm_access` claim is missing, default value will be used.
|
||||
# e.g. {"role": "admin"}.
|
||||
- name: jwt-custom-claims
|
||||
jwt:
|
||||
skip_verify: true
|
||||
vm_default_access_claim:
|
||||
metrics_account_id: 1
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: "http://vmselect-admin:8481/select/0/prometheus"
|
||||
|
||||
# Requests without Authorization header are proxied according to `unauthorized_user` section.
|
||||
# Requests are proxied in round-robin fashion between `url_prefix` backends.
|
||||
# The deny_partial_response query arg is added to all the proxied requests.
|
||||
|
||||
@@ -65,6 +65,8 @@ type JWTConfig struct {
|
||||
MatchClaims map[string]string `yaml:"match_claims,omitempty"`
|
||||
parsedMatchClaims []*jwt.Claim
|
||||
|
||||
DefaultVMAccessClaim *jwt.VMAccessClaim `yaml:"default_vm_access_claim,omitempty"`
|
||||
|
||||
// verifierPool is used to verify JWT tokens.
|
||||
// It is initialized from PublicKeys and/or PublicKeyFiles.
|
||||
// In this case, it is initialized once at config reload and never updated until next reload
|
||||
@@ -72,9 +74,8 @@ type JWTConfig struct {
|
||||
verifierPool atomic.Pointer[jwt.VerifierPool]
|
||||
}
|
||||
|
||||
func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
func parseJWTUsers(ac *AuthConfig, oidcDP *oidcDiscovererPool) ([]*UserInfo, error) {
|
||||
jui := make([]*UserInfo, 0, len(ac.Users))
|
||||
oidcDP := &oidcDiscovererPool{}
|
||||
|
||||
uniqClaims := make(map[string]*UserInfo)
|
||||
var sortedClaims []string
|
||||
@@ -85,10 +86,10 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
}
|
||||
|
||||
if ui.AuthToken != "" || ui.BearerToken != "" || ui.Username != "" || ui.Password != "" {
|
||||
return nil, nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
|
||||
return nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
|
||||
}
|
||||
if len(jwtToken.PublicKeys) == 0 && len(jwtToken.PublicKeyFiles) == 0 && !jwtToken.SkipVerify && jwtToken.OIDC == nil {
|
||||
return nil, nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
|
||||
return nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
|
||||
}
|
||||
var claimsString string
|
||||
sortedClaims = sortedClaims[:0]
|
||||
@@ -97,7 +98,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
sortedClaims = append(sortedClaims, fmt.Sprintf("%s=%s", ck, cv))
|
||||
pc, err := jwt.NewClaim(ck, cv)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
|
||||
return nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
|
||||
}
|
||||
parsedClaims = append(parsedClaims, pc)
|
||||
}
|
||||
@@ -106,7 +107,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
claimsString = strings.Join(sortedClaims, ",")
|
||||
|
||||
if oldUI, ok := uniqClaims[claimsString]; ok {
|
||||
return nil, nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
|
||||
return nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
|
||||
}
|
||||
uniqClaims[claimsString] = &ui
|
||||
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 {
|
||||
@@ -115,7 +116,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
for i := range jwtToken.PublicKeys {
|
||||
k, err := jwt.ParseKey([]byte(jwtToken.PublicKeys[i]))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
keys = append(keys, k)
|
||||
}
|
||||
@@ -123,52 +124,52 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
for _, filePath := range jwtToken.PublicKeyFiles {
|
||||
keyData, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
|
||||
return nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
|
||||
}
|
||||
k, err := jwt.ParseKey(keyData)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
|
||||
return nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
|
||||
}
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
vp, err := jwt.NewVerifierPool(keys)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jwtToken.verifierPool.Store(vp)
|
||||
}
|
||||
if jwtToken.OIDC != nil {
|
||||
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 || jwtToken.SkipVerify {
|
||||
return nil, nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
|
||||
return nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
|
||||
}
|
||||
|
||||
if jwtToken.OIDC.Issuer == "" {
|
||||
return nil, nil, fmt.Errorf("oidc issuer cannot be empty")
|
||||
return nil, fmt.Errorf("oidc issuer cannot be empty")
|
||||
}
|
||||
isserURL, err := url.Parse(jwtToken.OIDC.Issuer)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
|
||||
return nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
|
||||
}
|
||||
if isserURL.Scheme != "https" && isserURL.Scheme != "http" {
|
||||
return nil, nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
|
||||
return nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
|
||||
}
|
||||
|
||||
oidcDP.createOrAdd(ui.JWT.OIDC.Issuer, &ui.JWT.verifierPool)
|
||||
}
|
||||
|
||||
if err := parseJWTPlaceholdersForUserInfo(&ui, true); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ui.initURLs(); err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metricLabels, err := ui.getMetricLabels()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot parse metric_labels: %w", err)
|
||||
return nil, fmt.Errorf("cannot parse metric_labels: %w", err)
|
||||
}
|
||||
ui.requests = ac.ms.GetOrCreateCounter(`vmauth_user_requests_total` + metricLabels)
|
||||
ui.requestErrors = ac.ms.GetOrCreateCounter(`vmauth_user_request_errors_total` + metricLabels)
|
||||
@@ -187,7 +188,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
|
||||
rt, err := newRoundTripper(ui.TLSCAFile, ui.TLSCertFile, ui.TLSKeyFile, ui.TLSServerName, ui.TLSInsecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
|
||||
return nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
|
||||
}
|
||||
ui.rt = rt
|
||||
|
||||
@@ -200,7 +201,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
|
||||
return len(jui[i].JWT.MatchClaims) > len(jui[j].JWT.MatchClaims)
|
||||
})
|
||||
|
||||
return jui, oidcDP, nil
|
||||
return jui, nil
|
||||
}
|
||||
|
||||
var tokenPool sync.Pool
|
||||
@@ -433,7 +434,6 @@ func validateJWTPlaceholdersForURL(up *URLPrefix, isAllowed bool) error {
|
||||
}
|
||||
if strings.Contains(p, placeholderPrefix) {
|
||||
return fmt.Errorf("invalid placeholder found in URL request path: %q, supported values are: %s", bu.Path, strings.Join(allPlaceholders, ", "))
|
||||
|
||||
}
|
||||
}
|
||||
for param, values := range bu.Query() {
|
||||
@@ -488,7 +488,6 @@ func hasAnyPlaceholders(u *url.URL) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -39,16 +39,14 @@ XOtclIk1uhc03oL9nOQ=
|
||||
}
|
||||
return
|
||||
}
|
||||
users, oidcDP, err := parseJWTUsers(ac)
|
||||
oidcDP := &oidcDiscovererPool{}
|
||||
users, err := parseJWTUsers(ac, oidcDP)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error; got %v", users)
|
||||
}
|
||||
if expErr != err.Error() {
|
||||
t.Fatalf("unexpected error; got\n%q\nwant \n%q", err.Error(), expErr)
|
||||
}
|
||||
if oidcDP != nil {
|
||||
t.Fatalf("expecting nil oidcDP; got %v", oidcDP)
|
||||
}
|
||||
}
|
||||
|
||||
// unauthorized_user cannot be used with jwt
|
||||
@@ -326,7 +324,8 @@ XOtclIk1uhc03oL9nOQ=
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
jui, oidcDP, err := parseJWTUsers(ac)
|
||||
oidcDP := &oidcDiscovererPool{}
|
||||
jui, err := parseJWTUsers(ac, oidcDP)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -173,11 +175,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if len(ats) == 0 {
|
||||
// Process requests for unauthorized users
|
||||
ui := authConfig.Load().UnauthorizedUser
|
||||
if ui != nil {
|
||||
if ui.haveURLs() {
|
||||
processUserRequest(w, r, ui, nil)
|
||||
return true
|
||||
}
|
||||
|
||||
ui.logRequest(r, ``, http.StatusUnauthorized, 0)
|
||||
handleMissingAuthorizationError(w)
|
||||
return true
|
||||
}
|
||||
@@ -190,18 +193,25 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if tkn == nil {
|
||||
logger.Panicf("BUG: unexpected nil jwt token for user %q", ui.name())
|
||||
}
|
||||
if !tkn.HasVMAccessClaim() && ui.JWT.DefaultVMAccessClaim == nil {
|
||||
ui.logRequest(r, ``, http.StatusUnauthorized, 0)
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return true
|
||||
}
|
||||
defer putToken(tkn)
|
||||
processUserRequest(w, r, ui, tkn)
|
||||
return true
|
||||
}
|
||||
|
||||
uu := authConfig.Load().UnauthorizedUser
|
||||
if uu != nil {
|
||||
if uu.haveURLs() {
|
||||
processUserRequest(w, r, uu, nil)
|
||||
return true
|
||||
}
|
||||
|
||||
invalidAuthTokenRequests.Inc()
|
||||
slowdownUnauthorizedResponse(r)
|
||||
uu.logRequest(r, ``, http.StatusUnauthorized, 0)
|
||||
if *logInvalidAuthTokens {
|
||||
err := fmt.Errorf("cannot authorize request with auth tokens %q", ats)
|
||||
err = &httpserver.ErrorWithStatusCode{
|
||||
@@ -424,8 +434,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
|
||||
}
|
||||
targetURL := bu.url
|
||||
if tkn != nil {
|
||||
vmac := tkn.VMAccess()
|
||||
if !tkn.HasVMAccessClaim() {
|
||||
vmac = ui.JWT.DefaultVMAccessClaim
|
||||
}
|
||||
// for security reasons allow templating only for configured url values and headers
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, tkn.VMAccess())
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, vmac)
|
||||
}
|
||||
if isDefault {
|
||||
// Don't change path and add request_path query param for default route.
|
||||
@@ -881,3 +895,20 @@ func debugInfo(u *url.URL, r *http.Request) string {
|
||||
fmt.Fprint(s, ")")
|
||||
return s.String()
|
||||
}
|
||||
|
||||
// slowdownUnauthorizedResponse adds a random delay in the [2..3] seconds range before returning an unauthorized response.
|
||||
// This reduces the effectiveness of brute-force.
|
||||
//
|
||||
// Recommended by OWASP Top10:
|
||||
// https://owasp.org/Top10/2025/A07_2025-Authentication_Failures
|
||||
func slowdownUnauthorizedResponse(r *http.Request) {
|
||||
|
||||
d := 2*time.Second + time.Duration(rand.IntN(1000))*time.Millisecond
|
||||
t := timerpool.Get(d)
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-r.Context().Done():
|
||||
}
|
||||
timerpool.Put(t)
|
||||
}
|
||||
|
||||
@@ -739,6 +739,12 @@ users:
|
||||
"vm_access": map[string]any{},
|
||||
}, false)
|
||||
|
||||
// token without vm_access claim, but with a custom claim usable for routing
|
||||
roleToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"role": "admin",
|
||||
}, true)
|
||||
|
||||
fullToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"vm_access": map[string]any{
|
||||
@@ -779,6 +785,26 @@ statusCode=401
|
||||
Unauthorized`
|
||||
f(simpleCfgStr, request, responseExpected)
|
||||
|
||||
// token without vm_access claim is accepted when it
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+roleToken)
|
||||
responseExpected = `
|
||||
statusCode=200
|
||||
path: /foo/abc
|
||||
query:
|
||||
headers:`
|
||||
f(fmt.Sprintf(`
|
||||
users:
|
||||
- jwt:
|
||||
public_keys:
|
||||
- %q
|
||||
default_vm_access_claim:
|
||||
metrics_account_id: 10
|
||||
metrics_project_id: 10
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: {BACKEND}/foo`, string(publicKeyPEM)), request, responseExpected)
|
||||
|
||||
// expired token
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+expiredToken)
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
See vmctl docs [here](https://docs.victoriametrics.com/victoriametrics/vmctl/).
|
||||
|
||||
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmctl.md).
|
||||
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmctl/vmctl.md).
|
||||
|
||||
@@ -131,16 +131,13 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
|
||||
if ba.Username == "" {
|
||||
return fmt.Errorf("missing `username` in `basic_auth` section")
|
||||
}
|
||||
if ba.Password != "" {
|
||||
ac.getAuthHeader = func() string {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64
|
||||
}
|
||||
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
return nil
|
||||
ac.getAuthHeader = func() string {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64
|
||||
}
|
||||
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,8 @@ const (
|
||||
vmAddr = "vm-addr"
|
||||
vmUser = "vm-user"
|
||||
vmPassword = "vm-password"
|
||||
vmHeaders = "vm-headers"
|
||||
vmBearerToken = "vm-bearer-token"
|
||||
vmAccountID = "vm-account-id"
|
||||
vmConcurrency = "vm-concurrency"
|
||||
vmCompress = "vm-compress"
|
||||
@@ -112,6 +114,16 @@ var (
|
||||
Usage: "VictoriaMetrics password for basic auth",
|
||||
EnvVars: []string{"VM_PASSWORD"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmHeaders,
|
||||
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
|
||||
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
|
||||
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmBearerToken,
|
||||
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmAccountID,
|
||||
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +
|
||||
|
||||
@@ -259,7 +259,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
|
||||
|
||||
fieldValues, ok := r.values[cr.field]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("response doesn't contain filed %q", cr.field)
|
||||
return nil, nil, fmt.Errorf("response doesn't contain field %q", cr.field)
|
||||
}
|
||||
values := make([]float64, len(fieldValues))
|
||||
for i, fv := range fieldValues {
|
||||
|
||||
@@ -457,7 +457,7 @@ func main() {
|
||||
auth.WithBearer(c.String(vmNativeDstBearerToken)),
|
||||
auth.WithHeaders(c.String(vmNativeDstHeaders)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
|
||||
return fmt.Errorf("error initialize auth config for destination: %s: %w", dstAddr, err)
|
||||
}
|
||||
|
||||
// create TLS config
|
||||
@@ -563,11 +563,11 @@ func main() {
|
||||
}()
|
||||
|
||||
err = app.Run(os.Args)
|
||||
pushmetrics.StopAndPush()
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
log.Printf("Total time: %v", time.Since(start))
|
||||
pushmetrics.StopAndPush()
|
||||
}
|
||||
|
||||
func initConfigVM(c *cli.Context) (vm.Config, error) {
|
||||
@@ -596,11 +596,18 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
|
||||
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
|
||||
}
|
||||
|
||||
authCfg, err := auth.Generate(
|
||||
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
|
||||
auth.WithBearer(c.String(vmBearerToken)),
|
||||
auth.WithHeaders(c.String(vmHeaders)))
|
||||
if err != nil {
|
||||
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %w", addr, err)
|
||||
}
|
||||
|
||||
return vm.Config{
|
||||
Addr: addr,
|
||||
Transport: tr,
|
||||
User: c.String(vmUser),
|
||||
Password: c.String(vmPassword),
|
||||
AuthCfg: authCfg,
|
||||
Concurrency: uint8(c.Int(vmConcurrency)),
|
||||
Compress: c.Bool(vmCompress),
|
||||
AccountID: c.String(vmAccountID),
|
||||
|
||||
@@ -74,9 +74,9 @@ func wrapErr(vmErr *vm.ImportError, verbose bool) error {
|
||||
verboseMsg = "(enable `--verbose` output to get more details)"
|
||||
}
|
||||
if vmErr.Err == nil {
|
||||
return fmt.Errorf("%s\n\tLatest delivered batch for timestamps range %d - %d %s\n%s",
|
||||
return fmt.Errorf("%w\n\tLatest delivered batch for timestamps range %d - %d %s\n%s",
|
||||
vmErr.Err, minTS, maxTS, verboseMsg, errTS)
|
||||
}
|
||||
return fmt.Errorf("%s\n\tImporting batch failed for timestamps range %d - %d %s\n%s",
|
||||
return fmt.Errorf("%w\n\tImporting batch failed for timestamps range %d - %d %s\n%s",
|
||||
vmErr.Err, minTS, maxTS, verboseMsg, errTS)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
@@ -27,6 +28,8 @@ type Config struct {
|
||||
// --httpListenAddr value for single node version
|
||||
// --httpListenAddr value of vmselect component for cluster version
|
||||
Addr string
|
||||
|
||||
AuthCfg *auth.Config
|
||||
// Transport allows specifying custom http.Transport
|
||||
Transport *http.Transport
|
||||
// Concurrency defines number of worker
|
||||
@@ -40,10 +43,6 @@ type Config struct {
|
||||
// BatchSize defines how many samples
|
||||
// importer collects before sending the import request
|
||||
BatchSize int
|
||||
// User name for basic auth
|
||||
User string
|
||||
// Password for basic auth
|
||||
Password string
|
||||
// SignificantFigures defines the number of significant figures to leave
|
||||
// in metric values before importing.
|
||||
// Zero value saves all the significant decimal places
|
||||
@@ -65,11 +64,10 @@ type Config struct {
|
||||
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
|
||||
type Importer struct {
|
||||
addr string
|
||||
authCfg *auth.Config
|
||||
client *http.Client
|
||||
importPath string
|
||||
compress bool
|
||||
user string
|
||||
password string
|
||||
|
||||
close chan struct{}
|
||||
input chan *TimeSeries
|
||||
@@ -148,8 +146,7 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
|
||||
client: client,
|
||||
importPath: importPath,
|
||||
compress: cfg.Compress,
|
||||
user: cfg.User,
|
||||
password: cfg.Password,
|
||||
authCfg: cfg.AuthCfg,
|
||||
rl: limiter.NewLimiter(cfg.RateLimit),
|
||||
close: make(chan struct{}),
|
||||
input: make(chan *TimeSeries, cfg.Concurrency*4),
|
||||
@@ -304,8 +301,8 @@ func (im *Importer) Ping() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
|
||||
}
|
||||
if im.user != "" {
|
||||
req.SetBasicAuth(im.user, im.password)
|
||||
if im.authCfg != nil {
|
||||
im.authCfg.SetHeaders(req, true)
|
||||
}
|
||||
resp, err := im.client.Do(req)
|
||||
if err != nil {
|
||||
@@ -334,8 +331,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
|
||||
im.importRequestsErrorsTotal.Inc()
|
||||
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
|
||||
}
|
||||
if im.user != "" {
|
||||
req.SetBasicAuth(im.user, im.password)
|
||||
if im.authCfg != nil {
|
||||
im.authCfg.SetHeaders(req, true)
|
||||
}
|
||||
if im.compress {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
|
||||
@@ -405,7 +405,16 @@ func buildMatchWithFilter(filter string, metricName string) (string, error) {
|
||||
if len(tf.Key) == 0 {
|
||||
continue
|
||||
}
|
||||
a = append(a, tf.String())
|
||||
switch {
|
||||
case tf.IsNegative && tf.IsRegexp:
|
||||
a = append(a, fmt.Sprintf("%s!~%q", tf.Key, tf.Value))
|
||||
case tf.IsNegative:
|
||||
a = append(a, fmt.Sprintf("%s!=%q", tf.Key, tf.Value))
|
||||
case tf.IsRegexp:
|
||||
a = append(a, fmt.Sprintf("%s=~%q", tf.Key, tf.Value))
|
||||
default:
|
||||
a = append(a, fmt.Sprintf("%s=%q", tf.Key, tf.Value))
|
||||
}
|
||||
}
|
||||
a = append(a, nameFilter)
|
||||
filters = append(filters, strings.Join(a, ","))
|
||||
|
||||
@@ -175,13 +175,19 @@ func (ctx *InsertCtx) WriteMetadata(mmpbs []prompb.MetricMetadata) error {
|
||||
}
|
||||
mms := ctx.mms
|
||||
mms = slicesutil.SetLength(mms, len(mmpbs))
|
||||
for idx, mmpb := range mmpbs {
|
||||
mm := &mms[idx]
|
||||
var cnt int
|
||||
for _, mmpb := range mmpbs {
|
||||
if timeserieslimits.IsMetricMetadataExceeding(&mmpb) {
|
||||
continue
|
||||
}
|
||||
mm := &mms[cnt]
|
||||
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.MetricFamilyName)
|
||||
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
|
||||
mm.Type = mmpb.Type
|
||||
mm.Unit = bytesutil.ToUnsafeBytes(mmpb.Unit)
|
||||
cnt++
|
||||
}
|
||||
mms = mms[:cnt]
|
||||
ctx.mms = mms
|
||||
|
||||
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
|
||||
@@ -201,14 +207,19 @@ func (ctx *InsertCtx) WritePromMetadata(mmps []prometheus.Metadata) error {
|
||||
}
|
||||
mms := ctx.mms
|
||||
mms = slicesutil.SetLength(mms, len(mmps))
|
||||
for idx, mmpb := range mmps {
|
||||
mm := &mms[idx]
|
||||
var cnt int
|
||||
for _, mmpb := range mmps {
|
||||
mm := &mms[cnt]
|
||||
if timeserieslimits.IsPrometheusMetadataExceeding(&mmpb) {
|
||||
continue
|
||||
}
|
||||
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.Metric)
|
||||
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
|
||||
mm.Type = mmpb.Type
|
||||
cnt++
|
||||
}
|
||||
mms = mms[:cnt]
|
||||
ctx.mms = mms
|
||||
|
||||
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
|
||||
if err != nil {
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
|
||||
@@ -49,7 +49,7 @@ func InsertHandlerForHTTP(req *http.Request) error {
|
||||
}
|
||||
q := req.URL.Query()
|
||||
precision := q.Get("precision")
|
||||
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
|
||||
// Read db tag from https://docs.influxdata.com/influxdb/v1/api/write/#operation/PostWrite
|
||||
db := q.Get("db")
|
||||
encoding := req.Header.Get("Content-Encoding")
|
||||
isStreamMode := req.Header.Get("Stream-Mode") == "1"
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
nethttputil "net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -29,6 +27,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmalertproxy"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -38,7 +37,10 @@ var (
|
||||
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
|
||||
"See also -search.logQueryMemoryUsage")
|
||||
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
|
||||
|
||||
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , "+
|
||||
"then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert")
|
||||
)
|
||||
|
||||
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
||||
@@ -55,8 +57,8 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
|
||||
concurrencyLimitCh = make(chan struct{}, maxConcurrentRequests)
|
||||
|
||||
initVMUIConfig()
|
||||
initVMAlertProxy()
|
||||
|
||||
vmalertproxy.Init(*vmalertProxyURL)
|
||||
flagutil.RegisterSecretFlag("vmalert.proxyURL")
|
||||
}
|
||||
|
||||
@@ -514,10 +516,11 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
if len(*vmalertProxyURL) == 0 {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(w, "%s", `{"status":"error","msg":"for accessing vmalert flag '-vmalert.proxyURL' must be configured"}`)
|
||||
fmt.Fprintf(w, "%s", `{"status":"error","msg":"the '-vmalert.proxyURL' command-line must be configured; `+
|
||||
`see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert"}`)
|
||||
return true
|
||||
}
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -555,7 +558,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/rules", "/rules":
|
||||
rulesRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
return true
|
||||
}
|
||||
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#rules
|
||||
@@ -565,7 +568,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/alerts", "/alerts":
|
||||
alertsRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
return true
|
||||
}
|
||||
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
|
||||
@@ -575,7 +578,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/notifiers", "/notifiers":
|
||||
notifiersRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
return true
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@@ -722,48 +725,7 @@ var (
|
||||
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
|
||||
)
|
||||
|
||||
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request, path string) {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err == nil || err == http.ErrAbortHandler {
|
||||
// Suppress http.ErrAbortHandler panic.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
|
||||
return
|
||||
}
|
||||
// Forward other panics to the caller.
|
||||
panic(err)
|
||||
}()
|
||||
req := r.Clone(r.Context())
|
||||
req.URL.Path = strings.TrimPrefix(path, "prometheus")
|
||||
req.Host = vmalertProxyHost
|
||||
|
||||
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
|
||||
// Grafana currently supports only Prometheus-style alerts. If other alert types
|
||||
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
|
||||
//
|
||||
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
|
||||
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
|
||||
//
|
||||
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
|
||||
// `datasource_type=prometheus`.
|
||||
//
|
||||
// See:
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
|
||||
q := req.URL.Query()
|
||||
q.Set("datasource_type", "prometheus")
|
||||
req.URL.RawQuery = q.Encode()
|
||||
req.RequestURI = ""
|
||||
}
|
||||
|
||||
vmalertProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
var (
|
||||
vmalertProxyHost string
|
||||
vmalertProxy *nethttputil.ReverseProxy
|
||||
vmuiConfig string
|
||||
)
|
||||
var vmuiConfig string
|
||||
|
||||
func initVMUIConfig() {
|
||||
var cfg struct {
|
||||
@@ -795,16 +757,3 @@ func initVMUIConfig() {
|
||||
}
|
||||
vmuiConfig = string(data)
|
||||
}
|
||||
|
||||
// initVMAlertProxy must be called after flag.Parse(), since it uses command-line flags.
|
||||
func initVMAlertProxy() {
|
||||
if len(*vmalertProxyURL) == 0 {
|
||||
return
|
||||
}
|
||||
proxyURL, err := url.Parse(*vmalertProxyURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", *vmalertProxyURL, err)
|
||||
}
|
||||
vmalertProxyHost = proxyURL.Host
|
||||
vmalertProxy = nethttputil.NewSingleHostReverseProxy(proxyURL)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-m
|
||||
currentItem := 0
|
||||
%}
|
||||
{% for _, row := range result %}
|
||||
"{%s string(row.MetricFamilyName) %}": [
|
||||
{%q= string(row.MetricFamilyName) %}: [
|
||||
{
|
||||
"type": {%q= row.Type.String() %},
|
||||
{% if len(row.Unit) > 0 -%}
|
||||
|
||||
@@ -35,12 +35,10 @@ func StreamMetadataResponse(qw422016 *qt422016.Writer, result []*metricsmetadata
|
||||
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:17
|
||||
for _, row := range result {
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:17
|
||||
qw422016.N().S(`"`)
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:18
|
||||
qw422016.E().S(string(row.MetricFamilyName))
|
||||
qw422016.N().Q(string(row.MetricFamilyName))
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:18
|
||||
qw422016.N().S(`": [{"type":`)
|
||||
qw422016.N().S(`: [{"type":`)
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:20
|
||||
qw422016.N().Q(row.Type.String())
|
||||
//line app/vmselect/prometheus/metadata_response.qtpl:20
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
@@ -525,6 +526,7 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
|
||||
if deletedCount > 0 {
|
||||
promql.ResetRollupResultCache()
|
||||
}
|
||||
logger.Infof("/api/v1/admin/tsdb/delete_series has been called for %q. Deleted %d series.", sq.FiltersString(), deletedCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -954,6 +956,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
mayCache := !httputil.GetBool(r, "nocache")
|
||||
optimizeRepeatedBinaryOpSubexprs := httputil.GetBool(r, "optimize_repeated_binary_op_subexprs")
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -975,18 +978,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
}
|
||||
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
OptimizeRepeatedBinaryOpSubexprs: optimizeRepeatedBinaryOpSubexprs,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
@@ -132,6 +132,9 @@ type EvalConfig struct {
|
||||
// Whether the response can be cached.
|
||||
MayCache bool
|
||||
|
||||
// Whether repeated cacheable binary op subexpressions can be optimized.
|
||||
OptimizeRepeatedBinaryOpSubexprs bool
|
||||
|
||||
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||
LookbackDelta int64
|
||||
|
||||
@@ -171,6 +174,7 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
||||
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
|
||||
ec.Deadline = src.Deadline
|
||||
ec.MayCache = src.MayCache
|
||||
ec.OptimizeRepeatedBinaryOpSubexprs = src.OptimizeRepeatedBinaryOpSubexprs
|
||||
ec.LookbackDelta = src.LookbackDelta
|
||||
ec.RoundDigits = src.RoundDigits
|
||||
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
|
||||
@@ -467,7 +471,8 @@ func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
|
||||
}
|
||||
|
||||
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
|
||||
if !canPushdownCommonFilters(be) {
|
||||
canPushdown := canPushdownCommonFilters(be)
|
||||
if !canPushdown && !shouldOptimizeRepeatedBinaryOpSubexprs(ec, exprFirst, exprSecond) {
|
||||
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
|
||||
// from exprFirst to exprSecond.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
|
||||
@@ -500,6 +505,25 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
if !canPushdown {
|
||||
qt = qt.NewChild("execute left and right sides of %q sequentially because repeated cacheable subexpression was found", be.Op)
|
||||
defer qt.Done()
|
||||
|
||||
qtFirst := qt.NewChild("expr1")
|
||||
tssFirst, err := evalExpr(qtFirst, ec, exprFirst)
|
||||
qtFirst.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
qtSecond := qt.NewChild("expr2")
|
||||
tssSecond, err := evalExpr(qtSecond, ec, exprSecond)
|
||||
qtSecond.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
// Execute binary operation in the following way:
|
||||
//
|
||||
@@ -544,6 +568,78 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
func shouldOptimizeRepeatedBinaryOpSubexprs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr) bool {
|
||||
if !ec.OptimizeRepeatedBinaryOpSubexprs {
|
||||
return false
|
||||
}
|
||||
if ec.Start == ec.End {
|
||||
return false
|
||||
}
|
||||
if !ec.mayCache() {
|
||||
return false
|
||||
}
|
||||
|
||||
candidatesFirst := make(map[string]struct{}, 1)
|
||||
var b []byte
|
||||
visitOptimizedAggrs(exprFirst, func(ae *metricsql.AggrFuncExpr) {
|
||||
if hasUnseededVolatileFunc(ae) {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
candidatesFirst[string(b)] = struct{}{}
|
||||
})
|
||||
if len(candidatesFirst) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
repeated := false
|
||||
visitOptimizedAggrs(exprSecond, func(ae *metricsql.AggrFuncExpr) {
|
||||
if repeated {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
_, repeated = candidatesFirst[string(b)]
|
||||
})
|
||||
return repeated
|
||||
}
|
||||
|
||||
func visitOptimizedAggrs(e metricsql.Expr, f func(ae *metricsql.AggrFuncExpr)) {
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
ae, ok := expr.(*metricsql.AggrFuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if getIncrementalAggrFuncCallbacks(ae.Name) == nil {
|
||||
return
|
||||
}
|
||||
fe, _ := tryGetArgRollupFuncWithMetricExpr(ae)
|
||||
if fe == nil {
|
||||
return
|
||||
}
|
||||
f(ae)
|
||||
})
|
||||
}
|
||||
|
||||
func hasUnseededVolatileFunc(e metricsql.Expr) bool {
|
||||
found := false
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
fe, ok := expr.(*metricsql.FuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch strings.ToLower(fe.Name) {
|
||||
case "now":
|
||||
found = true
|
||||
case "rand", "rand_normal", "rand_exponential":
|
||||
found = len(fe.Args) == 0
|
||||
}
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
||||
if len(tss) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -170,3 +170,87 @@ func TestGetSumInstantValues(t *testing.T) {
|
||||
[]*timeseries{ts("foo", 100, 1)},
|
||||
)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsGate(t *testing.T) {
|
||||
e, err := metricsql.Parse(`count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(): %s", err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type; got %T; want *metricsql.BinaryOpExpr", e)
|
||||
}
|
||||
|
||||
f := func(name string, ec *EvalConfig, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v", name, result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("disabled optimization", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
}, false)
|
||||
f("disabled cache", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("instant query", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 1000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("repeated cacheable aggregate subexpression", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, true)
|
||||
f("unaligned range query", &EvalConfig{
|
||||
Start: 1001,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsExpressions(t *testing.T) {
|
||||
f := func(name, q string, resultExpected bool) {
|
||||
t.Helper()
|
||||
e, err := metricsql.Parse(q)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(%q) for %q: %s", q, name, err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type for %q; got %T; want *metricsql.BinaryOpExpr", name, e)
|
||||
}
|
||||
ec := &EvalConfig{Start: 1000, End: 2000, Step: 1000, MayCache: true, OptimizeRepeatedBinaryOpSubexprs: true}
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v; query: %q", name, result, resultExpected, q)
|
||||
}
|
||||
}
|
||||
|
||||
f("original issue query", `count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`, true)
|
||||
f("right side contains repeated count aggregate", `count(foo) by (job) / (count(foo) by (job) + 1)`, true)
|
||||
f("same sum aggregate", `sum(rate(foo[5m])) by (job) / sum(rate(foo[5m])) by (job)`, true)
|
||||
f("same inner rollup but different aggregates", `sum(rate(foo[5m])) by (job) / count(rate(foo[5m])) by (job)`, false)
|
||||
f("different count aggregates", `count(foo) by (job) / count(bar) by (job)`, false)
|
||||
f("bare metric selector", `foo / foo`, false)
|
||||
f("bare rollup function", `rate(a[5m]) / rate(a[5m])`, false)
|
||||
f("now at modifier", `sum(rate(foo[5m] @ now())) by (job) / sum(rate(foo[5m] @ now())) by (job)`, false)
|
||||
f("unseeded rand at modifier", `sum(rate(foo[5m] @ rand())) by (job) / sum(rate(foo[5m] @ rand())) by (job)`, false)
|
||||
f("unseeded rand_normal at modifier", `sum(rate(foo[5m] @ rand_normal())) by (job) / sum(rate(foo[5m] @ rand_normal())) by (job)`, false)
|
||||
f("unseeded rand_exponential at modifier", `sum(rate(foo[5m] @ rand_exponential())) by (job) / sum(rate(foo[5m] @ rand_exponential())) by (job)`, false)
|
||||
f("seeded rand at modifier", `sum(rate(foo[5m] @ rand(1))) by (job) / sum(rate(foo[5m] @ rand(1))) by (job)`, true)
|
||||
}
|
||||
|
||||
@@ -1055,7 +1055,7 @@ func newRollupHoltWinters(args []any) (rollupFunc, error) {
|
||||
return nan
|
||||
}
|
||||
|
||||
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing .
|
||||
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing_%28Holt_linear%29 .
|
||||
// TODO: determine whether this shit really works.
|
||||
s0 := rfa.prevValue
|
||||
if math.IsNaN(s0) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -288,6 +289,9 @@ func marshalMetricTagsSorted(dst []byte, mn *storage.MetricName) []byte {
|
||||
}
|
||||
|
||||
func marshalBytesFast(dst []byte, s []byte) []byte {
|
||||
if len(s) > math.MaxUint16 {
|
||||
logger.Panicf("BUG: s len %d cannot exceed %d", len(s), math.MaxUint16)
|
||||
}
|
||||
dst = encoding.MarshalUint16(dst, uint16(len(s)))
|
||||
dst = append(dst, s...)
|
||||
return dst
|
||||
|
||||
@@ -482,7 +482,7 @@ See also [hoeffding_bound_lower](#hoeffding_bound_lower).
|
||||
#### holt_winters
|
||||
|
||||
`holt_winters(series_selector[d], sf, tf)` is a [rollup function](#rollup-functions), which calculates Holt-Winters value
|
||||
(aka [double exponential smoothing](https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing)) for [raw samples](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#raw-samples)
|
||||
(aka [double exponential smoothing](https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing_%28Holt_linear%29)) for [raw samples](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#raw-samples)
|
||||
over the given lookbehind window `d` using the given smoothing factor `sf` and the given trend factor `tf`.
|
||||
Both `sf` and `tf` must be in the range `[0...1]`.
|
||||
|
||||
@@ -1154,7 +1154,7 @@ See also [asin](#asin) and [cos](#cos).
|
||||
#### acosh
|
||||
|
||||
`acosh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic cosine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_cosine) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic cosine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
@@ -1176,7 +1176,7 @@ See also [acos](#acos) and [sin](#sin).
|
||||
#### asinh
|
||||
|
||||
`asinh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic sine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_sine) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic sine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
@@ -1198,7 +1198,7 @@ See also [tan](#tan).
|
||||
#### atanh
|
||||
|
||||
`atanh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic tangent](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_tangent) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic tangent](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
|
||||
File diff suppressed because one or more lines are too long
197
app/vmselect/vmui/assets/index-CusQvJzs.js
Normal file
197
app/vmselect/vmui/assets/index-CusQvJzs.js
Normal file
File diff suppressed because one or more lines are too long
@@ -1 +0,0 @@
|
||||
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||e((t={exports:{}}).exports,t),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};
|
||||
1
app/vmselect/vmui/assets/rolldown-runtime-Cyuzqnbw.js
Normal file
1
app/vmselect/vmui/assets/rolldown-runtime-Cyuzqnbw.js
Normal file
@@ -0,0 +1 @@
|
||||
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||(e((t={exports:{}}).exports,t),e=null),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};
|
||||
78
app/vmselect/vmui/assets/vendor-B83wxFqK.js
Normal file
78
app/vmselect/vmui/assets/vendor-B83wxFqK.js
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -37,9 +37,9 @@
|
||||
<meta property="og:title" content="UI for VictoriaMetrics">
|
||||
<meta property="og:url" content="https://victoriametrics.com/">
|
||||
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
|
||||
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
|
||||
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
|
||||
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
|
||||
<script type="module" crossorigin src="./assets/index-CusQvJzs.js"></script>
|
||||
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-Cyuzqnbw.js">
|
||||
<link rel="modulepreload" crossorigin href="./assets/vendor-B83wxFqK.js">
|
||||
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
|
||||
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
|
||||
</head>
|
||||
|
||||
@@ -30,6 +30,9 @@ var (
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
|
||||
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
|
||||
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
|
||||
vmselectAddr = flag.String("vmselectAddr", "", "TCP address to accept connections from vmselect services")
|
||||
vmselectDisableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
|
||||
"This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
|
||||
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
|
||||
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
|
||||
@@ -108,7 +111,7 @@ func DataPath() string {
|
||||
}
|
||||
|
||||
// Init initializes vmstorage.
|
||||
func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Duration, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
storage.SetDedupInterval(*minScrapeInterval)
|
||||
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
|
||||
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
|
||||
@@ -169,6 +172,21 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
|
||||
storageMetrics.RegisterMetricsWriter(vmStorage.writeStorageMetrics)
|
||||
metrics.RegisterSet(storageMetrics)
|
||||
|
||||
if *vmselectAddr != "" {
|
||||
var err error
|
||||
limits := vmselectapi.Limits{
|
||||
MaxConcurrentRequests: vmselectMaxConcurrentRequests,
|
||||
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
|
||||
MaxQueueDuration: vmselectMaxQueueDuration,
|
||||
MaxQueueDurationFlagName: "search.maxQueueDuration",
|
||||
}
|
||||
api := newVMStorageWithTenantID(vmStorage)
|
||||
vmselectSrv, err = vmselectapi.NewServer(*vmselectAddr, api, limits, *vmselectDisableRPCCompression)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
VMInsertAPI = vmStorage
|
||||
VMSelectAPI = vmStorage
|
||||
GetSearch = vmStorage.GetSearch
|
||||
@@ -191,6 +209,8 @@ var (
|
||||
|
||||
// TODO(@rtm0): Remove this dependency from vmalert-tool unit tests.
|
||||
DebugFlush func()
|
||||
|
||||
vmselectSrv *vmselectapi.Server
|
||||
)
|
||||
|
||||
// Stop stops the vmstorage
|
||||
@@ -201,6 +221,10 @@ func Stop() {
|
||||
|
||||
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
|
||||
startTime := time.Now()
|
||||
|
||||
if vmselectSrv != nil {
|
||||
vmselectSrv.MustStop()
|
||||
}
|
||||
vmStorage.Stop()
|
||||
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
|
||||
|
||||
|
||||
@@ -164,6 +164,10 @@ func (vms *VMStorage) IsReadOnly() bool {
|
||||
}
|
||||
|
||||
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
return vms.initSearch(qt, sq, marshalDefault, deadline)
|
||||
}
|
||||
|
||||
func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, marshal marshalFunc, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
vms.wg.Add(1)
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
@@ -178,6 +182,7 @@ func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
bi := getBlockIterator()
|
||||
bi.marshal = marshal
|
||||
bi.wgDone = vms.wg.Done
|
||||
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
|
||||
if err := bi.sr.Error(); err != nil {
|
||||
@@ -198,11 +203,19 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
|
||||
return searchQueryLimit
|
||||
}
|
||||
|
||||
type marshalFunc func(dst []byte, src *storage.MetricBlock) []byte
|
||||
|
||||
// marshalDefault is the default implementation of the MetricBlock marshaling.
|
||||
func marshalDefault(dst []byte, src *storage.MetricBlock) []byte {
|
||||
return src.Marshal(dst)
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
wgDone func()
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
marshal marshalFunc
|
||||
wgDone func()
|
||||
}
|
||||
|
||||
var blockIteratorsPool sync.Pool
|
||||
@@ -231,7 +244,7 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
mb := bi.mb
|
||||
mb.MetricName = bi.sr.MetricBlockRef.MetricName
|
||||
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
|
||||
dst = mb.Marshal(dst[:0])
|
||||
dst = bi.marshal(dst[:0], &mb)
|
||||
return dst, true
|
||||
}
|
||||
|
||||
|
||||
282
app/vmstorage/vmstorage_with_tenant_id.go
Normal file
282
app/vmstorage/vmstorage_with_tenant_id.go
Normal file
@@ -0,0 +1,282 @@
|
||||
package vmstorage
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
|
||||
var (
|
||||
accountID = flag.Uint64("accountID", 0, "The accountID of the stored data")
|
||||
projectID = flag.Uint64("projectID", 0, "The projectID of the stored data")
|
||||
)
|
||||
|
||||
func newVMStorageWithTenantID(vms *VMStorage) *VMStorageWithTenantID {
|
||||
if *accountID > math.MaxUint32 {
|
||||
logger.Fatalf("-accountID must be in the range [0, %d], got %d", uint32(math.MaxUint32), *accountID)
|
||||
}
|
||||
if *projectID > math.MaxUint32 {
|
||||
logger.Fatalf("-projectID must be in the range [0, %d], got %d", uint32(math.MaxUint32), *projectID)
|
||||
}
|
||||
return &VMStorageWithTenantID{
|
||||
vms: vms,
|
||||
accountID: uint32(*accountID),
|
||||
projectID: uint32(*projectID),
|
||||
}
|
||||
}
|
||||
|
||||
// VMStorageWithTenantID is a thin wrapper around VMStorage type that overrides
|
||||
// its methods to properly serve requests coming from a vmselect (require
|
||||
// tenantID).
|
||||
//
|
||||
// A new instance of this type should be created using
|
||||
// newVMStorageWithTenantID(). The created instance does not require closing.
|
||||
// The instance also does not take ownership of vms and it is the responsibility
|
||||
// of the caller to close vms.
|
||||
type VMStorageWithTenantID struct {
|
||||
vms *VMStorage
|
||||
|
||||
accountID uint32
|
||||
projectID uint32
|
||||
}
|
||||
|
||||
// InitSearch initializes a storage search for a request initiated by a
|
||||
// vmselect.
|
||||
//
|
||||
// The search is initialized only if the search query is either multitenant or
|
||||
// its accountID and projectID match -accountID and -projectID flag values.
|
||||
// Otherwise, the method returns an interator that will return no data.
|
||||
//
|
||||
// The method also overrides the data format of the data returned by the
|
||||
// iterator by prepending accountID and projectID bytes to the metric name and
|
||||
// the data block (a format used in vmcluster).
|
||||
func (vmst *VMStorageWithTenantID) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return emptyBI, nil
|
||||
}
|
||||
return vmst.vms.initSearch(qt, sq, vmst.marshalMetricBlock, deadline)
|
||||
}
|
||||
|
||||
var emptyBI = &emptyBlockIterator{}
|
||||
|
||||
// emptyBlockIterator is an implementation of vmselectapi.BlockIterator that
|
||||
// always returns no data.
|
||||
type emptyBlockIterator struct{}
|
||||
|
||||
func (*emptyBlockIterator) MustClose() {}
|
||||
|
||||
func (*emptyBlockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
|
||||
func (*emptyBlockIterator) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// marshalMetricBlock serializes a metric block in the format expected by
|
||||
// vmselect.
|
||||
//
|
||||
// vmselect expects metric names and data blocks to have the tenantID but
|
||||
// vmsingle does not have it. Therefore the tenantID needs to be included to
|
||||
// every metric name and block.
|
||||
func (vmst *VMStorageWithTenantID) marshalMetricBlock(dst []byte, src *storage.MetricBlock) []byte {
|
||||
// Marshal metric name:
|
||||
// 1. Marshal metric name length + accountID length + projectID length (in
|
||||
// bytes).
|
||||
// 2. append accountID and projectID bytes
|
||||
// 3. Finally append metric name bytes
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(src.MetricName))+8)
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
dst = append(dst, src.MetricName...)
|
||||
|
||||
// Marshal data block.
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
dst = storage.MarshalBlock(dst, &src.Block)
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
// SearchMetricNames searches the storage for metric names that match the query.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
//
|
||||
// Found metric names are prepended with accountID and projectID bytes (a format
|
||||
// used in vmcluster).
|
||||
func (vmst *VMStorageWithTenantID) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metricNames, err := vmst.vms.SearchMetricNames(qt, sq, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// vmselect expects metric names to have the tenantID but vmsingle does not
|
||||
// have it. Therefore the tenantID needs to be prepended to every metric
|
||||
// name.
|
||||
dst := make([]byte, 0, 8)
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
tenantID := string(dst)
|
||||
|
||||
for i, metricName := range metricNames {
|
||||
metricNames[i] = tenantID + metricName
|
||||
}
|
||||
return metricNames, nil
|
||||
}
|
||||
|
||||
// LabelValues searches the storage for label values that match the query and
|
||||
// correspond to a label whose name is `labelName`. The returned result
|
||||
// will contain not more than `maxLabelValues`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
|
||||
}
|
||||
|
||||
// TagValueSuffixes searches the storage for Graphite tag value suffixes. The
|
||||
// returned result will contain not more than `maxSuffixes`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
if !vmst.isValidTenantID(accountID, projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
}
|
||||
|
||||
// LabelNames searches the storage for label names that match the query.
|
||||
// The returned result will contain not more than `maxLabelNames`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.LabelNames(qt, sq, maxLabelNames, deadline)
|
||||
}
|
||||
|
||||
// SeriesCount returns the total number of metrics stored in the database.
|
||||
//
|
||||
// The method may return inflated numbers. How inflated the count depends
|
||||
// on the churn rate and the retention period. For example, if a metric lasts
|
||||
// for 2 months, it will be counted twice.
|
||||
//
|
||||
// The method also counts the deleted metrics.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||
if !vmst.isValidTenantID(accountID, projectID) {
|
||||
return 0, nil
|
||||
}
|
||||
return vmst.vms.SeriesCount(qt, accountID, projectID, deadline)
|
||||
}
|
||||
|
||||
// Tenants returns just one tenant consisting of the -accountID and -projectID
|
||||
// flag values.
|
||||
func (vmst *VMStorageWithTenantID) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
||||
tenantID := fmt.Sprintf("%d:%d", vmst.accountID, vmst.projectID)
|
||||
return []string{tenantID}, nil
|
||||
}
|
||||
|
||||
// TSDBStatus retrieves the status for metrics that match to the search query.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return empty
|
||||
// status.
|
||||
func (vmst *VMStorageWithTenantID) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return &storage.TSDBStatus{}, nil
|
||||
}
|
||||
return vmst.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
|
||||
}
|
||||
|
||||
// DeleteSeries marks as deleted metrics that match the search query.
|
||||
// The method returns the number of deleted metrics.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, no metrics will be deleted
|
||||
// and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
if !vmst.hasValidTenantID(sq) {
|
||||
return 0, nil
|
||||
}
|
||||
return vmst.vms.DeleteSeries(qt, sq, deadline)
|
||||
}
|
||||
|
||||
// RegisterMetricNames registers metric names in the index, the sample values
|
||||
// and timestamps are ignored.
|
||||
func (vmst *VMStorageWithTenantID) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
|
||||
return vmst.vms.RegisterMetricNames(qt, mrs, deadline)
|
||||
}
|
||||
|
||||
// GetMetricNamesUsageStats retrieves the usage stats for metrics whose name
|
||||
// matches the pattern.
|
||||
//
|
||||
// If the request is not multitenant or the request accountID and projectID do
|
||||
// not match the -accoutID and -projectID flag values, no metrics will be
|
||||
// deleted and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
|
||||
if !vmst.isValidTenantToken(tt) {
|
||||
return metricnamestats.StatsResult{}, nil
|
||||
}
|
||||
return vmst.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
}
|
||||
|
||||
// ResetMetricNamesUsageStats resets the metric name usage stats.
|
||||
func (vmst *VMStorageWithTenantID) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
|
||||
return vmst.vms.ResetMetricNamesUsageStats(qt, deadline)
|
||||
}
|
||||
|
||||
// GetMetadataRecords retrieves the metadata for the metricName.
|
||||
//
|
||||
// If the request is not multitenant or the request accountID and projectID do
|
||||
// not match the -accoutID and -projectID flag values, no metrics will be
|
||||
// deleted and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
|
||||
if !vmst.isValidTenantToken(tt) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
|
||||
}
|
||||
|
||||
// hasValidTenantID returns true if the search query is either multitenant or
|
||||
// its accountID and projectID match -accountID and -projectID flag values.
|
||||
func (vmst *VMStorageWithTenantID) hasValidTenantID(sq *storage.SearchQuery) bool {
|
||||
return sq.IsMultiTenant || vmst.isValidTenantID(sq.AccountID, sq.ProjectID)
|
||||
}
|
||||
|
||||
// isValidTenantToken returns true if the TenantToken is either multitenant or
|
||||
// its accountID and projectID match -accountID and -projectID flag values.
|
||||
func (vmst *VMStorageWithTenantID) isValidTenantToken(tt *storage.TenantToken) bool {
|
||||
return tt == nil || vmst.isValidTenantID(tt.AccountID, tt.ProjectID)
|
||||
}
|
||||
|
||||
// isValidTenantID returns true if the accountID and projectID match -accountID
|
||||
// and -projectID flag values.
|
||||
func (vmst *VMStorageWithTenantID) isValidTenantID(accountID, projectID uint32) bool {
|
||||
return accountID == vmst.accountID && projectID == vmst.projectID
|
||||
}
|
||||
@@ -6,7 +6,7 @@ COPY web/ /build/
|
||||
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \
|
||||
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/
|
||||
|
||||
FROM alpine:3.23.4
|
||||
FROM alpine:3.24.1
|
||||
USER root
|
||||
|
||||
COPY --from=build-web-stage /build/web-amd64 /app/web
|
||||
|
||||
@@ -91,9 +91,9 @@ The list of MetricsQL features on top of PromQL:
|
||||
Labels from the `on()` list aren't copied.
|
||||
* [Aggregate functions](#aggregate-functions) accept arbitrary number of args.
|
||||
For example, `avg(q1, q2, q3)` would return the average values for every point across time series returned by `q1`, `q2` and `q3`.
|
||||
* [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) can be put anywhere in the query.
|
||||
* [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#-modifier) can be put anywhere in the query.
|
||||
For example, `sum(foo) @ end()` calculates `sum(foo)` at the `end` timestamp of the selected time range `[start ... end]`.
|
||||
* Arbitrary subexpression can be used as [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier).
|
||||
* Arbitrary subexpression can be used as [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#-modifier).
|
||||
For example, `foo @ (end() - 1h)` calculates `foo` at the `end - 1 hour` timestamp on the selected time range `[start ... end]`.
|
||||
* [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier), lookbehind window in square brackets
|
||||
and `step` value for [subquery](#subqueries) may refer to the current step aka `$__interval` value from Grafana with `[Ni]` syntax.
|
||||
@@ -482,7 +482,7 @@ See also [hoeffding_bound_lower](#hoeffding_bound_lower).
|
||||
#### holt_winters
|
||||
|
||||
`holt_winters(series_selector[d], sf, tf)` is a [rollup function](#rollup-functions), which calculates Holt-Winters value
|
||||
(aka [double exponential smoothing](https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing)) for [raw samples](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#raw-samples)
|
||||
(aka [double exponential smoothing](https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing_%28Holt_linear%29)) for [raw samples](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#raw-samples)
|
||||
over the given lookbehind window `d` using the given smoothing factor `sf` and the given trend factor `tf`.
|
||||
Both `sf` and `tf` must be in the range `[0...1]`.
|
||||
|
||||
@@ -1154,7 +1154,7 @@ See also [asin](#asin) and [cos](#cos).
|
||||
#### acosh
|
||||
|
||||
`acosh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic cosine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_cosine) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic cosine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
@@ -1176,7 +1176,7 @@ See also [acos](#acos) and [sin](#sin).
|
||||
#### asinh
|
||||
|
||||
`asinh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic sine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_sine) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic sine](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
@@ -1198,7 +1198,7 @@ See also [tan](#tan).
|
||||
#### atanh
|
||||
|
||||
`atanh(q)` is a [transform function](#transform-functions), which returns
|
||||
[inverse hyperbolic tangent](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Inverse_hyperbolic_tangent) for every point of every time series returned by `q`.
|
||||
[inverse hyperbolic tangent](https://en.wikipedia.org/wiki/Inverse_hyperbolic_functions#Definitions_in_terms_of_logarithms) for every point of every time series returned by `q`.
|
||||
|
||||
Metric names are stripped from the resulting series. Add [keep_metric_names](#keep_metric_names) modifier in order to keep metric names.
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import uPlot from "uplot";
|
||||
import Button from "../../Main/Button/Button";
|
||||
import { CloseIcon, DragIcon } from "../../Main/Icons";
|
||||
import { SeriesItemStatsFormatted } from "../../../types";
|
||||
import { STATS_ORDER } from "../../../constants/graph";
|
||||
import { STATS_ORDER_TOOLTIP } from "../../../constants/graph";
|
||||
|
||||
export interface ChartTooltipProps {
|
||||
u?: uPlot;
|
||||
@@ -164,7 +164,7 @@ const ChartTooltip: FC<ChartTooltipProps> = ({
|
||||
</div>
|
||||
{statsFormatted && (
|
||||
<table className="vm-chart-tooltip-stats">
|
||||
{STATS_ORDER.map((key, i) => (
|
||||
{STATS_ORDER_TOOLTIP.map((key, i) => (
|
||||
<div
|
||||
className="vm-chart-tooltip-stats-row"
|
||||
key={i}
|
||||
|
||||
@@ -61,7 +61,7 @@ const LegendConfigs: FC<Props> = ({ data, isCompact }) => {
|
||||
label: "Hide Statistics",
|
||||
value: hideStats,
|
||||
onChange: onChangeStats,
|
||||
info: "If enabled, hides the display of min, median, and max values.",
|
||||
info: "If enabled, hides the display of min, median, max, and last values.",
|
||||
}
|
||||
];
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import "./style.scss";
|
||||
import classNames from "classnames";
|
||||
import { getFreeFields } from "./helpers";
|
||||
import useCopyToClipboard from "../../../../../hooks/useCopyToClipboard";
|
||||
import { STATS_ORDER } from "../../../../../constants/graph";
|
||||
import { STATS_ORDER_LEGEND } from "../../../../../constants/graph";
|
||||
import { useShowStats } from "../hooks/useShowStats";
|
||||
import { useLegendFormat } from "../hooks/useLegendFormat";
|
||||
import { getLabelAlias } from "../../../../../utils/metric";
|
||||
@@ -80,7 +80,7 @@ const LegendItem: FC<LegendItemProps> = ({ legend, onChange, duplicateFields })
|
||||
</div>
|
||||
{!hideStats && showStats && (
|
||||
<div className="vm-legend-item-stats">
|
||||
{STATS_ORDER.map((key, i) => (
|
||||
{STATS_ORDER_LEGEND.map((key, i) => (
|
||||
<div
|
||||
className="vm-legend-item-stats-row"
|
||||
key={i}
|
||||
|
||||
@@ -4,11 +4,11 @@ import "./style.scss";
|
||||
import { LegendItemType } from "../../../../../types";
|
||||
import { MouseEvent } from "react";
|
||||
import classNames from "classnames";
|
||||
import { STATS_ORDER } from "../../../../../constants/graph";
|
||||
import { STATS_ORDER_LEGEND } from "../../../../../constants/graph";
|
||||
import { useShowStats } from "../hooks/useShowStats";
|
||||
import { getValueByPath } from "../../../../../utils/object";
|
||||
|
||||
const statsColumns = STATS_ORDER.map(k => ({
|
||||
const statsColumns = STATS_ORDER_LEGEND.map(k => ({
|
||||
key: `statsFormatted.${k}`,
|
||||
title: k
|
||||
}));
|
||||
|
||||
@@ -183,7 +183,7 @@ const StepConfigurator: FC = () => {
|
||||
<div className="vm-step-control-popper-info">
|
||||
<p>
|
||||
<code>step</code> - the <Hyperlink
|
||||
href="https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations"
|
||||
href="https://prometheus.io/docs/prometheus/latest/querying/basics/#float-literals-and-time-durations"
|
||||
text="interval"
|
||||
/> between datapoints, which must be returned from the range query.
|
||||
The <code>query</code> is executed
|
||||
|
||||
@@ -26,4 +26,5 @@ export const GRAPH_SIZES: GraphSize[] = [
|
||||
},
|
||||
];
|
||||
|
||||
export const STATS_ORDER: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max"];
|
||||
export const STATS_ORDER_LEGEND: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max", "last"];
|
||||
export const STATS_ORDER_TOOLTIP: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max"];
|
||||
|
||||
@@ -4,6 +4,7 @@ export interface SeriesItemStatsFormatted {
|
||||
min: string,
|
||||
max: string,
|
||||
median: string,
|
||||
last: string,
|
||||
}
|
||||
|
||||
export interface SeriesItem extends Series {
|
||||
|
||||
@@ -16,7 +16,7 @@ const supportedValuesOf = Intl.supportedValuesOf;
|
||||
export const supportedTimezones = supportedValuesOf ? supportedValuesOf("timeZone") as string[] : timezones;
|
||||
|
||||
// The list of supported units could be the following -
|
||||
// https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations
|
||||
// https://prometheus.io/docs/prometheus/latest/querying/basics/#float-literals-and-time-durations
|
||||
export const supportedDurations = [
|
||||
{ long: "years", short: "y", possible: "year" },
|
||||
{ long: "weeks", short: "w", possible: "week" },
|
||||
|
||||
@@ -53,6 +53,7 @@ const getSeriesStatistics = (d: MetricResult) => {
|
||||
min: formatPrettyNumber(min, min, max),
|
||||
max: formatPrettyNumber(max, min, max),
|
||||
median: formatPrettyNumber(median, min, max),
|
||||
last: formatPrettyNumber(values.at(-1), min, max),
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -4,7 +4,7 @@ The `apptest` package contains the integration tests for the VictoriaMetrics
|
||||
applications (such as vmstorage, vminsert, and vmselect).
|
||||
|
||||
An integration test aims at verifying the behavior of an application as a whole,
|
||||
as apposed to a unit test that verifies the behavior of a building block of an
|
||||
as opposed to a unit test that verifies the behavior of a building block of an
|
||||
application.
|
||||
|
||||
To achieve that an integration test starts an application in a separate process
|
||||
@@ -19,10 +19,10 @@ work together as a system.
|
||||
The package provides a collection of helpers to start applications and make
|
||||
queries to them:
|
||||
|
||||
- `app.go` - contains the generic code for staring an application and should
|
||||
- `app.go` - contains the generic code for starting an application and should
|
||||
not be used by integration tests directly.
|
||||
- `{vmstorage,vminsert,etc}.go` - build on top of `app.go` and provide the
|
||||
code for staring a specific application.
|
||||
code for starting a specific application.
|
||||
- `client.go` - provides helper functions for sending HTTP requests to
|
||||
applications.
|
||||
|
||||
@@ -36,7 +36,7 @@ the application binary files to be built and put into the `bin` directory. The
|
||||
build rule used for running integration tests, `make apptest`,
|
||||
accounts for that, it builds all application binaries before running the tests.
|
||||
But if you want to run the tests without `make`, i.e. by executing
|
||||
`go test ./app/apptest`, you will need to build the binaries first (for example,
|
||||
`go test ./apptest/tests`, you will need to build the binaries first (for example,
|
||||
by executing `make all`).
|
||||
|
||||
Not all binaries can be built from `master` branch, cluster binaries can be built
|
||||
|
||||
358
apptest/testdata.go
Normal file
358
apptest/testdata.go
Normal file
@@ -0,0 +1,358 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
)
|
||||
|
||||
type TestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantSeries []map[string]string
|
||||
WantLabels []string
|
||||
WantLabelValues []string
|
||||
WantQueryResults []*QueryResult
|
||||
WantMetadata map[string][]MetadataEntry
|
||||
WantMetricNamesStats []MetricNamesStatsRecord
|
||||
}
|
||||
|
||||
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
|
||||
d := TestData{
|
||||
Samples: []string{},
|
||||
Step: (end - start) / numMetrics,
|
||||
WantSeries: make([]map[string]string, numMetrics),
|
||||
WantLabels: make([]string, numMetrics),
|
||||
WantLabelValues: make([]string, numMetrics),
|
||||
WantQueryResults: make([]*QueryResult, numMetrics),
|
||||
WantMetadata: make(map[string][]MetadataEntry),
|
||||
WantMetricNamesStats: make([]MetricNamesStatsRecord, numMetrics),
|
||||
}
|
||||
for i := range numMetrics {
|
||||
metricName := fmt.Sprintf("%s_%04d", prefix, i)
|
||||
metricHelp := fmt.Sprintf("# HELP %s some help message", metricName)
|
||||
metricType := fmt.Sprintf("# TYPE %s gauge", metricName)
|
||||
labelName := fmt.Sprintf("label_%04d", i)
|
||||
labelValue := fmt.Sprintf("value_%04d", i)
|
||||
value := i
|
||||
timestamp := start + i*d.Step
|
||||
sample := fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
|
||||
|
||||
d.Samples = append(d.Samples, metricHelp, metricType, sample)
|
||||
d.WantSeries[i] = map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
}
|
||||
d.WantLabels[i] = labelName
|
||||
d.WantLabelValues[i] = labelValue
|
||||
d.WantQueryResults[i] = &QueryResult{
|
||||
Metric: map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
},
|
||||
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
|
||||
}
|
||||
d.WantMetadata[metricName] = []MetadataEntry{{Help: "some help message", Type: "gauge"}}
|
||||
d.WantMetricNamesStats[i].MetricName = metricName
|
||||
}
|
||||
d.WantLabels = append(d.WantLabels, "__name__", "label")
|
||||
slices.Sort(d.WantLabels)
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertSeries retrieves metric names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []map[string]string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
}).Sort()
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
Retries: 1000,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertSeriesCount retrieves series count and compares it with expected one.
|
||||
func AssertSeriesCount(tc *TestCase, app PrometheusQuerier, tenantID string, start, end int64, want uint64) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series/count response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1SeriesCount(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesCountResponse{
|
||||
Status: "success",
|
||||
Data: []uint64{want},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabels retrieves label names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertLabels(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1Labels(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelsResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabelValues retrieves values for the label whose name is labelName for
|
||||
// the series whose name mathes metricNameRE, compares the result with the
|
||||
// expected one.
|
||||
func AssertLabelValues(tc *TestCase, app PrometheusQuerier, metricNameRE, labelName, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels/.../values response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1LabelValues(tc.T(), labelName, query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelValuesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertQueryResults sends a data query to storage and compares the query
|
||||
// result with the expected one.
|
||||
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end, step int64, want []*QueryResult) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/query_range response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: fmt.Sprintf("%dms", step),
|
||||
MaxLookback: fmt.Sprintf("%dms", step-1),
|
||||
NoCache: "1",
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1QueryResponse{
|
||||
Status: "success",
|
||||
Data: &QueryData{
|
||||
ResultType: "matrix",
|
||||
Result: want,
|
||||
},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetadata(tc *TestCase, app PrometheusQuerier, metricName, tenantID string, want map[string][]MetadataEntry) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/metadata response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Metadata(tc.T(), metricName, 0, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetricNamesStats(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, want []MetricNamesStatsRecord) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/status/metric_names_stats response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1StatusMetricNamesStats(tc.T(), "", "", metricNameRE, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: MetricNamesStatsResponse{
|
||||
Records: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// GraphiteTestData holds the data samples in Graphite Pickle format, distance
|
||||
// between samples in milliseconds and expected responses for various Graphite
|
||||
// API endpoints.
|
||||
type GraphiteTestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantMetricsIndex []string
|
||||
WantMetricsFind []GraphiteMetric
|
||||
WantMetricsExpand []string
|
||||
WantRenderedTargets []GraphiteRenderedTarget
|
||||
}
|
||||
|
||||
// GenerateGraphiteTestData generates Graphite test data.
|
||||
func GenerateGraphiteTestData(prefix string, numMetrics, start, end int64) GraphiteTestData {
|
||||
d := GraphiteTestData{
|
||||
Samples: make([]string, numMetrics),
|
||||
Step: (end - start) / numMetrics,
|
||||
WantMetricsIndex: make([]string, numMetrics),
|
||||
WantMetricsFind: make([]GraphiteMetric, numMetrics),
|
||||
WantMetricsExpand: make([]string, numMetrics),
|
||||
WantRenderedTargets: make([]GraphiteRenderedTarget, numMetrics),
|
||||
}
|
||||
|
||||
datapoints := make([][2]float64, numMetrics)
|
||||
for i := range numMetrics {
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
datapoints[i][1] = float64(timestamp)
|
||||
}
|
||||
|
||||
for i := range numMetrics {
|
||||
suffix := fmt.Sprintf("%04d", i)
|
||||
metricName := fmt.Sprintf("%s.%s", prefix, suffix)
|
||||
value := i
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
sample := fmt.Sprintf(`%s %d %d`, metricName, value, timestamp)
|
||||
|
||||
d.Samples[i] = sample
|
||||
d.WantMetricsIndex[i] = metricName
|
||||
d.WantMetricsFind[i].Id = metricName
|
||||
d.WantMetricsFind[i].Text = suffix
|
||||
d.WantMetricsFind[i].Leaf = 1
|
||||
d.WantMetricsExpand[i] = metricName
|
||||
d.WantRenderedTargets[i].Target = metricName
|
||||
d.WantRenderedTargets[i].Datapoints = slices.Clone(datapoints)
|
||||
d.WantRenderedTargets[i].Datapoints[i][0] = float64(value)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsIndex retrieves all metrics by sending a request to
|
||||
// /graphite/metrics/index.json and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsIndex(tc *TestCase, app PrometheusQuerier, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/index.json response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsIndex(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
Retries: 30,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind finds metric names by sending a request to
|
||||
// /graphite/metrics/find and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsFind(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []GraphiteMetric) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/find response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsFind(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind expands metric names by sending a request to
|
||||
// /graphite/metrics/expand and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsExpand(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/expand response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsExpand(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteRender retieves metric raw data by sending a request to
|
||||
// /graphite/render and compares the result with the expected one.
|
||||
func AssertGraphiteRender(tc *TestCase, app PrometheusQuerier, target, tenantID string, from, until, step int64, want []GraphiteRenderedTarget) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/render response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteRender(tc.T(), target, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
From: fmt.Sprintf("%d", from/1000),
|
||||
Until: fmt.Sprintf("%d", until/1000),
|
||||
StorageStep: fmt.Sprintf("%dms", step),
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
@@ -25,7 +26,11 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
if len(resp.Data) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
|
||||
}
|
||||
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
const ingestTimestamp = 1707123456700
|
||||
prometheusTextDataSet := []string{
|
||||
`# HELP metric_name_1 some help message`,
|
||||
@@ -40,16 +45,27 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
`# TYPE metric_name_3 gauge`,
|
||||
`metric_name_3{label="baz"} 30`,
|
||||
}
|
||||
prometheusTextDataSet = append(prometheusTextDataSet,
|
||||
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
|
||||
`# TYPE metric_name_4 gauge`,
|
||||
`metric_name_4{label="baz"} 30`,
|
||||
)
|
||||
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
},
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
|
||||
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -59,12 +75,13 @@ func TestSingleMetricsMetadata(t *testing.T) {
|
||||
expected := &apptest.PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: map[string][]apptest.MetadataEntry{
|
||||
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_2": {{Help: "some help message", Type: "counter"}},
|
||||
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_5": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
|
||||
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_2": {{Help: "some help message", Type: "counter"}},
|
||||
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_5": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
|
||||
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
|
||||
},
|
||||
}
|
||||
gotStats := sut.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{})
|
||||
@@ -134,6 +151,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
if len(resp.Data) != 0 {
|
||||
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
|
||||
}
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
|
||||
const ingestTimestamp = 1707123456700
|
||||
prometheusTextDataSet := []string{
|
||||
@@ -149,16 +171,26 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
`# TYPE metric_name_3 gauge`,
|
||||
`metric_name_3{label="baz"} 30`,
|
||||
}
|
||||
prometheusTextDataSet = append(prometheusTextDataSet,
|
||||
`# HELP metric_name_4 `+generateValueExceedLimit("large help"),
|
||||
`# TYPE metric_name_4 gauge`,
|
||||
`metric_name_4{label="baz"} 30`,
|
||||
)
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
|
||||
},
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
|
||||
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset},
|
||||
{MetricFamilyName: "metric_name_9", Help: "some help message", Type: prompb.MetricTypeStateset, Unit: generateValueExceedLimit("large_unit")},
|
||||
{MetricFamilyName: generateValueExceedLimit("metric_name_10"), Help: "some help message", Type: prompb.MetricTypeStateset},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -171,12 +203,13 @@ func TestClusterMetricsMetadata(t *testing.T) {
|
||||
expected := &apptest.PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: map[string][]apptest.MetadataEntry{
|
||||
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_2": {{Help: "some help message", Type: "counter"}},
|
||||
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_5": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
|
||||
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_2": {{Help: "some help message", Type: "counter"}},
|
||||
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
|
||||
"metric_name_4": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_5": {{Help: "some help message", Type: "summary"}},
|
||||
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
|
||||
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
|
||||
},
|
||||
}
|
||||
gotStats := vmselect.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{Tenant: tenantID})
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -332,13 +333,11 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
|
||||
},
|
||||
)
|
||||
|
||||
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
|
||||
// since worker is busy with the first request.
|
||||
for i := range 2 {
|
||||
@@ -518,10 +517,15 @@ func TestClusterVMAgentForwardMetricsMetadata(t *testing.T) {
|
||||
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
|
||||
fmt.Sprintf(`-remoteWrite.url=http://%s/insert/multitenant/prometheus/api/v1/write`, sut.Vminsert.HTTPAddr()),
|
||||
})
|
||||
|
||||
generateValueExceedLimit := func(prefix string) string {
|
||||
buf := make([]byte, math.MaxUint16+len(prefix))
|
||||
copy(buf, prefix)
|
||||
return string(buf)
|
||||
}
|
||||
prometheusRemoteWriteDataSet := prompb.WriteRequest{
|
||||
Metadata: []prompb.MetricMetadata{
|
||||
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary, AccountID: 100},
|
||||
{MetricFamilyName: "metric_name_8", Help: generateValueExceedLimit("large_help"), Type: prompb.MetricTypeStateset, AccountID: 100},
|
||||
},
|
||||
}
|
||||
vmagent.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: "multitenant"})
|
||||
@@ -641,3 +645,116 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
|
||||
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer remoteWriteSrv.Close()
|
||||
|
||||
var mustRW2ReturnError atomic.Bool
|
||||
mustRW2ReturnError.Store(true)
|
||||
|
||||
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if mustRW2ReturnError.Load() {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer remoteWriteSrv2.Close()
|
||||
|
||||
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
|
||||
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
|
||||
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
|
||||
"-remoteWrite.disableOnDiskQueue=true",
|
||||
// use only 1 worker to get a full queue faster
|
||||
"-remoteWrite.queues=1",
|
||||
"-remoteWrite.flushInterval=1ms",
|
||||
"-remoteWrite.inmemoryQueues=1",
|
||||
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
|
||||
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
|
||||
// See initRemoteWriteCtxs function in remotewrite.go for details.
|
||||
"-remoteWrite.maxRowsPerBlock=1000000000",
|
||||
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
|
||||
|
||||
// Delay retry logic to avoid race conditions with waitFor assertions.
|
||||
// It improves the test stability on resource-constrained runners.
|
||||
"-remoteWrite.retryMinInterval=3s",
|
||||
"-remoteWrite.retryMaxTime=3s",
|
||||
})
|
||||
|
||||
const (
|
||||
retries = 20
|
||||
period = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
waitFor := func(f func() bool) {
|
||||
t.Helper()
|
||||
for range retries {
|
||||
if f() {
|
||||
return
|
||||
}
|
||||
time.Sleep(period)
|
||||
}
|
||||
t.Fatalf("timed out waiting for retry #%d", retries)
|
||||
}
|
||||
|
||||
// Real remote write URLs are hidden in metrics
|
||||
url1 := "1:secret-url"
|
||||
url2 := "2:secret-url"
|
||||
|
||||
// Wait until first request got flushed to remote write server
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
|
||||
},
|
||||
)
|
||||
// Wait until second request got flushed to remote write server
|
||||
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
|
||||
},
|
||||
)
|
||||
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
|
||||
// since worker is busy with the first request.
|
||||
for i := range 2 {
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Send one more request.
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
|
||||
},
|
||||
)
|
||||
mustRW2ReturnError.Store(false)
|
||||
// ensure that inmemory data correctly flushed to the remote write
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 0
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
216
apptest/tests/vmsingle_vmselect_rpc_test.go
Normal file
216
apptest/tests/vmsingle_vmselect_rpc_test.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestMixedPrometheusQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateTestData("metric", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
emptyLabels := []string{}
|
||||
emptyLabelValues := []string{}
|
||||
emptyQueryResults := []*apptest.QueryResult{}
|
||||
emptyMetadata := map[string][]apptest.MetadataEntry{}
|
||||
emptyMetricNamesStats := []apptest.MetricNamesStatsRecord{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmsingle, "", start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmsingle, "metric.*", "", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmsingle, "metric.*", "label", "", start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmsingle, "", "", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 1
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmsingle, "", "", data.WantMetricNamesStats)
|
||||
|
||||
// Check that current vmsingle tenant (configured via flags) is tenant1.
|
||||
gotAdminTenantsResponse := vmselect.APIV1AdminTenants(t, apptest.QueryOpts{})
|
||||
wantAdminTenantsResponse := &apptest.AdminTenantsResponse{
|
||||
Status: "success",
|
||||
Data: []string{tenantID1},
|
||||
}
|
||||
if diff := cmp.Diff(wantAdminTenantsResponse, gotAdminTenantsResponse); diff != "" {
|
||||
t.Fatalf("unexpected tenants (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID1, start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID1, start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID1, start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID1, start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID1, data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 2
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID1, data.WantMetricNamesStats)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID2, start, end, emptySeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID2, start, end, 0)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID2, start, end, emptyLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID2, start, end, emptyLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID2, emptyMetadata)
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID2, emptyMetricNamesStats)
|
||||
|
||||
// Ensure vmselect returns data for multitenant.
|
||||
for _, v := range data.WantSeries {
|
||||
v["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", "multitenant", start, end, data.WantSeries)
|
||||
data.WantLabels = append(data.WantLabels, "vm_account_id", "vm_project_id")
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", "multitenant", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", "multitenant", start, end, data.WantLabelValues)
|
||||
for _, v := range data.WantQueryResults {
|
||||
v.Metric["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v.Metric["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", "multitenant", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", "multitenant", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 3
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", "multitenant", data.WantMetricNamesStats)
|
||||
}
|
||||
|
||||
func TestMixedDeleteSeries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data1 := apptest.GenerateTestData("metric1", numMetrics, start, end)
|
||||
data2 := apptest.GenerateTestData("metric2", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data1.Samples, apptest.QueryOpts{})
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data2.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
wantSeries12 := slices.Concat(data1.WantSeries, data2.WantSeries)
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, wantSeries12)
|
||||
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric1.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID1,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID2,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: "multitenant",
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, emptySeries)
|
||||
}
|
||||
|
||||
func TestMixedGraphiteQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateGraphiteTestData("metric", numMetrics, start, end)
|
||||
emptyMetricsIndex := []string{}
|
||||
emptyMetricsFind := []apptest.GraphiteMetric{}
|
||||
emptyMetricsExpand := []string{}
|
||||
emptyRenderedTargets := []apptest.GraphiteRenderedTarget{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.GraphiteWrite(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmsingle, "", data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmsingle, "metric.*", "", data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmsingle, "metric.*", "", data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID1, data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID1, data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID1, data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID2, emptyMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID2, emptyMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID2, emptyMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyRenderedTargets)
|
||||
}
|
||||
@@ -25,12 +25,14 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-graphiteListenAddr": "127.0.0.1:0",
|
||||
"-opentsdbListenAddr": "127.0.0.1:0",
|
||||
"-vmselectAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
storageDataPathRE,
|
||||
httpListenAddrRE,
|
||||
graphiteListenAddrRE,
|
||||
openTSDBListenAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
output: output,
|
||||
})
|
||||
@@ -43,6 +45,7 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
openTSDBListenAddr: stderrExtracts[3],
|
||||
vmselectAddr: stderrExtracts[4],
|
||||
}), nil
|
||||
}
|
||||
|
||||
@@ -51,6 +54,7 @@ type vmsingleRuntimeValues struct {
|
||||
httpListenAddr string
|
||||
graphiteListenAddr string
|
||||
openTSDBListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
@@ -85,6 +89,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
},
|
||||
storageDataPath: rt.storageDataPath,
|
||||
httpListenAddr: rt.httpListenAddr,
|
||||
vmselectAddr: rt.vmselectAddr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +104,7 @@ type Vmsingle struct {
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
// HTTPAddr returns the address at which the vminsert process is
|
||||
@@ -107,6 +113,12 @@ func (app *Vmsingle) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// VmselectAddr returns the address at which the vmsingle process is listening
|
||||
// for vmselect connections.
|
||||
func (app *Vmsingle) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmsingle app state.
|
||||
func (app *Vmsingle) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{
|
||||
|
||||
@@ -2083,7 +2083,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -2388,7 +2388,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -2084,7 +2084,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -2389,7 +2389,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -2165,7 +2165,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6201,7 +6201,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6282,14 +6282,14 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"instant": false,
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Ignored samples ($instance)",
|
||||
"title": "Dropped samples ($instance)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -1840,7 +1840,7 @@
|
||||
"type": "victoriametrics-metrics-datasource",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -2164,7 +2164,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6200,7 +6200,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
@@ -6281,14 +6281,14 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
|
||||
"instant": false,
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Ignored samples ($instance)",
|
||||
"title": "Dropped samples ($instance)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -1839,7 +1839,7 @@
|
||||
"type": "prometheus",
|
||||
"uid": "$ds"
|
||||
},
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSeу major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"description": "Shows memory pressure based on [Pressure Stall Information](https://docs.kernel.org/accounting/psi.html).\n\n**Lower is better.**\n\nPressure is measured as amount of time within 1sec time window the process was:\n- waiting: at least one thread was blocked on memory.\n- stalled: every thread was blocked on memory (severe pressure).\n\nElevated memory pressure can slowdown the process performance by utilizing more disk IO. Consider increasing amount of available RAM limit or decreasing the load on the process.\n\nSee major page faults rate panel in Troubleshooting section if this metric continued to be high.",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
DOCKER_REGISTRIES ?= docker.io quay.io
|
||||
DOCKER_NAMESPACE ?= victoriametrics
|
||||
|
||||
ROOT_IMAGE ?= alpine:3.23.4
|
||||
ROOT_IMAGE ?= alpine:3.24.1
|
||||
ROOT_IMAGE_SCRATCH ?= scratch
|
||||
CERTS_IMAGE := alpine:3.23.4
|
||||
CERTS_IMAGE := alpine:3.24.1
|
||||
|
||||
GO_BUILDER_IMAGE := golang:1.26.4
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ services:
|
||||
# It scrapes targets defined in --promscrape.config
|
||||
# And forward them to --remoteWrite.url
|
||||
vmagent:
|
||||
image: victoriametrics/vmagent:v1.145.0
|
||||
image: victoriametrics/vmagent:v1.146.0
|
||||
depends_on:
|
||||
- "vmauth"
|
||||
ports:
|
||||
@@ -42,14 +42,14 @@ services:
|
||||
# vmstorage shards. Each shard receives 1/N of all metrics sent to vminserts,
|
||||
# where N is number of vmstorages (2 in this case).
|
||||
vmstorage-1:
|
||||
image: victoriametrics/vmstorage:v1.145.0-cluster
|
||||
image: victoriametrics/vmstorage:v1.146.0-cluster
|
||||
volumes:
|
||||
- strgdata-1:/storage
|
||||
command:
|
||||
- "--storageDataPath=/storage"
|
||||
restart: always
|
||||
vmstorage-2:
|
||||
image: victoriametrics/vmstorage:v1.145.0-cluster
|
||||
image: victoriametrics/vmstorage:v1.146.0-cluster
|
||||
volumes:
|
||||
- strgdata-2:/storage
|
||||
command:
|
||||
@@ -59,7 +59,7 @@ services:
|
||||
# vminsert is ingestion frontend. It receives metrics pushed by vmagent,
|
||||
# pre-process them and distributes across configured vmstorage shards.
|
||||
vminsert-1:
|
||||
image: victoriametrics/vminsert:v1.145.0-cluster
|
||||
image: victoriametrics/vminsert:v1.146.0-cluster
|
||||
depends_on:
|
||||
- "vmstorage-1"
|
||||
- "vmstorage-2"
|
||||
@@ -68,7 +68,7 @@ services:
|
||||
- "--storageNode=vmstorage-2:8400"
|
||||
restart: always
|
||||
vminsert-2:
|
||||
image: victoriametrics/vminsert:v1.145.0-cluster
|
||||
image: victoriametrics/vminsert:v1.146.0-cluster
|
||||
depends_on:
|
||||
- "vmstorage-1"
|
||||
- "vmstorage-2"
|
||||
@@ -80,7 +80,7 @@ services:
|
||||
# vmselect is a query fronted. It serves read queries in MetricsQL or PromQL.
|
||||
# vmselect collects results from configured `--storageNode` shards.
|
||||
vmselect-1:
|
||||
image: victoriametrics/vmselect:v1.145.0-cluster
|
||||
image: victoriametrics/vmselect:v1.146.0-cluster
|
||||
depends_on:
|
||||
- "vmstorage-1"
|
||||
- "vmstorage-2"
|
||||
@@ -90,7 +90,7 @@ services:
|
||||
- "--vmalert.proxyURL=http://vmalert:8880"
|
||||
restart: always
|
||||
vmselect-2:
|
||||
image: victoriametrics/vmselect:v1.145.0-cluster
|
||||
image: victoriametrics/vmselect:v1.146.0-cluster
|
||||
depends_on:
|
||||
- "vmstorage-1"
|
||||
- "vmstorage-2"
|
||||
@@ -105,7 +105,7 @@ services:
|
||||
# read requests from Grafana, vmui, vmalert among vmselects.
|
||||
# It can be used as an authentication proxy.
|
||||
vmauth:
|
||||
image: victoriametrics/vmauth:v1.145.0
|
||||
image: victoriametrics/vmauth:v1.146.0
|
||||
depends_on:
|
||||
- "vmselect-1"
|
||||
- "vmselect-2"
|
||||
@@ -119,7 +119,7 @@ services:
|
||||
|
||||
# vmalert executes alerting and recording rules
|
||||
vmalert:
|
||||
image: victoriametrics/vmalert:v1.145.0
|
||||
image: victoriametrics/vmalert:v1.146.0
|
||||
depends_on:
|
||||
- "vmauth"
|
||||
ports:
|
||||
|
||||
@@ -3,7 +3,7 @@ services:
|
||||
# It scrapes targets defined in --promscrape.config
|
||||
# And forward them to --remoteWrite.url
|
||||
vmagent:
|
||||
image: victoriametrics/vmagent:v1.145.0
|
||||
image: victoriametrics/vmagent:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
@@ -18,7 +18,7 @@ services:
|
||||
# VictoriaMetrics instance, a single process responsible for
|
||||
# storing metrics and serve read requests.
|
||||
victoriametrics:
|
||||
image: victoriametrics/victoria-metrics:v1.145.0
|
||||
image: victoriametrics/victoria-metrics:v1.146.0
|
||||
ports:
|
||||
- 8428:8428
|
||||
- 8089:8089
|
||||
@@ -59,7 +59,7 @@ services:
|
||||
|
||||
# vmalert executes alerting and recording rules
|
||||
vmalert:
|
||||
image: victoriametrics/vmalert:v1.145.0
|
||||
image: victoriametrics/vmalert:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
- "alertmanager"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
services:
|
||||
vmagent:
|
||||
image: victoriametrics/vmagent:v1.145.0
|
||||
image: victoriametrics/vmagent:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
@@ -14,7 +14,7 @@ services:
|
||||
restart: always
|
||||
|
||||
victoriametrics:
|
||||
image: victoriametrics/victoria-metrics:v1.145.0
|
||||
image: victoriametrics/victoria-metrics:v1.146.0
|
||||
ports:
|
||||
- 8428:8428
|
||||
volumes:
|
||||
@@ -40,7 +40,7 @@ services:
|
||||
restart: always
|
||||
|
||||
vmalert:
|
||||
image: victoriametrics/vmalert:v1.145.0
|
||||
image: victoriametrics/vmalert:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
@@ -59,7 +59,7 @@ services:
|
||||
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr": },{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]'
|
||||
restart: always
|
||||
vmanomaly:
|
||||
image: victoriametrics/vmanomaly:v1.29.5
|
||||
image: victoriametrics/vmanomaly:v1.29.7
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
|
||||
@@ -32,6 +32,17 @@ docs-image:
|
||||
--platform $(DOCKER_PLATFORM) \
|
||||
vmdocs
|
||||
|
||||
docs-check-links: docs-image
|
||||
rm -rf vmdocs/public
|
||||
docker run \
|
||||
--rm \
|
||||
--platform $(DOCKER_PLATFORM) \
|
||||
-v ./vmdocs:/opt/docs \
|
||||
$(shell for d in ./docs/*/; do printf ' -v %s:/opt/docs/content/%s' "$${d}" "$$(basename $${d})"; done) \
|
||||
--entrypoint /bin/sh \
|
||||
vmdocs-docker-package \
|
||||
-c "yarn install && hugo --minify && yarn run check-links"
|
||||
|
||||
docs-debug: docs docs-image
|
||||
docker run \
|
||||
--rm \
|
||||
|
||||
@@ -14,6 +14,24 @@ aliases:
|
||||
---
|
||||
Please find the changelog for VictoriaMetrics Anomaly Detection below.
|
||||
|
||||
## v1.29.7
|
||||
Released: 2026-06-25
|
||||
|
||||
- UI: updated [vmanomaly UI](https://docs.victoriametrics.com/anomaly-detection/ui/) from [v1.7.1](https://docs.victoriametrics.com/anomaly-detection/ui/#v171) to [v1.7.2](https://docs.victoriametrics.com/anomaly-detection/ui/#v172), see respective [release notes](https://docs.victoriametrics.com/anomaly-detection/ui/#v172) for details. Notable mentions include `api/v1/server/model` endpoint for accessing production models config and queries from UI, manually or through [AI assistant](https://docs.victoriametrics.com/anomaly-detection/ui/#ai-assistance).
|
||||
|
||||
- IMPROVEMENT: Increased high-cardinality inference scaling by optionally scattering periodic infer jobs to reduce contention on shared resources (e.g. datasource, CPU, RAM) when `settings.n_workers > 1` and `scheduler.infer_every` is smaller than the total time to fetch and process all queries. This is controlled by new `scatter_infer_jobs` boolean argument of [Periodic Scheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#parameters-1) (default: `false`).
|
||||
|
||||
- IMPROVEMENT: Optimized internal batching for reader post-fetch series processing, exposing reader processing queue depth (`vmanomaly_reader_processing_tasks_queued` [metric](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#reader-behaviour-metrics)), and clarifying inference skip logs after data fetch timeouts. See `series_processing_batch_size` argument of [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) and [VLogsReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#victorialogs-reader) for details.
|
||||
|
||||
- IMPROVEMENT: Refined `VmReader` and `VLogsReader` logging after datasource request failures by suppressing the follow-up generic "No data" or "No unseen data" warning for failed fetches. Failed requests now keep the original datasource error while empty successful responses still emit the no-data warning.
|
||||
|
||||
## v1.29.6
|
||||
Released: 2026-06-17
|
||||
|
||||
- BUGFIX: Fixed `VLogsReader` startup and query execution when `tenant_id` is omitted or provided in short account-only form such as `"0"`. Omitted or empty tenant IDs are treated as single-node/no-tenant mode, and account-only tenant IDs are expanded to `accountID:0` before adding VictoriaLogs `AccountID`/`ProjectID` params or VM tenant labels.
|
||||
|
||||
- BUGFIX: Hardened [`OnlineMADModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-mad) anomaly scoring for perfectly constant time series (all values identical). The model now keeps a small deterministic prediction interval when the learned MAD is zero, so values deviating from an unknown constant baseline can produce `anomaly_score > 1` (previously, all anomaly scores were `0`).
|
||||
|
||||
## v1.29.5
|
||||
Released: 2026-06-11
|
||||
|
||||
@@ -265,7 +283,7 @@ Released: 2025-06-13
|
||||
## v1.23.2
|
||||
Released: 2025-06-09
|
||||
|
||||
- IMPROVEMENT: Increased convergence speed for [OnlineZScoreModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-z-score), [ZScoreModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#z-score), [MADModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#mad), and [OnlineMADModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-mad) models. Now it works better for tight optimization budgets (n_trials < 10, timeout < 1s)
|
||||
- IMPROVEMENT: Increased convergence speed for [OnlineZScoreModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-z-score), [ZScoreModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#z-score), [MADModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#mad-median-absolute-deviation), and [OnlineMADModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-mad) models. Now it works better for tight optimization budgets (n_trials < 10, timeout < 1s)
|
||||
|
||||
- BUGFIX: Now mean and variance of [OnlineZScoreModel](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-z-score) with exponential `decay` < 1 [arg](https://docs.victoriametrics.com/anomaly-detection/components/models/#decay) are properly calculated for unbiased predictions.
|
||||
|
||||
@@ -520,7 +538,7 @@ Released: 2024-10-01
|
||||
|
||||
> A bug was discovered in this release that causes the service to crash. Please use the patch [v1.16.1](#v1161) to resolve this issue.
|
||||
|
||||
- FEATURE: Introduced data dumps to a host filesystem for [VmReader](https://docs.victoriametrics.com/anomaly-detection/#vm-reader). Resource-intensive setups (multiple queries returning many metrics, bigger `fit_window` arg) will have RAM consumption reduced during fit calls.
|
||||
- FEATURE: Introduced data dumps to a host filesystem for [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader). Resource-intensive setups (multiple queries returning many metrics, bigger `fit_window` arg) will have RAM consumption reduced during fit calls.
|
||||
- IMPROVEMENT: Added a `groupby` argument for logical grouping in [multivariate models](https://docs.victoriametrics.com/anomaly-detection/components/models/#multivariate-models). When specified, a separate multivariate model is trained for each unique combination of label values in the `groupby` columns. For example, to perform multivariate anomaly detection on metrics at the machine level without cross-entity interference, you can use `groupby: [host]` or `groupby: [instance]`, ensuring one model per entity being trained (e.g., per host). Please find more details [here](https://docs.victoriametrics.com/anomaly-detection/components/models/#group-by).
|
||||
- IMPROVEMENT: Improved performance of [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) on multicore instances for reading and data processing.
|
||||
- IMPROVEMENT: Introduced new CLI argument aliases to enhance compatibility with [Helm charts](https://github.com/VictoriaMetrics/helm-charts/blob/master/charts/victoria-metrics-anomaly/README.md) (i.e. using secrets) and better align with [VictoriaMetrics flags](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#list-of-command-line-flags):
|
||||
@@ -667,7 +685,7 @@ Released: 2024-02-15
|
||||
|
||||
## v1.9.2
|
||||
Released: 2024-01-29
|
||||
- BUGFIX: now multivariate models (like [`IsolationForestMultivariateModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#isolation-foresthttpsenwikipediaorgwikiisolation_forest-multivariate)) are properly handled throughout fit/infer phases.
|
||||
- BUGFIX: now multivariate models (like [`IsolationForestMultivariateModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#isolation-forest-multivariate)) are properly handled throughout fit/infer phases.
|
||||
|
||||
|
||||
## v1.9.1
|
||||
|
||||
@@ -423,7 +423,7 @@ services:
|
||||
# ...
|
||||
vmanomaly:
|
||||
container_name: vmanomaly
|
||||
image: victoriametrics/vmanomaly:v1.29.5
|
||||
image: victoriametrics/vmanomaly:v1.29.7
|
||||
# ...
|
||||
restart: always
|
||||
volumes:
|
||||
@@ -641,7 +641,7 @@ options:
|
||||
Here’s an example of using the config splitter to divide configurations based on the `extra_filters` argument from the reader section:
|
||||
|
||||
```sh
|
||||
docker pull victoriametrics/vmanomaly:v1.29.5 && docker image tag victoriametrics/vmanomaly:v1.29.5 vmanomaly
|
||||
docker pull victoriametrics/vmanomaly:v1.29.7 && docker image tag victoriametrics/vmanomaly:v1.29.7 vmanomaly
|
||||
```
|
||||
|
||||
```sh
|
||||
|
||||
@@ -45,7 +45,7 @@ There are 2 types of compatibility to consider when migrating in stateful mode:
|
||||
|
||||
| Group start | Group end | Compatibility | Notes |
|
||||
|---------|--------- |------------|-------|
|
||||
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295) | Fully Compatible | - |
|
||||
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1297) | Fully Compatible | - |
|
||||
| [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) | Partially compatible* | Dumped models of class [prophet](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet) and [seasonal quantile](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) have problems with loading to [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) due to dropped `pytz` library. **Upgrading directly from v1.28.7 to [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) with a fix is suggested** |
|
||||
| [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262) | [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | Fully Compatible | [v1.28.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1280) introduced [rolling](https://docs.victoriametrics.com/anomaly-detection/components/models/#rolling-models) model class drop in favor of [online](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-models) models (`rolling_quantile` and `std` models), however, it does not impact compatibility, as artifacts were not produced by default for rolling models. Also, offline `mad` and `zscore` models are redirecting to their respective online counterparts since [v1.28.4](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1284). |
|
||||
| [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) | [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270) | Partially Compatible* | [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) introduced `forecast_at` argument for base [univariate](https://docs.victoriametrics.com/anomaly-detection/components/models/#univariate-models) and `Prophet` [models](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet), however, itself remains backward-reversible from newer states like [v1.26.2](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262), [v1.27.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270). (All models except `isolation_forest_multivariate` class will be dropped) |
|
||||
|
||||
@@ -132,7 +132,7 @@ Below are the steps to get `vmanomaly` up and running inside a Docker container:
|
||||
1. Pull Docker image:
|
||||
|
||||
```sh
|
||||
docker pull victoriametrics/vmanomaly:v1.29.5
|
||||
docker pull victoriametrics/vmanomaly:v1.29.7
|
||||
```
|
||||
|
||||
2. Create the license file with your license key.
|
||||
@@ -152,7 +152,7 @@ docker run -it \
|
||||
-v ./license:/license \
|
||||
-v ./config.yaml:/config.yaml \
|
||||
-p 8490:8490 \
|
||||
victoriametrics/vmanomaly:v1.29.5 \
|
||||
victoriametrics/vmanomaly:v1.29.7 \
|
||||
/config.yaml \
|
||||
--licenseFile=/license \
|
||||
--loggerLevel=INFO \
|
||||
@@ -169,7 +169,7 @@ docker run -it \
|
||||
-e VMANOMALY_DATA_DUMPS_DIR=/tmp/vmanomaly/data \
|
||||
-e VMANOMALY_MODEL_DUMPS_DIR=/tmp/vmanomaly/models \
|
||||
-p 8490:8490 \
|
||||
victoriametrics/vmanomaly:v1.29.5 \
|
||||
victoriametrics/vmanomaly:v1.29.7 \
|
||||
/config.yaml \
|
||||
--licenseFile=/license \
|
||||
--loggerLevel=INFO \
|
||||
@@ -182,7 +182,7 @@ services:
|
||||
# ...
|
||||
vmanomaly:
|
||||
container_name: vmanomaly
|
||||
image: victoriametrics/vmanomaly:v1.29.5
|
||||
image: victoriametrics/vmanomaly:v1.29.7
|
||||
# ...
|
||||
restart: always
|
||||
volumes:
|
||||
@@ -267,6 +267,7 @@ schedulers:
|
||||
# https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler
|
||||
class: 'periodic'
|
||||
infer_every: '5m'
|
||||
scatter_infer_jobs: true
|
||||
fit_every: '1d'
|
||||
fit_window: '4w'
|
||||
|
||||
@@ -298,6 +299,7 @@ reader:
|
||||
datasource_url: "https://play.victoriametrics.com/" # [YOUR_DATASOURCE_URL]
|
||||
tenant_id: '0:0'
|
||||
sampling_period: "5m"
|
||||
series_processing_batch_size: 8 # number of time series to process together while preparing data for fit or infer stages
|
||||
queries:
|
||||
# define your queries with MetricsQL - https://docs.victoriametrics.com/victoriametrics/metricsql/
|
||||
cpu_user:
|
||||
@@ -413,11 +415,13 @@ For optimal service behavior, consider the following tweaks when configuring `vm
|
||||
- Configure the **inference frequency** in the [scheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/) section of the configuration file.
|
||||
- Ensure that `infer_every` aligns with your **minimum required alerting frequency**.
|
||||
- For example, if receiving **alerts every 15 minutes** is sufficient (when `anomaly_score > 1`), set `infer_every` to match `reader.sampling_period` or override it per query via `reader.queries.query_xxx.step` for an optimal setup.
|
||||
- Set `scheduler.scatter_infer_jobs` {{% available_from "v1.29.7" anomaly %}} [arg](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#parameters-1) to `true` to allow for equal distribution of inference jobs across `infer_every` intervals, which can further enhance parallel processing efficiency and reduce resource contention when `reader.queries` contains a large number of queries.
|
||||
|
||||
**Reader**:
|
||||
- Setup the datasource to read data from in the [reader](https://docs.victoriametrics.com/anomaly-detection/components/reader/) section. Include tenant ID if using a [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) (`multitenant` value {{% available_from "v1.16.2" anomaly %}} can be also used here).
|
||||
- Define queries for input data using [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/) under `reader.queries` section. Note, it's possible to override reader-level arguments at query level for increased flexibility, e.g. specifying per-query [timezone](https://docs.victoriametrics.com/anomaly-detection/faq/#handling-timezones) or [sampling period](https://docs.victoriametrics.com/anomaly-detection/components/reader/#config-parameters).
|
||||
- For longer `fit_window` intervals in scheduler, consider splitting queries into smaller time ranges to avoid excessive memory usage, timeouts and hitting server-side constraints, so they can be queried separately and reconstructed on `vmanomaly` side. Please refer to this [example](https://docs.victoriametrics.com/anomaly-detection/faq/#handling-large-queries-in-vmanomaly) for more details.
|
||||
- Set `reader.series_processing_batch_size` {{% available_from "v1.29.7" anomaly %}} [arg](https://docs.victoriametrics.com/anomaly-detection/components/reader/#config-parameters) to a reasonable value (4-16, default is 8) to balance between memory usage and processing speed when preparing data for fit or infer stages.
|
||||
|
||||
> If applicable - consider [`VLogsReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#victorialogs-reader) {{% available_from "v1.26.0" anomaly %}} to perform anomaly detection on **log-derived metrics**. This is particularly useful for scenarios where log data needs to be analyzed for unusual patterns or behaviors, such as error rates or request latencies.
|
||||
|
||||
|
||||
@@ -315,7 +315,7 @@ docker run -it --rm \
|
||||
-e VMANOMALY_MCP_SERVER_URL=http://mcp-vmanomaly:8081/mcp \
|
||||
-p 8080:8080 \
|
||||
-p 8490:8490 \
|
||||
victoriametrics/vmanomaly:v1.29.5 \
|
||||
victoriametrics/vmanomaly:v1.29.7 \
|
||||
vmanomaly_config.yaml
|
||||
```
|
||||
|
||||
@@ -553,7 +553,7 @@ preset: ui
|
||||
# other optional server/settings parameters, e.g. port, max_concurrent_tasks, n_workers, logger_levels, etc.
|
||||
```
|
||||
|
||||
using one of the [deployment methods](https://docs.victoriametrics.com/anomaly-detection/quickstart/#how-to-install-and-run-vmanomaly) in a [QuickStart guide](https://docs.victoriametrics.com/anomaly-detection/quickstart/#quickstart), e.g. via Docker.
|
||||
using one of the [deployment methods](https://docs.victoriametrics.com/anomaly-detection/quickstart/#how-to-install-and-run-vmanomaly) in a [QuickStart guide](https://docs.victoriametrics.com/anomaly-detection/quickstart/), e.g. via Docker.
|
||||
|
||||
Retrieve the UI at `http://<vmanomaly-host>:<port>` (e.g. at `http://localhost:8490` if running locally with default port) and start exploring anomaly detection models and their configurations interactively.
|
||||
|
||||
@@ -640,6 +640,21 @@ If the **results** look good and the **model configuration should be deployed in
|
||||
|
||||
## Changelog
|
||||
|
||||
### v1.7.2
|
||||
Released: 2026-06-25
|
||||
|
||||
vmanomaly version: [v1.29.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1297)
|
||||
|
||||
- FEATURE: Added controls for selecting server-configured scheduled models (drop-down inside [model wizard](#model-panel)) and browsing scheduled queries from the running vmanomaly instance ("Queries" button, "scheduled queries" tab).
|
||||
|
||||
- IMPROVEMENT: Surfaced datasource fetch failures from ad-hoc VMUI raw queries as query-level errors instead of returning a successful empty result that triggers a generic "No match" warning. Now the user can see the actual error message from the datasource (e.g. "unauthorized", "not found", etc.) and take appropriate action.
|
||||
|
||||
- BUGFIX: Fixed [UI/query-server](#settings-panel) handling of VictoriaMetrics datasource URLs that already include `/select/multitenant/prometheus`. Such URLs are now recognized as cluster datasource URLs, preserving the multitenant path when proxying VMUI requests and allowing `server.use_reader_connection_settings` to reuse [configured reader credentials for authenticated datasources](#authentication).
|
||||
|
||||
- BUGFIX: Fixed [settings](#settings-panel) inputs for server and datasource URLs so editing, deleting, or pasting text is no longer immediately reverted to the previous value before applying changes.
|
||||
|
||||
- BUGFIX: Fixed [model wizard](#model-panel) settings for [`IsolationForestModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#isolation-forest-multivariate) `contamination`, allowing decimal float values such as `0.1` or `0,1` to be typed or pasted without being collapsed to `0`, while preserving the `"auto"` value.
|
||||
|
||||
### v1.7.1
|
||||
Released: 2026-06-11
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ schedulers:
|
||||
periodic_online: # alias
|
||||
class: 'periodic' # scheduler class
|
||||
infer_every: "30s" # how often to produce anomaly scores for new data
|
||||
scatter_infer_jobs: true # distribute infer jobs evenly across the infer interval to reduce synchronized bursts
|
||||
fit_every: "365d" # how often to re-fit the models, for online models used effectively once, then they are updated with new data and won't require re-fit
|
||||
fit_window: "3d" # how much historical data to use for fit stage
|
||||
start_from: "00:00" # start from specified time, i.e. 00:00 given timezone and do daily fits as `fit_every` is 1 day
|
||||
@@ -56,6 +57,7 @@ schedulers:
|
||||
periodic_offline_1w:
|
||||
class: 'periodic'
|
||||
infer_every: "15m"
|
||||
scatter_infer_jobs: true
|
||||
fit_every: "24h"
|
||||
fit_window: "14d"
|
||||
# if no start_from is specified, jobs will start immediately after service starts
|
||||
@@ -135,6 +137,7 @@ server:
|
||||
port: 8490
|
||||
path_prefix: '/vmanomaly' # optional path prefix for all HTTP routes
|
||||
max_concurrent_tasks: 4 # maximum number of concurrent anomaly detection tasks processed by backend
|
||||
use_reader_connection_settings: True # if True, use reader's datasource_url and credentials for UI requests to datasource
|
||||
uvicorn_config: # optional Uvicorn server configuration
|
||||
log_level: 'warning'
|
||||
```
|
||||
|
||||
@@ -1265,7 +1265,7 @@ monitoring:
|
||||
Let's pull the docker image for `vmanomaly`:
|
||||
|
||||
```sh
|
||||
docker pull victoriametrics/vmanomaly:v1.29.5
|
||||
docker pull victoriametrics/vmanomaly:v1.29.7
|
||||
```
|
||||
|
||||
Now we can run the docker container putting as volumes both config and model file:
|
||||
@@ -1279,7 +1279,7 @@ docker run -it \
|
||||
-v $(PWD)/license:/license \
|
||||
-v $(PWD)/custom_model.py:/vmanomaly/model/custom.py \
|
||||
-v $(PWD)/custom.yaml:/config.yaml \
|
||||
victoriametrics/vmanomaly:v1.29.5 /config.yaml \
|
||||
victoriametrics/vmanomaly:v1.29.7 /config.yaml \
|
||||
--licenseFile=/license
|
||||
--watch
|
||||
```
|
||||
|
||||
@@ -458,6 +458,21 @@ Label names [description](#labelnames)
|
||||
<td>The total number of datapoints received from VictoriaMetrics for the `query_key` query within the specified scheduler `scheduler_alias`, in the `vmanomaly` service running in `preset` mode.</td>
|
||||
<td>
|
||||
|
||||
`url`, `query_key`, `scheduler_alias`, `preset`
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span style="white-space: nowrap;">`vmanomaly_reader_processing_tasks_queued`</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
`Gauge`
|
||||
</td>
|
||||
<td>The total number of queued processing tasks {{% available_from "v1.29.7" anomaly %}} (timeseries batches of size `series_processing_batch_size`) for the `query_key` query within the specified scheduler `scheduler_alias`, in the `vmanomaly` service running in `preset` mode. If continuously >0, it may lead to skipped infer runs due to resource contention and timeouts.</td>
|
||||
<td>
|
||||
|
||||
`url`, `query_key`, `scheduler_alias`, `preset`
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
@@ -421,7 +421,20 @@ Optional argument{{% available_from "v1.18.1" anomaly %}} allows defining **vali
|
||||
`60s`
|
||||
</td>
|
||||
<td>
|
||||
Optional argument{{% available_from "v1.25.3" anomaly %}} allows specifying a time offset for all queries in `queries`. Defaults to `0s` (0) if not set and can be overridden on a [per-query basis](#per-query-parameters).
|
||||
Optional argument {{% available_from "v1.25.3" anomaly %}}, allows specifying a time offset for all queries in `queries`. Defaults to `0s` (0) if not set and can be overridden on a [per-query basis](#per-query-parameters).
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span style="white-space: nowrap;">`series_processing_batch_size`</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
`8`
|
||||
</td>
|
||||
<td>
|
||||
Optional argument {{% available_from "v1.29.7" anomaly %}}, allows specifying the number of time series to process together while preparing data for fit or infer stages. Defaults to `8`. Suggested values are 4-16 for high-cardinality queries.
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
@@ -450,6 +463,7 @@ reader:
|
||||
sampling_period: '1m'
|
||||
query_from_last_seen_timestamp: True # false by default
|
||||
latency_offset: '1ms'
|
||||
series_processing_batch_size: 8
|
||||
```
|
||||
|
||||
### MetricsQL Playground
|
||||
@@ -879,6 +893,19 @@ If a path to a CA bundle file (like `ca.crt`), it will verify the certificate us
|
||||
(Optional) Password for authentication. If set, it will be used to authenticate the request.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span style="white-space: nowrap;">`series_processing_batch_size`</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
`8`
|
||||
</td>
|
||||
<td>
|
||||
Optional argument {{% available_from "v1.29.7" anomaly %}}, allows specifying the number of time series to process together while preparing data for fit or infer stages. Defaults to `8`. Suggested values are 4-16 for high-cardinality queries.
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
@@ -897,6 +924,7 @@ reader:
|
||||
# tenant_id: '0:0' # for cluster version only
|
||||
sampling_period: '1m'
|
||||
max_points_per_query: 10000
|
||||
series_processing_batch_size: 8
|
||||
data_range: [0, 'inf'] # reader-level
|
||||
offset: '0s' # reader-level
|
||||
timeout: '30s'
|
||||
|
||||
@@ -74,40 +74,7 @@ options={`"scheduler.periodic.PeriodicScheduler"`, `"scheduler.oneoff.OneoffSche
|
||||
|
||||
### Parameters
|
||||
|
||||
For periodic scheduler parameters are defined as differences in times, expressed in difference units, e.g. days, hours, minutes, seconds.
|
||||
|
||||
Examples: `"50s"`, `"4m"`, `"3h"`, `"2d"`, `"1w"`.
|
||||
|
||||
<table class="params">
|
||||
<thead>
|
||||
<tr>
|
||||
<th></th>
|
||||
<th>Time granularity</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>s</td>
|
||||
<td>seconds</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>m</td>
|
||||
<td>minutes</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>h</td>
|
||||
<td>hours</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>d</td>
|
||||
<td>days</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>w</td>
|
||||
<td>weeks</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
For periodic scheduler parameters are defined as differences in times, expressed in difference units, e.g. days, hours, minutes, seconds. Time granularity is defined by the last characters of a string. Examples: `"50s"` (seconds), `"4m"` (minutes), `"3h"` (hours), `"2d"` (days), `"1w"` (weeks).
|
||||
|
||||
<table class="params">
|
||||
<thead>
|
||||
@@ -188,6 +155,21 @@ Specifies when to initiate the first `fit_every` call. Accepts either an ISO 860
|
||||
Defines the local timezone for the `start_from` parameter, if specified. Defaults to `UTC` if no timezone is provided.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span style="white-space: nowrap;">`scatter_infer_jobs`{{% available_from "v1.29.7" anomaly %}}</span>
|
||||
</td>
|
||||
<td>bool, <span style="white-space: nowrap;">Optional</span></td>
|
||||
<td>
|
||||
|
||||
`true` or `false`
|
||||
</td>
|
||||
<td>
|
||||
|
||||
If `true`, distribute infer jobs and their dependent data-fetch jobs evenly across the infer interval. This reduces synchronized read and inference bursts for high-scale configurations. Defaults to `false`. Useful when `settings.n_workers > 1`, `reader.queries` cardinality is high, and `scheduler.infer_every` is small.
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
@@ -200,6 +182,7 @@ schedulers:
|
||||
# (or class: "scheduler.periodic.PeriodicScheduler" for versions before v1.13.0, without class alias support)
|
||||
fit_window: "14d"
|
||||
infer_every: "1m"
|
||||
scatter_infer_jobs: true # Distribute infer jobs evenly across the infer interval to reduce synchronized bursts.
|
||||
fit_every: "1h"
|
||||
start_from: "20:00" # If launched before 20:00 (local Kyiv time), the first run starts today at 20:00. Otherwise, it starts tomorrow at 20:00.
|
||||
tz: "Europe/Kyiv" # Defaults to 'UTC' if not specified.
|
||||
|
||||
@@ -32,7 +32,7 @@ Server component of VictoriaMetrics Anomaly Detection (`vmanomaly`) is responsib
|
||||
### Example Configuration
|
||||
|
||||
> [!TIP]
|
||||
> If [hot-reloading](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#hot-reloading) is enabled in vmanomaly service, the server will automatically pick up changes made to the configuration file without requiring a restart.
|
||||
> If [hot-reloading](https://docs.victoriametrics.com/anomaly-detection/components/#hot-reload) is enabled in vmanomaly service, the server will automatically pick up changes made to the configuration file without requiring a restart.
|
||||
|
||||
```yaml
|
||||
server:
|
||||
@@ -63,4 +63,4 @@ reader:
|
||||
|
||||
After starting the `vmanomaly` server with the above configuration, UI can be accessed at `<vmanomaly-host>:8490/vmanomaly/vmui/` (e.g. `http://localhost:8490/vmanomaly/vmui/`).
|
||||
|
||||
Rest API endpoints (e.g. `/metrics`) can be accessed at `<vmanomaly-host>:8490/vmanomaly/metrics` (e.g. `http://localhost:8490/vmanomaly/metrics`).
|
||||
Rest API endpoints (e.g. `/metrics`) can be accessed at `<vmanomaly-host>:8490/vmanomaly/metrics` (e.g. `http://localhost:8490/vmanomaly/metrics`).
|
||||
|
||||
@@ -10,9 +10,9 @@ sitemap:
|
||||
|
||||
- To use *vmanomaly*, part of the enterprise package, a license key is required. Obtain your key [here](https://victoriametrics.com/products/enterprise/trial/) for this tutorial or for enterprise use.
|
||||
- In the tutorial, we'll be using the following VictoriaMetrics components:
|
||||
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.145.0)
|
||||
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.145.0)
|
||||
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.145.0)
|
||||
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.146.0)
|
||||
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.146.0)
|
||||
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.146.0)
|
||||
- [Grafana](https://grafana.com/) (v12.2.0)
|
||||
- [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/)
|
||||
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.9.1) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.28.1)
|
||||
@@ -323,7 +323,7 @@ Let's wrap it all up together into the `docker-compose.yml` file.
|
||||
services:
|
||||
vmagent:
|
||||
container_name: vmagent
|
||||
image: victoriametrics/vmagent:v1.145.0
|
||||
image: victoriametrics/vmagent:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
@@ -340,7 +340,7 @@ services:
|
||||
|
||||
victoriametrics:
|
||||
container_name: victoriametrics
|
||||
image: victoriametrics/victoria-metrics:v1.145.0
|
||||
image: victoriametrics/victoria-metrics:v1.146.0
|
||||
ports:
|
||||
- 8428:8428
|
||||
volumes:
|
||||
@@ -373,7 +373,7 @@ services:
|
||||
|
||||
vmalert:
|
||||
container_name: vmalert
|
||||
image: victoriametrics/vmalert:v1.145.0
|
||||
image: victoriametrics/vmalert:v1.146.0
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
@@ -395,7 +395,7 @@ services:
|
||||
restart: always
|
||||
vmanomaly:
|
||||
container_name: vmanomaly
|
||||
image: victoriametrics/vmanomaly:v1.29.5
|
||||
image: victoriametrics/vmanomaly:v1.29.7
|
||||
depends_on:
|
||||
- "victoriametrics"
|
||||
ports:
|
||||
|
||||
@@ -240,23 +240,23 @@ vmagent will write data into VictoriaMetrics single-node and cluster (with tenan
|
||||
# compose.yaml
|
||||
services:
|
||||
vmsingle:
|
||||
image: victoriametrics/victoria-metrics:v1.145.0
|
||||
image: victoriametrics/victoria-metrics:v1.146.0
|
||||
|
||||
vmstorage:
|
||||
image: victoriametrics/vmstorage:v1.145.0-cluster
|
||||
image: victoriametrics/vmstorage:v1.146.0-cluster
|
||||
|
||||
vminsert:
|
||||
image: victoriametrics/vminsert:v1.145.0-cluster
|
||||
image: victoriametrics/vminsert:v1.146.0-cluster
|
||||
command:
|
||||
- -storageNode=vmstorage:8400
|
||||
|
||||
vmselect:
|
||||
image: victoriametrics/vmselect:v1.145.0-cluster
|
||||
image: victoriametrics/vmselect:v1.146.0-cluster
|
||||
command:
|
||||
- -storageNode=vmstorage:8401
|
||||
|
||||
vmagent:
|
||||
image: victoriametrics/vmagent:v1.145.0
|
||||
image: victoriametrics/vmagent:v1.146.0
|
||||
volumes:
|
||||
- ./scrape.yaml:/etc/vmagent/config.yaml
|
||||
command:
|
||||
@@ -308,7 +308,7 @@ Now add the vmauth service to `compose.yaml`:
|
||||
# compose.yaml
|
||||
services:
|
||||
vmauth:
|
||||
image: docker.io/victoriametrics/vmauth:v1.145.0
|
||||
image: docker.io/victoriametrics/vmauth:v1.146.0
|
||||
ports:
|
||||
- 8427:8427
|
||||
volumes:
|
||||
|
||||
@@ -39,7 +39,7 @@ Each subsequent section of this guide presents an architecture designed to handl
|
||||
### The decision tree
|
||||
|
||||
<p align="center">
|
||||
<img src="decision-tree.webp" alt="Decision Tree" width="80%">
|
||||
<img src="/guides/vm-architectures/decision-tree.webp" alt="Decision Tree" width="80%">
|
||||
</p>
|
||||
|
||||
## Basic
|
||||
@@ -62,7 +62,7 @@ Installation guide reference: [VictoriaMetrics Single](https://docs.victoriametr
|
||||
**Schema:**
|
||||
|
||||
<p align="center">
|
||||
<img src="basic-architecture.webp" alt="Basic Architecture" width="40%">
|
||||
<img src="/guides/vm-architectures/basic-architecture.webp" alt="Basic Architecture" width="40%">
|
||||
</p>
|
||||
|
||||
### Unavailability Scenarios
|
||||
@@ -93,7 +93,7 @@ High availability implementation: [HA VictoriaMetrics Cluster](https://docs.vict
|
||||
**Schema:**
|
||||
|
||||
<p align="center">
|
||||
<img src="single-az-architecture.webp" alt="Single AZ Architecture" width="60%">
|
||||
<img src="/guides/vm-architectures/single-az-architecture.webp" alt="Single AZ Architecture" width="60%">
|
||||
</p>
|
||||
|
||||
### Application vs. Storage Replication
|
||||
@@ -210,7 +210,7 @@ To ensure reliability, vmagent implements the bulkhead pattern: each destination
|
||||
**Schema:**
|
||||
|
||||
<p align="center">
|
||||
<img src="multi-az-architecture.webp" alt="Multi-AZ Architecture" width="65%">
|
||||
<img src="/guides/vm-architectures/multi-az-architecture.webp" alt="Multi-AZ Architecture" width="65%">
|
||||
</p>
|
||||
|
||||
### Unavailability Scenarios
|
||||
@@ -257,7 +257,7 @@ For complete disaster recovery, this entire cell-based architecture is duplicate
|
||||
A global, stateless layer of routing cells (vmagent, vmauth) sits on top. It routes traffic to several logical groups of storage cells. Each storage group contains multiple AZs, and data is replicated or sharded across them. There are several approaches to implementing it.
|
||||
|
||||
<p align="center">
|
||||
<img src="hyperscale-architecture.webp" alt="Hyperscale Architecture" width="85%">
|
||||
<img src="/guides/vm-architectures/hyperscale-architecture.webp" alt="Hyperscale Architecture" width="85%">
|
||||
</p>
|
||||
|
||||
### Choosing Your Read Path Strategy
|
||||
@@ -375,7 +375,7 @@ This multitenancy approach gives us another trade-off in the isolation implement
|
||||
**Schema:**
|
||||
|
||||
<p align="center">
|
||||
<img src="logical-layers-architecture.webp" alt="Logical Layers Architecture" width="80%">
|
||||
<img src="/guides/vm-architectures/logical-layers-architecture.webp" alt="Logical Layers Architecture" width="80%">
|
||||
</p>
|
||||
|
||||
**Path A: Shared resources.** We have a single, shared pool of all cluster components.
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user