Compare commits

..

24 Commits
sso2 ... master

Author SHA1 Message Date
f41gh7
9356c2111a docs: update version to v1.146.0
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2026-06-19 14:18:31 +02:00
f41gh7
45f0b87150 app/vmselect: run make vmui-update
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2026-06-19 14:16:01 +02:00
Nikolay
8480f6b43e app/vmagent: prioritise recently ingested data to the queue
This commit adds a new flag remoteWrite.inmemoryQueues, which starts a dedicated set of workers for processing only in-memory part of vmagent persistentqueue. It should help to mitigate an
issue when stale data at file-based queue prevents from ingestion recent data.

 There is a downside - in case of remote storage is not reachable,
it's possible to get only a part of data ingested and other part queued
into file-based queue. But it should be acceptable, because there is no
strong guarantees for the data ingestion order.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833
2026-06-19 13:47:49 +02:00
f41gh7
61668f0672 changelog: sort entries
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2026-06-19 13:23:01 +02:00
f41gh7
d1ebbf573c vendor: update metrics to v1.44.0
fixes https://github.com/VictoriaMetrics/metrics/issues/127
2026-06-19 13:16:31 +02:00
Hui Wang
16422b2d14 lib/streamagrr: add sum_sample_total output function
Add `sum_sample_total` aggregation function to sums input delta values
into a cumulative
[counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter)
and outputs the result at the given `interval`.
`sum_samples_total` makes sense only for aggregating delta values from
clients such as [StatsD
counter](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting).

>Note: The aggregator will forget the cumulative counter if it has not
seen input samples for `staleness_interval`(set to `interval` by
default) per output result, so the output counter will start from `0`
the next time it sees the input again. Increase the `staleness_interval`
option if you want to extend the window to tolerate bigger gaps.

Fixes:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11002
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4843.
2026-06-19 13:11:36 +02:00
Roman Khavronenko
0f1ca87611 app/vmselect: skip caching empty responses for tenants discovery
Before, empty tenant response was cached for 5min. Making all subsequent
read queries to return empty response, even if data for the tenants was
properly ingested.

With this change empty response won't be cached.
This change is important for integration tests, where reads and writes
are performed one after another.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10982/
2026-06-19 10:04:58 +02:00
Alexander Frolov
0dd2b2cee6 app/vmselect: fix tenant filtering with long tenant regexp
When tenant filter is a long regexp, its content can be replaced with
`...`, causing tenants to be matched incorrectly.

`applyFiltersToTenants` converts tag filters using `tagFiltersToString`

c497c8c2e9/app/vmselect/netstorage/tenant_filters.go (L106-L112)

Which uses human-readable representation of `TagFilter`

c497c8c2e9/lib/storage/search.go (L390-L397)

This way I can see results from unexpected tenants. See the test, which
fails with
```
--- FAIL: TestApplyFiltersToTenants (0.00s)
    tenant_filters_test.go:18: unexpected tenants result; got [{100 0} {116 0} {1239 0}]; want [{100 0} {108 0} {116 0}]
```

---------

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11096
2026-06-19 09:55:04 +02:00
Hui Wang
7caec5fcb4 lib/streamaggr: expose vm_streamaggr_dedup_dropped_samples_total to track deduplicated samples
Currently, there is no way to determine how many samples are dropped
during
https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication,
and deduplication can also drop out-of-order samples if configured,
without being captured by
vm_streamaggr_ignored_samples_total{reason="too_old"}.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11069
2026-06-19 09:49:09 +02:00
Alexander Frolov
612f8ac8d6 app/vmselect: escape metadata MetricFamilyName
Follow-up for:
- https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115
- https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120
- https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128

Under normal circumstances this is not necessary to escape
`MetricFamilyName`.
I have noticed a few cases when vmselect produced malformed JSON
response due to corrupted `MetricFamilyName` value.

On the other hand, there are pros in keeping it as is — there is a
better chance that the issue would be noticed and reported.
2026-06-19 09:37:49 +02:00
Immanuel Tikhonov
6aa31a09d7 app/vmctl: fix typo in missing-field influx error
`vmctl influx` returns `response doesn't contain filed "value"` when the
response misses the requested field column.

Related to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10892
2026-06-19 09:36:25 +02:00
Hui Wang
b6e6a50e29 lib/streamaggr: reduce the default staleness interval
Reduce the default staleness_interval from `2*rule_interval` to
`1*rule_interval`, so the lookbehind range in stream aggregation is more
consistent with metricsQL query. Also add a stale sample check during
sample push in case flush hasn't cleaned it in time.

Fixes fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102
2026-06-19 09:35:22 +02:00
Nikolay
a6d48b6af3 lib/storage: fixes typo at vm_retention_filters_partitions_scheduled_rows
Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138
2026-06-19 09:09:38 +02:00
Max Kotliar
dc4cf5631b follow-up on b3054bba - remove .codex
Follow-up on
b3054bbadd
2026-06-18 23:36:03 +03:00
Max Kotliar
005f133146 follow-up on 710c920d - remove leftovers
Follow-up on
710c920d60
2026-06-18 18:08:29 +03:00
Max Kotliar
35fc595e6f docs/changelog: chore changelog messages 2026-06-18 13:39:34 +03:00
Max Kotliar
710c920d60 app/vmrestore: disallow restoring parts outside the configured -storageDataPath directory (#1051)
A specifically crafted backup source (compromised S3/GCS/Azure bucket) could use `..`
components in object names to write files outside the `-storageDataPath`
directory.

Fix by validating all source parts against the destination directory
before restore begins (restore.go), and adding a defense-in-depth panic
guard in `NewDirectWriteCloser` (fslocal.go).

PR https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/1051

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
2026-06-18 13:21:12 +03:00
JAYICE
0ceeb14076 app/vmagent: support sharding targets by label in vmagent cluster mode (#11114)
Previously, targets were sharded among `vmagent` instances by all target labels after relabeling. The commit adds `-promscrape.cluster.shardByLabels` optional flag to shard targets by specified labels.

For example, with `-promscrape.cluster.shardByLabels=service`, the targets with the same `service` label value will be scraped by the same `vmagent` instance, 
which is useful when performing stream aggregation (drop pod label) that requires all metrics with the same `service` label value to be processed on the same `vmagent` instance. 

If none of the specified labels are present in the target labels, then all target labels will be used for sharding.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044
PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11114
2026-06-18 13:13:06 +03:00
Zasda Yusuf Mikail
adc29732f9 app/vmctl: push metrics before exiting on error (#11081)
Call `pushmetrics.StopAndPush()` before `vmctl` exits on `app.Run` errors

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11081

Signed-off-by: Zasda Mikail <zasdaym@gmail.com>
2026-06-18 12:55:25 +03:00
Aliaksandr Valialkin
41ffe23b18 docs/victoriametrics/Articles.md: add https://xata.io/blog/how-we-rebuilt-postgresql-branch-metrics-on-victoriametrics-per-cell 2026-06-18 00:19:21 +02:00
Fred Navruzov
6229a8fe7d docs/vmanomaly: release v1.29.6 (#11132)
Update vmanomaly docs (/anomaly-detection) for patch release v1.29.6

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11132
2026-06-17 20:09:17 +03:00
Max Kotliar
b58c73ac90 app/vmauth: refactor oidc discovery pool creation (#11123)
Initializing `oidcDP` outside or before `parseJWTUsers` would simplify its reuse with SSO implementation https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11122

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11123
2026-06-17 13:52:39 +03:00
Nikolay
77efbb2e36 lib/fs: retry file deletion on NFS error
This commit adds additional file removal retries.
This is follow-up for
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9842.

According to the stack trace, NFS also could return error for
`deleteFilePath`.
This retry is safe, since part already removed from `parts.json` and
performs a skips empty directories on restart.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060
2026-06-17 09:16:42 +02:00
Alexander Frolov
e388e41430 app/vmagent: properly copy metadata unit value
Previously, Unit value at metrics metadata was incorrectly referenced instead of memory copy:

ed795a8443/app/vmagent/remotewrite/pendingseries.go (L209-L212)

 This commit properly copies Unit into dedicated buffer and prevents memory corruption.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120
2026-06-17 09:16:04 +02:00
71 changed files with 1353 additions and 1213 deletions

0
.codex
View File

View File

@@ -187,7 +187,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
return c
}
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
func (c *client) init(argIdx int, sanitizedURL string) {
limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
@@ -204,11 +204,20 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
})
for range concurrency {
c.wg.Go(c.runWorker)
workers := queues.GetOptionalArg(argIdx)
if workers <= 0 {
workers = 1
}
inmemoryWorkers := inmemoryQueues.GetOptionalArg(argIdx)
for range inmemoryWorkers {
c.wg.Go(func() {
c.runWorker(c.fq.MustReadInMemoryBlockBlocking)
})
}
for range workers {
c.wg.Go(func() {
c.runWorker(c.fq.MustReadBlock)
})
}
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
}
@@ -302,12 +311,12 @@ func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
return cfg, nil
}
func (c *client) runWorker() {
func (c *client) runWorker(readBlock func(dst []byte) ([]byte, bool)) {
var ok bool
var block []byte
ch := make(chan bool, 1)
for {
block, ok = c.fq.MustReadBlock(block[:0])
block, ok = readBlock(block[:0])
if !ok {
return
}

View File

@@ -209,13 +209,12 @@ func (wr *writeRequest) tryPushMetadata(mms []prompb.MetricMetadata) bool {
func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
// Direct copy for non-string fields, which are safe by value.
dst.Type = src.Type
dst.Unit = src.Unit
dst.AccountID = src.AccountID
dst.ProjectID = src.ProjectID
// Pre-allocate memory for all string fields.
neededBufLen := len(src.MetricFamilyName) + len(src.Help)
neededBufLen := len(src.MetricFamilyName) + len(src.Help) + len(src.Unit)
bufLen := len(wr.metadatabuf)
wr.metadatabuf = slicesutil.SetLength(wr.metadatabuf, bufLen+neededBufLen)
buf := wr.metadatabuf[:bufLen]
@@ -230,6 +229,11 @@ func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
buf = append(buf, src.Help...)
dst.Help = bytesutil.ToUnsafeString(buf[bufLen:])
// Copy Unit
bufLen = len(buf)
buf = append(buf, src.Unit...)
dst.Unit = bytesutil.ToUnsafeString(buf[bufLen:])
wr.metadatabuf = buf
}

View File

@@ -66,6 +66,9 @@ var (
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
inmemoryQueues = flagutil.NewArrayInt("remoteWrite.inmemoryQueues", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
@@ -906,7 +909,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
queuesSize := queues.GetOptionalArg(argIdx)
inmemoryQueueSize := inmemoryQueues.GetOptionalArg(argIdx)
queuesSize := queues.GetOptionalArg(argIdx) + inmemoryQueueSize
if queuesSize > maxQueues {
queuesSize = maxQueues
} else if queuesSize <= 0 {
@@ -923,7 +927,13 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
fqOpts := persistentqueue.OpenFastQueueOpts{
MaxInmemoryBlocks: maxInmemoryBlocks,
MaxPendingBytes: maxPendingBytes,
IsPQDisabled: isPQDisabled,
PrioritizeInmemoryData: inmemoryQueueSize > 0,
}
fq := persistentqueue.MustOpenFastQueueWithOpts(queuePath, sanitizedURL, fqOpts)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes())
})
@@ -936,6 +946,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
return 0
})
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, sanitizedURL), func() float64 {
return float64(queuesSize)
})
var c *client
switch remoteWriteURL.Scheme {
@@ -944,7 +957,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
default:
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
}
c.init(argIdx, queuesSize, sanitizedURL)
c.init(argIdx, sanitizedURL)
// Initialize pss
sf := significantFigures.GetOptionalArg(argIdx)

View File

@@ -58,12 +58,9 @@ var (
type AuthConfig struct {
Users []UserInfo `yaml:"users,omitempty"`
UnauthorizedUser *UserInfo `yaml:"unauthorized_user,omitempty"`
SSO SSOConfig `yaml:"sso,omitempty"`
// ms holds all the metrics for the given AuthConfig
ms *metrics.Set
oidcDP *oidcDiscovererPool
}
// UserInfo is user information read from authConfigPath
@@ -914,25 +911,12 @@ func reloadAuthConfigData(data []byte) (bool, error) {
return false, fmt.Errorf("failed to parse auth config: %w", err)
}
if err := validateSSOConfigs(ac.SSO); err != nil {
return false, fmt.Errorf("invalid SSO config: %w", err)
}
jui, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
jui, err := parseJWTUsers(ac, oidcDP)
if err != nil {
return false, fmt.Errorf("failed to parse JWT users from auth config: %w", err)
}
ac.oidcDP = oidcDP
// Register SSO issuers with the OIDC discoverer pool so their discovery
// runs together with JWT users during startDiscovery below.
for _, cfg := range ac.SSO {
oidcDP.createOrAdd(cfg.OpenIDConnect.Issuer, nil)
}
oidcDP.startDiscovery()
jwtc := &jwtCache{
users: jui,
oidcDP: oidcDP,

View File

@@ -72,9 +72,8 @@ type JWTConfig struct {
verifierPool atomic.Pointer[jwt.VerifierPool]
}
func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
func parseJWTUsers(ac *AuthConfig, oidcDP *oidcDiscovererPool) ([]*UserInfo, error) {
jui := make([]*UserInfo, 0, len(ac.Users))
oidcDP := &oidcDiscovererPool{}
uniqClaims := make(map[string]*UserInfo)
var sortedClaims []string
@@ -85,10 +84,10 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
}
if ui.AuthToken != "" || ui.BearerToken != "" || ui.Username != "" || ui.Password != "" {
return nil, nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
return nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
}
if len(jwtToken.PublicKeys) == 0 && len(jwtToken.PublicKeyFiles) == 0 && !jwtToken.SkipVerify && jwtToken.OIDC == nil {
return nil, nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
return nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
}
var claimsString string
sortedClaims = sortedClaims[:0]
@@ -97,7 +96,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
sortedClaims = append(sortedClaims, fmt.Sprintf("%s=%s", ck, cv))
pc, err := jwt.NewClaim(ck, cv)
if err != nil {
return nil, nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
return nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
}
parsedClaims = append(parsedClaims, pc)
}
@@ -106,7 +105,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
claimsString = strings.Join(sortedClaims, ",")
if oldUI, ok := uniqClaims[claimsString]; ok {
return nil, nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
return nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
}
uniqClaims[claimsString] = &ui
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 {
@@ -115,7 +114,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
for i := range jwtToken.PublicKeys {
k, err := jwt.ParseKey([]byte(jwtToken.PublicKeys[i]))
if err != nil {
return nil, nil, err
return nil, err
}
keys = append(keys, k)
}
@@ -123,52 +122,52 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
for _, filePath := range jwtToken.PublicKeyFiles {
keyData, err := os.ReadFile(filePath)
if err != nil {
return nil, nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
return nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
}
k, err := jwt.ParseKey(keyData)
if err != nil {
return nil, nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
return nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
}
keys = append(keys, k)
}
vp, err := jwt.NewVerifierPool(keys)
if err != nil {
return nil, nil, err
return nil, err
}
jwtToken.verifierPool.Store(vp)
}
if jwtToken.OIDC != nil {
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 || jwtToken.SkipVerify {
return nil, nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
return nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
}
if jwtToken.OIDC.Issuer == "" {
return nil, nil, fmt.Errorf("oidc issuer cannot be empty")
return nil, fmt.Errorf("oidc issuer cannot be empty")
}
isserURL, err := url.Parse(jwtToken.OIDC.Issuer)
if err != nil {
return nil, nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
return nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
}
if isserURL.Scheme != "https" && isserURL.Scheme != "http" {
return nil, nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
return nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
}
oidcDP.createOrAdd(ui.JWT.OIDC.Issuer, &ui.JWT.verifierPool)
}
if err := parseJWTPlaceholdersForUserInfo(&ui, true); err != nil {
return nil, nil, err
return nil, err
}
if err := ui.initURLs(); err != nil {
return nil, nil, err
return nil, err
}
metricLabels, err := ui.getMetricLabels()
if err != nil {
return nil, nil, fmt.Errorf("cannot parse metric_labels: %w", err)
return nil, fmt.Errorf("cannot parse metric_labels: %w", err)
}
ui.requests = ac.ms.GetOrCreateCounter(`vmauth_user_requests_total` + metricLabels)
ui.requestErrors = ac.ms.GetOrCreateCounter(`vmauth_user_request_errors_total` + metricLabels)
@@ -187,7 +186,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
rt, err := newRoundTripper(ui.TLSCAFile, ui.TLSCertFile, ui.TLSKeyFile, ui.TLSServerName, ui.TLSInsecureSkipVerify)
if err != nil {
return nil, nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
return nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
}
ui.rt = rt
@@ -200,7 +199,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
return len(jui[i].JWT.MatchClaims) > len(jui[j].JWT.MatchClaims)
})
return jui, oidcDP, nil
return jui, nil
}
var tokenPool sync.Pool

View File

@@ -39,16 +39,14 @@ XOtclIk1uhc03oL9nOQ=
}
return
}
users, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
users, err := parseJWTUsers(ac, oidcDP)
if err == nil {
t.Fatalf("expecting non-nil error; got %v", users)
}
if expErr != err.Error() {
t.Fatalf("unexpected error; got\n%q\nwant \n%q", err.Error(), expErr)
}
if oidcDP != nil {
t.Fatalf("expecting nil oidcDP; got %v", oidcDP)
}
}
// unauthorized_user cannot be used with jwt
@@ -326,7 +324,8 @@ XOtclIk1uhc03oL9nOQ=
t.Fatalf("unexpected error: %s", err)
}
jui, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
jui, err := parseJWTUsers(ac, oidcDP)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"net/textproto"
@@ -170,22 +169,7 @@ func requestHandlerWithInternalRoutes(w http.ResponseWriter, r *http.Request) bo
}
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// Handle SSO callback before any auth checks.
if r.URL.Path == "/_vmauth/sso/callback" {
handleSSOCallback(w, r)
return true
}
ats := getAuthTokensFromRequest(r)
log.Println(51)
// Inject the SSO session cookie as a Bearer token so that the existing
// JWT pipeline can validate it and match it to a configured user.
if tok := ssoAuthTokenFromRequest(r); tok != "" {
log.Println(52, tok)
ats = append(ats, tok)
}
if len(ats) == 0 {
// Process requests for unauthorized users
ui := authConfig.Load().UnauthorizedUser
@@ -193,13 +177,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
processUserRequest(w, r, ui, nil)
return true
}
log.Println(1)
if cfg := ssoConfigForHost(r.Host); cfg != nil {
log.Println(2)
showSSOLoginPage(w, r, cfg)
return true
}
log.Println(3)
handleMissingAuthorizationError(w)
return true
}
@@ -223,11 +201,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
if cfg := ssoConfigForHost(r.Host); cfg != nil {
showSSOLoginPage(w, r, cfg)
return true
}
invalidAuthTokenRequests.Inc()
if *logInvalidAuthTokens {
err := fmt.Errorf("cannot authorize request with auth tokens %q", ats)

View File

@@ -44,19 +44,7 @@ func (dp *oidcDiscovererPool) createOrAdd(issuer string, vp *atomic.Pointer[jwt.
dp.ds[issuer] = ds
}
if vp != nil {
ds.vps = append(ds.vps, vp)
}
}
// openIDConfig returns the most recently discovered openidConfig for the given issuer,
// or nil if the issuer is not registered or discovery has not completed yet.
func (dp *oidcDiscovererPool) openIDConfig(issuer string) *openidConfig {
d := dp.ds[issuer]
if d == nil {
return nil
}
return d.cfg.Load()
ds.vps = append(ds.vps, vp)
}
func (dp *oidcDiscovererPool) startDiscovery() {
@@ -92,7 +80,6 @@ func (dp *oidcDiscovererPool) stopDiscovery() {
type oidcDiscoverer struct {
issuer string
vps []*atomic.Pointer[jwt.VerifierPool]
cfg atomic.Pointer[openidConfig]
}
func (d *oidcDiscoverer) run(ctx context.Context) {
@@ -130,26 +117,21 @@ func (d *oidcDiscoverer) refreshVerifierPools(ctx context.Context) error {
return fmt.Errorf("openid configuration issuer %q does not match expected issuer %q", cfg.Issuer, d.issuer)
}
d.cfg.Store(&cfg)
verifierPool, err := fetchAndParseJWKs(ctx, cfg.JWKsURI)
if err != nil {
return err
}
if len(d.vps) > 0 {
verifierPool, err := fetchAndParseJWKs(ctx, cfg.JWKsURI)
if err != nil {
return err
}
for _, vp := range d.vps {
vp.Store(verifierPool)
}
for _, vp := range d.vps {
vp.Store(verifierPool)
}
return nil
}
// See https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata for details.
type openidConfig struct {
Issuer string `json:"issuer"`
JWKsURI string `json:"jwks_uri"`
AuthorizationEndpoint string `json:"authorization_endpoint"`
TokenEndpoint string `json:"token_endpoint"`
Issuer string `json:"issuer"`
JWKsURI string `json:"jwks_uri"`
}
var oidcHTTPClient = &http.Client{

View File

@@ -1,320 +0,0 @@
package main
import (
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"html/template"
"io"
"log"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// SSOConfig maps hostname to its SSO configuration.
type SSOConfig map[string]*SSOHostConfig
// SSOHostConfig holds the SSO configuration for a single host.
type SSOHostConfig struct {
OpenIDConnect *OIDCConnectConfig `yaml:"openid_connect"`
}
// OIDCConnectConfig is the OpenID Connect configuration for SSO.
type OIDCConnectConfig struct {
Issuer string `yaml:"issuer"`
ClientID string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
// RedirectURL is optional. Defaults to https://{host}/_vmauth/sso/callback.
RedirectURL string `yaml:"redirect_url,omitempty"`
// Scopes defaults to ["openid"] when not set.
Scopes []string `yaml:"scopes,omitempty"`
// filled from OIDC discovery at init time
authEndpoint string
tokenEndpoint string
}
// validateSSOConfigs checks that all required fields are present in SSO configs.
func validateSSOConfigs(sso SSOConfig) error {
for host, cfg := range sso {
if cfg.OpenIDConnect == nil {
return fmt.Errorf("missing openid_connect config for sso host %q", host)
}
oidc := cfg.OpenIDConnect
if oidc.Issuer == "" {
return fmt.Errorf("missing issuer in openid_connect config for sso host %q", host)
}
if oidc.ClientID == "" {
return fmt.Errorf("missing client_id in openid_connect config for sso host %q", host)
}
if oidc.ClientSecret == "" {
return fmt.Errorf("missing client_secret in openid_connect config for sso host %q", host)
}
}
return nil
}
// ssoConfigForHost returns the SSO host config for the given request host, or nil.
func ssoConfigForHost(host string) *SSOHostConfig {
// Strip port, e.g. "foo.com:8427" -> "foo.com"
if i := strings.LastIndexByte(host, ':'); i >= 0 {
host = host[:i]
}
ac := authConfig.Load()
if ac == nil || ac.SSO == nil {
log.Println(21)
return nil
}
ssoh := ac.SSO[host]
if ssoh == nil || ssoh.OpenIDConnect == nil {
log.Println(22)
return nil
}
oidcCfg := ac.oidcDP.openIDConfig(ssoh.OpenIDConnect.Issuer)
if oidcCfg == nil {
log.Println(24)
return nil
}
ssoh.OpenIDConnect.authEndpoint = oidcCfg.AuthorizationEndpoint
ssoh.OpenIDConnect.tokenEndpoint = oidcCfg.TokenEndpoint
log.Println(25)
return ssoh
}
// ssoStatePayload is the CSRF state payload embedded in the OIDC state parameter.
type ssoStatePayload struct {
Nonce string `json:"n"`
OriginalURL string `json:"u"`
IssuedAt int64 `json:"t"`
}
const (
ssoStateTTL = 10 * time.Minute
ssoCookieName = "_vmauth_sso"
)
// buildSSOState builds a signed, self-contained state value safe to use across
// multiple vmauth instances behind a load balancer.
//
// Format: base64url(JSON(payload)) "." base64url(HMAC-SHA256(clientSecret, payload))
func buildSSOState(originalURL, clientSecret string) (string, error) {
nonce := make([]byte, 16)
if _, err := rand.Read(nonce); err != nil {
return "", fmt.Errorf("cannot generate nonce: %w", err)
}
p := ssoStatePayload{
Nonce: base64.RawURLEncoding.EncodeToString(nonce),
OriginalURL: originalURL,
IssuedAt: time.Now().Unix(),
}
payloadJSON, err := json.Marshal(p)
if err != nil {
return "", err
}
payloadEnc := base64.RawURLEncoding.EncodeToString(payloadJSON)
mac := hmac.New(sha256.New, []byte(clientSecret))
mac.Write([]byte(payloadEnc))
sig := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
return payloadEnc + "." + sig, nil
}
// verifySSOState verifies the state signature and expiry, returning the original URL.
func verifySSOState(state, clientSecret string) (string, error) {
dot := strings.LastIndexByte(state, '.')
if dot < 0 {
return "", fmt.Errorf("invalid state: missing separator")
}
payloadEnc := state[:dot]
sig := state[dot+1:]
mac := hmac.New(sha256.New, []byte(clientSecret))
mac.Write([]byte(payloadEnc))
expectedSig := base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(sig), []byte(expectedSig)) {
return "", fmt.Errorf("invalid state signature")
}
payloadJSON, err := base64.RawURLEncoding.DecodeString(payloadEnc)
if err != nil {
return "", fmt.Errorf("cannot decode state payload: %w", err)
}
var p ssoStatePayload
if err := json.Unmarshal(payloadJSON, &p); err != nil {
return "", fmt.Errorf("cannot unmarshal state payload: %w", err)
}
if time.Since(time.Unix(p.IssuedAt, 0)) > ssoStateTTL {
return "", fmt.Errorf("state expired")
}
return p.OriginalURL, nil
}
var ssoLoginPageTmpl = template.Must(template.New("sso_login").Parse(`<!DOCTYPE html>
<html>
<head><meta charset="utf-8"><title>Login</title></head>
<body>
<p><a href="{{.}}">Login with SSO</a></p>
</body>
</html>`))
// showSSOLoginPage renders a minimal HTML page with a single "Login with SSO"
// button pointing directly to the OIDC provider's authorization endpoint.
func showSSOLoginPage(w http.ResponseWriter, r *http.Request, cfg *SSOHostConfig) {
oidc := cfg.OpenIDConnect
if oidc == nil || oidc.authEndpoint == "" {
http.Error(w, "SSO not properly configured for this host", http.StatusInternalServerError)
return
}
state, err := buildSSOState(r.RequestURI, oidc.ClientSecret)
if err != nil {
logger.Errorf("SSO: cannot build state: %s", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
redirectURL := ssoRedirectURL(r, oidc)
scopes := oidc.Scopes
if len(scopes) == 0 {
scopes = []string{"openid"}
}
params := url.Values{}
params.Set("response_type", "code")
params.Set("client_id", oidc.ClientID)
params.Set("redirect_uri", redirectURL)
params.Set("scope", strings.Join(scopes, " "))
params.Set("state", state)
authURL := oidc.authEndpoint + "?" + params.Encode()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
if err := ssoLoginPageTmpl.Execute(w, authURL); err != nil {
logger.Errorf("SSO: cannot render login page: %s", err)
}
}
// handleSSOCallback handles the OIDC authorization code callback at /_vmauth/sso/callback.
func handleSSOCallback(w http.ResponseWriter, r *http.Request) {
cfg := ssoConfigForHost(r.Host)
if cfg == nil || cfg.OpenIDConnect == nil {
http.Error(w, "SSO not configured for this host", http.StatusBadRequest)
return
}
oidc := cfg.OpenIDConnect
q := r.URL.Query()
state := q.Get("state")
if state == "" {
http.Error(w, "missing state parameter", http.StatusBadRequest)
return
}
originalURL, err := verifySSOState(state, oidc.ClientSecret)
if err != nil {
logger.Warnf("SSO callback: invalid state from %s: %s", r.RemoteAddr, err)
http.Error(w, "invalid state", http.StatusBadRequest)
return
}
code := q.Get("code")
if code == "" {
http.Error(w, "missing code parameter", http.StatusBadRequest)
return
}
idToken, err := exchangeCodeForIDToken(r.Context(), oidc, code, ssoRedirectURL(r, oidc))
if err != nil {
logger.Warnf("SSO callback: token exchange failed: %s", err)
http.Error(w, "token exchange failed", http.StatusBadRequest)
return
}
http.SetCookie(w, &http.Cookie{
Name: ssoCookieName,
Value: idToken,
Path: "/",
HttpOnly: true,
Secure: r.TLS != nil,
SameSite: http.SameSiteLaxMode,
})
if originalURL == "" {
originalURL = "/"
}
http.Redirect(w, r, originalURL, http.StatusFound)
}
type tokenResponse struct {
IDToken string `json:"id_token"`
}
// exchangeCodeForIDToken exchanges the OIDC authorization code for an id_token.
func exchangeCodeForIDToken(ctx context.Context, oidc *OIDCConnectConfig, code, redirectURL string) (string, error) {
params := url.Values{}
params.Set("grant_type", "authorization_code")
params.Set("code", code)
params.Set("redirect_uri", redirectURL)
params.Set("client_id", oidc.ClientID)
params.Set("client_secret", oidc.ClientSecret)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, oidc.tokenEndpoint, strings.NewReader(params.Encode()))
if err != nil {
return "", fmt.Errorf("cannot create token request: %w", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := oidcHTTPClient.Do(req)
if err != nil {
return "", fmt.Errorf("token request failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("cannot read token response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token endpoint returned status %d: %s", resp.StatusCode, body)
}
var tr tokenResponse
if err := json.Unmarshal(body, &tr); err != nil {
return "", fmt.Errorf("cannot unmarshal token response: %w", err)
}
if tr.IDToken == "" {
return "", fmt.Errorf("token response missing id_token")
}
return tr.IDToken, nil
}
// ssoAuthTokenFromRequest extracts the SSO session cookie and returns it as
// a Bearer auth token string compatible with the existing JWT pipeline.
func ssoAuthTokenFromRequest(r *http.Request) string {
c, err := r.Cookie(ssoCookieName)
if err != nil || c.Value == "" {
return ""
}
return "http_auth:Bearer " + c.Value
}
// ssoRedirectURL returns the OIDC redirect URL for the current request.
func ssoRedirectURL(r *http.Request, oidc *OIDCConnectConfig) string {
if oidc.RedirectURL != "" {
return oidc.RedirectURL
}
scheme := "https"
if r.TLS == nil {
scheme = "http"
}
return scheme + "://" + r.Host + "/_vmauth/sso/callback"
}

View File

@@ -259,7 +259,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
fieldValues, ok := r.values[cr.field]
if !ok {
return nil, nil, fmt.Errorf("response doesn't contain filed %q", cr.field)
return nil, nil, fmt.Errorf("response doesn't contain field %q", cr.field)
}
values := make([]float64, len(fieldValues))
for i, fv := range fieldValues {

View File

@@ -563,11 +563,11 @@ func main() {
}()
err = app.Run(os.Args)
pushmetrics.StopAndPush()
if err != nil {
log.Fatalln(err)
}
log.Printf("Total time: %v", time.Since(start))
pushmetrics.StopAndPush()
}
func initConfigVM(c *cli.Context) (vm.Config, error) {

View File

@@ -405,7 +405,16 @@ func buildMatchWithFilter(filter string, metricName string) (string, error) {
if len(tf.Key) == 0 {
continue
}
a = append(a, tf.String())
switch {
case tf.IsNegative && tf.IsRegexp:
a = append(a, fmt.Sprintf("%s!~%q", tf.Key, tf.Value))
case tf.IsNegative:
a = append(a, fmt.Sprintf("%s!=%q", tf.Key, tf.Value))
case tf.IsRegexp:
a = append(a, fmt.Sprintf("%s=~%q", tf.Key, tf.Value))
default:
a = append(a, fmt.Sprintf("%s=%q", tf.Key, tf.Value))
}
}
a = append(a, nameFilter)
filters = append(filters, strings.Join(a, ","))

View File

@@ -15,7 +15,7 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-m
currentItem := 0
%}
{% for _, row := range result %}
"{%s string(row.MetricFamilyName) %}": [
{%q= string(row.MetricFamilyName) %}: [
{
"type": {%q= row.Type.String() %},
{% if len(row.Unit) > 0 -%}

View File

@@ -35,12 +35,10 @@ func StreamMetadataResponse(qw422016 *qt422016.Writer, result []*metricsmetadata
//line app/vmselect/prometheus/metadata_response.qtpl:17
for _, row := range result {
//line app/vmselect/prometheus/metadata_response.qtpl:17
qw422016.N().S(`"`)
//line app/vmselect/prometheus/metadata_response.qtpl:18
qw422016.E().S(string(row.MetricFamilyName))
qw422016.N().Q(string(row.MetricFamilyName))
//line app/vmselect/prometheus/metadata_response.qtpl:18
qw422016.N().S(`": [{"type":`)
qw422016.N().S(`: [{"type":`)
//line app/vmselect/prometheus/metadata_response.qtpl:20
qw422016.N().Q(row.Type.String())
//line app/vmselect/prometheus/metadata_response.qtpl:20

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||e((t={exports:{}}).exports,t),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};

View File

@@ -0,0 +1 @@
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||(e((t={exports:{}}).exports,t),e=null),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -37,9 +37,9 @@
<meta property="og:title" content="UI for VictoriaMetrics">
<meta property="og:url" content="https://victoriametrics.com/">
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
<script type="module" crossorigin src="./assets/index-CusQvJzs.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-Cyuzqnbw.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-B83wxFqK.js">
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
</head>

View File

@@ -45,11 +45,13 @@ func TestSingleMetricsMetadata(t *testing.T) {
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
},
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
},
}
@@ -59,12 +61,13 @@ func TestSingleMetricsMetadata(t *testing.T) {
expected := &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
},
}
gotStats := sut.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{})
@@ -154,11 +157,13 @@ func TestClusterMetricsMetadata(t *testing.T) {
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
},
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
},
}
@@ -171,12 +176,13 @@ func TestClusterMetricsMetadata(t *testing.T) {
expected := &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
},
}
gotStats := vmselect.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{Tenant: tenantID})

View File

@@ -8,6 +8,7 @@ import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -332,13 +333,11 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
@@ -641,3 +640,116 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
}
}
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv.Close()
var mustRW2ReturnError atomic.Bool
mustRW2ReturnError.Store(true)
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if mustRW2ReturnError.Load() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv2.Close()
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
"-remoteWrite.disableOnDiskQueue=true",
// use only 1 worker to get a full queue faster
"-remoteWrite.queues=1",
"-remoteWrite.flushInterval=1ms",
"-remoteWrite.inmemoryQueues=1",
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
// See initRemoteWriteCtxs function in remotewrite.go for details.
"-remoteWrite.maxRowsPerBlock=1000000000",
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
// Delay retry logic to avoid race conditions with waitFor assertions.
// It improves the test stability on resource-constrained runners.
"-remoteWrite.retryMinInterval=3s",
"-remoteWrite.retryMaxTime=3s",
})
const (
retries = 20
period = 200 * time.Millisecond
)
waitFor := func(f func() bool) {
t.Helper()
for range retries {
if f() {
return
}
time.Sleep(period)
}
t.Fatalf("timed out waiting for retry #%d", retries)
}
// Real remote write URLs are hidden in metrics
url1 := "1:secret-url"
url2 := "2:secret-url"
// Wait until first request got flushed to remote write server
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Wait until second request got flushed to remote write server
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
},
)
}
// Send one more request.
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
},
)
mustRW2ReturnError.Store(false)
// ensure that inmemory data correctly flushed to the remote write
waitFor(
func() bool {
return vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 0
},
)
}

View File

@@ -6201,7 +6201,7 @@
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6282,14 +6282,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Ignored samples ($instance)",
"title": "Dropped samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -6200,7 +6200,7 @@
"type": "prometheus",
"uid": "$ds"
},
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6281,14 +6281,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Ignored samples ($instance)",
"title": "Dropped samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -59,7 +59,7 @@ services:
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr": },{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]'
restart: always
vmanomaly:
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.6
depends_on:
- "victoriametrics"
ports:

View File

@@ -14,6 +14,13 @@ aliases:
---
Please find the changelog for VictoriaMetrics Anomaly Detection below.
## v1.29.6
Released: 2026-06-17
- BUGFIX: Fixed `VLogsReader` startup and query execution when `tenant_id` is omitted or provided in short account-only form such as `"0"`. Omitted or empty tenant IDs are treated as single-node/no-tenant mode, and account-only tenant IDs are expanded to `accountID:0` before adding VictoriaLogs `AccountID`/`ProjectID` params or VM tenant labels.
- BUGFIX: Hardened [`OnlineMADModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-mad) anomaly scoring for perfectly constant time series (all values identical). The model now keeps a small deterministic prediction interval when the learned MAD is zero, so values deviating from an unknown constant baseline can produce `anomaly_score > 1` (previously, all anomaly scores were `0`).
## v1.29.5
Released: 2026-06-11

View File

@@ -423,7 +423,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.6
# ...
restart: always
volumes:
@@ -641,7 +641,7 @@ options:
Heres an example of using the config splitter to divide configurations based on the `extra_filters` argument from the reader section:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5 && docker image tag victoriametrics/vmanomaly:v1.29.5 vmanomaly
docker pull victoriametrics/vmanomaly:v1.29.6 && docker image tag victoriametrics/vmanomaly:v1.29.6 vmanomaly
```
```sh

View File

@@ -45,7 +45,7 @@ There are 2 types of compatibility to consider when migrating in stateful mode:
| Group start | Group end | Compatibility | Notes |
|---------|--------- |------------|-------|
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295) | Fully Compatible | - |
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.6](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1296) | Fully Compatible | - |
| [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) | Partially compatible* | Dumped models of class [prophet](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet) and [seasonal quantile](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) have problems with loading to [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) due to dropped `pytz` library. **Upgrading directly from v1.28.7 to [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) with a fix is suggested** |
| [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262) | [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | Fully Compatible | [v1.28.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1280) introduced [rolling](https://docs.victoriametrics.com/anomaly-detection/components/models/#rolling-models) model class drop in favor of [online](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-models) models (`rolling_quantile` and `std` models), however, it does not impact compatibility, as artifacts were not produced by default for rolling models. Also, offline `mad` and `zscore` models are redirecting to their respective online counterparts since [v1.28.4](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1284). |
| [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) | [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270) | Partially Compatible* | [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) introduced `forecast_at` argument for base [univariate](https://docs.victoriametrics.com/anomaly-detection/components/models/#univariate-models) and `Prophet` [models](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet), however, itself remains backward-reversible from newer states like [v1.26.2](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262), [v1.27.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270). (All models except `isolation_forest_multivariate` class will be dropped) |

View File

@@ -132,7 +132,7 @@ Below are the steps to get `vmanomaly` up and running inside a Docker container:
1. Pull Docker image:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5
docker pull victoriametrics/vmanomaly:v1.29.6
```
2. Create the license file with your license key.
@@ -152,7 +152,7 @@ docker run -it \
-v ./license:/license \
-v ./config.yaml:/config.yaml \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.6 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -169,7 +169,7 @@ docker run -it \
-e VMANOMALY_DATA_DUMPS_DIR=/tmp/vmanomaly/data \
-e VMANOMALY_MODEL_DUMPS_DIR=/tmp/vmanomaly/models \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.6 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -182,7 +182,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.6
# ...
restart: always
volumes:

View File

@@ -315,7 +315,7 @@ docker run -it --rm \
-e VMANOMALY_MCP_SERVER_URL=http://mcp-vmanomaly:8081/mcp \
-p 8080:8080 \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.6 \
vmanomaly_config.yaml
```

View File

@@ -1265,7 +1265,7 @@ monitoring:
Let's pull the docker image for `vmanomaly`:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5
docker pull victoriametrics/vmanomaly:v1.29.6
```
Now we can run the docker container putting as volumes both config and model file:
@@ -1279,7 +1279,7 @@ docker run -it \
-v $(PWD)/license:/license \
-v $(PWD)/custom_model.py:/vmanomaly/model/custom.py \
-v $(PWD)/custom.yaml:/config.yaml \
victoriametrics/vmanomaly:v1.29.5 /config.yaml \
victoriametrics/vmanomaly:v1.29.6 /config.yaml \
--licenseFile=/license
--watch
```

View File

@@ -395,7 +395,7 @@ services:
restart: always
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.6
depends_on:
- "victoriametrics"
ports:

View File

@@ -19,6 +19,7 @@ See also [case studies](https://docs.victoriametrics.com/victoriametrics/casestu
* [Datanami: Why Roblox Picked VictoriaMetrics for Observability Data Overhaul](https://www.hpcwire.com/bigdatawire/2023/05/30/why-roblox-picked-victoriametrics-for-observability-data-overhaul/)
* [Cloudflare: Introducing notifications for HTTP Traffic Anomalies](https://blog.cloudflare.com/introducing-http-traffic-anomalies-notifications/)
* [Grammarly: Better, Faster, Cheaper: How Grammarly Improved Monitoring by Over 10x with VictoriaMetrics](https://www.grammarly.com/blog/engineering/monitoring-with-victoriametrics/)
* [Xata: How we rebuilt PostgreSQL branch metrics on VictoriaMetrics, per cell](https://xata.io/blog/how-we-rebuilt-postgresql-branch-metrics-on-victoriametrics-per-cell)
* [CERN: CMS monitoring R&D: Real-time monitoring and alerts](https://indico.cern.ch/event/877333/contributions/3696707/attachments/1972189/3281133/CMS_mon_RD_for_opInt.pdf)
* [CERN: The CMS monitoring infrastructure and applications](https://arxiv.org/pdf/2007.03630.pdf)
* [Forbes: The (Almost) Infinitely Scalable Open Source Monitoring Dream](https://www.forbes.com/sites/adrianbridgwater/2022/08/16/the-almost-infinitely-scalable-open-source-monitoring-dream/)

View File

@@ -30,11 +30,26 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueues` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* BUGFIX: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly expose metric `vm_retention_filters_partitions_scheduled_rows`. See [#11138](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix potential corruption of remote-write metadata `Unit` values. See [#11120](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): push metrics to configured `-pushmetrics.url` on shutdown when migration fails. Previously, metrics were not pushed if vmctl exited with an error. See [#11081](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11081). Thanks to @zasdaym for contribution.
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): disallow restoring parts outside the configured `-storageDataPath` directory. See [710c920d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/710c920d6083327042a309e449fae4383617d817).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): correctly apply long tenant filters. Previously, such filters could be truncated, causing tenants to be matched incorrectly. See [#11096](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11096). Thanks for @fxrlv for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): don't cache empty responses for tenant IDs discovery during [multitenant queries](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenant-reads). This problem was visible during integration tests when multitenant queries were executed before the first ingestion happened. See [#10982](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10982)
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -76,7 +76,7 @@ It is better to substitute the slow recording rule with the following [stream ag
outputs: [rate_sum]
```
> Field `interval` should be set to a value at least several times higher than the matched metrics collection interval.
> It is recommended to set the `interval` field to a value at least 2 times the matched metrics collection interval.
This stream aggregation generates `http_request_duration_seconds_bucket:1m_without_instance_rate_sum` output series according to [output metric naming](#output-metric-names).
Then these series can be used in [alerting rules](https://docs.victoriametrics.com/victoriametrics/vmalert/#alerting-rules):
@@ -396,7 +396,7 @@ before sending them to the configured `-remoteWrite.url`. The deduplication can
Labels can be dropped before deduplication is applied. See [these docs](#dropping-unneeded-labels).
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation.
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation. The dropped old samples can be tracked with the `vm_streamaggr_dedup_dropped_samples_total` metric.
# Relabeling
@@ -444,7 +444,9 @@ outside the current [aggregation interval](https://docs.victoriametrics.com/vict
- To enable [aggregation windows](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` metric.
- To enable [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` and `vm_streamaggr_dedup_dropped_samples_total` metrics.
## Ignore aggregation intervals on start
@@ -642,9 +644,9 @@ See also [why you shouldn't put an aggregator behind a load balancer](https://do
# Troubleshooting
- [Unexpected spikes for `total` or `increase` outputs](#staleness).
- [Unexpected spikes for `total` or `increase` outputs](#data-delay-and-staleness).
- [Excessively large values for `total*`, `increase*`, and `rate*` outputs](#counter-resets).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#staleness).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#data-delay-and-staleness).
- [High memory usage and CPU usage](#high-resource-usage).
- [Unexpected results in vmagent cluster mode](#cluster-mode).
- [Inaccurate aggregation results for histograms](#aggregation-windows)
@@ -677,11 +679,19 @@ the following settings:
If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, produce values that are significantly higher than anticipated, then check the `vm_streamaggr_counter_resets_total` metric. This metric increments each time when [counter reset event](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) happens and could be caused by duplication or collision of raw samples. If you observe duplication or collision, try solving this problem by either fixing the source of these metrics or by [deduplicating](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication) these samples before aggregation.
## Staleness
## Data delay and staleness {#staleness}
The following outputs track the last seen per-series values in order to properly calculate output values:
Stream aggregation processes input samples in a streaming manner and flushes results once per specified `interval`. Because of this, aggregation results can be heavily affected by data delays (see `vm_streamaggr_samples_lag_seconds_bucket` metric).
In particular:
1. Stream aggregation won't produce results if input samples are delayed for multiple aggregation intervals, causing gaps in the output.
2. Delayed and out-of-order samples can inflate or skew correctness of aggregation results.
Dropping delayed samples can result in missed observations in the results, while keeping delayed samples may inflate the results. It is up to the user to decide what they prefer in the produced results:
1. If you prefer consistency in aggregation results and do not want delayed data to affect the next aggregation window, drop all potentially delayed samples via [ignore_old_samples](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples).
2. If you prefer to have the accumulated changes from delayed data reflected in aggregation windows after the delay, increase `staleness_interval` in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config).
This is especially important for outputs that track the last seen per-series values in order to properly calculate output values:
- [histogram_bucket](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#histogram_bucket)
- [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase)
- [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus)
- [rate_avg](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#rate_avg)
@@ -689,21 +699,19 @@ The following outputs track the last seen per-series values in order to properly
- [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total)
- [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus)
The last seen per-series value is dropped if no new samples are received for the given time series during two consecutive aggregations
intervals specified in [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config) via `interval` option.
For these outputs, the last seen per-series value is dropped if no new samples are received for the given time series during consecutive aggregation intervals specified in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config) via `interval` option.
If a new sample for the existing time series is received after that, then it is treated as the first sample for a new time series.
This may lead to the following issues:
This may lead to the following issues when data is delayed:
- Lower than expected results for [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus) and [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus) outputs,
since they ignore the first sample in a new time series.
- Unexpected spikes for [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total) and [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase) outputs, since they assume that new time series start from 0.
- [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total) and [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase) may produce unexpected spikes, since they assume that a new time series starts from `0`.
- [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus) and [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus) may produce lower than expected results, if you expect to see the accumulated changes reflected after the delay, since they ignore the first sample in a new time series.
These issues can be fixed in the following ways:
These issues can be improved in the following ways:
- By increasing the `interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines.
- By specifying the `staleness_interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `2 x interval`.
delays in data ingestion pipelines. It is recommended to set `interval` to at least 2× the scrape or push interval of the input. Set it to a higher value if the input pipeline is prone to large delays.
- By increasing the `staleness_interval` option in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `interval`.
## High resource usage

View File

@@ -66,6 +66,8 @@ specified individually per each `-remoteWrite.url`:
# interval is the interval for the aggregation.
# The aggregated stats is sent to remote storage once per interval.
# It is recommended to set `interval` to at least 2× the scrape or push interval of the input.
# Set it to a higher value if the input pipeline is prone to large delays.
#
interval: 1m
@@ -94,7 +96,7 @@ specified individually per each `-remoteWrite.url`:
# - total_prometheus
# See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness for more details.
#
# staleness_interval: 2m
# staleness_interval: 1m
# ignore_first_sample_interval specifies the interval after which the agent begins sending samples.
# By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
@@ -291,9 +293,6 @@ The results of `histogram_bucket` is equal to the following [MetricsQL](https://
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
```
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) option.
See also:
- [quantiles](#quantiles)
- [avg](#avg)
@@ -507,6 +506,19 @@ See also:
- [count_samples](#count_samples)
- [count_series](#count_series)
### `sum_samples_total`
`sum_samples_total` sums input delta values into a cumulative [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter) and outputs the result at the given `interval`.
`sum_samples_total` makes sense only for aggregating delta values from clients such as [StatsD counter](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting).
The results of `sum_samples_total` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/) query:
```metricsql
sum(running_sum(some_delta_values))
```
>Note: The aggregator will forget the cumulative counter if it has not seen input samples for `staleness_interval`(set to `interval` by default) per output result, so the output counter will start from `0` the next time it sees the input again. Increase the `staleness_interval` option if you want to extend the window to tolerate bigger gaps.
### total
`total` generates output [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) by summing the input counters over the given `interval`.

View File

@@ -797,6 +797,12 @@ For example, the following commands spread scrape targets among a cluster of two
The `-promscrape.cluster.memberNum` can be set to a StatefulSet pod name when `vmagent` runs in Kubernetes.
The pod name must end with a number in the range `0 ... promscrape.cluster.membersCount-1`. For example, `-promscrape.cluster.memberNum=vmagent-0`.
By default, targets are sharded among `vmagent` instances by all target labels after relabeling.
Use `-promscrape.cluster.shardByLabels` {{% available_from "v1.146.0" %}} to shard targets by specified labels instead.
For example, with `-promscrape.cluster.shardByLabels=service`, the targets with the same `service` label value will be scraped by the same `vmagent` instance,
which is useful when perform stream aggregation that requires all metrics with the same `service` label value to be processed on the same `vmagent` instance.
If none of the specified labels are present in the target labels, then all target labels will be used for sharding.
By default, each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances,
then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands
start a cluster of three `vmagent` instances, where two `vmagent` instances scrape each target:
@@ -928,6 +934,29 @@ vmagent will generate the following persistent queue folders:
2_0AAFDF53E314A72A
```
### On-disk persistence and data processing order
By default, vmagent processes data in FIFO order. If data has been written to the on-disk queue,
it must be flushed to the remote storage before newly ingested data can be forwarded there.
During long outages, vmagent may accumulate large amounts of data in the file-based queue,
which can introduce a significant lag between the moment data is collected by vmagent and the
moment it becomes visible at the remote storage.
This behavior can be changed with the `-remoteWrite.inmemoryQueues` {{% available_from "v1.146.0" %}} command-line flag.
When set to a non-zero value, vmagent starts the given number of additional workers,
which send only recently ingested data from the in-memory queue, while the workers configured via `-remoteWrite.queues` drain the file-based backlog concurrently.
This reduces the delivery lag for fresh samples after remote storage outages or slowdowns. The flag can be set individually per each `-remoteWrite.url`.
Note that these workers are started in addition to the workers configured via `-remoteWrite.queues`, so the total number of concurrent connections to
the remote storage becomes the sum of both flags. Take this into account if the remote storage limits the number of concurrent requests.
This flag has the following possible limitations:
* Samples may arrive at the remote storage out of order, since recent data can be delivered before the older backlogged data.
Do not use this option if the remote storage doesn't accept out-of-order samples.
* Recent data isn't guaranteed to take the fast path: if the in-memory queue is full,
newly ingested data is still written to the file-based queue and is delivered in FIFO order by the generic workers.
### Disabling On-disk persistence
There are cases when it is better to disable on-disk persistence for pending data on the `vmagent` side:

2
go.mod
View File

@@ -10,7 +10,7 @@ require (
github.com/VictoriaMetrics/VictoriaLogs v1.121.1-0.20260616132739-c901a1e31cb3
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.43.2
github.com/VictoriaMetrics/metrics v1.44.0
github.com/VictoriaMetrics/metricsql v0.87.1
github.com/aws/aws-sdk-go-v2 v1.42.0
github.com/aws/aws-sdk-go-v2/config v1.32.25

2
go.sum
View File

@@ -62,6 +62,8 @@ github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8F
github.com/VictoriaMetrics/fastcache v1.13.3/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/VictoriaMetrics/metrics v1.43.2 h1:+8pIQEGwchKS5CYFyvv3LKvNXGi7baZ9hmIV4RHqibY=
github.com/VictoriaMetrics/metrics v1.43.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metrics v1.44.0 h1:Fr8yqQSV+ZfYaDD/anqk1E8e9YPgfleSleJmAI0M0Tw=
github.com/VictoriaMetrics/metrics v1.44.0/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metricsql v0.87.1 h1:GdIblCDgXsrBJcBSDtFT8SLK7P+QHijdQmcr4L/f0Go=
github.com/VictoriaMetrics/metricsql v0.87.1/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=

View File

@@ -91,6 +91,11 @@ func (r *Restore) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
for _, srcPart := range srcParts {
if !srcPart.IsLocalPathInsideDir(r.Dst.Dir) {
return fmt.Errorf("part file %s would be written outside storage directory %s", srcPart.Path, r.Dst.Dir)
}
}
logger.Infof("obtaining list of parts at %s", dst)
dstParts, err := dst.ListParts()
if err != nil {

View File

@@ -120,6 +120,17 @@ func (p *Part) ParseFromRemotePath(remotePath string) bool {
return true
}
// IsLocalPathInsideDir returns true if the part's local path resolves inside dir.
// It resolves ../../ sequences and prevents path traversal outside dir.
func (p *Part) IsLocalPathInsideDir(dir string) bool {
dir = filepath.Clean(dir)
if dir == `/` {
return true
}
return strings.HasPrefix(p.LocalPath(dir), dir+string(filepath.Separator))
}
// MaxPartSize is the maximum size for each part.
//
// The MaxPartSize reduces bandwidth usage during retires on network errors

View File

@@ -0,0 +1,54 @@
package common
import (
"testing"
)
func TestIsLocalPathInsideDir(t *testing.T) {
f := func(dir, path string, expected bool) {
t.Helper()
p := Part{Path: path}
if got := p.IsLocalPathInsideDir(dir); got != expected {
t.Fatalf("IsLocalPathInsideDir(%q, %q): got %v, want %v", dir, path, got, expected)
}
}
// normal path inside dir
f("/data/storage", "parts/segment1/data.bin", true)
// dir with trailing slash is normalized
f("/data/storage/", "parts/segment1/data.bin", true)
// deeply nested path
f("/data/storage", "a/b/c/d/e/file.dat", true)
// traversal that stays inside dir
f("/data/storage", "foo/../bar/file.dat", true)
// root dir allows any path
f("/", "any/path/here", true)
// root dir allows traversal attempts since nothing is outside /
f("/", "../outside/marker.txt", true)
// path with leading slash is treated as relative by filepath.Join and stays inside dir
f("/data/storage", "/outside/marker.txt", true)
// dir with .. components is normalized; path inside resolved dir
f("/data/storage/../foo", "parts/file.dat", true)
// dir with .. components is normalized; traversal outside resolved dir
f("/data/storage/../foo", "../storage/evil.txt", false)
// simple traversal
f("/data/storage", "../outside/marker.txt", false)
// traversal with trailing slash in dir
f("/data/storage/", "../outside/marker.txt", false)
// deep traversal
f("/data/storage", "a/../../outside/marker.txt", false)
// sibling directory whose name shares a prefix with dir
f("/data/storage", "../storagefoo/evil.txt", false)
}

View File

@@ -129,6 +129,10 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) {
// On platforms with preallocation, writes go to a .tmp file that must be
// finalized with FinalizeFile.
func (fs *FS) NewDirectWriteCloser(p common.Part) (io.WriteCloser, error) {
if !p.IsLocalPathInsideDir(fs.Dir) {
logger.Fatalf("BUG: part file %s would be written outside storage directory %s", p.Path, fs.Dir)
}
path := fs.writePath(p)
if err := fs.mkdirAll(path); err != nil {
return nil, err

View File

@@ -179,6 +179,9 @@ func tryRemoveDir(dirPath string) bool {
// times simultaneously and properly close it, fs caching may still
// confuse NFS client.
if err := os.RemoveAll(dirEntryPath); err != nil {
if os.IsNotExist(err) {
return
}
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
@@ -203,8 +206,9 @@ func tryRemoveDir(dirPath string) bool {
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
MustRemovePath(deleteFilePath)
if !tryRemovePath(deleteFilePath) {
return false
}
// Sync the directory after the removing deletDirFilename file in order to make sure
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
@@ -212,7 +216,9 @@ func tryRemoveDir(dirPath string) bool {
MustSyncPath(dirPath)
// Remove the dirPath itself
MustRemovePath(dirPath)
if !tryRemovePath(dirPath) {
return false
}
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
@@ -221,6 +227,23 @@ func tryRemoveDir(dirPath string) bool {
return true
}
// tryRemovePath removes given path and returns true on success
// or false if error is temporary NFS error
func tryRemovePath(path string) bool {
if err := os.Remove(path); err != nil {
if os.IsNotExist(err) {
return true
}
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", path, err)
}
nfsDirRemoveFailedAttempts.Inc()
return false
}
return true
}
var (
dirRemoverWG sync.WaitGroup
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)

View File

@@ -154,33 +154,29 @@ func (b *body) parse(src string) error {
vaObject := jv.Get("vm_access")
if vaObject == nil {
b.vmAccessClaim = VMAccessClaim{}
//return ErrVMAccessFieldMissing
} else {
// some IDPs encode custom claims as a string
// try parsing as an object and fallback to a string
switch vaObject.Type() {
case fastjson.TypeObject:
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
return err
}
case fastjson.TypeString:
b.claimsParser = parserPool.Get()
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
if err != nil {
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
}
if err := b.vmAccessClaim.parseFrom(va); err != nil {
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
}
b.vmAccessClaimObject = va
case fastjson.TypeNull:
b.vmAccessClaim = VMAccessClaim{}
//return ErrVMAccessFieldMissing
default:
b.vmAccessClaim = VMAccessClaim{}
//return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
return ErrVMAccessFieldMissing
}
// some IDPs encode custom claims as a string
// try parsing as an object and fallback to a string
switch vaObject.Type() {
case fastjson.TypeObject:
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
return err
}
case fastjson.TypeString:
b.claimsParser = parserPool.Get()
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
if err != nil {
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
}
if err := b.vmAccessClaim.parseFrom(va); err != nil {
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
}
b.vmAccessClaimObject = va
case fastjson.TypeNull:
return ErrVMAccessFieldMissing
default:
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
}
b.Jti = bytesutil.ToUnsafeString(jv.GetStringBytes("jti"))

View File

@@ -26,6 +26,8 @@ type FastQueue struct {
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
prioritizeInMemoryData bool
// pq is file-based queue
pq *queue
@@ -39,6 +41,31 @@ type FastQueue struct {
stopDeadline uint64
}
// OpenFastQueueOpts defines options for FastQueue
type OpenFastQueueOpts struct {
// MaxInmemoryBlocks defines amount of blocks to hold in memory before falling back to file-based persistence.
MaxInmemoryBlocks int
// MaxPendingBytes limits file-based size of the queue.
// If MaxPendingBytes is 0, then the queue size is unlimited.
// The oldest data is dropped when the queue
// reaches MaxPendingSize.
MaxPendingBytes int64
// IsPQDisabled defines whether file-based queue could be used.
// If it is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
IsPQDisabled bool
// PrioritizeInMemoryData instructs FastQueue to write data into the in-memory queue
// even if the file-based queue is not empty.
// This is useful when data order doesn't matter and getting the most recent data
// as fast as possible is more important.
PrioritizeInmemoryData bool
}
// MustOpenFastQueueWithOpts opens persistent queue at the given path with given opts
func MustOpenFastQueueWithOpts(path, name string, opts OpenFastQueueOpts) *FastQueue {
return mustOpenFastQueue(path, name, opts)
}
// MustOpenFastQueue opens persistent queue at the given path.
//
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
@@ -49,11 +76,22 @@ type FastQueue struct {
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
opts := OpenFastQueueOpts{
MaxInmemoryBlocks: maxInmemoryBlocks,
MaxPendingBytes: maxPendingBytes,
IsPQDisabled: isPQDisabled,
}
return mustOpenFastQueue(path, name, opts)
}
func mustOpenFastQueue(path, name string, opts OpenFastQueueOpts) *FastQueue {
maxPendingBytes := opts.MaxPendingBytes
isPQDisabled := opts.IsPQDisabled
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
pq: pq,
isPQDisabled: isPQDisabled,
prioritizeInMemoryData: opts.PrioritizeInmemoryData,
ch: make(chan *bytesutil.ByteBuffer, opts.MaxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
@@ -81,7 +119,7 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes
if isPQDisabled {
persistenceStatus = "disabled"
}
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, maxInmemoryBlocks, pendingBytes, persistenceStatus)
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, opts.MaxInmemoryBlocks, pendingBytes, persistenceStatus)
return fq
}
@@ -97,7 +135,7 @@ func (fq *FastQueue) IsWriteBlocked() bool {
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
return len(fq.ch) == cap(fq.ch) || (fq.pq.GetPendingBytes() > 0 && !fq.prioritizeInMemoryData)
}
// UnblockAllReaders unblocks all the readers.
@@ -193,19 +231,24 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
defer fq.mu.Unlock()
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
// fast path: there is pending data at file-based queue,
// it must be drained before in-memory queue could be used.
// File-based queue could be non-empty after vmagent restart
// and vmagent couldn't flush in-memory queue during shutdown.
return false
}
if !fq.prioritizeInMemoryData {
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
fq.pq.MustWriteBlock(block)
return true
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
@@ -216,7 +259,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
@@ -229,12 +272,41 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
}
// MustReadBlock reads the next block from fq into dst and returns it.
// It first reads from the in-memory queue, then checks file-based queue.
// It first reads from the file-based queue, then checks in-memory queue.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.pq.ResetIfEmpty()
fq.cond.Wait()
}
}
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
@@ -242,19 +314,10 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.pq.ResetIfEmpty()
fq.cond.Wait()
}
}
@@ -277,9 +340,6 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()

View File

@@ -364,3 +364,64 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
fq.MustClose()
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithPrioritizeInmemory(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq-force-write"
fs.MustRemoveDir(path)
capacity := 20
opts := OpenFastQueueOpts{
MaxInmemoryBlocks: capacity,
PrioritizeInmemoryData: true,
}
fq := MustOpenFastQueueWithOpts(path, "foobar", opts)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := range capacity {
block := fmt.Sprintf("block %d", i)
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if n := fq.GetInmemoryQueueLen(); n != capacity {
t.Fatalf("unexpected non-zero inmemory queue size: %d: %d", n, capacity)
}
for i := range capacity {
block := fmt.Sprintf("block %d-%d", i, i)
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
// in case of capacity exceed last element is written into file-based queue
if n := fq.GetInmemoryQueueLen(); n != capacity-1 {
t.Fatalf("unexpected non-zero inmemory queue size: %d: %d", n, capacity)
}
// make sure that recently ingested elemements returned first
for idx := capacity + 1; idx < capacity*2; idx++ {
buf, ok := fq.MustReadInMemoryBlockBlocking(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != blocks[idx] {
t.Fatalf("unexpected block read; got %q; want %q: %d", buf, blocks[idx], idx)
}
}
blocks = blocks[:capacity+1]
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
fs.MustRemoveDir(path)
}

View File

@@ -50,6 +50,17 @@ func (ie *IfExpression) Parse(s string) error {
return nil
}
// ParseFromMetricExpr parses if from given MetricExpr
func (ie *IfExpression) ParseFromMetricExpr(me *metricsql.MetricExpr) error {
var ieLocal ifExpression
if err := ieLocal.parseFromMetricExpr(me); err != nil {
return err
}
ie.ies = []*ifExpression{&ieLocal}
return nil
}
// UnmarshalJSON unmarshals ie from JSON data.
func (ie *IfExpression) UnmarshalJSON(data []byte) error {
var v any
@@ -182,6 +193,16 @@ func (ie *ifExpression) Parse(s string) error {
return nil
}
func (ie *ifExpression) parseFromMetricExpr(me *metricsql.MetricExpr) error {
lfss, err := metricExprToLabelFilterss(me)
if err != nil {
return fmt.Errorf("cannot parse series selector: %w", err)
}
ie.s = string(me.AppendString(nil))
ie.lfss = lfss
return nil
}
// UnmarshalJSON unmarshals ie from JSON data.
func (ie *ifExpression) UnmarshalJSON(data []byte) error {
var s string

View File

@@ -76,6 +76,9 @@ var (
"Every %d occurrence in the template is substituted with -promscrape.cluster.memberNum at urls to vmagent instances responsible for scraping the given target "+
"at /service-discovery page. For example -promscrape.cluster.memberURLTemplate='http://vmagent-%d:8429/targets'. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more details")
clusterShardByLabels = flagutil.NewArrayString("promscrape.cluster.shardByLabels", "Optional list of target labels, which will be used for sharding targets among cluster members "+
"if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
@@ -86,7 +89,10 @@ var (
"Bigger uncompressed responses are rejected. See also max_scrape_size option at https://docs.victoriametrics.com/victoriametrics/sd_configs/#scrape_configs")
)
var clusterMemberID int
var (
clusterMemberID int
clusterShardByLabelsSorted []string
)
func mustInitClusterMemberID() {
s := *clusterMemberNum
@@ -110,6 +116,15 @@ func mustInitClusterMemberID() {
clusterMemberID = n
}
func initClusterShardByLabels() {
if len(*clusterShardByLabels) == 0 {
clusterShardByLabelsSorted = nil
return
}
clusterShardByLabelsSorted = slices.Clone(*clusterShardByLabels)
slices.Sort(clusterShardByLabelsSorted)
}
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
Global GlobalConfig `yaml:"global,omitempty"`
@@ -1138,12 +1153,28 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
}
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels) []byte {
for _, label := range labels.GetLabels() {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
originalDstLen := len(dst)
for _, targetLabelName := range clusterShardByLabelsSorted {
for _, label := range labels.GetLabels() {
if label.Name == targetLabelName {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
break
}
}
}
// Use all labels to compute the key if `promscrape.cluster.shardByLabels` is not configured
if len(dst) == originalDstLen {
for _, label := range labels.GetLabels() {
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
}
return dst
}
return dst
}

View File

@@ -148,6 +148,77 @@ func TestGetClusterMemberNumsForScrapeWork(t *testing.T) {
f("foo", 3, 2, []int{2, 0})
}
func TestAppendScrapeWorkKeyShardByLabels(t *testing.T) {
f := func(labels map[string]string, shardByLabels []string, expectedKey string) {
t.Helper()
originValue := *clusterShardByLabels
*clusterShardByLabels = shardByLabels
defer func() {
*clusterShardByLabels = originValue
}()
initClusterShardByLabels()
outputKey := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labels)))
if expectedKey != outputKey {
t.Fatalf("unexpected sharding key:%q for target labels:%v with shardByLabels=%q, expect: %q",
outputKey, labels, shardByLabels, expectedKey)
}
}
// didn't specify -promscrape.cluster.shardByLabels, so all labels will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{},
"a=aa,b=bb,c=cc,d=dd,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"a", "c"},
"a=aa,c=cc,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding even if they're not in order in -promscrape.cluster.shardByLabels.
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"c", "a"},
"a=aa,c=cc,",
)
// match part of labels in -promscrape.cluster.shardByLabels, label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"c": "cc",
"d": "dd"},
[]string{"a", "b", "c"},
"a=aa,c=cc,",
)
// none of labels in -promscrape.cluster.shardByLabels is matched, so all labels will be used for sharding
f(
map[string]string{
"d": "dd",
"e": "ee"},
[]string{"a", "b", "c"},
"d=dd,e=ee,",
)
}
func TestLoadStaticConfigs(t *testing.T) {
scs, err := loadStaticConfigs("testdata/file_sd.json")
if err != nil {

View File

@@ -66,6 +66,7 @@ func CheckConfig() error {
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
mustInitClusterMemberID()
initClusterShardByLabels()
globalStopChan = make(chan struct{})
scraperWG.Go(func() {
runScraper(*promscrapeConfigFile, pushData, globalStopChan)

View File

@@ -1,6 +1,7 @@
package streamaggr
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
@@ -17,9 +18,10 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
droppedSamples *metrics.Counter
}
type dedupAggrShard struct {
@@ -47,10 +49,20 @@ type dedupAggrSample struct {
timestamp int64
}
func newDedupAggr() *dedupAggr {
return &dedupAggr{
shards: make([]dedupAggrShard, dedupAggrShardsCount),
}
func newDedupAggr(ms *metrics.Set, metricLabels string) *dedupAggr {
var d dedupAggr
d.shards = make([]dedupAggrShard, dedupAggrShardsCount)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.itemsCount())
})
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.droppedSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_dropped_samples_total{%s}`, metricLabels))
return &d
}
func (da *dedupAggr) sizeBytes() uint64 {
@@ -87,7 +99,8 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples, isGreen)
deduplicatedSamples := da.shards[i].pushSamples(shardSamples, isGreen)
da.droppedSamples.Add(deduplicatedSamples)
}
putPerShardSamples(pss)
}
@@ -167,8 +180,9 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
var state *dedupAggrState
var deduplicatedSamples int
if isGreen {
state = &das.green
@@ -198,8 +212,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
deduplicatedSamples++
}
state.samplesBuf = samplesBuf
return deduplicatedSamples
}
// deduplicateSamples returns deduplicated timestamp and value results.

View File

@@ -7,11 +7,13 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample)
@@ -59,7 +61,7 @@ func TestDedupAggrSerial(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
var wg sync.WaitGroup
for range concurrency {

View File

@@ -5,6 +5,8 @@ import (
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -23,7 +25,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
da := newDedupAggr(metrics.NewSet(), "")
b.ResetTimer()
b.ReportAllocs()

View File

@@ -44,7 +44,6 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
interval: interval,
enableWindows: enableWindows,
@@ -64,16 +63,7 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.da = newDedupAggr(ms, metricLabels)
metrics.RegisterSet(ms)
@@ -120,6 +110,7 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
d.da.droppedSamples.Inc()
continue
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{

View File

@@ -31,12 +31,16 @@ type increaseAggrValue struct {
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
if av.total == nil {
av.total = new(float64)
}
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
if av.total == nil {
av.total = new(float64)
// The last value is stale, reset it.
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
ok = false
}
if ok {
if sample.timestamp < lv.timestamp {

View File

@@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
var rateAggrSharedValuePool sync.Pool
@@ -99,6 +100,12 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
ac := c.(*rateAggrConfig)
var state *rateAggrStateValue
sv, ok := av.shared[key]
// The last value is stale, reset it.
if ok && sv.deleteDeadline < int64(fasttime.UnixTimestamp())*1000 {
delete(av.shared, key)
putRateAggrSharedValue(sv)
ok = false
}
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {

View File

@@ -43,6 +43,7 @@ var supportedOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",
@@ -172,12 +173,12 @@ type Config struct {
DedupInterval string `yaml:"dedup_interval,omitempty"`
// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus, rate_avg and rate_sum.
StalenessInterval string `yaml:"staleness_interval,omitempty"`
// IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.
// By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
// This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket.
// This parameter is relevant only for the following outputs: total, total_prometheus, increase and increase_prometheus.
IgnoreFirstSampleInterval string `yaml:"ignore_first_sample_interval,omitempty"`
// Outputs is a list of output aggregate functions to produce.
@@ -501,8 +502,9 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return nil, fmt.Errorf("interval=%s must be a multiple of dedup_interval=%s", interval, dedupInterval)
}
// check cfg.StalenessInterval
stalenessInterval := interval * 2
// set the default staleness interval as the aggregation interval, to be consistent with query lookbehind window in metricsQL,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102
stalenessInterval := interval
if cfg.StalenessInterval != "" {
stalenessInterval, err = time.ParseDuration(cfg.StalenessInterval)
if err != nil {
@@ -668,18 +670,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
if dedupInterval > 0 {
a.da = newDedupAggr()
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount()
return float64(n)
})
a.da = newDedupAggr(ms, metricLabels)
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
@@ -780,7 +771,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "stdvar":
return newStdvarAggrConfig(), nil
case "sum_samples":
return newSumSamplesAggrConfig(), nil
return newSumSamplesAggrConfig(true), nil
case "sum_samples_total":
return newSumSamplesAggrConfig(false), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":

View File

@@ -1,3 +1,5 @@
//go:build synctest
package streamaggr
import (
@@ -475,26 +477,125 @@ foo:1m_increase_prometheus{baz="qwe"} 15
outputs: [increase_prometheus]
`, "11111111")
// multiple aggregate configs
// increase, increase_prometheus, total, total_prometheus outputs with different staleness intervals
f([]string{`
foo 5
bar 200
`, `
foo 10
bar 201
`, ``, `
foo 7
bar 205
`}, time.Minute, `bar:1m_increase 200
bar:1m_increase 1
bar:1m_increase 205
bar:1m_increase_prometheus 0
bar:1m_increase_prometheus 1
bar:1m_increase_prometheus 0
bar:1m_total 200
bar:1m_total 201
bar:1m_total 205
bar:1m_total_prometheus 0
bar:1m_total_prometheus 1
bar:1m_total_prometheus 0
bar:1m_without_non_existing_label_increase 0
bar:1m_without_non_existing_label_increase 1
bar:1m_without_non_existing_label_increase 4
bar:1m_without_non_existing_label_increase_prometheus 0
bar:1m_without_non_existing_label_increase_prometheus 1
bar:1m_without_non_existing_label_increase_prometheus 4
bar:1m_without_non_existing_label_total 0
bar:1m_without_non_existing_label_total 1
bar:1m_without_non_existing_label_total 1
bar:1m_without_non_existing_label_total 5
bar:1m_without_non_existing_label_total_prometheus 0
bar:1m_without_non_existing_label_total_prometheus 1
bar:1m_without_non_existing_label_total_prometheus 1
bar:1m_without_non_existing_label_total_prometheus 5
foo:1m_increase 5
foo:1m_increase 5
foo:1m_increase 7
foo:1m_increase_prometheus 0
foo:1m_increase_prometheus 5
foo:1m_increase_prometheus 0
foo:1m_total 5
foo:1m_total 10
foo:1m_total 7
foo:1m_total_prometheus 0
foo:1m_total_prometheus 5
foo:1m_total_prometheus 0
foo:1m_without_non_existing_label_increase 0
foo:1m_without_non_existing_label_increase 5
foo:1m_without_non_existing_label_increase 7
foo:1m_without_non_existing_label_increase_prometheus 0
foo:1m_without_non_existing_label_increase_prometheus 5
foo:1m_without_non_existing_label_increase_prometheus 7
foo:1m_without_non_existing_label_total 0
foo:1m_without_non_existing_label_total 5
foo:1m_without_non_existing_label_total 5
foo:1m_without_non_existing_label_total 12
foo:1m_without_non_existing_label_total_prometheus 0
foo:1m_without_non_existing_label_total_prometheus 5
foo:1m_without_non_existing_label_total_prometheus 5
foo:1m_without_non_existing_label_total_prometheus 12
`, `
- interval: 1m
ignore_first_sample_interval: 0s
outputs: [increase, increase_prometheus, total, total_prometheus]
- interval: 1m
staleness_interval: 2m
without: [non_existing_label]
outputs: [increase, increase_prometheus, total, total_prometheus]
`, "111111")
// sum_sample and sum_samples_total outputs with different staleness intervals
f([]string{`
foo 1
foo 2 1
foo{bar="baz"} 2
foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
`, `
foo 4
`, ``, ``, `
foo 6
`, ``, ``}, time.Minute, `foo:1m_sum_samples 3
foo:1m_sum_samples 4
foo:1m_sum_samples 6
foo:1m_sum_samples_total 3
foo:1m_sum_samples_total 7
foo:1m_sum_samples_total 6
foo:1m_sum_samples_total{bar="baz"} 2
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:1m_without_non-existing-label_sum_samples 3
foo:1m_without_non-existing-label_sum_samples 4
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples 6
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples_total 3
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 0
foo:5m_by_bar_sum_samples 13
foo:5m_by_bar_sum_samples_total 13
foo:5m_by_bar_sum_samples_total{bar="baz"} 2
foo:5m_by_bar_sum_samples{bar="baz"} 2
`, `
- interval: 1m
outputs: [count_series, sum_samples]
staleness_interval: 1m
outputs: [ sum_samples, sum_samples_total]
- interval: 1m
staleness_interval: 2m
without: [non-existing-label]
outputs: [ sum_samples, sum_samples_total]
- interval: 5m
by: [bar]
outputs: [sum_samples]
`, "111")
outputs: [sum_samples, sum_samples_total]
`, "11111")
// min and max outputs
f([]string{`
@@ -688,30 +789,39 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg, when two aggregation intervals are empty
// test rate_sum and rate_avg with different staleness intervals
f([]string{`
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
foo{abc="456", cde="1"} 7
foo{abc="456", cde="1"} 8 1
foo{abc="777", cde="1"} 8
foo{abc="777", cde="1"} 9 1
`, ``, ``, `
foo{abc="123", cde="1"} 19
foo{abc="123", cde="1"} 20 1
foo{abc="456", cde="1"} 26
foo{abc="456", cde="1"} 27 1
foo{abc="777", cde="1"} 27
foo{abc="777", cde="1"} 28 1
foo{abc="456", cde="1"} 3
foo{abc="456", cde="1"} 4 1
foo{abc="777", cde="1"} 5
foo{abc="777", cde="1"} 6 1
`, ``, `
foo{abc="123", cde="1"} 121
foo{abc="123", cde="1"} 122 1
foo{abc="456", cde="1"} 123
foo{abc="456", cde="1"} 124 1
foo{abc="777", cde="1"} 125
foo{abc="777", cde="1"} 126 1
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_without_abc_rate_avg{cde="1"} 1
foo:1m_without_abc_rate_avg{cde="1"} 1
foo:1m_without_abc_rate_sum{cde="1"} 3
foo:1m_without_abc_rate_sum{cde="1"} 3
`, `
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
enable_windows: true
- interval: 1m
staleness_interval: 2m
without: [abc]
outputs: [rate_sum, rate_avg]
enable_windows: true
`, "111111111111")
// rate_sum and rate_avg with duplicated events

View File

@@ -252,11 +252,15 @@ func TestAggregatorsEqual(t *testing.T) {
}
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
sorted := make([]prompb.TimeSeries, len(tss))
copy(sorted, tss)
sort.SliceStable(sorted, func(i, j int) bool {
return promrelabel.LabelsToString(sorted[i].Labels) < promrelabel.LabelsToString(sorted[j].Labels)
})
a := make([]string, len(sorted))
for i, ts := range sorted {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}

View File

@@ -27,6 +27,7 @@ var benchOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",

View File

@@ -1,27 +1,44 @@
package streamaggr
import (
"math"
)
type sumSamplesAggrValue struct {
sum float64
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if math.Abs(av.sum) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.sum = 0
}
av.sum += sample.value
}
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
func (av *sumSamplesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*sumSamplesAggrConfig)
if ac.resetTotalOnFlush {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
return
}
ctx.appendSeries(key, "sum_samples_total", av.sum)
}
func (*sumSamplesAggrValue) state() any {
return nil
}
func newSumSamplesAggrConfig() aggrConfig {
return &sumSamplesAggrConfig{}
func newSumSamplesAggrConfig(resetTotalOnFlush bool) aggrConfig {
return &sumSamplesAggrConfig{
resetTotalOnFlush: resetTotalOnFlush,
}
}
type sumSamplesAggrConfig struct{}
type sumSamplesAggrConfig struct {
resetTotalOnFlush bool
}
func (*sumSamplesAggrConfig) getValue(_ any) aggrValue {
return &sumSamplesAggrValue{}

View File

@@ -31,7 +31,11 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
if ok || keepFirstSample {
// The last value is stale, reset it.
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
ok = false
}
if ok {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
@@ -43,6 +47,8 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp

208
sso.md
View File

@@ -1,208 +0,0 @@
# SSO (OpenID Connect) for vmauth
## Requirements
* SSO should only support OpenID Connect (authorization code flow).
* Keep implementation as simple as possible.
* Avoid external dependencies — use only stdlib `net/http`, `crypto/hmac`, `encoding/json`, etc.
* SSO must be coupled to existing JWT logic: after OIDC callback, the id\_token is stored in a cookie and fed into the existing JWT user-matching pipeline on subsequent requests.
* SSO should be implemented as a standalone feature in `app/vmauth/sso.go`.
* Attempt SSO login only if:
1. No existing credentials matched (bearer, basic, JWT), AND
2. The request Host is listed in the `sso:` config section.
* Validate the OIDC callback (`state`, `code`, token exchange, id\_token signature) before setting the session cookie.
* Both cookie-based SSO sessions and all existing credentials (bearer, basic, JWT) work simultaneously — SSO is purely additive.
---
## Config
```yaml
# vmauth.yaml
sso:
foo.com:
openid_connect:
issuer: https://accounts.google.com # OIDC discovery base URL
client_id: <client_id>
client_secret: <client_secret>
redirect_url: https://foo.com/_vmauth/sso/callback # optional; derived from Host if omitted
scopes: [openid, email, profile] # optional; default: [openid]
bar.com:
openid_connect:
issuer: https://login.microsoftonline.com/<tenant>/v2.0
client_id: <client_id>
client_secret: <client_secret>
```
---
## Architecture
### New file: `app/vmauth/sso.go`
Responsibilities:
- Config structs (`SSOConfig`, `SSOHostConfig`, `OIDCConnectConfig`).
- OIDC discovery: fetch `{issuer}/.well-known/openid-configuration` → extract `authorization_endpoint`, `token_endpoint`, `jwks_uri`.
- `initiateSSOLogin(w, r, cfg)` — redirect browser to OIDC authorization URL with `state` + `nonce` (CSRF).
- `handleSSOCallback(w, r, cfg)` — validate `state`, exchange `code` for tokens, validate `id_token`, set signed session cookie, redirect to original URL.
- `ssoAuthTokenFromRequest(r)` — extract `id_token` from the session cookie and return it as an auth token string so that the existing JWT pipeline can match it to a configured user.
### Changes to `auth_config.go`
Add `SSO SSOConfig` field to `AuthConfig`:
```go
type AuthConfig struct {
Users []UserInfo `yaml:"users"`
UnauthorizedUser *UserInfo `yaml:"unauthorized_user"`
SSO SSOConfig `yaml:"sso"` // NEW
}
```
`SSOConfig` is `map[string]*SSOHostConfig` (host → config). Parsed during `parseAuthConfig`; OIDC discovery is triggered at parse time (same pattern as `oidcDiscoverer`).
### Changes to `main.go`
**Callback handler** — registered before the main auth flow:
```
if r.URL.Path == "/_vmauth/sso/callback" {
handleSSOCallback(w, r, ssoConfigForHost(r.Host))
return
}
```
**Extended auth token extraction** — after `getAuthTokensFromRequest`:
```
if tok := ssoAuthTokenFromRequest(r); tok != "" {
ats = append(ats, tok)
}
```
This feeds the cookie's `id_token` into the existing `getJWTUserInfo` call with zero duplication.
**SSO login page** — after all existing auth attempts fail and before returning 401:
```
if cfg := ssoConfigForHost(r.Host); cfg != nil {
showSSOLoginPage(w, r, cfg)
return
}
```
`showSSOLoginPage` computes the full OIDC authorization URL (including `state`, `client_id`, `redirect_uri`, `scope`) upfront and writes a minimal HTML page (200 OK) with a single `<a href="{authorizationURL}">Login with SSO</a>` button. The link points directly to the OIDC provider — no intermediate vmauth hop.
---
## OIDC Authorization Code Flow
```
Browser vmauth OIDC Provider
| | |
|-- GET foo.com/app ---------->| |
| |-- no credentials, host in SSO |
| | compute full authz URL with |
| | state, client_id, scopes |
|<-- 200 HTML login page ------| |
| [Login with SSO] href= | |
| https://provider/authorize?... |
| |
|-- (user clicks) GET /authorize?client_id=...&state=X ------>|
|<-- 302 → foo.com/_vmauth/sso/callback?code=Y&state=X -------|
| | |
|-- GET /_vmauth/sso/callback?code=Y&state=X ->| |
| |-- validate state cookie |
| |-- POST /token {code} -------->|
| |<-- {id_token, access_token} --|
| |-- validate id_token JWT |
| | (via OIDC JWKS) |
| |-- set _vmauth_sso cookie |
|<-- 302 → /app (original) ---| |
| | |
|-- GET foo.com/app (with cookie) ->| |
| |-- extract id_token from cookie |
| |-- JWT user matching (existing) |
| |-- proxy to backend |
|<-- 200 OK ------------------| |
```
---
## Session Cookie
- Name: `_vmauth_sso`
- Value: raw OIDC `id_token` (already a signed JWT — no extra wrapping needed)
- Flags: `HttpOnly; Secure; SameSite=Lax; Path=/`
- Expiry: derived from `id_token` `exp` claim
No server-side session store is required. The `id_token` is self-contained and validated on every request via the existing JWT machinery (JWKS signature check, `exp` check).
---
## State / CSRF Protection
State is entirely self-contained in the `state` query parameter — no server-side storage, no per-instance secret. This works correctly when multiple vmauth instances are behind a load balancer.
- On SSO initiation: build `state = base64url(JSON{nonce, originalURL, issuedAt}) + "." + HMAC-SHA256(client_secret, payload)`.
- `nonce` — 16 random bytes (replay protection).
- `originalURL` — so the user is redirected back after login.
- `issuedAt` — Unix timestamp; reject states older than 10 minutes on callback.
- `client_secret` — already shared across all instances via the config file; no extra coordination needed.
- On callback: split `state` on `.`, re-compute HMAC over the payload using `client_secret`, compare in constant time, check `issuedAt` not expired.
---
## OIDC Discovery
Performed once at config load time (same as existing `oidcDiscoverer`):
```
GET {issuer}/.well-known/openid-configuration
→ parse authorization_endpoint, token_endpoint, jwks_uri
```
JWKS is fetched and cached by the existing `oidcDiscovererPool`. The `issuer` value in the SSO config is reused as the JWT `iss` claim for matching with an existing `UserInfo` that has `jwt.oidc.issuer` set — this is how SSO couples to existing users.
---
## Coupling SSO to Existing Users / JWT
The `id_token` returned by the OIDC provider is a JWT. vmauth stores it in a cookie and, on each request, presents it to the existing `getJWTUserInfo` pipeline. That pipeline already:
- Discovers JWKS from the issuer.
- Verifies the signature.
- Checks the `exp` claim.
- Matches `match_claims` patterns.
So a user in `vmauth.yaml` that would normally accept a JWT from that OIDC issuer will automatically accept SSO-authenticated sessions too — no new user-matching logic is needed.
Example: a JWT user config that SSO sessions will match:
```yaml
users:
- name: sso-users
jwt:
oidc:
issuer: https://accounts.google.com
match_claims:
hd: mycompany\.com # only @mycompany.com Google accounts
url_prefix: http://backend:8428
```
---
## Files Changed
| File | Change |
|---|---|
| `app/vmauth/sso.go` | New file — all SSO logic |
| `app/vmauth/auth_config.go` | Add `SSO SSOConfig` to `AuthConfig`; call OIDC discovery in `parseAuthConfig` |
| `app/vmauth/main.go` | Callback route, cookie token injection, SSO redirect fallback |
---
## What is NOT in scope
- OAuth2 implicit / device / client\_credentials flows.
- PKCE (not needed for confidential server-side clients, keeps it simple).
- Logout / RP-initiated logout endpoint.
- Token refresh (user re-authenticates when `id_token` expires).
- Storing sessions in an external store (Redis, DB).

View File

@@ -1,34 +0,0 @@
services:
keycloak:
image: docker.io/keycloak/keycloak:26.1
command:
- start-dev
- --import-realm
ports:
- "8080:8080"
environment:
KC_BOOTSTRAP_ADMIN_USERNAME: admin
KC_BOOTSTRAP_ADMIN_PASSWORD: admin
# Fix the frontend URL so the issuer in JWTs is always http://keycloak:8080
# regardless of how Keycloak is accessed internally vs externally.
KC_HOSTNAME_URL: http://keycloak:8080
volumes:
- ./keycloak-realm.json:/opt/keycloak/data/import/realm.json
victoria-metrics:
image: docker.io/victoriametrics/victoria-metrics:v1.145.0
command:
- -httpListenAddr=0.0.0.0:8429
ports:
- "8429:8429"
vmauth:
image: docker.io/victoriametrics/vmauth:heads-master-0-ged795a8443-dirty-5bb2c38b
ports:
- "8427:8427"
volumes:
- ./vmauth.yaml:/etc/vmauth/config.yaml
command:
- -auth.config=/etc/vmauth/config.yaml
- -httpListenAddr=0.0.0.0:8427
- -logInvalidAuthTokens=true

View File

@@ -1,35 +0,0 @@
{
"realm": "testrealm",
"enabled": true,
"sslRequired": "none",
"clients": [
{
"clientId": "vmauth",
"enabled": true,
"protocol": "openid-connect",
"publicClient": false,
"standardFlowEnabled": true,
"directAccessGrantsEnabled": false,
"serviceAccountsEnabled": false,
"clientAuthenticatorType": "client-secret",
"secret": "vmauth-client-secret",
"redirectUris": [
"http://localhost:8427/_vmauth/sso/callback"
],
"webOrigins": ["*"]
}
],
"users": [
{
"username": "testuser",
"enabled": true,
"credentials": [
{
"type": "password",
"value": "testpassword",
"temporary": false
}
]
}
]
}

View File

@@ -1,21 +0,0 @@
sso:
localhost:
openid_connect:
# Keycloak is accessed by vmauth via Docker DNS ("keycloak:8080").
# KC_HOSTNAME_URL is set to this same value so the issuer in JWTs is
# always http://keycloak:8080 regardless of how Keycloak is accessed.
issuer: http://keycloak:8080/realms/testrealm
client_id: vmauth
client_secret: vmauth-client-secret
redirect_url: http://localhost:8427/_vmauth/sso/callback
scopes: [openid, profile, email]
users:
# Matches id_tokens issued by Keycloak for the testrealm.
# Any user that logs in via SSO and has a valid token will be proxied
# to VictoriaMetrics.
- name: keycloak-users
jwt:
oidc:
issuer: http://keycloak:8080/realms/testrealm
url_prefix: http://victoria-metrics:8429

View File

@@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
@@ -404,16 +405,113 @@ func readPSITotals(cgroupPath, statsName string) (uint64, uint64, error) {
}
func getCgroupV2Path() string {
data, err := ioutil.ReadFile("/proc/self/cgroup")
cgroupData, err := os.ReadFile("/proc/self/cgroup")
if err != nil {
return ""
}
tmp := strings.SplitN(string(data), "::", 2)
if len(tmp) != 2 {
// Read /proc/self/mountinfo with a timeout. Generating the mountinfo contents
// can block in the kernel when a backing filesystem (e.g. a hung NFS or FUSE
// mount) is unresponsive. Since this runs at program init via psiMetricsStart,
// a blocking read would hang startup, so fall back to disabling PSI metrics instead.
mountinfoData, _ := readFileWithTimeout("/proc/self/mountinfo", time.Second)
return getCgroupV2PathInternal(string(cgroupData), mountinfoData)
}
// readFileWithTimeout reads the file at path, returning ("", false) if the read
// doesn't complete within timeout.
//
// A timed-out read leaks the reading goroutine until the read eventually unblocks
// (if ever). This is an acceptable safeguard against a read of a pseudo-file such
// as /proc/self/mountinfo hanging on an unresponsive mount.
func readFileWithTimeout(path string, timeout time.Duration) (string, bool) {
type result struct {
data []byte
err error
}
// The channel is buffered so the goroutine can always send and exit,
// even after this function has returned on timeout.
ch := make(chan result, 1)
go func() {
data, err := os.ReadFile(path)
ch <- result{data: data, err: err}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case r := <-ch:
if r.err != nil {
return "", false
}
return string(r.data), true
case <-timer.C:
return "", false
}
}
func getCgroupV2PathInternal(cgroupData, mountinfoData string) string {
rel := getCgroupV2RelativePath(cgroupData)
if rel == "" {
// The process doesn't run under cgroup v2.
return ""
}
path := "/sys/fs/cgroup" + strings.TrimSpace(tmp[1])
// Drop trailing slash if it exsits. This prevents from '//' in the constructed paths by the caller.
return strings.TrimSuffix(path, "/")
// Determine the actual cgroup v2 mountpoint instead of assuming /sys/fs/cgroup.
// On systems with a hybrid cgroup hierarchy the unified cgroup v2 is mounted
// at a different location such as /sys/fs/cgroup/unified.
// See https://github.com/VictoriaMetrics/metrics/issues/127
mountpoint := getCgroupV2Mountpoint(mountinfoData)
if mountpoint == "" {
// fallback to assumed path
mountpoint = "/sys/fs/cgroup"
}
cgroupPath := path.Join(mountpoint, rel)
// Drop trailing slash if it exists. This prevents from '//' in the constructed paths by the caller.
return strings.TrimSuffix(cgroupPath, "/")
}
// getCgroupV2RelativePath returns the cgroup v2 path of the process relative to
// the cgroup v2 mountpoint, or an empty string if the process doesn't run under cgroup v2.
//
// The cgroup v2 entry in /proc/self/cgroup has an empty controllers field, e.g. "0::/the/path".
// See https://man7.org/linux/man-pages/man7/cgroups.7.html
func getCgroupV2RelativePath(cgroupData string) string {
for _, line := range strings.Split(cgroupData, "\n") {
// Each line has the form "hierarchy-ID:controller-list:cgroup-path".
// The cgroup v2 line has an empty hierarchy-ID and controller-list, i.e. it starts with "0::".
tmp := strings.SplitN(line, "::", 2)
if len(tmp) == 2 && strings.HasPrefix(line, "0::") {
return strings.TrimSpace(tmp[1])
}
}
return ""
}
// getCgroupV2Mountpoint returns the mountpoint of the cgroup v2 (unified) hierarchy
// parsed from the contents of /proc/self/mountinfo, or an empty string if cgroup v2 isn't mounted.
func getCgroupV2Mountpoint(mountinfoData string) string {
for _, line := range strings.Split(mountinfoData, "\n") {
if !strings.Contains(line, "cgroup2") {
// fast path
continue
}
// mountinfo lines have the form:
// 36 35 98:0 / /sys/fs/cgroup/unified rw,... - cgroup2 cgroup2 rw,...
// The optional fields preceding the filesystem type are terminated by " - ".
// See https://man7.org/linux/man-pages/man5/proc_pid_mountinfo.5.html
tmp := strings.SplitN(line, " - ", 2)
if len(tmp) != 2 {
continue
}
after := strings.Fields(tmp[1])
if len(after) < 1 || after[0] != "cgroup2" {
continue
}
before := strings.Fields(tmp[0])
if len(before) < 5 {
continue
}
// before[4] is the mount point.
return before[4]
}
return ""
}

2
vendor/modules.txt vendored
View File

@@ -143,7 +143,7 @@ github.com/VictoriaMetrics/easyproto
# github.com/VictoriaMetrics/fastcache v1.13.3
## explicit; go 1.24.0
github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.43.2
# github.com/VictoriaMetrics/metrics v1.44.0
## explicit; go 1.24.0
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.87.1