mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-09 11:54:31 +03:00
Compare commits
4 Commits
issue-1060
...
v1.145.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3147f9c24b | ||
|
|
db8c9badbf | ||
|
|
2dc487d125 | ||
|
|
4cb5fae347 |
@@ -12,7 +12,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -104,9 +103,6 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -308,10 +304,6 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
fs.RegisterPathFsMetrics(*tmpDataPath)
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -867,7 +859,6 @@ type remoteWriteCtx struct {
|
||||
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
mdxFilter *mdx.Filter
|
||||
|
||||
streamAggrKeepInput bool
|
||||
streamAggrDropInput bool
|
||||
@@ -882,7 +873,6 @@ type remoteWriteCtx struct {
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
rowsPreservedByMdx *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -969,6 +959,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
@@ -985,15 +976,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
|
||||
if enableMdx.GetOptionalArg(argIdx) {
|
||||
rwctx.mdxFilter = mdx.NewFilter()
|
||||
rwctx.rowsPreservedByMdx = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_mdx_tracked_vm_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(rwctx.mdxFilter.VmInstancesCount())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return rwctx
|
||||
}
|
||||
|
||||
@@ -1007,10 +989,6 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
if rwctx.mdxFilter != nil {
|
||||
rwctx.mdxFilter.MustStop()
|
||||
rwctx.mdxFilter = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
@@ -1026,7 +1004,6 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
rwctx.rowsPreservedByMdx = nil
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -1035,38 +1012,24 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||
var rctx *relabelCtx
|
||||
var v *[]prompb.TimeSeries
|
||||
acquiredTssFromPool := false
|
||||
defer func() {
|
||||
if acquiredTssFromPool {
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
}
|
||||
if rctx != nil {
|
||||
putRelabelCtx(rctx)
|
||||
if rctx == nil {
|
||||
return
|
||||
}
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
if rwctx.mdxFilter != nil {
|
||||
acquiredTssFromPool = true
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = rwctx.mdxFilter.Filter(tss, *v)
|
||||
rowsCountAfterMdx := getRowsCount(tss)
|
||||
rwctx.rowsPreservedByMdx.Add(rowsCountAfterMdx)
|
||||
if len(tss) == 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
if pcs.Len() > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
rctx = getRelabelCtx()
|
||||
acquiredTssFromPool = true
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
@@ -1086,7 +1049,6 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
acquiredTssFromPool = true
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
@@ -1096,7 +1058,6 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
acquiredTssFromPool = true
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
|
||||
1
app/vmselect/vmui/assets/index-BBUnmLOr.css
Normal file
1
app/vmselect/vmui/assets/index-BBUnmLOr.css
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -37,11 +37,11 @@
|
||||
<meta property="og:title" content="UI for VictoriaMetrics">
|
||||
<meta property="og:url" content="https://victoriametrics.com/">
|
||||
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
|
||||
<script type="module" crossorigin src="./assets/index-U3iNn2Tx.js"></script>
|
||||
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
|
||||
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
|
||||
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
|
||||
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
|
||||
<link rel="stylesheet" crossorigin href="./assets/index-BL7jEFBa.css">
|
||||
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
|
||||
</head>
|
||||
<body>
|
||||
<noscript>You need to enable JavaScript to run this app.</noscript>
|
||||
|
||||
@@ -25,10 +25,14 @@ The sandbox cluster installation runs under the constant load generated by
|
||||
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
## tip
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Release candidate
|
||||
|
||||
* SECURITY: upgrade Go builder from Go1.26.3 to Go1.26.4. See [the list of issues addressed in Go1.26.4](https://github.com/golang/go/issues?q=milestone%3AGo1.26.4%20label%3ACherryPickApproved).
|
||||
|
||||
* FEATURE: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add the new metrics `vm_downsampling_partitions_scheduled_rows` and `vm_retention_filters_partitions_scheduled_rows` for measuring background historical data merge completion time. See [#10960](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10960)
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): support `match[]=<label_selector>` query parameters in `/api/v1/rules` and `/api/v1/alerts` APIs to return only the rules that have configured labels satisfying the provided label selectors. See [11020](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11020).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) : introduce `vmagent_remotewrite_kafka_outbuf_latency_seconds` and `vmagent_remotewrite_kafka_rtt_seconds` metrics for [kafka integration](https://docs.victoriametrics.com/victoriametrics/integrations/kafka/). The metrics could help identify throughput bottlenecks. See [#10730](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730).
|
||||
|
||||
@@ -268,31 +268,6 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The MDX (Monitoring Data eXchange) feature aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true
|
||||
```
|
||||
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward the metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit,
|
||||
or the metrics contain the label specified by `mdx.label`:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.label="service=victoriametrics"
|
||||
```
|
||||
|
||||
The number of preserved rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -310,20 +285,18 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
|
||||
4
go.mod
4
go.mod
@@ -25,6 +25,7 @@ require (
|
||||
github.com/googleapis/gax-go/v2 v2.22.0
|
||||
github.com/influxdata/influxdb v1.12.4
|
||||
github.com/klauspost/compress v1.18.5
|
||||
github.com/oklog/ulid/v2 v2.1.1
|
||||
github.com/prometheus/prometheus v0.311.3
|
||||
github.com/urfave/cli/v2 v2.27.7
|
||||
github.com/valyala/fastjson v1.6.10
|
||||
@@ -109,7 +110,6 @@ require (
|
||||
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
|
||||
github.com/oklog/ulid/v2 v2.1.1 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.150.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.150.0 // indirect
|
||||
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.150.0 // indirect
|
||||
@@ -155,7 +155,7 @@ require (
|
||||
go.uber.org/zap v1.27.1 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.4 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||
golang.org/x/crypto v0.51.0 // indirect
|
||||
golang.org/x/crypto v0.52.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/term v0.43.0 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@@ -509,8 +509,8 @@ go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
|
||||
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
|
||||
golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988=
|
||||
golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc=
|
||||
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f h1:W3F4c+6OLc6H2lb//N1q4WpJkhzJCK5J6kUi1NTVXfM=
|
||||
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f/go.mod h1:J1xhfL/vlindoeF/aINzNzt2Bket5bjo9sdOYzOsU80=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
|
||||
@@ -1,159 +0,0 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
var (
|
||||
vmLabel = flag.String("mdx.label", "", "Optional label in the form 'name=value' to identify metrics from VictoriaMetrics. The metrics contain this label will be kept and sent to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`.")
|
||||
)
|
||||
|
||||
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
|
||||
type Filter struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
vmInstance map[string]*atomic.Int64
|
||||
filterByLabel bool
|
||||
filterByCustomLabelName string
|
||||
filterByCustomLabelValue string
|
||||
}
|
||||
|
||||
func NewFilter() *Filter {
|
||||
filter := &Filter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
if len(*vmLabel) != 0 {
|
||||
n := strings.IndexByte(*vmLabel, '=')
|
||||
if n < 0 {
|
||||
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
filter.filterByCustomLabelName = (*vmLabel)[:n]
|
||||
filter.filterByCustomLabelValue = (*vmLabel)[n+1:]
|
||||
filter.filterByLabel = true
|
||||
}
|
||||
|
||||
filter.wg.Go(filter.cleanStale)
|
||||
return filter
|
||||
}
|
||||
|
||||
func (filter *Filter) VmInstancesCount() int {
|
||||
filter.mu.RLock()
|
||||
defer filter.mu.RUnlock()
|
||||
return len(filter.vmInstance)
|
||||
|
||||
}
|
||||
|
||||
func (filter *Filter) cleanStale() {
|
||||
entryTTL := time.Hour * 1
|
||||
ttlSec := int64(entryTTL.Seconds())
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
filter.mu.Lock()
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
|
||||
for k, v := range filter.vmInstance {
|
||||
if currTs-v.Load() < ttlSec {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
if len(dst) != len(filter.vmInstance) {
|
||||
filter.vmInstance = dst
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
case <-filter.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (filter *Filter) MustStop() {
|
||||
if filter == nil {
|
||||
return
|
||||
}
|
||||
close(filter.stopCh)
|
||||
filter.wg.Wait()
|
||||
}
|
||||
|
||||
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
currTs := time.Now().Unix()
|
||||
var identicalKey []byte
|
||||
|
||||
nextTss:
|
||||
for _, ts := range tss {
|
||||
var hasVersionLabel, triedJobInstance bool
|
||||
var job, instance string
|
||||
for _, label := range ts.Labels {
|
||||
if filter.filterByLabel && label.Name == filter.filterByCustomLabelName && label.Value == filter.filterByCustomLabelValue {
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
|
||||
if label.Name == "__name__" && label.Value == "vm_app_version" {
|
||||
hasVersionLabel = true
|
||||
}
|
||||
if instance == "" && label.Name == "instance" {
|
||||
if label.Value == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
instance = label.Value
|
||||
}
|
||||
if job == "" && label.Name == "job" {
|
||||
if label.Value == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
job = label.Value
|
||||
}
|
||||
if !triedJobInstance && job != "" && instance != "" {
|
||||
identicalKey = identicalKey[:0]
|
||||
identicalKey = strconv.AppendQuote(identicalKey, job)
|
||||
identicalKey = append(identicalKey, ':')
|
||||
identicalKey = strconv.AppendQuote(identicalKey, instance)
|
||||
filter.mu.RLock()
|
||||
ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)]
|
||||
filter.mu.RUnlock()
|
||||
if found {
|
||||
ptr.Store(currTs)
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
triedJobInstance = true
|
||||
}
|
||||
|
||||
if hasVersionLabel && job != "" && instance != "" {
|
||||
identicalKey = identicalKey[:0]
|
||||
identicalKey = strconv.AppendQuote(identicalKey, job)
|
||||
identicalKey = append(identicalKey, ':')
|
||||
identicalKey = strconv.AppendQuote(identicalKey, instance)
|
||||
|
||||
v := &atomic.Int64{}
|
||||
v.Store(currTs)
|
||||
|
||||
filter.mu.Lock()
|
||||
filter.vmInstance[string(identicalKey)] = v
|
||||
filter.mu.Unlock()
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
}
|
||||
}
|
||||
return resTss
|
||||
}
|
||||
@@ -1,216 +0,0 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
)
|
||||
|
||||
func timeSeriessToString(tss []prompb.TimeSeries) string {
|
||||
a := make([]string, len(tss))
|
||||
for i, ts := range tss {
|
||||
a[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return strings.Join(a, "")
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompb.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
|
||||
return fmt.Sprintf("%s\n", labelsString)
|
||||
}
|
||||
|
||||
func TestMdxInstanceFilter(t *testing.T) {
|
||||
|
||||
filter := NewFilter()
|
||||
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
output := filter.Filter(input, []prompb.TimeSeries{})
|
||||
if len(output) != len(expectedOutput) {
|
||||
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
|
||||
}
|
||||
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
|
||||
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
|
||||
}
|
||||
if len(filter.vmInstance) != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if filter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
f([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
}, map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestMdxFilterByLabel(t *testing.T) {
|
||||
*vmLabel = "service=victoriametrics"
|
||||
filter := NewFilter()
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
|
||||
t.Helper()
|
||||
output := filter.Filter(input, []prompb.TimeSeries{})
|
||||
if len(output) != len(expectedOutput) {
|
||||
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
|
||||
}
|
||||
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
|
||||
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
|
||||
}
|
||||
}
|
||||
f([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
})
|
||||
*vmLabel = ""
|
||||
}
|
||||
|
||||
func TestMdxInstanceCleanup(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
filter := NewFilter()
|
||||
|
||||
// init instance list
|
||||
filter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
f := func(expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
if len(filter.vmInstance) != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if filter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
f(map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
|
||||
})
|
||||
|
||||
// receive samples from victoria-metrics1:8428 after 9 seconds.
|
||||
time.Sleep(59 * time.Minute)
|
||||
filter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
|
||||
// no samples from vmagent1:8429 in the last 10 seconds, so it should be removed from the mdx instance list.
|
||||
time.Sleep(2 * time.Minute)
|
||||
f(map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
})
|
||||
filter.MustStop()
|
||||
})
|
||||
|
||||
}
|
||||
6
vendor/golang.org/x/crypto/chacha20poly1305/chacha20poly1305_amd64.go
generated
vendored
6
vendor/golang.org/x/crypto/chacha20poly1305/chacha20poly1305_amd64.go
generated
vendored
@@ -20,7 +20,7 @@ func chacha20Poly1305Open(dst []byte, key []uint32, src, ad []byte) bool
|
||||
func chacha20Poly1305Seal(dst []byte, key []uint32, src, ad []byte)
|
||||
|
||||
var (
|
||||
useAVX2 = cpu.X86.HasAVX2 && cpu.X86.HasBMI2
|
||||
useAVX2 = cpu.X86.HasSSSE3 && cpu.X86.HasAVX2 && cpu.X86.HasBMI2
|
||||
)
|
||||
|
||||
// setupState writes a ChaCha20 input matrix to state. See
|
||||
@@ -47,7 +47,7 @@ func setupState(state *[16]uint32, key *[32]byte, nonce []byte) {
|
||||
}
|
||||
|
||||
func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []byte {
|
||||
if !cpu.X86.HasSSSE3 {
|
||||
if !useAVX2 {
|
||||
return c.sealGeneric(dst, nonce, plaintext, additionalData)
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []
|
||||
}
|
||||
|
||||
func (c *chacha20poly1305) open(dst, nonce, ciphertext, additionalData []byte) ([]byte, error) {
|
||||
if !cpu.X86.HasSSSE3 {
|
||||
if !useAVX2 {
|
||||
return c.openGeneric(dst, nonce, ciphertext, additionalData)
|
||||
}
|
||||
|
||||
|
||||
5182
vendor/golang.org/x/crypto/chacha20poly1305/chacha20poly1305_amd64.s
generated
vendored
5182
vendor/golang.org/x/crypto/chacha20poly1305/chacha20poly1305_amd64.s
generated
vendored
File diff suppressed because it is too large
Load Diff
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -851,7 +851,7 @@ go.yaml.in/yaml/v2
|
||||
# go.yaml.in/yaml/v3 v3.0.4
|
||||
## explicit; go 1.16
|
||||
go.yaml.in/yaml/v3
|
||||
# golang.org/x/crypto v0.51.0
|
||||
# golang.org/x/crypto v0.52.0
|
||||
## explicit; go 1.25.0
|
||||
golang.org/x/crypto/chacha20
|
||||
golang.org/x/crypto/chacha20poly1305
|
||||
|
||||
Reference in New Issue
Block a user