Compare commits

..

6 Commits

Author SHA1 Message Date
Pablo Fernandez
60de3bad8e Build on a more expanded reference for using credentials
See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11073
2026-06-12 18:15:30 +01:00
Andrii Chubatiuk
05903c8acd chore(lib/streamaggr): move increase and increase_prometheus outputs to a separate struct (#11093)
move increase and increase_prometheus outputs to a separate struct
to simplify the code. 

Before, increase, increase_prometheus, total_prometheus 
were implemented within one aggregator making it harder to use or update it.
2026-06-12 11:47:55 +02:00
Pablo (Tomas) Fernandez
a9fae230ae docs: Update "Multi Retention Setup within VictoriaMetrics Cluster" Guide (#11055)
This PR updates the [Multi Retention Setup within VictoriaMetrics
Cluster](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/)
guide.

Changes:
- Rewrote the introduction and added an overview
- Added step-by-step example setup for Kubernetes
- Expanded on alternative ways to route traffic

I've tested the configuration on a K3s cluster and it works, but I don't
know if the way I routed traffic into the retention tiers makes sense,
so any feedback is very much appreciated.

---------

Signed-off-by: Pablo (Tomas) Fernandez <46322567+TomFern@users.noreply.github.com>
Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2026-06-12 11:46:26 +02:00
Fred Navruzov
19fac13418 docs/vmanomaly: release v1.29.5 (#11097)
### Description

- Documentation updates to reflect v1.29.5 vmanomaly release changes
- Documentation/internal logic alignment in argument descriptions (such
as `decay` arg for models)
2026-06-11 22:49:30 +03:00
Artem Fetishev
3a6054f8a2 vmsingle: refactor vmstorage type (#11076)
Move vmstorage_single_node.go code into vmstorage.go. This way the diff
between master and cluster branches is easier to view.

This change is for single node only. The change must not be
cherry-picked to cluster branches.

Follow-up for 89db66573b (#11046)

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 16:19:24 +02:00
Artem Fetishev
6653f6a5e7 apptest: rename client fields to generic cli (#11088)
Initially, the fields were given a different name because I mistakenly
believed that the fields with the same name will clash when the
corresponding types are embedded in some other type (for example,
vmselectClient and vminsertClient are embedded into vmsingle).

But it appears not to be the case. Thus, changing the field name to
something more generic.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-10 17:01:44 +02:00
32 changed files with 833 additions and 473 deletions

View File

@@ -131,13 +131,16 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
if ba.Username == "" {
return fmt.Errorf("missing `username` in `basic_auth` section")
}
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
if ba.Password != "" {
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}

View File

@@ -69,8 +69,6 @@ const (
vmAddr = "vm-addr"
vmUser = "vm-user"
vmPassword = "vm-password"
vmHeaders = "vm-headers"
vmBearerToken = "vm-bearer-token"
vmAccountID = "vm-account-id"
vmConcurrency = "vm-concurrency"
vmCompress = "vm-compress"
@@ -114,16 +112,6 @@ var (
Usage: "VictoriaMetrics password for basic auth",
EnvVars: []string{"VM_PASSWORD"},
},
&cli.StringFlag{
Name: vmHeaders,
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
},
&cli.StringFlag{
Name: vmBearerToken,
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
},
&cli.StringFlag{
Name: vmAccountID,
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +

View File

@@ -457,7 +457,7 @@ func main() {
auth.WithBearer(c.String(vmNativeDstBearerToken)),
auth.WithHeaders(c.String(vmNativeDstHeaders)))
if err != nil {
return fmt.Errorf("error initialize auth config for destination: %s: %s", dstAddr, err)
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
}
// create TLS config
@@ -596,18 +596,11 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
}
authCfg, err := auth.Generate(
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
auth.WithBearer(c.String(vmBearerToken)),
auth.WithHeaders(c.String(vmHeaders)))
if err != nil {
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %s", addr, err)
}
return vm.Config{
Addr: addr,
Transport: tr,
AuthCfg: authCfg,
User: c.String(vmUser),
Password: c.String(vmPassword),
Concurrency: uint8(c.Int(vmConcurrency)),
Compress: c.Bool(vmCompress),
AccountID: c.String(vmAccountID),

View File

@@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
@@ -28,8 +27,6 @@ type Config struct {
// --httpListenAddr value for single node version
// --httpListenAddr value of vmselect component for cluster version
Addr string
AuthCfg *auth.Config
// Transport allows specifying custom http.Transport
Transport *http.Transport
// Concurrency defines number of worker
@@ -43,6 +40,10 @@ type Config struct {
// BatchSize defines how many samples
// importer collects before sending the import request
BatchSize int
// User name for basic auth
User string
// Password for basic auth
Password string
// SignificantFigures defines the number of significant figures to leave
// in metric values before importing.
// Zero value saves all the significant decimal places
@@ -64,10 +65,11 @@ type Config struct {
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
type Importer struct {
addr string
authCfg *auth.Config
client *http.Client
importPath string
compress bool
user string
password string
close chan struct{}
input chan *TimeSeries
@@ -146,7 +148,8 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
client: client,
importPath: importPath,
compress: cfg.Compress,
authCfg: cfg.AuthCfg,
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
@@ -301,8 +304,8 @@ func (im *Importer) Ping() error {
if err != nil {
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
resp, err := im.client.Do(req)
if err != nil {
@@ -331,8 +334,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
if im.compress {
req.Header.Set("Content-Encoding", "gzip")

View File

@@ -10,8 +10,6 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@@ -23,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -153,7 +152,7 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
LogNewSeries: *logNewSeries,
}
strg := storage.MustOpenStorage(*storageDataPath, opts)
vmStorage = newVMStorageSingleNode(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
vmStorage = newVMStorage(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
var m storage.Metrics
strg.UpdateMetrics(&m)
@@ -175,15 +174,15 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
GetSearch = vmStorage.GetSearch
PutSearch = vmStorage.PutSearch
RequestHandler = vmStorage.requestHandler
DebugFlush = vmStorage.vms.s.DebugFlush
DebugFlush = vmStorage.s.DebugFlush
}
var storageMetrics *metrics.Set
var (
// vmStorageSingleNode is an instance of vmstorage used by vminsert and
// vmStorage is an instance of vmstorage used by vminsert and
// vmselect for writing and reading data.
vmStorage *VMStorageSingleNode
vmStorage *VMStorage
VMInsertAPI vminsertapi.API
VMSelectAPI vmselectapi.API
GetSearch func(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error)
@@ -209,15 +208,12 @@ func Stop() {
logger.Infof("the vmstorage has been stopped")
}
func (vmssn *VMStorageSingleNode) requestHandler(w http.ResponseWriter, r *http.Request) bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.requestHandler(w, r)
}
// requestHandler is a storage request handler.
// TODO(@rtm0): Move to a separate file, request_handler.go
func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) bool {
vms.wg.Add(1)
defer vms.wg.Done()
path := r.URL.Path
if path == "/internal/force_merge" {
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
@@ -361,15 +357,11 @@ var (
snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`)
)
// TODO(@rtm0): Move to metrics.go.
func (vmssn *VMStorageSingleNode) writeStorageMetrics(w io.Writer) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
vmssn.vms.writeStorageMetrics(w)
}
// TODO(@rtm0): Move to metrics.go.
func (vms *VMStorage) writeStorageMetrics(w io.Writer) {
vms.wg.Add(1)
defer vms.wg.Done()
strg := vms.s
var m storage.Metrics
strg.UpdateMetrics(&m)

View File

@@ -1,6 +1,7 @@
package vmstorage
import (
"errors"
"flag"
"fmt"
"sync"
@@ -14,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
@@ -34,7 +36,7 @@ var (
// newVMStorage creates a new instance of of VMStorage.
//
// The created VMStorage instance takes ownership of s.
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStorage {
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorage {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid -precisionBits=%d: %s", *precisionBits, err)
}
@@ -49,6 +51,8 @@ func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStor
maxUniqueTimeseries: *maxUniqueTimeseries,
maxUniqueTimeSeriesCalculated: maxUniqueTimeseriesCalculated,
staleSnapshotsRemoverCh: make(chan struct{}),
wg: syncwg.WaitGroup{},
resetCacheIfNeeded: resetCacheIfNeeded,
}
vms.initStaleSnapshotsRemover()
return vms
@@ -78,6 +82,17 @@ type VMStorage struct {
maxUniqueTimeSeriesCalculated int
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
// for proper graceful shutdown when Stop is called.
//
// Use syncwg instead of sync, since Add is called from concurrent
// goroutines.
wg syncwg.WaitGroup
// resetCacheIfNeeded is a callback for automatic resetting of response
// cache if needed.
resetCacheIfNeeded func(mrs []storage.MetricRow)
}
func (vms *VMStorage) initStaleSnapshotsRemover() {
@@ -103,6 +118,7 @@ func (vms *VMStorage) initStaleSnapshotsRemover() {
func (vms *VMStorage) Stop() {
close(vms.staleSnapshotsRemoverCh)
vms.staleSnapshotsRemoverWG.Wait()
vms.wg.WaitAndBlock()
vms.s.MustClose()
}
@@ -111,6 +127,14 @@ func (vms *VMStorage) Stop() {
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
vms.wg.Add(1)
defer vms.wg.Done()
if vms.s.IsReadOnly() {
return errReadOnly
}
vms.resetCacheIfNeeded(rows)
vms.s.AddRows(rows, uint8(*precisionBits))
return nil
}
@@ -120,26 +144,41 @@ func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vms *VMStorage) WriteMetadata(rows []metricsmetadata.Row) error {
vms.wg.Add(1)
defer vms.wg.Done()
if vms.s.IsReadOnly() {
return errReadOnly
}
vms.s.AddMetadataRows(rows)
return nil
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
// IsReadOnly returns true is the storage is in read-only mode.
func (vms *VMStorage) IsReadOnly() bool {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.IsReadOnly()
}
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vms.wg.Done()
return nil, err
}
if len(tfss) == 0 {
vms.wg.Done()
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.wgDone = vms.wg.Done
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
bi.MustClose()
@@ -161,8 +200,9 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
sr storage.Search
mb storage.MetricBlock
wgDone func()
}
var blockIteratorsPool sync.Pool
@@ -171,6 +211,8 @@ func (bi *blockIterator) MustClose() {
bi.sr.MustClose()
bi.mb.MetricName = nil
bi.mb.Block.Reset()
bi.wgDone()
bi.wgDone = nil
blockIteratorsPool.Put(bi)
}
@@ -197,8 +239,63 @@ func (bi *blockIterator) Error() error {
return bi.sr.Error()
}
// GetSearch sets up an instance of storage search and returns it to the caller
// along with the max series count that the search can return.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// Callers of this method must call PutSearch() once the search instance is not
// needed anymore.
func (vms *VMStorage) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vms.wg.Done()
return nil, 0, err
}
sr := getSearch()
maxSeriesCount := sr.Init(qt, vms.s, tfss, tr, sq.MaxMetrics, deadline)
return sr, maxSeriesCount, nil
}
// PutSearch resets the search once it is not needed anymore and puts it aside
// for future reuse.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// The method must only be used on search instances that have been created with
// GetSearch().
func (vms *VMStorage) PutSearch(sr *storage.Search) {
putSearch(sr)
vms.wg.Done()
}
func getSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
// SearchMetricNames returns metric names for the given tfss on the given tr.
func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -219,6 +316,9 @@ func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.Sear
// SearchLabelValues searches for label values for the given labelName, tfss and
// tr.
func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
maxLabelValues = *maxTagValues
@@ -244,6 +344,9 @@ func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuer
// similar APIs.
func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
maxSuffixes = *maxTagValueSuffixesPerSearch
}
@@ -260,6 +363,9 @@ func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr s
// SearchLabelNames searches for tag keys matching the given tfss on tr.
func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
maxLabelNames = *maxTagKeys
@@ -278,6 +384,8 @@ func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery
}
func (vms *VMStorage) SeriesCount(_ *querytracer.Tracer, _, _ uint32, deadline uint64) (uint64, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetSeriesCount(deadline)
}
@@ -287,6 +395,9 @@ func (vms *VMStorage) Tenants(_ *querytracer.Tracer, _ storage.TimeRange, _ uint
// GetTSDBStatus returns TSDB status for given filters on the given date.
func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -306,6 +417,9 @@ func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery
//
// Returns the number of deleted series.
func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -324,17 +438,26 @@ func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQue
}
func (vms *VMStorage) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
vms.wg.Add(1)
defer vms.wg.Done()
vms.s.RegisterMetricNames(qt, mrs)
return nil
}
// GetMetricNamesUsageStats returns metric name usage stats.
func (vms *VMStorage) GetMetricNamesUsageStats(qt *querytracer.Tracer, _ *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func (vms *VMStorage) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
vms.wg.Add(1)
defer vms.wg.Done()
vms.s.ResetMetricNamesStats(qt)
return nil
}
@@ -371,6 +494,8 @@ func (vms *VMStorage) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery,
}
func (vms *VMStorage) GetMetadataRecords(qt *querytracer.Tracer, _ *storage.TenantToken, limit int, metricName string, _ uint64) ([]*metricsmetadata.Row, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetMetadataRows(qt, limit, metricName), nil
}

View File

@@ -1,213 +0,0 @@
package vmstorage
import (
"errors"
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
// newVMStorageSingleNode creates a new instance of of VMStorage for vmsingle.
func newVMStorageSingleNode(s *storage.Storage, maxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorageSingleNode {
vms := newVMStorage(s, maxConcurrentRequests)
return &VMStorageSingleNode{
vms: vms,
wg: syncwg.WaitGroup{},
resetCacheIfNeeded: resetCacheIfNeeded,
}
}
type VMStorageSingleNode struct {
vms *VMStorage
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
// for proper graceful shutdown when Stop is called.
//
// Use syncwg instead of sync, since Add is called from concurrent
// goroutines.
wg syncwg.WaitGroup
// resetCacheIfNeeded is a callback for automatic resetting of response
// cache if needed.
resetCacheIfNeeded func(mrs []storage.MetricRow)
}
func (vmssn *VMStorageSingleNode) Stop() {
vmssn.wg.WaitAndBlock()
vmssn.vms.Stop()
}
// WriteRows writes metric rows to the storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteRows(rows []storage.MetricRow) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
vmssn.resetCacheIfNeeded(rows)
return vmssn.vms.WriteRows(rows)
}
// WriteMetadata writes metrics metadata to storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteMetadata(rows []metricsmetadata.Row) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
return vmssn.vms.WriteMetadata(rows)
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
func (vmssn *VMStorageSingleNode) IsReadOnly() bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.IsReadOnly()
}
func (vmssn *VMStorageSingleNode) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
return nil, fmt.Errorf("not implemented in vmsingle")
}
// GetSearch sets up an instance of storage search and returns it to the caller
// along with the max series count that the search can return.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// Callers of this method must call PutSearch() once the search instance is not
// needed anymore.
func (vmssn *VMStorageSingleNode) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
vmssn.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vmssn.vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vmssn.vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vmssn.wg.Done()
return nil, 0, err
}
sr := getSearch()
maxSeriesCount := sr.Init(qt, vmssn.vms.s, tfss, tr, sq.MaxMetrics, deadline)
return sr, maxSeriesCount, nil
}
// PutSearch resets the search once it is not needed anymore and puts it aside
// for future reuse.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// The method must only be used on search instances that have been created with
// GetSearch().
func (vmssn *VMStorageSingleNode) PutSearch(sr *storage.Search) {
putSearch(sr)
vmssn.wg.Done()
}
func getSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
func (vmssn *VMStorageSingleNode) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SearchMetricNames(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
}
func (vmssn *VMStorageSingleNode) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
}
func (vmssn *VMStorageSingleNode) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelNames(qt, sq, maxLabelNames, deadline)
}
func (vmssn *VMStorageSingleNode) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SeriesCount(qt, accountID, projectID, deadline)
}
func (vmssn *VMStorageSingleNode) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.Tenants(qt, tr, deadline)
}
func (vmssn *VMStorageSingleNode) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
}
func (vmssn *VMStorageSingleNode) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.DeleteSeries(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.RegisterMetricNames(qt, mrs, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
}
func (vmssn *VMStorageSingleNode) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.ResetMetricNamesUsageStats(qt, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
}

View File

@@ -48,7 +48,7 @@ func TestGetMaxMetrics(t *testing.T) {
t.Helper()
*maxUniqueTimeseries = storageMaxUniqueTimeseries
s := storage.MustOpenStorage(t.Name(), storage.OpenOptions{})
vms := newVMStorage(s, maxConcurrentRequests)
vms := newVMStorage(s, maxConcurrentRequests, func(mrs []storage.MetricRow) {})
defer vms.Stop()
maxMetrics := vms.getMaxMetrics(searchQueryLimit)
if maxMetrics != expect {

View File

@@ -156,14 +156,14 @@ func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string {
//
// This type is expected to be embedded by the apps that serve metrics.
type metricsClient struct {
metricsCli *Client
url string
cli *Client
url string
}
func newMetricsClient(cli *Client, addr string) *metricsClient {
return &metricsClient{
metricsCli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
cli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
}
}
@@ -179,7 +179,7 @@ func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int {
func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 {
t.Helper()
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -205,7 +205,7 @@ func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float6
values := []float64{}
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -234,7 +234,7 @@ func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []fl
values := []float64{}
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -270,7 +270,7 @@ func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int {
}
type vmselectClient struct {
vmselectCli *Client
cli *Client
url func(op, path string, opts QueryOpts) string
metricNamesStatsResetURL string
tenantsURL string
@@ -287,7 +287,7 @@ func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -302,7 +302,7 @@ func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string,
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return []byte(res)
}
@@ -315,7 +315,7 @@ func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts Q
url := c.url("select", "prometheus/api/v1/query", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -329,7 +329,7 @@ func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, o
url := c.url("select", "prometheus/api/v1/query_range", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -342,7 +342,7 @@ func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesResponse(t, res)
}
@@ -354,7 +354,7 @@ func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts
t.Helper()
url := c.url("select", "prometheus/api/v1/series/count", opts)
values := opts.asURLValues()
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesCountResponse(t, res)
}
@@ -367,7 +367,7 @@ func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/labels", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelsResponse(t, res)
}
@@ -382,7 +382,7 @@ func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, mat
url := c.url("select", path, opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelValuesResponse(t, res)
}
@@ -394,7 +394,7 @@ func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, li
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1Metadata(t, res)
}
@@ -408,7 +408,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matc
url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -426,7 +426,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, lim
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -455,7 +455,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery stri
addNonEmpty("match[]", matchQuery)
addNonEmpty("topN", topN)
addNonEmpty("date", date)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -476,7 +476,7 @@ func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) Grap
t.Helper()
url := c.url("select", "graphite/metrics/index.json", opts)
res, statusCode := c.vmselectCli.Get(t, url, opts.Headers)
res, statusCode := c.cli.Get(t, url, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -499,7 +499,7 @@ func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts Qu
url := c.url("select", "graphite/metrics/find", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -522,7 +522,7 @@ func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts
url := c.url("select", "graphite/metrics/expand", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -546,7 +546,7 @@ func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryO
values := opts.asURLValues()
values.Add("format", "json")
values.Add("target", target)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -567,7 +567,7 @@ func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts
url := c.url("select", "graphite/tags/tagSeries", opts)
values := opts.asURLValues()
values.Add("path", record)
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -584,7 +584,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
for _, rec := range records {
values.Add("path", rec)
}
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -598,7 +598,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
t.Helper()
values := opts.asURLValues()
res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -608,7 +608,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *test
// /admin/tenants endpoint.
func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse {
t.Helper()
res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers)
res, statusCode := c.cli.Get(t, c.tenantsURL, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -622,7 +622,7 @@ func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminT
}
type vminsertClient struct {
vminsertCli *Client
cli *Client
url func(op, path string, opts QueryOpts) string
openTSDBURL func(op, path string, opts QueryOpts) string
graphiteListenAddr string
@@ -647,7 +647,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -671,7 +671,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte,
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, 1, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -693,7 +693,7 @@ func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteReque
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -745,7 +745,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records [
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -771,7 +771,7 @@ func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryO
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
t.Helper()
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -805,7 +805,7 @@ func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsD
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -830,7 +830,7 @@ func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts Que
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -853,7 +853,7 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -867,11 +867,11 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
t.Helper()
c.vminsertCli.Write(t, c.graphiteListenAddr, records)
c.cli.Write(t, c.graphiteListenAddr, records)
}
type vmstorageClient struct {
vmstorageCli *Client
cli *Client
httpListenAddr string
}
@@ -881,7 +881,7 @@ func (c *vmstorageClient) ForceFlush(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
_, statusCode := c.cli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -892,7 +892,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
_, statusCode := c.cli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -905,7 +905,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
t.Helper()
data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil)
data, statusCode := c.cli.Post(t, c.SnapshotCreateURL(), nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -931,7 +931,7 @@ func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSn
t.Helper()
url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
data, statusCode := c.cli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -952,7 +952,7 @@ func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse {
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Get(t, url, nil)
data, statusCode := c.cli.Get(t, url, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -973,7 +973,7 @@ func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *Sna
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName)
data, statusCode := c.vmstorageCli.Delete(t, url)
data, statusCode := c.cli.Delete(t, url)
wantStatusCodes := map[int]bool{
http.StatusOK: true,
http.StatusInternalServerError: true,
@@ -998,7 +998,7 @@ func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResp
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
data, statusCode := c.cli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}

View File

@@ -77,7 +77,7 @@ type vminsertRuntimeValues struct {
func newVminsert(app *app, cli *Client, rt vminsertRuntimeValues) *Vminsert {
metricsClient := newMetricsClient(cli, rt.httpListenAddr)
vminsertClient := &vminsertClient{
vminsertCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -48,7 +48,7 @@ func newVmselect(app *app, cli *Client, rt vmselectRuntimeValues) *Vmselect {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmselectClient: &vmselectClient{
vmselectCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -58,11 +58,11 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
vmstorageCli: cli,
cli: cli,
httpListenAddr: rt.httpListenAddr,
},
vmselectClient: &vmselectClient{
vmselectCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},
@@ -70,7 +70,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
tenantsURL: "vmsingle-does-not-serve-tenants",
},
vminsertClient: &vminsertClient{
vminsertCli: cli,
cli: cli,
url: func(_, path string, _ QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},

View File

@@ -63,7 +63,7 @@ func newVmstorage(app *app, cli *Client, rt vmstorageRuntimeValues) *Vmstorage {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
vmstorageCli: cli,
cli: cli,
httpListenAddr: rt.httpListenAddr,
},
storageDataPath: rt.storageDataPath,

View File

@@ -59,7 +59,7 @@ services:
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr": },{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]'
restart: always
vmanomaly:
image: victoriametrics/vmanomaly:v1.29.4
image: victoriametrics/vmanomaly:v1.29.5
depends_on:
- "victoriametrics"
ports:

View File

@@ -43,7 +43,7 @@
"content": "If you don't observe any data initially, please wait a few minutes for it to appear. \n\nUpon the first running the guide (if there is not enough node_exporter monitoring data collected in your system), you may notice a significant number of false positive anomalies found. The predictions will become more accurate with at least two weeks' (full `fit_window`) worth of data provided to vmanomaly.\n\nEach row displays information for a distinct mode. The query used for anomaly detection is `sum(rate(node_cpu_seconds_total[5m])) by (mode, instance, job)`.\n",
"mode": "markdown"
},
"pluginVersion": "10.2.1",
"pluginVersion": "12.2.0",
"title": "Overview",
"type": "text"
},

View File

@@ -14,6 +14,21 @@ aliases:
---
Please find the changelog for VictoriaMetrics Anomaly Detection below.
## v1.29.5
Released: 2026-06-11
- UI: Updated [vmanomaly UI](https://docs.victoriametrics.com/anomaly-detection/ui/) from [v1.7.0](https://docs.victoriametrics.com/anomaly-detection/ui/#v170) to [v1.7.1](https://docs.victoriametrics.com/anomaly-detection/ui/#v171), see respective [release notes](https://docs.victoriametrics.com/anomaly-detection/ui/#v171) for details.
- IMPROVEMENT: Redesigned [hot reload](https://docs.victoriametrics.com/anomaly-detection/components/#hot-reload) config change detection to content-based polling with configurable `-configCheckInterval`, improving reliability for Kubernetes ConfigMap symlink rotations and other filesystems where event delivery can be inconsistent.
- IMPROVEMENT: Refined config validation errors for broken or invalid config sections, so startup and reload failures point to the affected section more clearly (e.g. YAML indentation typos).
- IMPROVEMENT: Tightened config validation for [`PeriodicScheduler`](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler) `infer_every` and [`IsolationForestModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#isolation-forest-multivariate) `contamination`, including clearer handling of missing scheduler intervals, numeric contamination strings, and invalid non-finite values.
- BUGFIX: Fixed a multiprocessing startup issue with `settings.n_workers > 1` that could leave scheduled data fetch or successive inference jobs stuck and repeatedly skipped by internal scheduler.
- BUGFIX: Bounded [`VmReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) and [`VLogsReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#victorialogs-reader) data fetch and post-fetch processing waits so stalled datasource reads or multiprocessing dataframe creation no longer keep [`PeriodicScheduler`](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler) `data_fetch` jobs running indefinitely. Previously, such stuck jobs could keep internal scheduler's `max_instances=1` slot per (scheduler, query) pair occupied, causing future data fetch, fit, or infer runs to be skipped until vmanomaly was restarted. The config validator now also warns when the configured reader timeout budget can exceed the connected scheduler interval.
## v1.29.4
Released: 2026-05-15

View File

@@ -423,7 +423,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.4
image: victoriametrics/vmanomaly:v1.29.5
# ...
restart: always
volumes:
@@ -641,7 +641,7 @@ options:
Heres an example of using the config splitter to divide configurations based on the `extra_filters` argument from the reader section:
```sh
docker pull victoriametrics/vmanomaly:v1.29.4 && docker image tag victoriametrics/vmanomaly:v1.29.4 vmanomaly
docker pull victoriametrics/vmanomaly:v1.29.5 && docker image tag victoriametrics/vmanomaly:v1.29.5 vmanomaly
```
```sh

View File

@@ -45,8 +45,7 @@ There are 2 types of compatibility to consider when migrating in stateful mode:
| Group start | Group end | Compatibility | Notes |
|---------|--------- |------------|-------|
| [v1.29.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1293) | Latest* | Fully Compatible | Just a placeholder for new releases |
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1293) | Fully Compatible | - |
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295) | Fully Compatible | - |
| [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) | Partially compatible* | Dumped models of class [prophet](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet) and [seasonal quantile](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) have problems with loading to [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) due to dropped `pytz` library. **Upgrading directly from v1.28.7 to [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) with a fix is suggested** |
| [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262) | [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | Fully Compatible | [v1.28.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1280) introduced [rolling](https://docs.victoriametrics.com/anomaly-detection/components/models/#rolling-models) model class drop in favor of [online](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-models) models (`rolling_quantile` and `std` models), however, it does not impact compatibility, as artifacts were not produced by default for rolling models. Also, offline `mad` and `zscore` models are redirecting to their respective online counterparts since [v1.28.4](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1284). |
| [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) | [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270) | Partially Compatible* | [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) introduced `forecast_at` argument for base [univariate](https://docs.victoriametrics.com/anomaly-detection/components/models/#univariate-models) and `Prophet` [models](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet), however, itself remains backward-reversible from newer states like [v1.26.2](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262), [v1.27.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270). (All models except `isolation_forest_multivariate` class will be dropped) |

View File

@@ -37,29 +37,39 @@ The `vmanomaly` service supports a set of command-line arguments to configure it
> Single-dashed command-line argument {{% available_from "v1.23.3" anomaly %}} format can be used, e.g. `-license.forceOffline` in addition to `--license.forceOffline`. This aligns better with other VictoriaMetrics ecosystem components. Mixing the two styles is also supported, e.g. `-license.forceOffline --loggerLevel INFO`.
```shellhelp
usage: vmanomaly.py [-h] [--license STRING | --licenseFile PATH] [--license.forceOffline] [--loggerLevel {DEBUG,WARNING,FATAL,ERROR,INFO}] [--watch] [--dryRun] [--outputSpec PATH] config [config ...]
usage: vmanomaly.py [-h] [--license STRING | --licenseFile PATH] [--license.forceOffline] [--loggerLevel {DEBUG,INFO,WARNING,ERROR,FATAL}] [--watch] [-configCheckInterval DURATION] [--dryRun] [--outputSpec PATH] config [config ...]
VictoriaMetrics Anomaly Detection Service
positional arguments:
config YAML config file(s) or directories containing YAML files. Multiple files will recursively merge each other values so multiple configs can be combined. If a directory is provided,
all `.yaml` files inside will be merged, without recursion. Default: vmanomaly.yaml is expected in the current directory.
config YAML config file(s) or directories containing YAML files. Multiple files will recursively merge each other
values so multiple configs can be combined. If a directory is provided, all `.yaml` files inside will be
merged, without recursion. Default: vmanomaly.yaml is expected in the current directory.
options:
-h Show this help message and exit
--license STRING License key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--licenseFile PATH Path to file with license key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--license STRING License key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to
obtain a trial license.
--licenseFile PATH Path to file with license key for VictoriaMetrics Enterprise. See
https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--license.forceOffline
Whether to force offline verification for VictoriaMetrics Enterprise license key, which has been passed either via -license or via -licenseFile command-line flag. The issued
license key must support offline verification feature. Contact info@victoriametrics.com if you need offline license verification.
--loggerLevel {DEBUG,WARNING,FATAL,ERROR,INFO}
Minimum level to log. Possible values: DEBUG, INFO, WARNING, ERROR, FATAL.
--watch Watch config files for changes and trigger hot reloads. Watches the specified config file or directory for modifications, deletions, or additions. Upon detecting changes,
triggers config reload. If new config validation fails, continues with previous valid config and state.
--dryRun Validate only: parse + merge all YAML(s) and run schema checks, then exit. Does not require a license to run. Does not expose metrics, or launch vmanomaly service(s).
Whether to force offline verification for VictoriaMetrics Enterprise license key, which has been passed either
via -license or via -licenseFile command-line flag. The issued license key must support offline verification
feature. Contact info@victoriametrics.com if you need offline license verification.
--loggerLevel {DEBUG,INFO,WARNING,ERROR,FATAL}
Minimum level to log. Possible values: {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'}.
--watch Watch config files for changes and trigger hot reloads. Watches the specified config file or directory for
modifications, deletions, or additions. Upon detecting changes, triggers config reload. If new config
validation fails, continues with previous valid config and state.
-configCheckInterval DURATION
Interval for checking watched config files for content changes. Default: 30s.
--dryRun Validate only: parse + merge all YAML(s) and run schema checks, then exit. Does not require a license to run.
Does not expose metrics, or launch vmanomaly service(s).
--outputSpec PATH Target location of .yaml output spec.
```
{{% available_from "v1.29.5" anomaly %}} When `--watch` is enabled, config changes are detected by fixed-interval content polling instead of filesystem event delivery. The polling frequency is controlled by `-configCheckInterval` (default: `30s`). The same option can also be passed as `--configCheckInterval`, `--config.check.interval`, `--config-check-interval`, `--config_check_interval`, or in key-value form such as `configCheckInterval=30s`.
You can specify these options when running `vmanomaly` to fine-tune logging levels or handle licensing configurations, as per your requirements.
### Licensing
@@ -122,7 +132,7 @@ Below are the steps to get `vmanomaly` up and running inside a Docker container:
1. Pull Docker image:
```sh
docker pull victoriametrics/vmanomaly:v1.29.4
docker pull victoriametrics/vmanomaly:v1.29.5
```
2. Create the license file with your license key.
@@ -142,7 +152,7 @@ docker run -it \
-v ./license:/license \
-v ./config.yaml:/config.yaml \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.4 \
victoriametrics/vmanomaly:v1.29.5 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -159,7 +169,7 @@ docker run -it \
-e VMANOMALY_DATA_DUMPS_DIR=/tmp/vmanomaly/data \
-e VMANOMALY_MODEL_DUMPS_DIR=/tmp/vmanomaly/models \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.4 \
victoriametrics/vmanomaly:v1.29.5 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -172,7 +182,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.4
image: victoriametrics/vmanomaly:v1.29.5
# ...
restart: always
volumes:

View File

@@ -315,7 +315,7 @@ docker run -it --rm \
-e VMANOMALY_MCP_SERVER_URL=http://mcp-vmanomaly:8081/mcp \
-p 8080:8080 \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.4 \
victoriametrics/vmanomaly:v1.29.5 \
vmanomaly_config.yaml
```
@@ -640,6 +640,17 @@ If the **results** look good and the **model configuration should be deployed in
## Changelog
### v1.7.1
Released: 2026-06-11
vmanomaly version: [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295)
- FEATURE: Added bulk Apply/Decline actions for [Copilot](#ai-assistance) chat suggestions.
- BUGFIX: Fixed modal windows closing when the mouse is released outside the window during text selection.
- BUGFIX: Fixed tooltip hover behavior so tooltips do not disappear while the cursor moves into the hover content.
### v1.7.0
Released: 2026-05-15

View File

@@ -143,11 +143,14 @@ server:
> This feature is better used in conjunction with [stateful service](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) to preserve the state of the models and schedulers between restarts and reuse what can be reused, thus avoiding unnecessary re-training of models, re-initialization of schedulers and re-reading of data.
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change filesystem events without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_config_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_config_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
> [!NOTE]
> {{% deprecated_from "v1.29.5" anomaly %}} File system event-based hot reload has been deprecated in favor of content-based polling with configurable `-configCheckInterval` due to reliability issues with Kubernetes ConfigMap symlink rotations and other filesystems where event delivery can be inconsistent. If you were using file system event-based hot reload, please switch to content-based polling by enabling `--watch` flag and configuring `-configCheckInterval` as needed.
### How it works
It works by watching for file system events, such as modifications, creations, or deletions of `.yml|.yaml` files in the specified directories. When a change is detected, the service will attempt to reload the configuration files, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-configuration) and reinitialize the components. If the reload is successful, the `vmanomaly_config_reloads_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
It works by checking watched `.yml|.yaml` file contents in the specified files or directories on the configured interval `-configCheckInterval` (default is `30s`) {{% available_from "v1.29.5" anomaly %}}. When a content change is detected, the service will attempt to reload the configuration files after the existing debounce window, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-configuration) and reinitialize the components. If the reload is successful, the `vmanomaly_config_reloads_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
> If the reload fails, the service will log an error message indicating the reason for the failure, and the **previous configuration will remain active until a successful reload occurs** to preserve the service's stability. This means that if there are errors in the new configuration, the service will continue to operate with the last valid configuration until the issues are resolved.

View File

@@ -449,9 +449,9 @@ models:
> The `decay` argument works only in combination with [online models](#online-models) like [`ZScoreOnlineModel`](#online-z-score) or [`OnlineQuantileModel`](#online-seasonal-quantile).
The `decay` {{% available_from "v1.23.0" anomaly %}} argument is used to control the (exponential) **decay factor** for online models, which determines how quickly the model adapts to new data. It is a float value between `0.0` and `1.0`, where:
- `1.0` means no decay (the model treats all data equally, without giving more weight to recent data). This is the default value for backward compatibility.
- Less than `1.0` means that the model will give more weight to recent data, effectively "forgetting" older data over time.
The `decay` {{% available_from "v1.23.0" anomaly %}} argument is used to control the (exponential) **decay factor** for online models, which determines how quickly the model adapts to new data. It is a positive float value from `(0.0, 1.0]` interval, where:
- Value `1.0` means no decay (the model treats all data points equally, without giving more weight to recent ones). This is the default value for backward compatibility.
- Values less than `1.0` mean that the model will give more weight to recent data, effectively "forgetting" older data over time.
Roughly speaking, for the recent N datapoints model processes `decay` = `d` means that these datapoints will contribute to the model as [1 - d^X] percent of total importance, for example decay of
- `0.99` means that 100 recent datapoints will contribute as [1 - 0.99^100] = 63.23% of total importance
@@ -998,7 +998,7 @@ Here we use Isolation Forest implementation from `scikit-learn` [library](https:
* `class` (string) - model class name `"model.isolation_forest.IsolationForestMultivariateModel"` (or `isolation_forest_multivariate` with class alias support {{% available_from "v1.13.0" anomaly %}})
* `contamination` (float or string, optional) - The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples. Default value - "auto". Should be either `"auto"` or be in the range (0.0, 0.5].
* `contamination` (float or string, optional) - The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples. Default value - "auto". Should be either `"auto"` or be in the range (0.0, 0.5]. {{% available_from "v1.29.5" anomaly %}} Numeric strings, such as `"0.01"`, are accepted, while invalid non-finite values, such as `nan`, `inf`, and `-inf`, are rejected during config validation.
* `seasonal_features` (list of string) - List of seasonality to encode through [cyclical encoding](https://towardsdatascience.com/cyclical-features-encoding-its-about-time-ce23581845ca), i.e. `dow` (day of week). **Introduced in [1.12.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1120)**.
- Empty by default for backward compatibility.
@@ -1265,7 +1265,7 @@ monitoring:
Let's pull the docker image for `vmanomaly`:
```sh
docker pull victoriametrics/vmanomaly:v1.29.4
docker pull victoriametrics/vmanomaly:v1.29.5
```
Now we can run the docker container putting as volumes both config and model file:
@@ -1279,7 +1279,7 @@ docker run -it \
-v $(PWD)/license:/license \
-v $(PWD)/custom_model.py:/vmanomaly/model/custom.py \
-v $(PWD)/custom.yaml:/config.yaml \
victoriametrics/vmanomaly:v1.29.4 /config.yaml \
victoriametrics/vmanomaly:v1.29.5 /config.yaml \
--licenseFile=/license
--watch
```

View File

@@ -10,12 +10,12 @@ sitemap:
- To use *vmanomaly*, part of the enterprise package, a license key is required. Obtain your key [here](https://victoriametrics.com/products/enterprise/trial/) for this tutorial or for enterprise use.
- In the tutorial, we'll be using the following VictoriaMetrics components:
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.137.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.137.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.137.0)
- [Grafana](https://grafana.com/) (v.10.2.1)
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.145.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.145.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.145.0)
- [Grafana](https://grafana.com/) (v12.2.0)
- [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/)
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.7.0) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.27.0)
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.9.1) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.28.1)
![typical setup diagram](guide-vmanomaly-vmalert_overview.webp)
@@ -323,7 +323,7 @@ Let's wrap it all up together into the `docker-compose.yml` file.
services:
vmagent:
container_name: vmagent
image: victoriametrics/vmagent:v1.137.0
image: victoriametrics/vmagent:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -340,7 +340,7 @@ services:
victoriametrics:
container_name: victoriametrics
image: victoriametrics/victoria-metrics:v1.137.0
image: victoriametrics/victoria-metrics:v1.145.0
ports:
- 8428:8428
volumes:
@@ -356,7 +356,7 @@ services:
grafana:
container_name: grafana
image: grafana/grafana-oss:10.2.1
image: grafana/grafana:12.2.0
depends_on:
- "victoriametrics"
ports:
@@ -373,7 +373,7 @@ services:
vmalert:
container_name: vmalert
image: victoriametrics/vmalert:v1.137.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -395,7 +395,7 @@ services:
restart: always
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.4
image: victoriametrics/vmanomaly:v1.29.5
depends_on:
- "victoriametrics"
ports:
@@ -412,7 +412,7 @@ services:
- "--licenseFile=/license"
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.27.0
image: prom/alertmanager:v0.28.1
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:
@@ -424,7 +424,7 @@ services:
restart: always
node-exporter:
image: quay.io/prometheus/node-exporter:v1.7.0
image: quay.io/prometheus/node-exporter:v1.9.1
container_name: node-exporter
ports:
- 9100:9100

View File

@@ -6,45 +6,348 @@ build:
sitemap:
disable: true
---
**Objective**
Setup Victoria Metrics Cluster with support of multiple retention periods within one installation.
> [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions for distinct sets of time series and tenants. If you are an Enterprise user, [configure multiple retentions directly through retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters) instead of following this guide.
**Enterprise Solution**
This guide explains how to set up multiple retentions using an [open-source VictoriaMetrics Cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/).
[VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions
for distinct sets of time series and [tenants](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
via [retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters).
## Overview
**Open Source Solution**
VictoriaMetrics retains metrics by default for **1 month**. You can change data retention with the [`-retentionPeriod` command-line flag](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention), but this value applies to **all time series stored** on a given `vmstorage` node and cannot be customized per tenant or per metric in the open source version.
Community version of VictoriaMetrics supports only one retention period per `vmstorage` node via [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) command-line flag.
The core idea of this guide is to run **separate logic groups of storages** (or even clusters) with individual `-retentionPeriod` settings, while still providing a single unified write and read path via vmagent and vmselect.
A multi-retention setup can be implemented by dividing a [victoriametrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) into logical groups with different retentions.
## Multi-Retention Architecture
Example:
Setup should handle 3 different retention groups 3months, 1year and 3 years.
Solution contains 3 groups of vmstorages + vminserts and one group of vmselects. Routing is done by [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
by [splitting data streams](https://docs.victoriametrics.com/victoriametrics/vmagent/#splitting-data-streams-among-multiple-systems).
The [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) sets how long to keep the metrics.
To support multiple retentions with the open source version of VictoriaMetrics cluster, you can split the cluster into several logical groups of storage nodes. Each group is configured with a different `-retentionPeriod` and receives only the data that must follow that retention.
The diagram below shows a proposed solution
Each storage group is connected to a separate vminsert, while a shared vmselect layer queries across all storage groups so that dashboards and alerts continue to see a single unified VictoriaMetrics backend.
![Setup](setup.webp)
**Implementation Details**
In the example used throughout this guide, the cluster is divided into three groups:
1. Groups of vminserts A know about only vmstorages A and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts B know about only vmstorages B and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts C know about only vmstorages C and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. vmselect reads data from all vmstorage nodes via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup)
with [deduplication](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#deduplication) setting equal to vmagent's scrape interval or minimum interval between collected samples.
1. vmagent routes incoming metrics to the given set of `vminsert` nodes using relabeling rules specified at `-remoteWrite.urlRelabelConfig` [configuration](https://docs.victoriametrics.com/victoriametrics/relabeling/).
- Group A: 3-month retention.
- Group B: 1-year retention.
- Group C: 3-year retention.
**Multi-Tenant Setup**
Metrics are routed to the appropriate vminsert group by splitting data streams in vmagent, so each time series is sent to exactly one retention group instead of being replicated to all groups. See [Deploying vmagent](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#step3) for an example of labelbased routing that implements this split. An optional [vmauth](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#additional-enhancements) layer can be added on top to restrict access to specific subclusters or tenants while still keeping a unified write and read path.
Every group of vmstorages can handle one tenant or multiple one. Different groups can have overlapping tenants. As vmselect reads from all vmstorage nodes, the data is aggregated on its level.
## Implementing Multi-Retention on Kubernetes
**Additional Enhancements**
In this section, we'll install and configure the components for a multi-retention deployment of the VictoriaMetrics cluster. See [Kubernetes monitoring with VictoriaMetrics Cluster](https://docs.victoriametrics.com/guides/k8s-monitoring-via-vm-cluster/) for details on running VictoriaMetrics in Kubernetes.
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) for routing data to the given vminsert group depending on the needed retention.
Run the following command to add the VictoriaMetrics Helm repository:
```shell
helm repo add vm https://victoriametrics.github.io/helm-charts/
helm repo update
```
### Step 1: Deploying storage groups {#step1}
We'll create three storage groups. Each has a different retention period and disk size. Read [Understand Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size/) to estimate how much space you will need for each group. The following table is shown as an example:
| Group | Retention Period | Total disk size |
|--------------|------------------|-----------------------|
| `vmcluster-a` | 3 months (`3M`) | 80 Gi |
| `vmcluster-b` | 1 year (`1Y`) | 300 Gi |
| `vmcluster-c` | 3 years (`3Y`) | 900 Gi |
Create a Helm values file for Group A.
```shell
cat <<EOF > vmcluster-a.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 80Gi
extraArgs:
retentionPeriod: 3M
podLabels:
retention-group: a
vminsert:
enabled: true
podLabels:
retention-group: a
vmselect:
enabled: false
EOF
```
The values file above creates vminsert and vmstorage services while turning off vmselect, which we'll deploy separately. The `retentionPeriod` flag configures how long data is kept in this group.
Create the values files for Group B and Group C:
```shell
cat <<EOF > vmcluster-b.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 300Gi
extraArgs:
retentionPeriod: 1y
podLabels:
retention-group: b
vminsert:
enabled: true
podLabels:
retention-group: b
vmselect:
enabled: false
EOF
cat <<EOF > vmcluster-c.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 900Gi
extraArgs:
retentionPeriod: 3y
podLabels:
retention-group: c
vminsert:
enabled: true
podLabels:
retention-group: c
vmselect:
enabled: false
EOF
```
Deploy the three storage groups with:
```shell
helm upgrade --install vmcluster-a vm/victoria-metrics-cluster -f vmcluster-a.yaml
helm upgrade --install vmcluster-b vm/victoria-metrics-cluster -f vmcluster-b.yaml
helm upgrade --install vmcluster-c vm/victoria-metrics-cluster -f vmcluster-c.yaml
# Wait for all storage pods to be ready
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-a
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-b
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-c
```
### Step 2: Deploying vmselect {#step2}
Next, we'll deploy a vmselect service to route queries to the storage groups.
Create a Helm values file with:
```shell
cat <<EOF >vmselect.yaml
vmstorage:
enabled: false
vminsert:
enabled: false
vmselect:
enabled: true
replicaCount: 1
suppressStorageFQDNsRender: true
extraArgs:
# Each list item is a single -storageNode flag. In this example, there is
# one vmstorage pod per retention group, so each entry contains a single host.
# If you run multiple pods per group, list them as comma-separated hosts
# in the same -storageNode value.
#
# The FQDN format is:
# <pod>.<svc>.default.svc
# where pod = <release>-victoria-metrics-cluster-vmstorage-<N>
# and svc = <release>-victoria-metrics-cluster-vmstorage
storageNode:
- "vmcluster-a-victoria-metrics-cluster-vmstorage-0.vmcluster-a-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-b-victoria-metrics-cluster-vmstorage-0.vmcluster-b-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-c-victoria-metrics-cluster-vmstorage-0.vmcluster-c-victoria-metrics-cluster-vmstorage.default.svc:8401"
EOF
```
Let's break down the file above:
- Deploys vmselect as a separate Helm release.
- Disables vminsert and vmstorage as these services were already deployed in Step 1.
- `suppressStorageFQDNsRender: true` turns off automatic FQDN generation for storage nodes. By default, the Helm chart auto-generates `-storageNodes` flags, but since `vmstorage` has been disabled, we need to supply them manually in `extraArgs`.
- In `extraArgs.storageNode:` we define the vmstorage endpoints for queries. On querying, vmselect merges results across all the specified vmstorages to provide a unified view of the data.
Deploy the `vmselect` release with:
```shell
helm upgrade --install vmselect vm/victoria-metrics-cluster -f vmselect.yaml
```
### Step 3: Deploying vmagent {#step3}
We'll use `vmagent` to route incoming metrics to the correct retention group. For example, we can use a `retention` label for mapping metrics to storage groups in the following way:
| `retention` label | Storage Group |
|-------------------|--------------|
| `"3mo"` | `vmcluster-a` |
| `"1yr"` | `vmcluster-b` |
| `"3yr"` | `vmcluster-c` |
Create the values file for vmagent:
```shell
cat <<EOF >vmagent.yaml
service:
enabled: true
remoteWrite:
# Group A: receives metrics with retention="3mo"
- url: http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3mo"}'
action: keep
# Group B: receives metrics with retention="1yr"
- url: http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="1yr"}'
action: keep
# Group C: receives metrics with retention="3yr"
- url: http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3yr"}'
action: keep
EOF
```
> Metrics without a matching `retention` label are silently dropped by the `keep` rules. You must ensure that every metric is labeled, or use a different routing configuration.
Now deploy the vmagent release:
```shell
helm upgrade --install vmagent vm/victoria-metrics-agent -f vmagent.yaml
```
Wait for vmagent to become ready:
```shell
kubectl rollout status deploy/vmagent-victoria-metrics-agent
```
### Step 4: Verification
We can send test data to verify that the data is flowing to the correct storage group.
First, port-forward vmagent and vmselect:
```shell
VMAGENT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMAGENT_SVC" 8429 &
VMSELECT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmselect -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMSELECT_SVC" 8481 &
```
Send test metrics directly to vmagent's HTTP endpoint to exercise all three retention labels:
```shell
POD=$(kubectl get pod -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
for retention in 3mo 1yr 3yr; do
kubectl exec "$POD" -- wget -qO- --post-data="test_routing{retention=\"${retention}\"} 1.0" \
"http://127.0.0.1:8429/api/v1/import/prometheus"
done
```
Query the data back from vmselect (it may take around 30-60 seconds for new data to be available for queries):
```shell
for retention in 3mo 1yr 3yr; do
echo "-> retention=${retention}"
curl -s "http://localhost:8481/select/0/prometheus/api/v1/query" \
--data-urlencode "query=test_routing{retention=\"${retention}\"}"
echo
done
```
You can also check that vmagent is forwarding data to all three groups:
```shell
curl -s http://localhost:8429/metrics | grep vmagent_remotewrite_blocks_sent_total
```
Each `url="N:secret-url"` corresponds to one `remoteWrite` entry (N=1 for Group A, N=2 for Group B, N=3 for Group C). Non-zero values confirm data is flowing.
## Alternative Routing by Existing Labels
The example setup above relies on a synthetic `retention` label to exist in every incoming metric.
If having a `retention` label in every metric isn't practical, you can, as an alternative, rely on existing labels to map data to the correct storage group.
The following example configures vmagent to route metrics based on the `environment` and `team` labels:
```yaml
# vmagent.yaml
remoteWrite:
# send dev and staging data to Group A
- url: "http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"dev|staging"}
action: keep
# send prod data to Group B
- url: "http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"prod|production"}
action: keep
# send data from Infra and SRE teams to Group C
- url: "http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {team=~"infra|sre"}
action: keep
```
> Metrics that do not match any of the `keep` rules are dropped in the configuration above.
## Additional Enhancements
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) to route data to the specified vminsert group based on the required retention or to restrict which data different users can query.
The following [`-auth.config`](https://docs.victoriametrics.com/victoriametrics/vmauth/#quick-start) example exposes the same vmselect backend via vmauth with two users using basic auth:
- `admin`: can query **all** data across all retention groups.
- `dev`: can query **only** time series that have `team="dev"` label, enforced via the `extra_label` query argument.
```yaml
users:
# User with access to all data across all retention groups
- username: "admin"
password: "foo"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# vmselect service that aggregates all vmstorage groups
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus"
# User restricted to Dev team data only
- username: "dev"
password: "bar"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# Same vmselect backend, but enforce label filter at query time
# by adding extra_label=team=dev to every proxied request
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus/?extra_label=team=dev"
```
This is useful for restricting access by team, environment, or tenant without changing the underlying storage topology.

View File

@@ -26,8 +26,6 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -546,7 +546,7 @@ tags at [Docker Hub](https://hub.docker.com/r/victoriametrics/vmalert/tags) and
## Reading rules from object storage
[Enterprise version](https://docs.victoriametrics.com/victoriametrics/enterprise/) of `vmalert` may read alerting and recording rules
The [Enterprise version](https://docs.victoriametrics.com/victoriametrics/enterprise/) of `vmalert` may read alerting and recording rules
from object storage:
* `./bin/vmalert -rule=s3://bucket/dir/alert.rules` would read rules from the given path at S3 bucket
@@ -563,6 +563,8 @@ The following [command-line flags](#flags) can be used for fine-tuning access to
* `-s3.customEndpoint` - custom S3 endpoint for use with S3-compatible storages (e.g. MinIO). S3 is used if not set.
* `-s3.forcePathStyle` - prefixing endpoint with bucket name when set false, true by default.
See [providing credentials as a file](https://docs.victoriametrics.com/victoriametrics/vmbackup/#providing-credentials-as-a-file) for details on how to create and use credentials to access S3-compatible buckets and Google Cloud Storage.
## Topology examples
The following sections are showing how `vmalert` may be used and configured

View File

@@ -204,38 +204,80 @@ See [this article](https://medium.com/@valyala/speeding-up-backups-for-big-time-
### Providing credentials as a file
Obtaining credentials from a file.
`vmbackup`, `vmbackupmanager`, and [`vmalert`](https://docs.victoriametrics.com/victoriametrics/vmalert/) can load credentials from a file via the `-credsFilePath` flag to access remote S3-compatible buckets and Google Cloud Storage.
Add flag `-credsFilePath=/etc/credentials` with the following content:
To use a credential file, add the flag:
* for S3 (AWS, MinIO or other S3 compatible storages):
```sh
-credsFilePath=/etc/credentials
```
```sh
[default]
aws_access_key_id=theaccesskey
aws_secret_access_key=thesecretaccesskeyvalue
```
The argument should point to a file with one of the formats below, depending on the storage provider.
* for GCP cloud storage:
#### S3 (AWS and S3-compatible)
```json
{
"type": "service_account",
"project_id": "project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\nprivate-key\n-----END PRIVATE KEY-----\n",
"client_email": "service-account-email",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/service-account-email"
}
```
1. In AWS, [create an IAM user](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_users_create.html) or role with permissions to read and write the target bucket.
2. [Create an access key](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) for that IAM identity and copy the **Access key** and **Secret access key** values
3. On the machine running `vmbackup`, create a credentials file with the following content and point `-credsFilePath` to it:
```ini
[default]
aws_access_key_id=YOUR_AWS_ACCESS_KEY
aws_secret_access_key=YOUR_AWS_SECRET_ACCESS_KEY
```
This format matches the standard shared AWS credentials file used by the [AWS CLI](https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html) and [AWS SDKs](https://docs.aws.amazon.com/sdkref/latest/guide/file-format.html).
For S3-compatible backends such as [MinIO](https://www.min.io/) or [Ceph](https://ceph.io/), create access keys in the respective
system and use the same file format and set a custom endpoint with `-customS3Endpoint`.
For example:
```sh
vmbackup \
-storageDataPath=/data \
-snapshot.createURL=http://localhost:8428/snapshot/create \
-dst=s3://victoriametrics-backup/backup01 \
-customS3Endpoint=http://minio.example.local:9000 \
-credsFilePath=/etc/credentials
```
#### Google Cloud Storage (GCS)
To create an IAM user and download the credential file, follow these steps:
1. Open the Google Cloud Console and go to **IAM & Admin → Service Accounts**.
2. Click **Create service account**.
3. Enter a service account name.
4. Assign the role the account needs to access Google Cloud Storage. See [IAM permissions for JSON methods](https://docs.cloud.google.com/storage/docs/access-control/iam-json) for more details.
5. Open the service account, go to **Keys**, then click **Add key → Create new key**.
6. Choose **JSON** as the key type
7. Save the downloaded JSON file on the machine running `vmbackup` and point `-credsFilePath` to it. The file contents look similar to:
```json
{
"type": "service_account",
"project_id": "project-id",
"private_key_id": "key-id",
"private_key": "-----BEGIN PRIVATE KEY-----\nprivate-key\n-----END PRIVATE KEY-----\n",
"client_email": "service-account-email",
"client_id": "client-id",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/service-account-email"
}
```
This JSON is the standard service account key format defined by [Google Cloud IAM](https://developers.google.com/workspace/guides/create-credentials) and is used by Google client libraries and tools.
#### Azure Blob Storage
Azure Blob Storage uses environment variables rather than `-credsFilePath` in `vmbackup`. See [providing credentials via env variables](https://docs.victoriametrics.com/victoriametrics/vmbackup/#providing-credentials-via-env-variables) for details.
### Providing credentials via env variables
Obtaining credentials from env variables.
Obtaining credentials from environment variables.
* For AWS S3 compatible storages set env variable `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.
Also you can set env variable `AWS_SHARED_CREDENTIALS_FILE` with path to credentials file.

View File

@@ -14,16 +14,10 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
av.h.Update(sample.value)
}
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*histogramBucketAggrConfig)
shared := av.shared
if ac.useSharedState {
shared.Merge(&av.h)
av.h.Reset()
} else {
shared = &av.h
}
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
av.shared.Merge(&av.h)
av.h.Reset()
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
}
@@ -32,26 +26,17 @@ func (av *histogramBucketAggrValue) state() any {
return av.shared
}
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
return &histogramBucketAggrConfig{
useSharedState: useSharedState,
}
func newHistogramBucketAggrConfig() aggrConfig {
return &histogramBucketAggrConfig{}
}
type histogramBucketAggrConfig struct {
useSharedState bool
}
type histogramBucketAggrConfig struct{}
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
var shared *metrics.Histogram
if ac.useSharedState {
if s == nil {
shared = &metrics.Histogram{}
} else {
shared = s.(*metrics.Histogram)
}
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
if s == nil {
s = &metrics.Histogram{}
}
return &histogramBucketAggrValue{
shared: shared,
shared: s.(*metrics.Histogram),
}
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
type increaseAggrValue struct {
total *float64
shared map[string]increaseLastValue
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
if av.total == nil {
av.total = new(float64)
}
if ok {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if sample.value >= lv.value {
*av.total += sample.value - lv.value
} else {
// counter reset
*av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
*av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
for lk, lv := range av.shared {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(av.shared, lk)
}
}
if av.total == nil {
return
}
total := *av.total
av.total = nil
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared map[string]increaseLastValue
if s == nil {
shared = make(map[string]increaseLastValue)
} else {
shared = s.(map[string]increaseLastValue)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -75,6 +75,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
outputs = av.blue
}
for idx, o := range outputs {
if o == nil {
o = av.blue[idx]
}
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
}
av.deleteDeadline = deleteDeadline
@@ -112,6 +115,9 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
outputs = av.blue
}
for i, o := range outputs {
if o == nil {
o = av.blue[i]
}
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
}
av.mu.Unlock()

View File

@@ -609,7 +609,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -716,7 +716,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -760,11 +760,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "count_series":
return newCountSeriesAggrConfig(), nil
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
return newHistogramBucketAggrConfig(), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -53,36 +53,30 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
for lk, lv := range av.shared.lastValues {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
delete(av.shared.lastValues, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
av.shared.total = total
}
ctx.appendSeries(key, suffix, total)
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +84,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +109,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}