Compare commits

...

11 Commits

Author SHA1 Message Date
Jayice
821f392f02 address review comments 2026-05-27 14:36:09 +08:00
Jayice
a991ea9b91 address review comments 2026-05-27 14:22:06 +08:00
Jayice
71cfe5a061 fix lint 2026-05-26 17:15:18 +08:00
Jayice
c2d9e0f5f2 introduce mdx.remoteWrite.* configs 2026-05-26 15:55:16 +08:00
Jayice
919049f9e2 push slice back to pool 2026-05-25 17:34:41 +08:00
Jayice
24efe47c6a add unit test & address review comments 2026-05-25 17:08:23 +08:00
Jayice
f8d99d9289 polish documentation 2026-04-27 18:02:41 +08:00
Jayice
333a015be5 update CHANGELOG.md 2026-04-27 15:15:59 +08:00
JAYICE
b6196524ba Merge branch 'master' into issue-10600
Signed-off-by: JAYICE <1185430411@qq.com>
2026-04-27 15:13:02 +08:00
Jayice
e9f1bb911c add documentation 2026-04-27 15:10:17 +08:00
Jayice
12b79143dc implement mdx for remote write 2026-04-21 15:48:46 +08:00
8 changed files with 826 additions and 106 deletions

View File

@@ -58,6 +58,62 @@ var (
"For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. "+
"Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'")
mdxForcePromProto = flagutil.NewArrayBool("mdx.remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol")
mdxForceVMProto = flagutil.NewArrayBool("remoteWrite.forceVMProto", "Whether to force VictoriaMetrics remote write protocol for sending data "+
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol")
mdxRateLimit = flagutil.NewArrayInt("mdx.remoteWrite.rateLimit", 0, "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
"is sent after temporary unavailability of the remote storage. See also -maxIngestionRate")
mdxSendTimeout = flagutil.NewArrayDuration("mdx.remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url")
mdxRetryMinInterval = flagutil.NewArrayDuration("mdx.remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
// deprecated in the future. use -remoteWrite.retryMaxInterval instead
mdxRetryMaxTime = flagutil.NewArrayDuration("mdx.remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. This flag is deprecated, use -remoteWrite.retryMaxInterval instead")
mdxRetryMaxInterval = flagutil.NewArrayDuration("mdx.remoteWrite.retryMaxInterval", time.Minute, "The maximum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. The delay doubles with each retry until this maximum is reached, after which it remains constant. See also -remoteWrite.retryMinInterval")
mdxProxyURL = flagutil.NewArrayString("mdx.remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234")
mdxTlsHandshakeTimeout = flagutil.NewArrayDuration("mdx.remoteWrite.tlsHandshakeTimeout", 20*time.Second, "The timeout for establishing tls connections to the corresponding -remoteWrite.url")
mdxTlsInsecureSkipVerify = flagutil.NewArrayBool("mdx.remoteWrite.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to the corresponding -remoteWrite.url")
mdxTlsCertFile = flagutil.NewArrayString("mdx.remoteWrite.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting "+
"to the corresponding -remoteWrite.url")
mdxTlsKeyFile = flagutil.NewArrayString("mdx.remoteWrite.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to the corresponding -remoteWrite.url")
mdxTlsCAFile = flagutil.NewArrayString("mdx.remoteWrite.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. "+
"By default, system CA is used")
mdxTlsServerName = flagutil.NewArrayString("mdx.remoteWrite.tlsServerName", "Optional TLS server name to use for connections to the corresponding -remoteWrite.url. "+
"By default, the server name from -remoteWrite.url is used")
mdxHeaders = flagutil.NewArrayString("mdx.remoteWrite.headers", "Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. "+
"For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. "+
"Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'")
mdxBasicAuthUsername = flagutil.NewArrayString("mdx.remoteWrite.basicAuth.username", "Optional basic auth username to use for the corresponding -remoteWrite.url")
mdxBasicAuthPassword = flagutil.NewArrayString("mdx.remoteWrite.basicAuth.password", "Optional basic auth password to use for the corresponding -remoteWrite.url")
mdxBasicAuthPasswordFile = flagutil.NewArrayString("mdx.remoteWrite.basicAuth.passwordFile", "Optional path to basic auth password to use for the corresponding -remoteWrite.url. "+
"The file is re-read every second")
mdxBearerToken = flagutil.NewArrayString("mdx.remoteWrite.bearerToken", "Optional bearer auth token to use for the corresponding -remoteWrite.url")
mdxBearerTokenFile = flagutil.NewArrayString("mdx.remoteWrite.bearerTokenFile", "Optional path to bearer token file to use for the corresponding -remoteWrite.url. "+
"The token is re-read from the file every second")
mdxOauth2ClientID = flagutil.NewArrayString("mdx.remoteWrite.oauth2.clientID", "Optional OAuth2 clientID to use for the corresponding -remoteWrite.url")
mdxOauth2ClientSecret = flagutil.NewArrayString("mdx.remoteWrite.oauth2.clientSecret", "Optional OAuth2 clientSecret to use for the corresponding -remoteWrite.url")
mdxOauth2ClientSecretFile = flagutil.NewArrayString("mdx.remoteWrite.oauth2.clientSecretFile", "Optional OAuth2 clientSecretFile to use for the corresponding -remoteWrite.url")
mdxOauth2EndpointParams = flagutil.NewArrayString("mdx.remoteWrite.oauth2.endpointParams", "Optional OAuth2 endpoint parameters to use for the corresponding -remoteWrite.url . "+
`The endpoint parameters must be set in JSON format: {"param1":"value1",...,"paramN":"valueN"}`)
mdxOauth2TokenURL = flagutil.NewArrayString("mdx.remoteWrite.oauth2.tokenUrl", "Optional OAuth2 tokenURL to use for the corresponding -remoteWrite.url")
mdxOauth2Scopes = flagutil.NewArrayString("mdx.remoteWrite.oauth2.scopes", "Optional OAuth2 scopes to use for the corresponding -remoteWrite.url. Scopes must be delimited by ';'")
mdxAwsUseSigv4 = flagutil.NewArrayBool("mdx.remoteWrite.aws.useSigv4", "Enables SigV4 request signing for the corresponding -remoteWrite.url. "+
"It is expected that other -remoteWrite.aws.* command-line flags are set if sigv4 request signing is enabled")
mdxAwsEC2Endpoint = flagutil.NewArrayString("mdx.remoteWrite.aws.ec2Endpoint", "Optional AWS EC2 API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
mdxAwsSTSEndpoint = flagutil.NewArrayString("mdx.remoteWrite.aws.stsEndpoint", "Optional AWS STS API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
mdxAwsRegion = flagutil.NewArrayString("mdx.remoteWrite.aws.region", "Optional AWS region to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
mdxAwsRoleARN = flagutil.NewArrayString("mdx.remoteWrite.aws.roleARN", "Optional AWS roleARN to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
mdxAwsAccessKey = flagutil.NewArrayString("mdx.remoteWrite.aws.accessKey", "Optional AWS AccessKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
mdxAwsService = flagutil.NewArrayString("mdx.remoteWrite.aws.service", "Optional AWS Service to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set. "+
"Defaults to \"aps\"")
mdxAwsSecretKey = flagutil.NewArrayString("mdx.remoteWrite.aws.secretKey", "Optional AWS SecretKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set")
basicAuthUsername = flagutil.NewArrayString("remoteWrite.basicAuth.username", "Optional basic auth username to use for the corresponding -remoteWrite.url")
basicAuthPassword = flagutil.NewArrayString("remoteWrite.basicAuth.password", "Optional basic auth password to use for the corresponding -remoteWrite.url")
basicAuthPasswordFile = flagutil.NewArrayString("remoteWrite.basicAuth.passwordFile", "Optional path to basic auth password to use for the corresponding -remoteWrite.url. "+
@@ -120,24 +176,24 @@ type client struct {
stopCh chan struct{}
}
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int) *client {
authCfg, err := getAuthConfig(argIdx)
func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqueue.FastQueue, concurrency int, flags *Flags) *client {
authCfg, err := getAuthConfig(argIdx, flags)
if err != nil {
logger.Fatalf("cannot initialize auth config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
awsCfg, err := getAWSAPIConfig(argIdx)
awsCfg, err := getAWSAPIConfig(argIdx, flags)
if err != nil {
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tr := httputil.NewTransport(false, "vmagent_remotewrite")
tr.TLSHandshakeTimeout = tlsHandshakeTimeout.GetOptionalArg(argIdx)
tr.TLSHandshakeTimeout = flags.tlsHandshakeTimeout.GetOptionalArg(argIdx)
tr.MaxConnsPerHost = 2 * concurrency
tr.MaxIdleConnsPerHost = 2 * concurrency
tr.IdleConnTimeout = time.Minute
tr.WriteBufferSize = 64 * 1024
pURL := proxyURL.GetOptionalArg(argIdx)
pURL := flags.proxyURL.GetOptionalArg(argIdx)
if len(pURL) > 0 {
if !strings.Contains(pURL, "://") {
logger.Fatalf("cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`", pURL)
@@ -150,11 +206,11 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
hc := &http.Client{
Transport: authCfg.NewRoundTripper(tr),
Timeout: sendTimeout.GetOptionalArg(argIdx),
Timeout: flags.sendTimeout.GetOptionalArg(argIdx),
}
retryMaxIntervalFlag := retryMaxTime
retryMaxIntervalFlag := flags.retryMaxTime
if retryMaxInterval.String() != "" {
retryMaxIntervalFlag = retryMaxInterval
retryMaxIntervalFlag = flags.retryMaxInterval
}
c := &client{
sanitizedURL: sanitizedURL,
@@ -163,14 +219,14 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
awsCfg: awsCfg,
fq: fq,
hc: hc,
retryMinInterval: retryMinInterval.GetOptionalArg(argIdx),
retryMinInterval: flags.retryMinInterval.GetOptionalArg(argIdx),
retryMaxInterval: retryMaxIntervalFlag.GetOptionalArg(argIdx),
stopCh: make(chan struct{}),
}
c.sendBlock = c.sendBlockHTTP
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
useVMProto := flags.forceVMProto.GetOptionalArg(argIdx)
usePromProto := flags.forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
logger.Fatalf("-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s", sanitizedURL)
}
@@ -184,16 +240,16 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
return c
}
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
func (c *client) init(argIdx, concurrency int, sanitizedURL string, flags *Flags) {
limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
if bytesPerSec := flags.rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
c.rl = ratelimiter.New(int64(bytesPerSec), limitReached, c.stopCh)
}
c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL))
c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL))
c.rateLimit = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_rate_limit{url=%q}`, c.sanitizedURL), func() float64 {
return float64(rateLimit.GetOptionalArg(argIdx))
return float64(flags.rateLimit.GetOptionalArg(argIdx))
})
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL))
c.requestsOKCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.sanitizedURL))
@@ -216,15 +272,15 @@ func (c *client) MustStop() {
logger.Infof("stopped client for -remoteWrite.url=%q", c.sanitizedURL)
}
func getAuthConfig(argIdx int) (*promauth.Config, error) {
headersValue := headers.GetOptionalArg(argIdx)
func getAuthConfig(argIdx int, flags *Flags) (*promauth.Config, error) {
headersValue := flags.headers.GetOptionalArg(argIdx)
var hdrs []string
if headersValue != "" {
hdrs = strings.Split(headersValue, "^^")
}
username := basicAuthUsername.GetOptionalArg(argIdx)
password := basicAuthPassword.GetOptionalArg(argIdx)
passwordFile := basicAuthPasswordFile.GetOptionalArg(argIdx)
username := flags.basicAuthUsername.GetOptionalArg(argIdx)
password := flags.basicAuthPassword.GetOptionalArg(argIdx)
passwordFile := flags.basicAuthPasswordFile.GetOptionalArg(argIdx)
var basicAuthCfg *promauth.BasicAuthConfig
if username != "" || password != "" || passwordFile != "" {
basicAuthCfg = &promauth.BasicAuthConfig{
@@ -234,34 +290,34 @@ func getAuthConfig(argIdx int) (*promauth.Config, error) {
}
}
token := bearerToken.GetOptionalArg(argIdx)
tokenFile := bearerTokenFile.GetOptionalArg(argIdx)
token := flags.bearerToken.GetOptionalArg(argIdx)
tokenFile := flags.bearerTokenFile.GetOptionalArg(argIdx)
var oauth2Cfg *promauth.OAuth2Config
clientSecret := oauth2ClientSecret.GetOptionalArg(argIdx)
clientSecretFile := oauth2ClientSecretFile.GetOptionalArg(argIdx)
clientSecret := flags.oauth2ClientSecret.GetOptionalArg(argIdx)
clientSecretFile := flags.oauth2ClientSecretFile.GetOptionalArg(argIdx)
if clientSecretFile != "" || clientSecret != "" {
endpointParamsJSON := oauth2EndpointParams.GetOptionalArg(argIdx)
endpointParamsJSON := flags.oauth2EndpointParams.GetOptionalArg(argIdx)
endpointParams, err := flagutil.ParseJSONMap(endpointParamsJSON)
if err != nil {
return nil, fmt.Errorf("cannot parse JSON for -remoteWrite.oauth2.endpointParams=%s: %w", endpointParamsJSON, err)
}
oauth2Cfg = &promauth.OAuth2Config{
ClientID: oauth2ClientID.GetOptionalArg(argIdx),
ClientID: flags.oauth2ClientID.GetOptionalArg(argIdx),
ClientSecret: promauth.NewSecret(clientSecret),
ClientSecretFile: clientSecretFile,
EndpointParams: endpointParams,
TokenURL: oauth2TokenURL.GetOptionalArg(argIdx),
Scopes: strings.Split(oauth2Scopes.GetOptionalArg(argIdx), ";"),
TokenURL: flags.oauth2TokenURL.GetOptionalArg(argIdx),
Scopes: strings.Split(flags.oauth2Scopes.GetOptionalArg(argIdx), ";"),
}
}
tlsCfg := &promauth.TLSConfig{
CAFile: tlsCAFile.GetOptionalArg(argIdx),
CertFile: tlsCertFile.GetOptionalArg(argIdx),
KeyFile: tlsKeyFile.GetOptionalArg(argIdx),
ServerName: tlsServerName.GetOptionalArg(argIdx),
InsecureSkipVerify: tlsInsecureSkipVerify.GetOptionalArg(argIdx),
CAFile: flags.tlsCAFile.GetOptionalArg(argIdx),
CertFile: flags.tlsCertFile.GetOptionalArg(argIdx),
KeyFile: flags.tlsKeyFile.GetOptionalArg(argIdx),
ServerName: flags.tlsServerName.GetOptionalArg(argIdx),
InsecureSkipVerify: flags.tlsInsecureSkipVerify.GetOptionalArg(argIdx),
}
opts := &promauth.Options{
@@ -279,17 +335,17 @@ func getAuthConfig(argIdx int) (*promauth.Config, error) {
return authCfg, nil
}
func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
if !awsUseSigv4.GetOptionalArg(argIdx) {
func getAWSAPIConfig(argIdx int, flags *Flags) (*awsapi.Config, error) {
if !flags.awsUseSigv4.GetOptionalArg(argIdx) {
return nil, nil
}
ec2Endpoint := awsEC2Endpoint.GetOptionalArg(argIdx)
stsEndpoint := awsSTSEndpoint.GetOptionalArg(argIdx)
region := awsRegion.GetOptionalArg(argIdx)
roleARN := awsRoleARN.GetOptionalArg(argIdx)
accessKey := awsAccessKey.GetOptionalArg(argIdx)
secretKey := awsSecretKey.GetOptionalArg(argIdx)
service := awsService.GetOptionalArg(argIdx)
ec2Endpoint := flags.awsEC2Endpoint.GetOptionalArg(argIdx)
stsEndpoint := flags.awsSTSEndpoint.GetOptionalArg(argIdx)
region := flags.awsRegion.GetOptionalArg(argIdx)
roleARN := flags.awsRoleARN.GetOptionalArg(argIdx)
accessKey := flags.awsAccessKey.GetOptionalArg(argIdx)
secretKey := flags.awsSecretKey.GetOptionalArg(argIdx)
service := flags.awsService.GetOptionalArg(argIdx)
cfg, err := awsapi.NewConfig(ec2Endpoint, stsEndpoint, region, roleARN, accessKey, secretKey, service, "")
if err != nil {
return nil, err

View File

@@ -20,7 +20,8 @@ import (
)
var (
unparsedLabelsGlobal = flagutil.NewArrayString("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to all -remoteWrite.url.")
unparsedLabels = flagutil.NewArrayString("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to all -remoteWrite.url.")
mdxUnparsedLabels = flagutil.NewArrayString("mdx.remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to all -remoteWrite.url.")
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabeling configs, which are applied "+
"to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. "+
"The path can point either to local file or to http url. "+
@@ -28,6 +29,9 @@ var (
relabelConfigPaths = flagutil.NewArrayString("remoteWrite.urlRelabelConfig", "Optional path to relabel configs for the corresponding -remoteWrite.url. "+
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/relabeling/")
mdxRelabelConfigPaths = flagutil.NewArrayString("mdx.remoteWrite.urlRelabelConfig", "Optional path to relabel configs for the corresponding -remoteWrite.url. "+
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/relabeling/")
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
@@ -141,11 +145,15 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*relabelConfigPaths), (len(*remoteWriteURLs)))
len(*relabelConfigPaths), len(*remoteWriteURLs))
}
if len(*mdxRelabelConfigPaths) > len(*mdxRemoteWriteURLs) {
return nil, fmt.Errorf("too many -mdx.remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -mdx.remoteWrite.url args: %d",
len(*relabelConfigPaths), len(*mdxRemoteWriteURLs))
}
var urlRelabelCfgs []any
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs)+len(*mdxRemoteWriteURLs))
for i, path := range *relabelConfigPaths {
if len(path) == 0 {
urlRelabelCfgs = append(urlRelabelCfgs, nil)
@@ -167,6 +175,27 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
urlRelabelCfgs = append(urlRelabelCfgs, nil)
}
}
for i, path := range *mdxRelabelConfigPaths {
if len(path) == 0 {
urlRelabelCfgs = append(urlRelabelCfgs, nil)
continue
}
prc, rawCfg, err := promrelabel.LoadRelabelConfigs(path)
if err != nil {
return nil, fmt.Errorf("cannot load relabel configs from -mdx.remoteWrite.urlRelabelConfig=%q: %w", path, err)
}
rcs.perURL[len(*remoteWriteURLs)+i] = prc
var parsedCfg any
_ = yaml.Unmarshal(rawCfg, &parsedCfg)
urlRelabelCfgs = append(urlRelabelCfgs, parsedCfg)
}
if len(*mdxRemoteWriteURLs) > len(*mdxRelabelConfigPaths) {
// fill the urlRelabelCfgs with empty relabel configs if not set
for i := len(*mdxRelabelConfigPaths); i < len(*mdxRemoteWriteURLs); i++ {
urlRelabelCfgs = append(urlRelabelCfgs, nil)
}
}
remoteWriteURLRelabelConfigData.Store(&urlRelabelCfgs)
return &rcs, nil
}
@@ -178,13 +207,16 @@ type relabelConfigs struct {
// isSet indicates whether (global or per-URL) command-line flags is set
func (rcs *relabelConfigs) isSet() bool {
return *relabelConfigPathGlobal != "" || len(*relabelConfigPaths) > 0
return *relabelConfigPathGlobal != "" || len(*relabelConfigPaths) > 0 || len(*mdxRelabelConfigPaths) > 0
}
// initLabelsGlobal must be called after parsing command-line flags.
func initLabelsGlobal() {
labelsGlobal = nil
for _, s := range *unparsedLabelsGlobal {
unparsedLabelsGlobal := make([]string, len(*unparsedLabels)+len(*mdxUnparsedLabels))
copy(unparsedLabelsGlobal, *unparsedLabels)
copy(unparsedLabelsGlobal[len(*unparsedLabels):], *mdxUnparsedLabels)
for _, s := range unparsedLabelsGlobal {
if len(s) == 0 {
continue
}

View File

@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
@@ -102,6 +103,193 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
mdxRemoteWriteURLs = flagutil.NewArrayString("mdx.remoteWrite.url", "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol "+
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
mdxQueues = flagutil.NewArrayInt("mdx.remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
mdxSignificantFigures = flagutil.NewArrayInt("mdx.remoteWrite.significantFigures", 0, "The number of significant figures to leave in metric values before writing them "+
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. "+
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits")
mdxRoundDigits = flagutil.NewArrayInt("mdx.remoteWrite.roundDigits", 100, "Round metric values to this number of decimal digits after the point before "+
"writing them to remote storage. "+
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+
"By default, digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+
"This option may be used for improving data compression for the stored metrics")
mdxDisableOnDiskQueue = flagutil.NewArrayBool("mdx.remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload")
mdxMaxPendingBytesPerURL = flagutil.NewArrayBytes("mdx.remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. "+
"Buffered data is stored in ~500MB chunks. It is recommended to set the value for this flag to a multiple of the block size 500MB. "+
"Disk usage is unlimited if the value is set to 0")
mdxDisableMetadataPerURL = flagutil.NewArrayBool("mdx.remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
)
// Flags contains all the array flags for remoteWrite and mdx.remoteWrite
// They have similar functions and flags, but the flags are indexed differently. This structure is used to organize them.
type Flags struct {
enableMdx bool
remoteWriteURLs *flagutil.ArrayString
maxPendingBytesPerURL *flagutil.ArrayBytes
queues *flagutil.ArrayInt
significantFigures *flagutil.ArrayInt
roundDigits *flagutil.ArrayInt
disableOnDiskQueue *flagutil.ArrayBool
disableMetadataPerURL *flagutil.ArrayBool
streamAggrConfig *flagutil.ArrayString
streamAggrDropInput *flagutil.ArrayBool
streamAggrKeepInput *flagutil.ArrayBool
streamAggrDedupInterval *flagutil.ArrayDuration
streamAggrIgnoreOldSamples *flagutil.ArrayBool
streamAggrIgnoreFirstIntervals *flagutil.ArrayInt
streamAggrDropInputLabels *flagutil.ArrayString
streamAggrEnableWindows *flagutil.ArrayBool
forcePromProto *flagutil.ArrayBool
forceVMProto *flagutil.ArrayBool
rateLimit *flagutil.ArrayInt
sendTimeout *flagutil.ArrayDuration
retryMaxTime *flagutil.ArrayDuration
retryMinInterval *flagutil.ArrayDuration
retryMaxInterval *flagutil.ArrayDuration
proxyURL *flagutil.ArrayString
tlsHandshakeTimeout *flagutil.ArrayDuration
tlsInsecureSkipVerify *flagutil.ArrayBool
tlsCertFile *flagutil.ArrayString
tlsKeyFile *flagutil.ArrayString
tlsCAFile *flagutil.ArrayString
tlsServerName *flagutil.ArrayString
headers *flagutil.ArrayString
basicAuthUsername *flagutil.ArrayString
basicAuthPassword *flagutil.ArrayString
basicAuthPasswordFile *flagutil.ArrayString
bearerToken *flagutil.ArrayString
bearerTokenFile *flagutil.ArrayString
oauth2ClientID *flagutil.ArrayString
oauth2ClientSecret *flagutil.ArrayString
oauth2ClientSecretFile *flagutil.ArrayString
oauth2EndpointParams *flagutil.ArrayString
oauth2TokenURL *flagutil.ArrayString
oauth2Scopes *flagutil.ArrayString
awsUseSigv4 *flagutil.ArrayBool
awsEC2Endpoint *flagutil.ArrayString
awsSTSEndpoint *flagutil.ArrayString
awsRegion *flagutil.ArrayString
awsRoleARN *flagutil.ArrayString
awsAccessKey *flagutil.ArrayString
awsService *flagutil.ArrayString
awsSecretKey *flagutil.ArrayString
}
var (
remoteWriteFlags = &Flags{
enableMdx: false,
remoteWriteURLs: remoteWriteURLs,
queues: queues,
significantFigures: significantFigures,
roundDigits: roundDigits,
disableMetadataPerURL: disableMetadataPerURL,
disableOnDiskQueue: disableOnDiskQueue,
maxPendingBytesPerURL: maxPendingBytesPerURL,
streamAggrConfig: streamAggrConfig,
streamAggrDropInput: streamAggrDropInput,
streamAggrKeepInput: streamAggrKeepInput,
streamAggrDedupInterval: streamAggrDedupInterval,
streamAggrIgnoreOldSamples: streamAggrIgnoreOldSamples,
streamAggrIgnoreFirstIntervals: streamAggrIgnoreFirstIntervals,
streamAggrDropInputLabels: streamAggrDropInputLabels,
streamAggrEnableWindows: streamAggrEnableWindows,
forcePromProto: forcePromProto,
forceVMProto: forceVMProto,
rateLimit: rateLimit,
sendTimeout: sendTimeout,
retryMinInterval: retryMinInterval,
retryMaxInterval: retryMaxInterval,
proxyURL: proxyURL,
tlsHandshakeTimeout: tlsHandshakeTimeout,
tlsInsecureSkipVerify: tlsInsecureSkipVerify,
tlsCertFile: tlsCertFile,
tlsKeyFile: tlsKeyFile,
tlsCAFile: tlsCAFile,
tlsServerName: tlsServerName,
headers: headers,
retryMaxTime: retryMaxTime,
basicAuthPassword: basicAuthPassword,
basicAuthPasswordFile: basicAuthPasswordFile,
basicAuthUsername: basicAuthUsername,
bearerToken: bearerToken,
bearerTokenFile: bearerTokenFile,
oauth2ClientID: oauth2ClientID,
oauth2ClientSecret: oauth2ClientSecret,
oauth2ClientSecretFile: oauth2ClientSecretFile,
oauth2EndpointParams: oauth2EndpointParams,
oauth2TokenURL: oauth2TokenURL,
oauth2Scopes: oauth2Scopes,
awsUseSigv4: awsUseSigv4,
awsEC2Endpoint: awsEC2Endpoint,
awsSTSEndpoint: awsSTSEndpoint,
awsRegion: awsRegion,
awsRoleARN: awsRoleARN,
awsAccessKey: awsAccessKey,
awsService: awsService,
awsSecretKey: awsSecretKey,
}
mdxRemoteWriteFlags = &Flags{
enableMdx: true,
remoteWriteURLs: mdxRemoteWriteURLs,
queues: mdxQueues,
significantFigures: mdxSignificantFigures,
roundDigits: mdxRoundDigits,
disableMetadataPerURL: mdxDisableMetadataPerURL,
disableOnDiskQueue: mdxDisableOnDiskQueue,
maxPendingBytesPerURL: mdxMaxPendingBytesPerURL,
streamAggrConfig: mdxStreamAggrConfig,
streamAggrDropInput: mdxStreamAggrDropInput,
streamAggrKeepInput: mdxStreamAggrKeepInput,
streamAggrDedupInterval: mdxStreamAggrDedupInterval,
streamAggrIgnoreOldSamples: mdxStreamAggrIgnoreOldSamples,
streamAggrIgnoreFirstIntervals: mdxStreamAggrIgnoreFirstIntervals,
streamAggrDropInputLabels: mdxStreamAggrDropInputLabels,
streamAggrEnableWindows: mdxStreamAggrEnableWindows,
forcePromProto: mdxForcePromProto,
forceVMProto: mdxForceVMProto,
rateLimit: mdxRateLimit,
sendTimeout: mdxSendTimeout,
retryMinInterval: mdxRetryMinInterval,
retryMaxInterval: mdxRetryMaxInterval,
proxyURL: mdxProxyURL,
tlsHandshakeTimeout: mdxTlsHandshakeTimeout,
tlsInsecureSkipVerify: mdxTlsInsecureSkipVerify,
tlsCertFile: mdxTlsCertFile,
tlsKeyFile: mdxTlsKeyFile,
tlsCAFile: mdxTlsCAFile,
tlsServerName: mdxTlsServerName,
headers: mdxHeaders,
retryMaxTime: mdxRetryMaxTime,
basicAuthPassword: mdxBasicAuthPassword,
basicAuthPasswordFile: mdxBasicAuthPasswordFile,
basicAuthUsername: mdxBasicAuthUsername,
bearerToken: mdxBearerToken,
bearerTokenFile: mdxBearerTokenFile,
oauth2ClientID: mdxOauth2ClientID,
oauth2ClientSecret: mdxOauth2ClientSecret,
oauth2ClientSecretFile: mdxOauth2ClientSecretFile,
oauth2EndpointParams: mdxOauth2EndpointParams,
oauth2TokenURL: mdxOauth2TokenURL,
oauth2Scopes: mdxOauth2Scopes,
awsUseSigv4: mdxAwsUseSigv4,
awsEC2Endpoint: mdxAwsEC2Endpoint,
awsSTSEndpoint: mdxAwsSTSEndpoint,
awsRegion: mdxAwsRegion,
awsRoleARN: mdxAwsRoleARN,
awsAccessKey: mdxAwsAccessKey,
awsService: mdxAwsService,
awsSecretKey: mdxAwsSecretKey,
}
)
var (
@@ -148,8 +336,9 @@ const persistentQueueDirname = "persistent-queue"
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags() {
if !*showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
// remoteWrite.url and mdx.remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil.RegisterSecretFlag("remoteWrite.url")
flagutil.RegisterSecretFlag("mdx.remoteWrite.url")
}
}
@@ -164,8 +353,8 @@ var (
//
// Stop must be called for graceful shutdown.
func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
if len(*remoteWriteURLs) == 0 && len(*mdxRemoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` or `-mdx.remoteWrite.url` command-line flag must be set")
}
if limit := getMaxHourlySeries(); limit > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
@@ -204,7 +393,7 @@ func Init() {
initStreamAggrConfigGlobal()
initRemoteWriteCtxs(*remoteWriteURLs)
initRemoteWriteCtxs(*remoteWriteURLs, *mdxRemoteWriteURLs)
disableOnDiskQueues := []bool(*disableOnDiskQueue)
disableOnDiskQueueAny = slices.Contains(disableOnDiskQueues, true)
@@ -213,7 +402,7 @@ func Init() {
// In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url
// if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples
// to the remaining -remoteWrite.url and dropping them on the blocked queue.
dropSamplesOnFailureGlobal = *dropSamplesOnOverload || disableOnDiskQueueAny && len(*remoteWriteURLs) > 1
dropSamplesOnFailureGlobal = *dropSamplesOnOverload || disableOnDiskQueueAny && len(*remoteWriteURLs)+len(*mdxRemoteWriteURLs) > 1
dropDanglingQueues()
@@ -264,12 +453,15 @@ func dropDanglingQueues() {
}
}
func initRemoteWriteCtxs(urls []string) {
if len(urls) == 0 {
func initRemoteWriteCtxs(urls []string, mdxUrls []string) {
if len(urls) == 0 && len(mdxUrls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
}
rwctxs := make([]*remoteWriteCtx, len(urls))
rwctxIdx := make([]int, len(urls))
if len(mdxUrls) != 0 && *shardByURL {
logger.Panicf("-remoteWrite.shardByURL should be false when -mdx.remoteWrite.url is set.")
}
rwctxs := make([]*remoteWriteCtx, len(urls)+len(mdxUrls))
rwctxIdx := make([]int, len(urls)+len(mdxUrls))
if retryMaxTime.String() != "" {
logger.Warnf("-remoteWrite.retryMaxTime is deprecated; use -remoteWrite.retryMaxInterval instead")
}
@@ -286,14 +478,32 @@ func initRemoteWriteCtxs(urls []string) {
rwctxIdx[i] = i
}
normalRWLens := len(urls)
for i, remoteWriteURLRaw := range mdxUrls {
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
if err != nil {
logger.Fatalf("invalid -mdx.remoteWrite.url=%q: %s", remoteWriteURL, err)
}
sanitizedURL := fmt.Sprintf("%d:secret-url", normalRWLens+i+1)
if *showRemoteWriteURL {
sanitizedURL = fmt.Sprintf("%d:%s", normalRWLens+i+1, remoteWriteURL)
}
rwctxs[normalRWLens+i] = newRemoteWriteCtx(normalRWLens+i, remoteWriteURL, sanitizedURL)
rwctxIdx[normalRWLens+i] = normalRWLens + i
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
consistentHashNodes := make([]string, 0, normalRWLens)
for i, url := range urls {
consistentHashNodes = append(consistentHashNodes, fmt.Sprintf("%d:%s", i+1, url))
}
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
}
if len(mdxUrls) != 0 {
mdx.InitGlobalFilter()
}
rwctxsGlobal = rwctxs
rwctxsGlobalIdx = rwctxIdx
}
@@ -356,6 +566,7 @@ func Stop() {
if sl := dailySeriesLimiter; sl != nil {
sl.MustStop()
}
mdx.GlobalFilter.MustStop()
}
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
@@ -837,22 +1048,26 @@ type remoteWriteCtx struct {
// otherwise by the global -enableMetadata flag.
enableMetadata bool
enableMdx bool
pss []*pendingSeries
pssNextIdx atomic.Uint64
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
rowsDroppedByMdx *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
rowsDroppedOnPushFailure *metrics.Counter
flags *Flags
}
// isMetadataEnabledForURL returns true if metadata should be sent to the remote storage at argIdx.
// It checks the per-URL -remoteWrite.disableMetadata flag first.
// If not set, it falls back to the global -enableMetadata flag.
func isMetadataEnabledForURL(argIdx int) bool {
if disableMetadataPerURL.GetOptionalArg(argIdx) {
func isMetadataEnabledForURL(flags *Flags, argIdx int) bool {
if flags.disableMetadataPerURL.GetOptionalArg(argIdx) {
// Metadata is explicitly disabled for this URL
return false
}
@@ -861,28 +1076,36 @@ func isMetadataEnabledForURL(argIdx int) bool {
}
func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string) *remoteWriteCtx {
flags := remoteWriteFlags
flagPrefix := "remoteWrite"
if argIdx >= len(*remoteWriteURLs) {
// mdx remote write url
argIdx -= len(*remoteWriteURLs)
flags = mdxRemoteWriteFlags
flagPrefix = "mdx.remoteWrite"
}
// strip query params, otherwise changing params resets pq
pqURL := *remoteWriteURL
pqURL.RawQuery = ""
pqURL.Fragment = ""
h := xxhash.Sum64([]byte(pqURL.String()))
queuePath := filepath.Join(*tmpDataPath, persistentQueueDirname, fmt.Sprintf("%d_%016X", argIdx+1, h))
maxPendingBytes := maxPendingBytesPerURL.GetOptionalArg(argIdx)
maxPendingBytes := flags.maxPendingBytesPerURL.GetOptionalArg(argIdx)
if maxPendingBytes != 0 && maxPendingBytes < persistentqueue.DefaultChunkFileSize {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4195
logger.Warnf("rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d", maxPendingBytes, persistentqueue.DefaultChunkFileSize)
logger.Warnf("rounding the -%s.maxDiskUsagePerURL=%d to the minimum supported value: %d", flagPrefix, maxPendingBytes, persistentqueue.DefaultChunkFileSize)
maxPendingBytes = persistentqueue.DefaultChunkFileSize
}
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
queuesSize := queues.GetOptionalArg(argIdx)
isPQDisabled := flags.disableOnDiskQueue.GetOptionalArg(argIdx)
queuesSize := flags.queues.GetOptionalArg(argIdx)
if queuesSize > maxQueues {
queuesSize = maxQueues
} else if queuesSize <= 0 {
queuesSize = 1
}
maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / *maxRowsPerBlock / 100
maxInmemoryBlocks := memory.Allowed() / (len(*remoteWriteURLs) + len(*mdxRemoteWriteURLs)) / *maxRowsPerBlock / 100
if maxInmemoryBlocks/queuesSize > 100 {
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
@@ -909,15 +1132,15 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
var c *client
switch remoteWriteURL.Scheme {
case "http", "https":
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, queuesSize)
c = newHTTPClient(argIdx, remoteWriteURL.String(), sanitizedURL, fq, queuesSize, flags)
default:
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
logger.Fatalf("unsupported scheme: %s for %sURL: %s, want `http`, `https`", flagPrefix, remoteWriteURL.Scheme, sanitizedURL)
}
c.init(argIdx, queuesSize, sanitizedURL)
c.init(argIdx, queuesSize, sanitizedURL, flags)
// Initialize pss
sf := significantFigures.GetOptionalArg(argIdx)
rd := roundDigits.GetOptionalArg(argIdx)
sf := flags.significantFigures.GetOptionalArg(argIdx)
rd := flags.roundDigits.GetOptionalArg(argIdx)
pssLen := queuesSize
if n := cgroup.AvailableCPUs(); pssLen > n {
// There is no sense in running more than availableCPUs concurrent pendingSeries,
@@ -928,16 +1151,22 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
idx := argIdx
if flags.enableMdx {
idx += len(*remoteWriteURLs)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
idx: idx,
fq: fq,
c: c,
pss: pss,
enableMetadata: isMetadataEnabledForURL(argIdx),
enableMetadata: isMetadataEnabledForURL(flags, argIdx),
enableMdx: flags.enableMdx,
flags: flags,
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByMdx: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
metadataDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_metadata_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
@@ -973,6 +1202,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
rwctx.rowsDroppedByMdx = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -990,17 +1220,30 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
putRelabelCtx(rctx)
}()
if rwctx.enableMdx && mdx.GlobalFilter != nil {
// Make a copy of tss
rctx = getRelabelCtx()
rowsCountBeforeMdx := getRowsCount(tss)
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = mdx.GlobalFilter.Filter(tss, *v)
rowsCountAfterMdx := getRowsCount(tss)
rwctx.rowsDroppedByMdx.Add(rowsCountBeforeMdx - rowsCountAfterMdx)
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
if pcs.Len() > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
if rctx == nil {
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, pcs)
rowsCountAfterRelabel := getRowsCount(tss)

View File

@@ -65,6 +65,32 @@ var (
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
mdxStreamAggrConfig = flagutil.NewArrayString("mdx.remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -mdx.remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
mdxStreamAggrDropInput = flagutil.NewArrayBool("mdx.remoteWrite.streamAggr.dropInput", "Whether to drop input samples that not matching any rule in "+
"the corresponding -remoteWrite.streamAggr.config. By default, only matched raw samples are dropped, while unmatched samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
mdxStreamAggrKeepInput = flagutil.NewArrayBool("mdx.remoteWrite.streamAggr.keepInput", "Whether to keep input samples that match any rule in "+
"the corresponding -remoteWrite.streamAggr.config. By default, matched raw samples are aggregated and dropped, while unmatched samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
mdxStreamAggrDedupInterval = flagutil.NewArrayDuration("mdx.remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication")
mdxStreamAggrIgnoreOldSamples = flagutil.NewArrayBool("mdx.remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples")
mdxStreamAggrIgnoreFirstIntervals = flagutil.NewArrayInt("mdx.remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+
"for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving buffered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignore-aggregation-intervals-on-start")
mdxStreamAggrDropInputLabels = flagutil.NewArrayString("mdx.remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+
"Multiple labels per remoteWrite.url must be delimited by '^^': -mdx.remoteWrite.streamAggr.dropInputLabels='replica^^az,replica'. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#dropping-unneeded-labels")
mdxStreamAggrEnableWindows = flagutil.NewArrayBool("mdx.remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
@@ -79,10 +105,20 @@ func CheckStreamAggrConfigs() error {
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", len(*streamAggrConfig), len(*remoteWriteURLs))
}
if len(*mdxStreamAggrConfig) > len(*mdxRemoteWriteURLs) {
return fmt.Errorf("too many -mdx.remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -mdx.remoteWrite.url args: %d", len(*streamAggrConfig), len(*remoteWriteURLs))
}
pushNoop := func(_ []prompb.TimeSeries) {}
for idx := range *streamAggrConfig {
sas, err := newStreamAggrConfigPerURL(idx, pushNoop)
sas, err := newStreamAggrConfigPerURL(idx, remoteWriteFlags, pushNoop)
if err != nil {
return err
}
sas.MustStop()
}
for idx := range *mdxStreamAggrConfig {
sas, err := newStreamAggrConfigPerURL(idx, mdxRemoteWriteFlags, pushNoop)
if err != nil {
return err
}
@@ -147,6 +183,9 @@ func initStreamAggrConfigGlobal() {
func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
idx := rwctx.idx
if rwctx.enableMdx {
idx = idx - len(*remoteWriteURLs)
}
sas, err := rwctx.newStreamAggrConfig()
if err != nil {
@@ -155,24 +194,28 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
if sas != nil {
filePath := sas.FilePath()
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx)
rwctx.streamAggrKeepInput = rwctx.flags.streamAggrKeepInput.GetOptionalArg(idx)
rwctx.streamAggrDropInput = rwctx.flags.streamAggrDropInput.GetOptionalArg(idx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
}
dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx)
dedupInterval := rwctx.flags.streamAggrDedupInterval.GetOptionalArg(idx)
if dedupInterval > 0 {
alias := fmt.Sprintf("dedup-%d", idx+1)
var dropLabels []string
if streamAggrDropInputLabels.GetOptionalArg(idx) != "" {
dropLabels = strings.Split(streamAggrDropInputLabels.GetOptionalArg(idx), "^^")
if rwctx.flags.streamAggrDropInputLabels.GetOptionalArg(idx) != "" {
dropLabels = strings.Split(rwctx.flags.streamAggrDropInputLabels.GetOptionalArg(idx), "^^")
}
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, *streamAggrGlobalEnableWindows, dedupInterval, dropLabels, alias)
}
}
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
idx := rwctx.idx
if rwctx.enableMdx {
idx = idx - len(*remoteWriteURLs)
}
path := rwctx.flags.streamAggrConfig.GetOptionalArg(idx)
if path == "" {
return
}
@@ -224,30 +267,34 @@ func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) {
}
func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, error) {
return newStreamAggrConfigPerURL(rwctx.idx, rwctx.pushInternalTrackDropped)
idx := rwctx.idx
if rwctx.flags.enableMdx {
idx = idx - len(*remoteWriteURLs)
}
return newStreamAggrConfigPerURL(idx, rwctx.flags, rwctx.pushInternalTrackDropped)
}
func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path := streamAggrConfig.GetOptionalArg(idx)
func newStreamAggrConfigPerURL(idx int, flags *Flags, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path := flags.streamAggrConfig.GetOptionalArg(idx)
if path == "" {
return nil, nil
}
alias := fmt.Sprintf("%d:secret-url", idx+1)
if *showRemoteWriteURL {
alias = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx))
alias = fmt.Sprintf("%d:%s", idx+1, flags.remoteWriteURLs.GetOptionalArg(idx))
}
var dropLabels []string
if streamAggrDropInputLabels.GetOptionalArg(idx) != "" {
dropLabels = strings.Split(streamAggrDropInputLabels.GetOptionalArg(idx), "^^")
if flags.streamAggrDropInputLabels.GetOptionalArg(idx) != "" {
dropLabels = strings.Split(flags.streamAggrDropInputLabels.GetOptionalArg(idx), "^^")
}
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DedupInterval: flags.streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: dropLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx),
KeepInput: streamAggrKeepInput.GetOptionalArg(idx),
EnableWindows: streamAggrEnableWindows.GetOptionalArg(idx),
IgnoreOldSamples: flags.streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: flags.streamAggrIgnoreFirstIntervals.GetOptionalArg(idx),
KeepInput: flags.streamAggrKeepInput.GetOptionalArg(idx),
EnableWindows: flags.streamAggrEnableWindows.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias)

View File

@@ -25,6 +25,7 @@ The sandbox cluster installation runs under the constant load generated by
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
## [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.141.0)

View File

@@ -268,6 +268,29 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The MDX (Monitoring Data eXchange) is a monitoring of monitoring metrics collection and sharing service.
It aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit.
The number of dropped rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_dropped_total`.
`vmagent` will maintain the information for the discovered VictoriaMetrics instances and expose the number of these instances via `vmagent_mdx_tracked_vm_instances` metric.
To prevent permanently offline VictoriaMetrics instances from continuously consuming `vmagent`'s memory, you also need to set `-mdx.instanceEntryTTL` to indicate that if no metrics are received from a VictoriaMetrics instance for the configured period,
then the information of this instance should be cleaned up from the memory. The value should be several times the scrape interval to prevent instances from being mistakenly cleaned up due to occasional network latency.
To enable MDX, set `-remoteWrite.mdx.enable` for the target URL and `-mdx.instanceEntryTTL`:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.instanceEntryTTL=60s
```
### Life of a sample
@@ -285,18 +308,20 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

135
lib/mdx/filter.go Normal file
View File

@@ -0,0 +1,135 @@
package mdx
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics"
)
var (
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "0", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
type Filter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
mdxTrackedVmInstances *metrics.Gauge
}
var GlobalFilter *Filter
func InitGlobalFilter() {
if mdxInstanceEntryTTL.Milliseconds() == 0 {
logger.Warnf("MDX instance entry cleanup mechanism will be disabled without explicilty setting -mdx.instanceEntryTTL.")
return
}
GlobalFilter = &Filter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
GlobalFilter.mdxTrackedVmInstances = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
GlobalFilter.mu.RLock()
n := len(GlobalFilter.vmInstance)
GlobalFilter.mu.RUnlock()
return float64(n)
})
GlobalFilter.wg.Go(GlobalFilter.cleanStale)
}
func (filter *Filter) cleanStale() {
ttlSec := int64(mdxInstanceEntryTTL.Duration().Seconds())
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < ttlSec {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *Filter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
for _, ts := range tss {
isVmInstance := false
var instance string
var job string
for _, label := range ts.Labels {
if label.Name == "__name__" && label.Value == "vm_app_version" {
isVmInstance = true
}
if label.Name == "instance" {
instance = label.Value
}
if label.Name == "job" {
job = label.Value
}
}
if len(job) == 0 || len(instance) == 0 {
continue
}
identicalKey := fmt.Sprintf("%q:%q", job, instance)
currTs := time.Now().Unix()
//fast path
filter.mu.RLock()
ptr, ok := filter.vmInstance[identicalKey]
filter.mu.RUnlock()
if ok {
ptr.Store(currTs)
resTss = append(resTss, ts)
continue
}
if !isVmInstance {
continue
}
// slow path
resTss = append(resTss, ts)
filter.mu.Lock()
if ptr, ok = filter.vmInstance[identicalKey]; ok {
ptr.Store(currTs)
} else {
v := atomic.Int64{}
v.Store(currTs)
filter.vmInstance[identicalKey] = &v
}
filter.mu.Unlock()
}
return resTss
}

181
lib/mdx/filter_test.go Normal file
View File

@@ -0,0 +1,181 @@
package mdx
import (
"fmt"
"sort"
"strings"
"sync/atomic"
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}
func timeSeriesToString(ts prompb.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
return fmt.Sprintf("%s\n", labelsString)
}
func TestMdxFilter(t *testing.T) {
filter := Filter{
vmInstance: make(map[string]*atomic.Int64),
}
_ = mdxInstanceEntryTTL.Set("120s")
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
t.Helper()
output := filter.Filter(input, []prompb.TimeSeries{})
if len(output) != len(expectedOutput) {
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
}
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
}
logger.Infof(timeSeriessToString(output))
if len(filter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k, _ := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
},
}, map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
}
func TestMdxInstanceCleanup(t *testing.T) {
t.Helper()
synctest.Test(t, func(t *testing.T) {
_ = mdxInstanceEntryTTL.Set("10s")
InitGlobalFilter()
// init instance list
GlobalFilter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
f := func(expectedInstanceMap map[string]int64) {
t.Helper()
if len(GlobalFilter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(GlobalFilter.vmInstance), len(expectedInstanceMap))
}
for k, _ := range expectedInstanceMap {
if GlobalFilter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
// receive samples from victoria-metrics1:8428 after 9 seconds.
time.Sleep(9 * time.Second)
GlobalFilter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
// no samples from vmagent1:8429 in the last 10 seconds, so it should be removed from the mdx instance list.
time.Sleep(9 * time.Second)
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
})
GlobalFilter.MustStop()
})
}