mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-08 03:14:09 +03:00
Compare commits
1 Commits
fs-paralle
...
follow-up-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0234437092 |
@@ -3,9 +3,11 @@ package remotewrite
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -30,6 +32,8 @@ var (
|
||||
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
||||
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
||||
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
||||
relabelConfigCheckInterval = flag.Duration("relabel.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
|
||||
"-remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
|
||||
)
|
||||
|
||||
var labelsGlobal []prompb.Label
|
||||
@@ -67,13 +71,15 @@ func initRelabelConfigs() {
|
||||
}
|
||||
}
|
||||
|
||||
func reloadRelabelConfigs() {
|
||||
func reloadRelabelConfigs(logReload bool) {
|
||||
rcs := allRelabelConfigs.Load()
|
||||
if !rcs.isSet() {
|
||||
return
|
||||
}
|
||||
relabelConfigReloads.Inc()
|
||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
if logReload {
|
||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
}
|
||||
rcs, err := loadRelabelConfigs()
|
||||
if err != nil {
|
||||
relabelConfigReloadErrors.Inc()
|
||||
@@ -271,3 +277,26 @@ func fixPromCompatibleNaming(labels []prompb.Label) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startRelabelConfigReloader(sighupCh <-chan os.Signal) {
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *relabelConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*relabelConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
reloadRelabelConfigs(true)
|
||||
case <-tickerCh:
|
||||
reloadRelabelConfigs(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -34,8 +34,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
remoteWriteConfigCheckInterval = flag.Duration("remoteWrite.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
|
||||
"-streamAggr.config, -remoteWrite.streamAggr.config, -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
|
||||
remoteWriteURLs = flagutil.NewArrayString("remoteWrite.url", "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol "+
|
||||
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
||||
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
|
||||
@@ -215,27 +213,8 @@ func Init() {
|
||||
|
||||
dropDanglingQueues()
|
||||
|
||||
// Start config reloader.
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *remoteWriteConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*remoteWriteConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
case <-tickerCh:
|
||||
}
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
}
|
||||
}()
|
||||
startRelabelConfigReloader(sighupCh)
|
||||
startStreamAggrConfigReloader(sighupCh)
|
||||
}
|
||||
|
||||
func dropDanglingQueues() {
|
||||
|
||||
@@ -3,7 +3,9 @@ package remotewrite
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -65,6 +67,8 @@ var (
|
||||
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
|
||||
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
|
||||
"-streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.")
|
||||
)
|
||||
|
||||
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
|
||||
@@ -91,20 +95,22 @@ func CheckStreamAggrConfigs() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfigs() {
|
||||
reloadStreamAggrConfigGlobal()
|
||||
func reloadStreamAggrConfigs(logReload bool) {
|
||||
reloadStreamAggrConfigGlobal(logReload)
|
||||
for _, rwctx := range rwctxsGlobal {
|
||||
rwctx.reloadStreamAggrConfig()
|
||||
rwctx.reloadStreamAggrConfig(logReload)
|
||||
}
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfigGlobal() {
|
||||
func reloadStreamAggrConfigGlobal(logReload bool) {
|
||||
path := *streamAggrGlobalConfig
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
|
||||
if logReload {
|
||||
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := newStreamAggrConfigGlobal()
|
||||
@@ -122,7 +128,9 @@ func reloadStreamAggrConfigGlobal() {
|
||||
logger.Infof("successfully reloaded -streamAggr.config=%q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
if logReload {
|
||||
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
@@ -171,13 +179,15 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
|
||||
}
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
|
||||
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
|
||||
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
if logReload {
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := rwctx.newStreamAggrConfig()
|
||||
@@ -195,7 +205,9 @@ func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
|
||||
logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
if logReload {
|
||||
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
@@ -256,3 +268,26 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
|
||||
}
|
||||
return sas, nil
|
||||
}
|
||||
|
||||
func startStreamAggrConfigReloader(sighupCh <-chan os.Signal) {
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
reloadStreamAggrConfigs(true)
|
||||
case <-tickerCh:
|
||||
reloadStreamAggrConfigs(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ var (
|
||||
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
|
||||
"The path can point either to local file or to http url. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal or"+
|
||||
"at the interval specified by -relabel.configCheckInterval.")
|
||||
"at the interval specified by -relabelConfigCheckInterval.")
|
||||
|
||||
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
||||
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
||||
|
||||
@@ -27,7 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: upgrade Go builder from Go1.24.6 to Go1.25. See [Go1.25 release notes](https://tip.golang.org/doc/go1.25).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add export functionality for Query (Table view) and RawQuery tabs in CSV/JSON format. See [#9332](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9332).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `-relabelConfigCheckInterval` and `-streamAggr.configCheckInterval` flags for periodical stream aggregation and relabel configuration reload. When set, `vmsingle` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-remoteWrite.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-streamAggr.configCheckInterval` and `-relabel.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent remote write ingestion stop on push error for [Google Pub/Sub](https://docs.victoriametrics.com/victoriametrics/vmagent/#writing-metrics-to-pubsub) integration.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly handle [mTLS authorization and routing](https://docs.victoriametrics.com/victoriametrics/vmauth/#mtls-based-request-routing). Previously it didn't work. See [#29](https://github.com/VictoriaMetrics/VictoriaLogs/issues/29).
|
||||
|
||||
@@ -377,7 +377,8 @@ and `-remoteWrite.streamAggr.config`:
|
||||
There is also:
|
||||
|
||||
* `-promscrape.configCheckInterval` command-line flag controls how often VictoriaMetrics checks the `-promscrape.config` file for changes and reloads it automatically.
|
||||
* `-remoteWrite.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config`, `-streamAggr.config`, `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
|
||||
* `-streamAggr.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config` and `-streamAggr.config` flags.
|
||||
* `-relabel.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
|
||||
|
||||
## SRV urls
|
||||
|
||||
@@ -1888,6 +1889,8 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Optional URL to push metrics exposed at /metrics page. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics . By default, metrics exposed at /metrics page aren't pushed to any remote storage
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-relabel.configCheckInterval duration
|
||||
Interval for checking for changes in configurations defined via -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
|
||||
-reloadAuthKey value
|
||||
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
|
||||
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
|
||||
@@ -1943,8 +1946,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Optional path to bearer token file to use for the corresponding -remoteWrite.url. The token is re-read from the file every second
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-remoteWrite.configCheckInterval duration
|
||||
Interval for checking for changes in configurations defined via -streamAggr.config, -remoteWrite.streamAggr.config, -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
|
||||
-remoteWrite.disableOnDiskQueue array
|
||||
Whether to disable storing pending data to -remoteWrite.tmpDataPath when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
@@ -2133,6 +2134,8 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
|
||||
-streamAggr.configCheckInterval duration
|
||||
Interval for checking for changes in configurations defined via -streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
|
||||
-streamAggr.dropInput
|
||||
|
||||
Reference in New Issue
Block a user