Compare commits

..

3 Commits

Author SHA1 Message Date
f41gh7
34a52b519d fixes flaky test 2026-06-05 13:44:48 +02:00
f41gh7
f4b0d9a4a6 address review comments 2026-06-05 11:46:13 +02:00
f41gh7
519b671b7d app/vmagent: prioritise recently ingested data to the queue
This commit adds a dedicated set of workers for processing only
in-memory part of vmagent persistentqueue. It should help to mitigate
an issue when stale data at file-based queue prevents from ingestion
recent data.

 There is a downside -in case of remote storage is not reachable,
it's possible to get only a part of data ingested and other part queued
into file-based queue. But it should be acceptable, because there is no
strong guarantees for the data ingestion order.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833
2026-06-05 10:56:43 +02:00
29 changed files with 4999 additions and 401 deletions

View File

@@ -207,9 +207,13 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
})
for range concurrency {
perWorkersConcurrency := max(concurrency/2, 1)
for range perWorkersConcurrency {
c.wg.Go(c.runWorker)
}
for range perWorkersConcurrency {
c.wg.Go(c.runWorkerForInmemoryQueue)
}
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
}
@@ -348,6 +352,52 @@ func (c *client) runWorker() {
}
}
func (c *client) runWorkerForInmemoryQueue() {
var ok bool
var block []byte
ch := make(chan bool, 1)
for {
block, ok = c.fq.MustReadInMemoryBlockBlocking(block[:0])
if !ok {
return
}
go func() {
startTime := time.Now()
ch <- c.sendBlock(block)
c.sendDuration.Add(time.Since(startTime).Seconds())
}()
select {
case ok := <-ch:
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
case <-c.stopCh:
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
// If it succeeds, drain the remaining in-memory queue before returning.
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
select {
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} else {
c.drainInMemoryQueue(stopCtx, block[:0])
}
case <-stopCtx.Done():
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
return
}
}
}
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
req, err := c.newRequest(url, body)
if err != nil {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -37,11 +37,11 @@
<meta property="og:title" content="UI for VictoriaMetrics">
<meta property="og:url" content="https://victoriametrics.com/">
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
<script type="module" crossorigin src="./assets/index-U3iNn2Tx.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
<link rel="stylesheet" crossorigin href="./assets/index-BL7jEFBa.css">
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>

View File

@@ -296,6 +296,7 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
"-remoteWrite.disableOnDiskQueue=true",
// use only 1 worker to get a full queue faster
"-remoteWrite.queues=1",
"-remoteWrite.flushInterval=1ms",
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
// See initRemoteWriteCtxs function in remotewrite.go for details.
@@ -332,13 +333,21 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Wait until second request got flushed to remote write server
// since there are 2 indepent queues (general and in-memory) with minimal capacity of 1
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
@@ -348,7 +357,7 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
},
)
}
@@ -360,7 +369,7 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 4 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
},
)
}

View File

@@ -130,7 +130,7 @@
"calcs": [
"lastNotNull"
],
"fields": "/^version$/",
"fields": "/^short_version$/",
"values": false
},
"showPercentChange": false,
@@ -146,10 +146,11 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "vm_app_version{job=~\"$job\",instance=~\"$instance\"}",
"format": "table",
"instant": true,
"interval": "",
"legendFormat": "{{short_version}}",
"range": false,
"refId": "A"
}

View File

@@ -791,7 +791,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -789,7 +789,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -131,7 +131,7 @@
"calcs": [
"lastNotNull"
],
"fields": "/^version$/",
"fields": "/^short_version$/",
"values": false
},
"showPercentChange": false,
@@ -147,10 +147,11 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "vm_app_version{job=~\"$job\",instance=~\"$instance\"}",
"format": "table",
"instant": true,
"interval": "",
"legendFormat": "{{short_version}}",
"range": false,
"refId": "A"
}

View File

@@ -792,7 +792,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -790,7 +790,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -785,7 +785,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -566,7 +566,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -492,14 +492,14 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by (job, short_version)",
"format": "table",
"instant": true,
"range": false,
"refId": "A"
}
],
"title": "",
"title": "Version",
"type": "table"
},
{

View File

@@ -784,7 +784,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -565,7 +565,7 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
"format": "table",
"instant": true,
"range": false,

View File

@@ -491,14 +491,14 @@
},
"editorMode": "code",
"exemplar": false,
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by (job, short_version)",
"format": "table",
"instant": true,
"range": false,
"refId": "A"
}
],
"title": "",
"title": "Version",
"type": "table"
},
{

View File

@@ -130,7 +130,7 @@ Released: 2025-11-05
## v1.27.0
Released: 2025-10-31
- FEATURE: Added runtime state compatibility guard for [stateful](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) deployments. The service now persists normalized versions, evaluates an [upgrade/downgrade compatibility matrix](https://docs.victoriametrics.com/anomaly-detection/migration/#compatibility-matrix), and selectively drops or reuses DB records and on-disk artifacts to keep migrations safe and automatic. Please refer to the [migration page](https://docs.victoriametrics.com/anomaly-detection/migration/) for more details.
- FEATURE: Added runtime state compatibility guard for [stateful](https://docs.victoriametrics.com/anomaly-detection/components/settings/#restore-state) deployments. The service now persists normalized versions, evaluates an [upgrade/downgrade compatibility matrix](https://docs.victoriametrics.com/anomaly-detection/migration/#compatibility-matrix), and selectively drops or reuses DB records and on-disk artifacts to keep migrations safe and automatic. Please refer to the [migration page](https://docs.victoriametrics.com/anomaly-detection/migration/) for more details.
- IMPROVEMENT: Parallelization now honours container cgroup CPU/RAM limits, so `settings.n_workers` in the [settings section](https://docs.victoriametrics.com/anomaly-detection/components/settings/#parallelization), internal routines and the `vmanomaly_available_memory_bytes`/`vmanomaly_cpu_cores_available` [startup metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) report or use container resources instead of host totals, keeping the [self-monitoring dashboard](https://docs.victoriametrics.com/anomaly-detection/self-monitoring/#grafana-dashboard) accurate.
@@ -190,7 +190,7 @@ Released: 2025-08-19
## v1.25.2
Released: 2025-07-30
- BUGFIX: Resolved inconsistent state between in-memory models and state database (if [stateful mode](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) is enabled). This bug caused `Model instance not found` warnings during inference calls and prevented proper cleanup of stale models from disk. The fix also prevents state updates when operations are terminated mid-execution of scheduled fit/infer jobs.
- BUGFIX: Resolved inconsistent state between in-memory models and state database (if [stateful mode](https://docs.victoriametrics.com/anomaly-detection/components/settings/#stateful-mode) is enabled). This bug caused `Model instance not found` warnings during inference calls and prevented proper cleanup of stale models from disk. The fix also prevents state updates when operations are terminated mid-execution of scheduled fit/infer jobs.
- BUGFIX: Added explicit handling for inference calls on models that were deleted from disk by the time of their usage, but still referenced in the state database, preventing `'NoneType' object has no attribute 'infer'` rows in logs. Now a warning is logged and the inference call is skipped, which is expected behavior for deleted models.
@@ -210,7 +210,7 @@ Released: 2025-07-24
- BUGFIX: Prevented `OneOffScheduler` and `BacktestingScheduler` [schedulers](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/) from receiving no data (when [state restoration](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) is enabled). Now a warning is logged and such scheduler types are implicitly used without state restoration, which is expected behavior for these one-time-job schedulers.
- BUGFIX: Now the paths to artifact database (if [stateful mode](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) is enabled) are properly resolved to absolute, preventing errors at initialization time (like `sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file`) or warnings (like `SAWarning: fully NULL primary key identity cannot load any object.`).
- BUGFIX: Now the paths to artifact database (if [stateful mode](https://docs.victoriametrics.com/anomaly-detection/components/settings/#stateful-mode) is enabled) are properly resolved to absolute, preventing errors at initialization time (like `sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file`) or warnings (like `SAWarning: fully NULL primary key identity cannot load any object.`).
## v1.25.0
Released: 2025-07-17
@@ -559,7 +559,7 @@ Released: 2024-08-10
- **Lowest anomaly scores** (=0) when the *model's predictions (`yhat`) fall outside the expected range*, signaling uncertain predictions.
- For more details, please refer to the [documentation](https://docs.victoriametrics.com/anomaly-detection/components/reader/#per-query-parameters).
- IMPROVEMENT: Added `latency_offset` argument to the [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) to override the default `-search.latencyOffset` [flag of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags) (30s). The default value is set to 1ms, which should help in cases where `sampling_period` is low (10-60s) and `sampling_period` equals `infer_every` in the [PeriodicScheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). This prevents users from receiving `service - WARNING - [Scheduler [scheduler_alias]] No data available for inference.` warnings in logs and allows for consecutive `infer` calls without gaps. To restore the backward compatible behavior, set it equal to your `-search.latencyOffset` value in [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) config section.
- IMPROVEMENT: Added `latency_offset` argument to the [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) to override the default `-search.latencyOffset` [flag of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags) (30s). The default value is set to 1ms, which should help in cases where `sampling_frequency` is low (10-60s) and `sampling_frequency` equals `infer_every` in the [PeriodicScheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). This prevents users from receiving `service - WARNING - [Scheduler [scheduler_alias]] No data available for inference.` warnings in logs and allows for consecutive `infer` calls without gaps. To restore the backward compatible behavior, set it equal to your `-search.latencyOffset` value in [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) config section.
- BUGFIX: Ensure the `use_transform` argument of the [`OnlineQuantileModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) functions as intended.
- BUGFIX: Add a docstring for `query_from_last_seen_timestamp` arg of [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader).

View File

@@ -281,7 +281,7 @@ reader:
datasource_url: 'some_url_to_read_data_from'
queries:
query_alias1: 'some_metricsql_query'
sampling_period: '1m' # change to whatever you need in data granularity
sampling_frequency: '1m' # change to whatever you need in data granularity
# other params if needed
# https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader
@@ -294,7 +294,7 @@ writer:
# https://docs.victoriametrics.com/anomaly-detection/components/monitoring/
```
Configuration above will produce N intervals of full length (`fit_window`=14d + `fit_every`=1h) until `to_iso` timestamp is reached to run N consecutive `fit` calls to train models; Then these models will be used to produce `M = [fit_every / sampling_period]` infer datapoints for `fit_every` range at the end of each such interval, imitating M consecutive calls of `infer_every` in `PeriodicScheduler` [config](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). These datapoints then will be written back to VictoriaMetrics TSDB, defined in `writer` [section](https://docs.victoriametrics.com/anomaly-detection/components/writer/#vm-writer) for further visualization (i.e. in VMUI or Grafana)
Configuration above will produce N intervals of full length (`fit_window`=14d + `fit_every`=1h) until `to_iso` timestamp is reached to run N consecutive `fit` calls to train models; Then these models will be used to produce `M = [fit_every / sampling_frequency]` infer datapoints for `fit_every` range at the end of each such interval, imitating M consecutive calls of `infer_every` in `PeriodicScheduler` [config](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). These datapoints then will be written back to VictoriaMetrics TSDB, defined in `writer` [section](https://docs.victoriametrics.com/anomaly-detection/components/writer/#vm-writer) for further visualization (i.e. in VMUI or Grafana)
## Forecasting
@@ -499,7 +499,7 @@ schedulers:
models:
zscore_example:
class: 'zscore_online'
min_n_samples_seen: 120 # i.e. minimal relevant seasonality or (initial) fit_window / sampling_period
min_n_samples_seen: 120 # i.e. minimal relevant seasonality or (initial) fit_window / sampling_frequency
decay: 0.999 # decay factor to control how fast the model adapts to new data, the lower, the faster it adapts
schedulers: ['periodic']
# other model params ...

View File

@@ -41,7 +41,7 @@ settings:
restore_state: True # restore state from previous run, if available
retention: # how long to keep stale models on disk/in memory
ttl: "1d" # time-to-live duration, if the model was not used for inference within this duration, it will be considered stale
check_interval: "1h" # how often to check for stale models and remove them
check_every: "1h" # how often to check for stale models and remove them
# how and when to run the models is defined by schedulers
# https://docs.victoriametrics.com/anomaly-detection/components/scheduler/
@@ -143,11 +143,11 @@ server:
> This feature is better used in conjunction with [stateful service](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) to preserve the state of the models and schedulers between restarts and reuse what can be reused, thus avoiding unnecessary re-training of models, re-initialization of schedulers and re-reading of data.
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change filesystem events without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_config_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change filesystem events without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_hot_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
### How it works
It works by watching for file system events, such as modifications, creations, or deletions of `.yml|.yaml` files in the specified directories. When a change is detected, the service will attempt to reload the configuration files, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-config) and reinitialize the components. If the reload is successful, the `vmanomaly_config_reloads_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
It works by watching for file system events, such as modifications, creations, or deletions of `.yml|.yaml` files in the specified directories. When a change is detected, the service will attempt to reload the configuration files, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-config) and reinitialize the components. If the reload is successful, the `vmanomaly_hot_reload_events_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
> If the reload fails, the service will log an error message indicating the reason for the failure, and the **previous configuration will remain active until a successful reload occurs** to preserve the service's stability. This means that if there are errors in the new configuration, the service will continue to operate with the last valid configuration until the issues are resolved.
@@ -219,7 +219,7 @@ reader:
# ... (rest of the config remains unchanged)
```
After saving the changes, hot reload will automatically detect the changes in `config.yaml` and attempt to reload the configuration. As the changes are valid, the service will log a success message and increment the `vmanomaly_config_reloads_total` metric with `status="success"` label:
After saving the changes, hot reload will automatically detect the changes in `config.yaml` and attempt to reload the configuration. As the changes are valid, the service will log a success message and increment the `vmanomaly_hot_reload_events_total` metric with `status="success"` label:
- All the model instances of class `zscore_online`, that were trained on `host_network_receive_errors` can be reused as they are still valid and "fresh" for making inference on new datapoints until the next `fit_every` happens.
- All the model instances of class `zscore_online`, that were trained on `cpu_seconds_total` will be re-trained with the new query expression and frequency, as old model instances are not valid anymore.

View File

@@ -369,7 +369,7 @@ If True, then query will be performed from the last seen timestamp for a given s
`1ms`
</td>
<td>
It allows overriding the default `-search.latencyOffset`{{% available_from "v1.15.1" anomaly %}} [flag of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags) (30s). The default value is set to 1ms, which should help in cases where `sampling_period` is low (10-60s) and `sampling_period` equals `infer_every` in the [PeriodicScheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). This prevents users from receiving `service - WARNING - [Scheduler [scheduler_alias]] No data available for inference.` warnings in logs and allows for consecutive `infer` calls without gaps. To restore the old behavior, set it equal to your `-search.latencyOffset` [flag value](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags).
It allows overriding the default `-search.latencyOffset`{{% available_from "v1.15.1" anomaly %}} [flag of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags) (30s). The default value is set to 1ms, which should help in cases where `sampling_frequency` is low (10-60s) and `sampling_frequency` equals `infer_every` in the [PeriodicScheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler). This prevents users from receiving `service - WARNING - [Scheduler [scheduler_alias]] No data available for inference.` warnings in logs and allows for consecutive `infer` calls without gaps. To restore the old behavior, set it equal to your `-search.latencyOffset` [flag value](https://docs.victoriametrics.com/victoriametrics/#list-of-command-line-flags).
</td>
</tr>
<tr>

View File

@@ -26,29 +26,23 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Release candidate
* SECURITY: upgrade Go builder from Go1.26.3 to Go1.26.4. See [the list of issues addressed in Go1.26.4](https://github.com/golang/go/issues?q=milestone%3AGo1.26.4%20label%3ACherryPickApproved).
* FEATURE: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add the new metrics `vm_downsampling_partitions_scheduled_rows` and `vm_retention_filters_partitions_scheduled_rows` for measuring background historical data merge completion time. See [#10960](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10960)
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): support `match[]=<label_selector>` query parameters in `/api/v1/rules` and `/api/v1/alerts` APIs to return only the rules that have configured labels satisfying the provided label selectors. See [11020](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11020).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prioritise recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) : introduce `vmagent_remotewrite_kafka_outbuf_latency_seconds` and `vmagent_remotewrite_kafka_rtt_seconds` metrics for [kafka integration](https://docs.victoriametrics.com/victoriametrics/integrations/kafka/). The metrics could help identify throughput bottlenecks. See [#10730](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly log user information when a missing route error occurs. See [#11052](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11052).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from [Mimir](https://docs.victoriametrics.com/victoriametrics/vmctl/mimir/#) object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
* FEATURE: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/dashboards): show the full `version` label in the `Version` panel when `short_version` label is empty (e.g. custom builds from feature branch). Previously, the panel could appear empty. See [#11047](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11047).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from Mimir object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix the `Notifiers` page in web UI appearing blank despite the API returning notifier data correctly. See [#11035](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11035).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): reset the group evaluation timestamp if it exceeds the current host time. Previously, vmalert could use future timestamps for evaluations if the system clock was shifted backward. See [#10985](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10985).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly parse [Prometheus Native Histograms](https://prometheus.io/docs/specs/native_histograms/), previously Protobuf parser could produce unexpected `vmrange` labels. See [#11041](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11041).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly calculate number of loaded users to be printed in startup log. Previously, it was only accounting for static users and skipped JWT configuration entries. See [#11050](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11050/).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly calculate number of loaded users to be printed in startup log. Previously, it was only accounting for static users and skipped JWT configuration entries.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): `integrate()` no longer extrapolates the last sample's value past the end of the time series. Previously, querying `integrate(metric[1h])` at a timestamp where the series had already ended would keep accruing area as if the last value continued indefinitely, producing values much larger than the true integral. See [#9474](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9474). Thanks to @wtfashwin for contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): avoid returning HTTP 503 for queries with partial results when a storage group is unavailable and `-search.denyPartialResponse` is disabled. See [#11009](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11009). Thanks to @fxrlv for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): avoid returning HTTP 503 for queries with partial results when a storage group is unavailable and `-search.denyPartialResponse` is disabled.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `utf-8` label names for [/federate](https://docs.victoriametrics.com/victoriametrics/#federation) API requests. See [#10968](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10968).
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): persist the `Disable deduplication` toggle under its own local storage key. Before this fix, the toggle state was lost after reload and could overwrite the `Compact view` table setting. See [#11004](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11004). Thanks to @immanuwell for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix intermittent `write: connection timed out` errors caused by silently dropped TCP connections being reused from the connection pool. See [#10735](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10735#issuecomment-4535832301).
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)

4
go.mod
View File

@@ -25,7 +25,6 @@ require (
github.com/googleapis/gax-go/v2 v2.22.0
github.com/influxdata/influxdb v1.12.4
github.com/klauspost/compress v1.18.5
github.com/oklog/ulid/v2 v2.1.1
github.com/prometheus/prometheus v0.311.3
github.com/urfave/cli/v2 v2.27.7
github.com/valyala/fastjson v1.6.10
@@ -110,6 +109,7 @@ require (
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.150.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.150.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.150.0 // indirect
@@ -155,7 +155,7 @@ require (
go.uber.org/zap v1.27.1 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.52.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/term v0.43.0 // indirect

4
go.sum
View File

@@ -509,8 +509,8 @@ go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988=
golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f h1:W3F4c+6OLc6H2lb//N1q4WpJkhzJCK5J6kUi1NTVXfM=
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f/go.mod h1:J1xhfL/vlindoeF/aINzNzt2Bket5bjo9sdOYzOsU80=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=

View File

@@ -194,19 +194,12 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
// fast path there is pending data at file-based queue,
// it must be drained before in-memory queue could be used.
return false
}
fq.flushInmemoryBlocksToFileIfNeededLocked()
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
if !isPQWriteAllowed {
@@ -216,7 +209,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
@@ -239,16 +232,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
@@ -259,6 +251,28 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.cond.Wait()
}
}
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It does not block waiting for new blocks.
@@ -277,9 +291,6 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()

View File

@@ -20,7 +20,7 @@ func chacha20Poly1305Open(dst []byte, key []uint32, src, ad []byte) bool
func chacha20Poly1305Seal(dst []byte, key []uint32, src, ad []byte)
var (
useAVX2 = cpu.X86.HasSSSE3 && cpu.X86.HasAVX2 && cpu.X86.HasBMI2
useAVX2 = cpu.X86.HasAVX2 && cpu.X86.HasBMI2
)
// setupState writes a ChaCha20 input matrix to state. See
@@ -47,7 +47,7 @@ func setupState(state *[16]uint32, key *[32]byte, nonce []byte) {
}
func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []byte {
if !useAVX2 {
if !cpu.X86.HasSSSE3 {
return c.sealGeneric(dst, nonce, plaintext, additionalData)
}
@@ -66,7 +66,7 @@ func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []
}
func (c *chacha20poly1305) open(dst, nonce, ciphertext, additionalData []byte) ([]byte, error) {
if !useAVX2 {
if !cpu.X86.HasSSSE3 {
return c.openGeneric(dst, nonce, ciphertext, additionalData)
}

File diff suppressed because it is too large Load Diff

2
vendor/modules.txt vendored
View File

@@ -851,7 +851,7 @@ go.yaml.in/yaml/v2
# go.yaml.in/yaml/v3 v3.0.4
## explicit; go 1.16
go.yaml.in/yaml/v3
# golang.org/x/crypto v0.52.0
# golang.org/x/crypto v0.51.0
## explicit; go 1.25.0
golang.org/x/crypto/chacha20
golang.org/x/crypto/chacha20poly1305