Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
0234437092 follow-up for #9598 2025-08-26 14:23:49 +03:00
6 changed files with 85 additions and 39 deletions

View File

@@ -3,9 +3,11 @@ package remotewrite
import (
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -30,6 +32,8 @@ var (
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
relabelConfigCheckInterval = flag.Duration("relabel.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
"-remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
)
var labelsGlobal []prompb.Label
@@ -67,13 +71,15 @@ func initRelabelConfigs() {
}
}
func reloadRelabelConfigs() {
func reloadRelabelConfigs(logReload bool) {
rcs := allRelabelConfigs.Load()
if !rcs.isSet() {
return
}
relabelConfigReloads.Inc()
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
if logReload {
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
}
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
@@ -271,3 +277,26 @@ func fixPromCompatibleNaming(labels []prompb.Label) {
}
}
}
func startRelabelConfigReloader(sighupCh <-chan os.Signal) {
configReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *relabelConfigCheckInterval > 0 {
ticker := time.NewTicker(*relabelConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
reloadRelabelConfigs(true)
case <-tickerCh:
reloadRelabelConfigs(false)
}
}
}()
}

View File

@@ -34,8 +34,6 @@ import (
)
var (
remoteWriteConfigCheckInterval = flag.Duration("remoteWrite.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
"-streamAggr.config, -remoteWrite.streamAggr.config, -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
remoteWriteURLs = flagutil.NewArrayString("remoteWrite.url", "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol "+
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
@@ -215,27 +213,8 @@ func Init() {
dropDanglingQueues()
// Start config reloader.
configReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *remoteWriteConfigCheckInterval > 0 {
ticker := time.NewTicker(*remoteWriteConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
case <-tickerCh:
}
reloadRelabelConfigs()
reloadStreamAggrConfigs()
}
}()
startRelabelConfigReloader(sighupCh)
startStreamAggrConfigReloader(sighupCh)
}
func dropDanglingQueues() {

View File

@@ -3,7 +3,9 @@ package remotewrite
import (
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -65,6 +67,8 @@ var (
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
"-streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
@@ -91,20 +95,22 @@ func CheckStreamAggrConfigs() error {
return nil
}
func reloadStreamAggrConfigs() {
reloadStreamAggrConfigGlobal()
func reloadStreamAggrConfigs(logReload bool) {
reloadStreamAggrConfigGlobal(logReload)
for _, rwctx := range rwctxsGlobal {
rwctx.reloadStreamAggrConfig()
rwctx.reloadStreamAggrConfig(logReload)
}
}
func reloadStreamAggrConfigGlobal() {
func reloadStreamAggrConfigGlobal(logReload bool) {
path := *streamAggrGlobalConfig
if path == "" {
return
}
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
if logReload {
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigGlobal()
@@ -122,7 +128,9 @@ func reloadStreamAggrConfigGlobal() {
logger.Infof("successfully reloaded -streamAggr.config=%q", path)
} else {
sasNew.MustStop()
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
if logReload {
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
}
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
@@ -171,13 +179,15 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
}
}
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
if path == "" {
return
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
if logReload {
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := rwctx.newStreamAggrConfig()
@@ -195,7 +205,9 @@ func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path)
} else {
sasNew.MustStop()
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
if logReload {
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
}
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
@@ -256,3 +268,26 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
}
return sas, nil
}
func startStreamAggrConfigReloader(sighupCh <-chan os.Signal) {
configReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
reloadStreamAggrConfigs(true)
case <-tickerCh:
reloadStreamAggrConfigs(false)
}
}
}()
}

View File

@@ -19,7 +19,7 @@ var (
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
"The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal or"+
"at the interval specified by -relabel.configCheckInterval.")
"at the interval specified by -relabelConfigCheckInterval.")
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+

View File

@@ -27,7 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: upgrade Go builder from Go1.24.6 to Go1.25. See [Go1.25 release notes](https://tip.golang.org/doc/go1.25).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add export functionality for Query (Table view) and RawQuery tabs in CSV/JSON format. See [#9332](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9332).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `-relabelConfigCheckInterval` and `-streamAggr.configCheckInterval` flags for periodical stream aggregation and relabel configuration reload. When set, `vmsingle` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-remoteWrite.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-streamAggr.configCheckInterval` and `-relabel.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent remote write ingestion stop on push error for [Google Pub/Sub](https://docs.victoriametrics.com/victoriametrics/vmagent/#writing-metrics-to-pubsub) integration.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly handle [mTLS authorization and routing](https://docs.victoriametrics.com/victoriametrics/vmauth/#mtls-based-request-routing). Previously it didn't work. See [#29](https://github.com/VictoriaMetrics/VictoriaLogs/issues/29).

View File

@@ -377,7 +377,8 @@ and `-remoteWrite.streamAggr.config`:
There is also:
* `-promscrape.configCheckInterval` command-line flag controls how often VictoriaMetrics checks the `-promscrape.config` file for changes and reloads it automatically.
* `-remoteWrite.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config`, `-streamAggr.config`, `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
* `-streamAggr.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config` and `-streamAggr.config` flags.
* `-relabel.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
## SRV urls
@@ -1888,6 +1889,8 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Optional URL to push metrics exposed at /metrics page. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics . By default, metrics exposed at /metrics page aren't pushed to any remote storage
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-relabel.configCheckInterval duration
Interval for checking for changes in configurations defined via -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
@@ -1943,8 +1946,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.configCheckInterval duration
Interval for checking for changes in configurations defined via -streamAggr.config, -remoteWrite.streamAggr.config, -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
-remoteWrite.disableOnDiskQueue array
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload
Supports array of values separated by comma or specified via multiple flags.
@@ -2133,6 +2134,8 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.configCheckInterval duration
Interval for checking for changes in configurations defined via -streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
-streamAggr.dropInput