Compare commits

..

3 Commits

Author SHA1 Message Date
f41gh7
517ce21792 address review comments and make linter happy
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2025-04-14 14:10:21 +02:00
f41gh7
f635ce90a9 address review comments 2025-04-14 13:32:00 +02:00
f41gh7
c3bbf12494 lib/metricnamestats: add new matchNames stats query filter
Introduce new query filter option matchNames for metricnamestats query Requests.
It allows to fetch stats for exact metric names. Which is useful for Explore Cardinality page.
It's also allows 3rd party tool to check if application metrics are used
for query requests.

 This commit adds the following changes:
* replaces MetricNamesUsageStats inline func query args with dedicated struct.
* bumps cluster RPC metricNamesUsageStats_v1 to metricNamesUsageStats_v2
  in order to properly encode new RequestQuery params
* adds new methods for MetricNameTracker

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6145
2025-04-14 13:05:35 +02:00
1596 changed files with 114705 additions and 98742 deletions

View File

@@ -5,10 +5,10 @@ body:
- type: markdown
attributes:
value: |
Before filling a bug report it would be great to [upgrade](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade)
Before filling a bug report it would be great to [upgrade](https://docs.victoriametrics.com/#how-to-upgrade)
to [the latest available release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest)
and verify whether the bug is reproducible there.
It's also recommended to read the [troubleshooting docs](https://docs.victoriametrics.com/victoriametrics/troubleshooting/) first.
It's also recommended to read the [troubleshooting docs](https://docs.victoriametrics.com/troubleshooting/) first.
- type: textarea
id: describe-the-bug
attributes:
@@ -64,8 +64,8 @@ body:
* [Grafana dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176)
See how to setup monitoring here:
* [monitoring for single-node VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#monitoring)
* [monitoring for VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#monitoring)
* [monitoring for single-node VictoriaMetrics](https://docs.victoriametrics.com/#monitoring)
* [monitoring for VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/#monitoring)
validations:
required: false
- type: textarea

View File

@@ -24,9 +24,9 @@ body:
label: Troubleshooting docs
description: I am familiar with the following troubleshooting docs
options:
- label: General - https://docs.victoriametrics.com/victoriametrics/troubleshooting/
- label: General - https://docs.victoriametrics.com/troubleshooting/
required: false
- label: vmagent - https://docs.victoriametrics.com/victoriametrics/vmagent/#troubleshooting
- label: vmagent - https://docs.victoriametrics.com/vmagent/#troubleshooting
required: false
- label: vmalert - https://docs.victoriametrics.com/victoriametrics/vmalert/#troubleshooting
- label: vmalert - https://docs.victoriametrics.com/vmalert/#troubleshooting
required: false

View File

@@ -6,4 +6,4 @@ Please provide a brief description of the changes you made. Be as specific as po
The following checks are **mandatory**:
- [ ] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/).

1
.gitignore vendored
View File

@@ -27,4 +27,3 @@ _site
coverage.txt
cspell.json
*~
deployment/docker/provisioning/plugins/

View File

@@ -1 +1 @@
The document has been moved [here](https://docs.victoriametrics.com/victoriametrics/contributing/).
The document has been moved [here](https://docs.victoriametrics.com/contributing/).

View File

@@ -5,6 +5,7 @@ MAKE_PARALLEL := $(MAKE) -j $(MAKE_CONCURRENCY)
DATEINFO_TAG ?= $(shell date -u +'%Y%m%d-%H%M%S')
BUILDINFO_TAG ?= $(shell echo $$(git describe --long --all | tr '/' '-')$$( \
git diff-index --quiet HEAD -- || echo '-dirty-'$$(git diff-index -u HEAD | openssl sha1 | cut -d' ' -f2 | cut -c 1-8)))
LATEST_TAG ?= cluster-latest
PKG_TAG ?= $(shell git tag -l --points-at HEAD)
ifeq ($(PKG_TAG),)
@@ -107,31 +108,12 @@ package: \
package-vmselect \
package-vmstorage
publish-latest:
PKG_TAG=$(TAG) APP_NAME=victoria-metrics $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmagent $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmalert $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmalert-tool $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmauth $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmbackup $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmrestore $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG) APP_NAME=vmctl $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG)-cluster APP_NAME=vminsert $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG)-cluster APP_NAME=vmselect $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG)-cluster APP_NAME=vmstorage $(MAKE) publish-via-docker-latest && \
PKG_TAG=$(TAG)-enterprise APP_NAME=vmgateway $(MAKE) publish-via-docker-latest
PKG_TAG=$(TAG)-enterprise APP_NAME=vmbackupmanager $(MAKE) publish-via-docker-latest
publish-victoria-logs-latest:
PKG_TAG=$(TAG) APP_NAME=victoria-logs $(MAKE) publish-via-docker-latest
PKG_TAG=$(TAG) APP_NAME=vlogscli $(MAKE) publish-via-docker-latest
publish-release:
rm -rf bin/*
git checkout $(TAG) && $(MAKE) release && $(MAKE) publish && \
git checkout $(TAG)-cluster && $(MAKE) release && $(MAKE) publish && \
git checkout $(TAG)-enterprise && $(MAKE) release && $(MAKE) publish && \
git checkout $(TAG)-enterprise-cluster && $(MAKE) release && $(MAKE) publish
git checkout $(TAG) && $(MAKE) release && LATEST_TAG=stable $(MAKE) publish && \
git checkout $(TAG)-cluster && $(MAKE) release && LATEST_TAG=cluster-stable $(MAKE) publish && \
git checkout $(TAG)-enterprise && $(MAKE) release && LATEST_TAG=enterprise-stable $(MAKE) publish && \
git checkout $(TAG)-enterprise-cluster && $(MAKE) release && LATEST_TAG=enterprise-cluster-stable $(MAKE) publish
release:
$(MAKE_PARALLEL) release-vmcluster
@@ -213,7 +195,7 @@ fmt:
gofmt -l -w -s ./apptest
vet:
GOEXPERIMENT=synctest go vet ./lib/...
go vet ./lib/...
go vet ./app/...
go vet ./apptest/...
@@ -222,35 +204,35 @@ check-all: fmt vet golangci-lint govulncheck
clean-checkers: remove-golangci-lint remove-govulncheck
test:
GOEXPERIMENT=synctest go test ./lib/... ./app/...
go test ./lib/... ./app/...
test-race:
GOEXPERIMENT=synctest go test -race ./lib/... ./app/...
go test -race ./lib/... ./app/...
test-pure:
GOEXPERIMENT=synctest CGO_ENABLED=0 go test ./lib/... ./app/...
CGO_ENABLED=0 go test ./lib/... ./app/...
test-full:
GOEXPERIMENT=synctest go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
test-full-386:
GOEXPERIMENT=synctest GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
integration-test: all
go test ./apptest/... -skip="^TestSingle.*"
integration-test: victoria-metrics vmagent vmalert vmauth
go test ./apptest/... -skip="^TestCluster.*"
benchmark:
GOEXPERIMENT=synctest go test -bench=. ./lib/...
go test -bench=. ./lib/...
go test -bench=. ./app/...
benchmark-pure:
GOEXPERIMENT=synctest CGO_ENABLED=0 go test -bench=. ./lib/...
CGO_ENABLED=0 go test -bench=. ./lib/...
CGO_ENABLED=0 go test -bench=. ./app/...
vendor-update:
go get -u ./lib/...
go get -u ./app/...
go mod tidy -compat=1.24
go mod tidy -compat=1.23
go mod vendor
app-local:
@@ -273,7 +255,7 @@ install-qtc:
golangci-lint: install-golangci-lint
GOEXPERIMENT=synctest golangci-lint run
golangci-lint run
install-golangci-lint:
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.64.7

View File

@@ -11,9 +11,9 @@
![Reddit](https://img.shields.io/reddit/subreddit-subscribers/VictoriaMetrics?style=flat&label=Join&labelColor=red&logoColor=white&logo=reddit&link=https%3A%2F%2Fwww.reddit.com%2Fr%2FVictoriaMetrics)
<picture>
<source srcset="docs/victoriametrics/logo_white.webp" media="(prefers-color-scheme: dark)">
<source srcset="docs/victoriametrics/logo.webp" media="(prefers-color-scheme: light)">
<img src="docs/victoriametrics/logo.webp" width="300" alt="VictoriaMetrics logo">
<source srcset="docs/logo_white.webp" media="(prefers-color-scheme: dark)">
<source srcset="docs/logo.webp" media="(prefers-color-scheme: light)">
<img src="docs/logo.webp" width="300" alt="VictoriaMetrics logo">
</picture>
VictoriaMetrics is a fast, cost-saving, and scalable solution for monitoring and managing time series data. It delivers high performance and reliability, making it an ideal choice for businesses of all sizes.
@@ -21,10 +21,10 @@ VictoriaMetrics is a fast, cost-saving, and scalable solution for monitoring and
Here are some resources and information about VictoriaMetrics:
- Documentation: [docs.victoriametrics.com](https://docs.victoriametrics.com)
- Case studies: [Grammarly, Roblox, Wix,...](https://docs.victoriametrics.com/victoriametrics/casestudies/).
- Case studies: [Grammarly, Roblox, Wix,...](https://docs.victoriametrics.com/casestudies/).
- Available: [Binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest), docker images [Docker Hub](https://hub.docker.com/r/victoriametrics/victoria-metrics/) and [Quay](https://quay.io/repository/victoriametrics/victoria-metrics), [Source code](https://github.com/VictoriaMetrics/VictoriaMetrics)
- Deployment types: [Single-node version](https://docs.victoriametrics.com/), [Cluster version](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/), and [Enterprise version](https://docs.victoriametrics.com/victoriametrics/enterprise/)
- Changelog: [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/), and [How to upgrade](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics)
- Deployment types: [Single-node version](https://docs.victoriametrics.com/), [Cluster version](https://docs.victoriametrics.com/cluster-victoriametrics/), and [Enterprise version](https://docs.victoriametrics.com/enterprise/)
- Changelog: [CHANGELOG](https://docs.victoriametrics.com/changelog/), and [How to upgrade](https://docs.victoriametrics.com/#how-to-upgrade-victoriametrics)
- Community: [Slack](https://slack.victoriametrics.com/), [X (Twitter)](https://x.com/VictoriaMetrics), [LinkedIn](https://www.linkedin.com/company/victoriametrics/), [YouTube](https://www.youtube.com/@VictoriaMetrics)
Yes, we open-source both the single-node VictoriaMetrics and the cluster version.
@@ -35,22 +35,22 @@ VictoriaMetrics is optimized for timeseries data, even when old time series are
* **Long-term storage for Prometheus** or as a drop-in replacement for Prometheus and Graphite in Grafana.
* **Powerful stream aggregation**: Can be used as a StatsD alternative.
* **Ideal for big data**: Works well with large amounts of time series data from APM, Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various [Enterprise workloads](https://docs.victoriametrics.com/victoriametrics/enterprise/).
* **Ideal for big data**: Works well with large amounts of time series data from APM, Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various [Enterprise workloads](https://docs.victoriametrics.com/enterprise/).
* **Query language**: Supports both PromQL and the more performant MetricsQL.
* **Easy to setup**: No dependencies, single [small binary](https://medium.com/@valyala/stripping-dependency-bloat-in-victoriametrics-docker-image-983fb5912b0d), configuration through command-line flags, but the default is also fine-tuned; backup and restore with [instant snapshots](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
* **Global query view**: Multiple Prometheus instances or any other data sources may ingest data into VictoriaMetrics and queried via a single query.
* **Various Protocols**: Support metric scraping, ingestion and backfilling in various protocol.
* [Prometheus exporters](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-scrape-prometheus-exporters-such-as-node-exporter), [Prometheus remote write API](https://docs.victoriametrics.com/victoriametrics/integrations/prometheus/), [Prometheus exposition format](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-data-in-prometheus-exposition-format).
* [InfluxDB line protocol](https://docs.victoriametrics.com/victoriametrics/integrations/influxdb/) over HTTP, TCP and UDP.
* [Graphite plaintext protocol](https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon).
* [OpenTSDB put message](https://docs.victoriametrics.com/victoriametrics/integrations/opentsdb/#sending-data-via-telnet).
* [HTTP OpenTSDB /api/put requests](https://docs.victoriametrics.com/victoriametrics/integrations/opentsdb/#sending-data-via-http).
* [JSON line format](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-data-in-json-line-format).
* [Arbitrary CSV data](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-csv-data).
* [Native binary format](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](https://docs.victoriametrics.com/victoriametrics/integrations/datadog/).
* [NewRelic infrastructure agent](https://docs.victoriametrics.com/victoriametrics/integrations/newrelic/#sending-data-from-agent).
* [OpenTelemetry metrics format](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#sending-data-via-opentelemetry).
* [Prometheus exporters](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter), [Prometheus remote write API](https://docs.victoriametrics.com/#prometheus-setup), [Prometheus exposition format](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format).
* [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP.
* [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon).
* [OpenTSDB put message](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol).
* [HTTP OpenTSDB /api/put requests](https://docs.victoriametrics.com/#sending-opentsdb-data-via-http-apiput-requests).
* [JSON line format](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).
* [Arbitrary CSV data](https://docs.victoriametrics.com/#how-to-import-csv-data).
* [Native binary format](https://docs.victoriametrics.com/#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent).
* [NewRelic infrastructure agent](https://docs.victoriametrics.com/#how-to-send-data-from-newrelic-agent).
* [OpenTelemetry metrics format](https://docs.victoriametrics.com/#sending-data-via-opentelemetry).
* **NFS-based storages**: Supports storing data on NFS-based storages such as Amazon EFS, Google Filestore.
* And many other features such as metrics relabeling, cardinality limiter, etc.
@@ -62,9 +62,9 @@ In addition, the Enterprise version includes extra features:
- **Backup automation**: Automates regular backup procedures.
- **Multiple retentions**: Reducing storage costs by specifying different retentions for different datasets.
- **Downsampling**: Reducing storage costs and increasing performance for queries over historical data.
- **Stable releases** with long-term support lines ([LTS](https://docs.victoriametrics.com/victoriametrics/lts-releases/)).
- **Stable releases** with long-term support lines ([LTS](https://docs.victoriametrics.com/lts-releases/)).
- **Comprehensive support**: First-class consulting, feature requests and technical support provided by the core VictoriaMetrics dev team.
- Many other features, which you can read about on [the Enterprise page](https://docs.victoriametrics.com/victoriametrics/enterprise/).
- Many other features, which you can read about on [the Enterprise page](https://docs.victoriametrics.com/enterprise/).
[Contact us](mailto:info@victoriametrics.com) if you need enterprise support for VictoriaMetrics. Or you can request a free trial license [here](https://victoriametrics.com/products/enterprise/trial/), downloaded Enterprise binaries are available at [Github Releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest).
@@ -77,7 +77,7 @@ Some good benchmarks VictoriaMetrics achieved:
* **Minimal memory footprint**: handling millions of unique timeseries with [10x less RAM](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) than InfluxDB, up to [7x less RAM](https://valyala.medium.com/prometheus-vs-victoriametrics-benchmark-on-node-exporter-metrics-4ca29c75590f) than Prometheus, Thanos or Cortex.
* **Highly scalable and performance** for [data ingestion](https://medium.com/@valyala/high-cardinality-tsdb-benchmarks-victoriametrics-vs-timescaledb-vs-influxdb-13e6ee64dd6b) and [querying](https://medium.com/@valyala/when-size-matters-benchmarking-victoriametrics-vs-timescale-and-influxdb-6035811952d4), [20x outperforms](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) InfluxDB and TimescaleDB.
* **High data compression**: [70x more data points](https://medium.com/@valyala/when-size-matters-benchmarking-victoriametrics-vs-timescale-and-influxdb-6035811952d4) may be stored into limited storage than TimescaleDB, [7x less storage](https://valyala.medium.com/prometheus-vs-victoriametrics-benchmark-on-node-exporter-metrics-4ca29c75590f) space is required than Prometheus, Thanos or Cortex.
* **Reducing storage costs**: [10x more effective](https://docs.victoriametrics.com/victoriametrics/casestudies/#grammarly) than Graphite according to the Grammarly case study.
* **Reducing storage costs**: [10x more effective](https://docs.victoriametrics.com/casestudies/#grammarly) than Graphite according to the Grammarly case study.
* **A single-node VictoriaMetrics** can replace medium-sized clusters built with competing solutions such as Thanos, M3DB, Cortex, InfluxDB or TimescaleDB. See [VictoriaMetrics vs Thanos](https://medium.com/@valyala/comparing-thanos-to-victoriametrics-cluster-b193bea1683), [Measuring vertical scalability](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae), [Remote write storage wars - PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/).
* **Optimized for storage**: [Works well with high-latency IO](https://medium.com/@valyala/high-cardinality-tsdb-benchmarks-victoriametrics-vs-timescaledb-vs-influxdb-13e6ee64dd6b) and low IOPS (HDD and network storage in AWS, Google Cloud, Microsoft Azure, etc.).
@@ -93,7 +93,7 @@ Feel free asking any questions regarding VictoriaMetrics:
* [Telegram-ru](https://t.me/VictoriaMetrics_ru1)
* [Mastodon](https://mastodon.social/@victoriametrics/)
If you like VictoriaMetrics and want to contribute, then please [read these docs](https://docs.victoriametrics.com/victoriametrics/contributing/).
If you like VictoriaMetrics and want to contribute, then please [read these docs](https://docs.victoriametrics.com/contributing/).
## VictoriaMetrics Logo

View File

@@ -6,9 +6,9 @@ The following versions of VictoriaMetrics receive regular security fixes:
| Version | Supported |
|---------|--------------------|
| [latest release](https://docs.victoriametrics.com/victoriametrics/changelog/) | :white_check_mark: |
| v1.102.x [LTS line](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | :white_check_mark: |
| v1.110.x [LTS line](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | :white_check_mark: |
| [latest release](https://docs.victoriametrics.com/changelog/) | :white_check_mark: |
| v1.102.x [LTS line](https://docs.victoriametrics.com/lts-releases/) | :white_check_mark: |
| v1.110.x [LTS line](https://docs.victoriametrics.com/lts-releases/) | :white_check_mark: |
| other releases | :x: |
See [this page](https://victoriametrics.com/security/) for more details.

View File

@@ -46,9 +46,7 @@ func main() {
vlselect.Init()
vlinsert.Init()
go httpserver.Serve(listenAddrs, requestHandler, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
})
go httpserver.Serve(listenAddrs, useProxyProtocol, requestHandler)
logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/victorialogs/", time.Since(startTime).Seconds())
pushmetrics.Init()

View File

@@ -95,7 +95,6 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool {
// There is no need in updating v2LogsRequestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
v2LogsRequestDuration.UpdateDuration(startTime)
w.WriteHeader(http.StatusAccepted)
fmt.Fprintf(w, `{}`)
return true
}

View File

@@ -102,7 +102,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
lmp := cp.NewLogMessageProcessor("elasticsearch_bulk", true)
encoding := r.Header.Get("Content-Encoding")
streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI)
n, err := readBulkRequest(streamName, r.Body, encoding, cp.TimeFields, cp.MsgFields, lmp)
n, err := readBulkRequest(streamName, r.Body, encoding, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose()
if err != nil {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s, stream fields: %s", n, err, cp.StreamFields)
@@ -131,7 +131,7 @@ var (
bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
)
func readBulkRequest(streamName string, r io.Reader, encoding string, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) (int, error) {
func readBulkRequest(streamName string, r io.Reader, encoding string, timeField string, msgFields []string, lmp insertutil.LogMessageProcessor) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
@@ -147,7 +147,7 @@ func readBulkRequest(streamName string, r io.Reader, encoding string, timeFields
n := 0
for {
ok, err := readBulkLine(lr, timeFields, msgFields, lmp)
ok, err := readBulkLine(lr, timeField, msgFields, lmp)
wcr.DecConcurrency()
if err != nil || !ok {
return n, err
@@ -156,7 +156,7 @@ func readBulkRequest(streamName string, r io.Reader, encoding string, timeFields
}
}
func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) (bool, error) {
func readBulkLine(lr *insertutil.LineReader, timeField string, msgFields []string, lmp insertutil.LogMessageProcessor) (bool, error) {
var line []byte
// Read the command, must be "create" or "index"
@@ -190,7 +190,7 @@ func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
ts, err := extractTimestampFromFields(timeFields, p.Fields)
ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err)
}
@@ -204,20 +204,18 @@ func readBulkLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp
return true, nil
}
func extractTimestampFromFields(timeFields []string, fields []logstorage.Field) (int64, error) {
for _, timeField := range timeFields {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseElasticsearchTimestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseElasticsearchTimestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
}
return 0, nil
}

View File

@@ -19,7 +19,7 @@ func TestReadBulkRequest_Failure(t *testing.T) {
tlp := &insertutil.TestLogMessageProcessor{}
r := bytes.NewBufferString(data)
rows, err := readBulkRequest("test", r, "", []string{"_time"}, []string{"_msg"}, tlp)
rows, err := readBulkRequest("test", r, "", "_time", []string{"_msg"}, tlp)
if err == nil {
t.Fatalf("expecting non-empty error")
}
@@ -40,13 +40,12 @@ func TestReadBulkRequest_Success(t *testing.T) {
f := func(data, encoding, timeField, msgField string, timestampsExpected []int64, resultExpected string) {
t.Helper()
timeFields := []string{"non_existing_foo", timeField, "non_existing_bar"}
msgFields := []string{"non_existing_foo", msgField, "non_exiting_bar"}
tlp := &insertutil.TestLogMessageProcessor{}
// Read the request without compression
r := bytes.NewBufferString(data)
rows, err := readBulkRequest("test", r, "", timeFields, msgFields, tlp)
rows, err := readBulkRequest("test", r, "", timeField, msgFields, tlp)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -63,7 +62,7 @@ func TestReadBulkRequest_Success(t *testing.T) {
data = compressData(data, encoding)
}
r = bytes.NewBufferString(data)
rows, err = readBulkRequest("test", r, encoding, timeFields, msgFields, tlp)
rows, err = readBulkRequest("test", r, encoding, timeField, msgFields, tlp)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -112,7 +111,7 @@ func compressData(s string, encoding string) string {
case "zstd":
zw, _ = zstd.NewWriter(&bb)
case "snappy":
return string(snappy.Encode(nil, []byte(s)))
zw = snappy.NewBufferedWriter(&bb)
case "deflate":
zw = zlib.NewWriter(&bb)
default:

View File

@@ -40,7 +40,7 @@ func benchmarkReadBulkRequest(b *testing.B, encoding string) {
}
dataBytes := bytesutil.ToUnsafeBytes(data)
timeFields := []string{"@timestamp"}
timeField := "@timestamp"
msgFields := []string{"message"}
blp := &insertutil.BenchmarkLogMessageProcessor{}
@@ -50,7 +50,7 @@ func benchmarkReadBulkRequest(b *testing.B, encoding string) {
r := &bytes.Reader{}
for pb.Next() {
r.Reset(dataBytes)
_, err := readBulkRequest("test", r, encoding, timeFields, msgFields, blp)
_, err := readBulkRequest("test", r, encoding, timeField, msgFields, blp)
if err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}

View File

@@ -28,13 +28,12 @@ var (
//
// See https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters
type CommonParams struct {
TenantID logstorage.TenantID
TimeFields []string
MsgFields []string
StreamFields []string
IgnoreFields []string
DecolorizeFields []string
ExtraFields []logstorage.Field
TenantID logstorage.TenantID
TimeField string
MsgFields []string
StreamFields []string
IgnoreFields []string
ExtraFields []logstorage.Field
Debug bool
DebugRequestURI string
@@ -49,15 +48,14 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
return nil, err
}
timeFields := []string{"_time"}
if tfs := httputil.GetArray(r, "_time_field", "VL-Time-Field"); len(tfs) > 0 {
timeFields = tfs
timeField := "_time"
if tf := httputil.GetRequestValue(r, "_time_field", "VL-Time-Field"); tf != "" {
timeField = tf
}
msgFields := httputil.GetArray(r, "_msg_field", "VL-Msg-Field")
streamFields := httputil.GetArray(r, "_stream_fields", "VL-Stream-Fields")
ignoreFields := httputil.GetArray(r, "ignore_fields", "VL-Ignore-Fields")
decolorizeFields := httputil.GetArray(r, "decolorize_fields", "VL-Decolorize-Fields")
extraFields, err := getExtraFields(r)
if err != nil {
@@ -79,16 +77,15 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
}
cp := &CommonParams{
TenantID: tenantID,
TimeFields: timeFields,
MsgFields: msgFields,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
DecolorizeFields: decolorizeFields,
ExtraFields: extraFields,
Debug: debug,
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
TenantID: tenantID,
TimeField: timeField,
MsgFields: msgFields,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
ExtraFields: extraFields,
Debug: debug,
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
}
return cp, nil
@@ -115,7 +112,7 @@ func getExtraFields(r *http.Request) ([]logstorage.Field, error) {
}
// GetCommonParamsForSyslog returns common params needed for parsing syslog messages and storing them to the given tenantID.
func GetCommonParamsForSyslog(tenantID logstorage.TenantID, streamFields, ignoreFields, decolorizeFields []string, extraFields []logstorage.Field) *CommonParams {
func GetCommonParamsForSyslog(tenantID logstorage.TenantID, streamFields, ignoreFields []string, extraFields []logstorage.Field) *CommonParams {
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
if streamFields == nil {
streamFields = []string{
@@ -125,17 +122,14 @@ func GetCommonParamsForSyslog(tenantID logstorage.TenantID, streamFields, ignore
}
}
cp := &CommonParams{
TenantID: tenantID,
TimeFields: []string{
"timestamp",
},
TenantID: tenantID,
TimeField: "timestamp",
MsgFields: []string{
"message",
},
StreamFields: streamFields,
IgnoreFields: ignoreFields,
DecolorizeFields: decolorizeFields,
ExtraFields: extraFields,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
ExtraFields: extraFields,
}
return cp
@@ -282,7 +276,7 @@ func (lmp *logMessageProcessor) MustClose() {
//
// MustClose() must be called on the returned LogMessageProcessor when it is no longer needed.
func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode bool) LogMessageProcessor {
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.DecolorizeFields, cp.ExtraFields, *defaultMsgValue)
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue)
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
lmp := &logMessageProcessor{

View File

@@ -10,29 +10,27 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// ExtractTimestampFromFields extracts timestamp in nanoseconds from the first field the name from timeFields at fields.
// ExtractTimestampFromFields extracts timestamp in nanoseconds from the field with the name timeField at fields.
//
// The value for the corresponding timeFields is set to empty string after returning from the function,
// The value for the timeField is set to empty string after returning from the function,
// so it could be ignored during data ingestion.
//
// The current timestamp is returned if fields do not contain a field with timeField name or if the timeField value is empty.
func ExtractTimestampFromFields(timeFields []string, fields []logstorage.Field) (int64, error) {
for _, timeField := range timeFields {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
nsecs, err := parseTimestamp(f.Value)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp from field %q: %s", f.Name, err)
}
f.Value = ""
if nsecs == 0 {
nsecs = time.Now().UnixNano()
}
return nsecs, nil
func ExtractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
nsecs, err := parseTimestamp(f.Value)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp from field %q: %s", timeField, err)
}
f.Value = ""
if nsecs == 0 {
nsecs = time.Now().UnixNano()
}
return nsecs, nil
}
return time.Now().UnixNano(), nil
}

View File

@@ -67,7 +67,7 @@ func TestExtractTimestampFromFields_Success(t *testing.T) {
f := func(timeField string, fields []logstorage.Field, nsecsExpected int64) {
t.Helper()
nsecs, err := ExtractTimestampFromFields([]string{timeField}, fields)
nsecs, err := ExtractTimestampFromFields(timeField, fields)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -140,7 +140,7 @@ func TestExtractTimestampFromFields_Error(t *testing.T) {
fields := []logstorage.Field{
{Name: "time", Value: s},
}
nsecs, err := ExtractTimestampFromFields([]string{"time"}, fields)
nsecs, err := ExtractTimestampFromFields("time", fields)
if err == nil {
t.Fatalf("expecting non-nil error")
}

View File

@@ -53,8 +53,8 @@ func getCommonParams(r *http.Request) (*insertutil.CommonParams, error) {
}
cp.TenantID = tenantID
}
if len(cp.TimeFields) == 0 {
cp.TimeFields = []string{*journaldTimeField}
if cp.TimeField != "" {
cp.TimeField = *journaldTimeField
}
if len(cp.StreamFields) == 0 {
cp.StreamFields = *journaldStreamFields
@@ -207,7 +207,7 @@ func parseJournaldRequest(data []byte, lmp insertutil.LogMessageProcessor, cp *i
if !allowedJournaldEntryNameChars.MatchString(name) {
return fmt.Errorf("journald entry name should consist of `A-Z0-9_` characters and must start from non-digit symbol")
}
if slices.Contains(cp.TimeFields, name) {
if name == cp.TimeField {
n, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse Journald timestamp, %w", err)

View File

@@ -11,8 +11,8 @@ func TestPushJournaldOk(t *testing.T) {
t.Helper()
tlp := &insertutil.TestLogMessageProcessor{}
cp := &insertutil.CommonParams{
TimeFields: []string{"__REALTIME_TIMESTAMP"},
MsgFields: []string{"MESSAGE"},
TimeField: "__REALTIME_TIMESTAMP",
MsgFields: []string{"MESSAGE"},
}
if err := parseJournaldRequest([]byte(src), tlp, cp); err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -46,8 +46,8 @@ func TestPushJournald_Failure(t *testing.T) {
t.Helper()
tlp := &insertutil.TestLogMessageProcessor{}
cp := &insertutil.CommonParams{
TimeFields: []string{"__REALTIME_TIMESTAMP"},
MsgFields: []string{"MESSAGE"},
TimeField: "__REALTIME_TIMESTAMP",
MsgFields: []string{"MESSAGE"},
}
if err := parseJournaldRequest([]byte(data), tlp, cp); err == nil {
t.Fatalf("expected non nil error")

View File

@@ -48,49 +48,34 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) {
lmp := cp.NewLogMessageProcessor("jsonline", true)
streamName := fmt.Sprintf("remoteAddr=%s, requestURI=%q", httpserver.GetQuotedRemoteAddr(r), r.RequestURI)
err = processStreamInternal(streamName, reader, cp.TimeFields, cp.MsgFields, lmp)
processStreamInternal(streamName, reader, cp.TimeField, cp.MsgFields, lmp)
lmp.MustClose()
if err != nil {
httpserver.Errorf(w, r, "cannot process jsonline request; error: %s", err)
return
}
requestDuration.UpdateDuration(startTime)
}
func processStreamInternal(streamName string, r io.Reader, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) error {
func processStreamInternal(streamName string, r io.Reader, timeField string, msgFields []string, lmp insertutil.LogMessageProcessor) {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
lr := insertutil.NewLineReader(streamName, wcr)
n := 0
errors := 0
var lastError error
for {
ok, err := readLine(lr, timeFields, msgFields, lmp)
ok, err := readLine(lr, timeField, msgFields, lmp)
wcr.DecConcurrency()
if err != nil {
lastError = err
errors++
errorsTotal.Inc()
logger.Warnf("jsonline: cannot read line #%d in /jsonline request: %s", n, err)
}
if !ok {
break
return
}
n++
}
errorsTotal.Add(errors)
if errors > 0 && n == errors {
// Return an error if no logs were processed and there were errors
return lastError
}
return nil
}
func readLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp insertutil.LogMessageProcessor) (bool, error) {
func readLine(lr *insertutil.LineReader, timeField string, msgFields []string, lmp insertutil.LogMessageProcessor) (bool, error) {
var line []byte
for len(line) == 0 {
if !lr.NextLine() {
@@ -104,11 +89,11 @@ func readLine(lr *insertutil.LineReader, timeFields, msgFields []string, lmp ins
defer logstorage.PutJSONParser(p)
if err := p.ParseLogMessage(line); err != nil {
return true, fmt.Errorf("%s; line contents: %q", err, line)
return true, fmt.Errorf("cannot parse json-encoded line: %w; line contents: %q", err, line)
}
ts, err := insertutil.ExtractTimestampFromFields(timeFields, p.Fields)
ts, err := insertutil.ExtractTimestampFromFields(timeField, p.Fields)
if err != nil {
return true, fmt.Errorf("%s; line contents: %q", err, line)
return true, fmt.Errorf("cannot get timestamp from json-encoded line: %w; line contents: %q", err, line)
}
logstorage.RenameField(p.Fields, msgFields, "_msg")
lmp.AddRow(ts, p.Fields, nil)

View File

@@ -2,23 +2,19 @@ package jsonline
import (
"bytes"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutil"
)
func TestProcessStreamInternalSuccess(t *testing.T) {
func TestProcessStreamInternal(t *testing.T) {
f := func(data, timeField, msgField string, timestampsExpected []int64, resultExpected string) {
t.Helper()
timeFields := []string{timeField}
msgFields := []string{msgField}
tlp := &insertutil.TestLogMessageProcessor{}
r := bytes.NewBufferString(data)
if err := processStreamInternal("test", r, timeFields, msgFields, tlp); err != nil {
t.Fatalf("unexpected error: %s", err)
}
processStreamInternal("test", r, timeField, msgFields, tlp)
if err := tlp.Verify(timestampsExpected, resultExpected); err != nil {
t.Fatal(err)
@@ -48,6 +44,22 @@ func TestProcessStreamInternalSuccess(t *testing.T) {
{"message":"baz"}`
f(data, timeField, msgField, timestampsExpected, resultExpected)
// invalid json
data = "foobar"
timeField = "@timestamp"
msgField = "aaa"
timestampsExpected = nil
resultExpected = ``
f(data, timeField, msgField, timestampsExpected, resultExpected)
// invalid timestamp field
data = `{"time":"foobar"}`
timeField = "time"
msgField = "abc"
timestampsExpected = nil
resultExpected = ``
f(data, timeField, msgField, timestampsExpected, resultExpected)
// invalid lines among valid lines
data = `
dsfodmasd
@@ -65,33 +77,3 @@ asbsdf
{"_msg":"baz"}`
f(data, timeField, msgField, timestampsExpected, resultExpected)
}
func TestProcessStreamInternalFailure(t *testing.T) {
f := func(data string) {
t.Helper()
tlp := &insertutil.TestLogMessageProcessor{}
r := strings.NewReader(data)
if err := processStreamInternal("test", r, []string{"time"}, nil, tlp); err == nil {
t.Fatalf("expected error, got nil")
}
if err := tlp.Verify(nil, ""); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
// invalid json
f("foobar")
f(`foo
bar`)
f(`
foo
`)
// invalid timestamp field
f(`{"time":"foobar"}`)
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"
)
@@ -52,7 +53,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) {
err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf", false)
useDefaultStreamFields := len(cp.StreamFields) == 0
err := pushProtobufRequest(data, lmp, cp.MsgFields, useDefaultStreamFields)
err := pushProtobufRequest(data, lmp, useDefaultStreamFields)
lmp.MustClose()
return err
})
@@ -74,7 +75,7 @@ var (
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
)
func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) error {
func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, useDefaultStreamFields bool) error {
var req pb.ExportLogsServiceRequest
if err := req.UnmarshalProtobuf(data); err != nil {
errorsTotal.Inc()
@@ -83,31 +84,35 @@ func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFie
var commonFields []logstorage.Field
for _, rl := range req.ResourceLogs {
commonFields = commonFields[:0]
commonFields = appendKeyValues(commonFields, rl.Resource.Attributes, "")
attributes := rl.Resource.Attributes
commonFields = slicesutil.SetLength(commonFields, len(attributes))
for i, attr := range attributes {
commonFields[i].Name = attr.Key
commonFields[i].Value = attr.Value.FormatString(true)
}
commonFieldsLen := len(commonFields)
for _, sc := range rl.ScopeLogs {
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields)
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, useDefaultStreamFields)
}
}
return nil
}
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) []logstorage.Field {
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutil.LogMessageProcessor, useDefaultStreamFields bool) []logstorage.Field {
fields := commonFields
for _, lr := range sc.LogRecords {
fields = fields[:len(commonFields)]
if lr.Body.KeyValueList != nil {
fields = appendKeyValues(fields, lr.Body.KeyValueList.Values, "")
logstorage.RenameField(fields[len(commonFields):], msgFields, "_msg")
} else {
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: lr.Body.FormatString(true),
})
for _, attr := range lr.Attributes {
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: lr.Body.FormatString(true),
Name: attr.Key,
Value: attr.Value.FormatString(true),
})
}
fields = appendKeyValues(fields, lr.Attributes, "")
if len(lr.TraceID) > 0 {
fields = append(fields, logstorage.Field{
Name: "trace_id",
@@ -133,22 +138,3 @@ func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field,
}
return fields
}
func appendKeyValues(fields []logstorage.Field, kvs []*pb.KeyValue, parentField string) []logstorage.Field {
for _, attr := range kvs {
fieldName := attr.Key
if parentField != "" {
fieldName = parentField + "." + fieldName
}
if attr.Value.KeyValueList != nil {
fields = appendKeyValues(fields, attr.Value.KeyValueList.Values, fieldName)
} else {
fields = append(fields, logstorage.Field{
Name: fieldName,
Value: attr.Value.FormatString(true),
})
}
}
return fields
}

View File

@@ -16,7 +16,7 @@ func TestPushProtoOk(t *testing.T) {
pData := lr.MarshalProtobuf(nil)
tlp := &insertutil.TestLogMessageProcessor{}
if err := pushProtobufRequest(pData, tlp, nil, false); err != nil {
if err := pushProtobufRequest(pData, tlp, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -41,26 +41,6 @@ func TestPushProtoOk(t *testing.T) {
`{"_msg":"log-line-message","severity":"Trace"}`,
)
// severities mapping
f([]pb.ResourceLogs{
{
ScopeLogs: []pb.ScopeLogs{
{
LogRecords: []pb.LogRecord{
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 13, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 24, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
},
},
},
},
},
[]int64{1234, 1234, 1234},
`{"_msg":"log-line-message","severity":"Trace"}
{"_msg":"log-line-message","severity":"Warn"}
{"_msg":"log-line-message","severity":"Fatal4"}`,
)
// multi-line with resource attributes
f([]pb.ResourceLogs{
{
@@ -80,7 +60,7 @@ func TestPushProtoOk(t *testing.T) {
{
LogRecords: []pb.LogRecord{
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1235, SeverityNumber: 25, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}},
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1235, SeverityNumber: 21, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}},
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1236, SeverityNumber: -1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}},
},
},
@@ -88,9 +68,9 @@ func TestPushProtoOk(t *testing.T) {
},
},
[]int64{1234, 1235, 1236},
`{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message","severity":"Trace"}
{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message-msg-2","severity":"Unspecified"}
{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message-msg-2","severity":"Unspecified"}`,
`{"logger":"context","instance_id":"10","node_taints":"{\"role\":\"dev\",\"cluster_load_percent\":0.55}","_msg":"log-line-message","severity":"Trace"}
{"logger":"context","instance_id":"10","node_taints":"{\"role\":\"dev\",\"cluster_load_percent\":0.55}","_msg":"log-line-message-msg-2","severity":"Unspecified"}
{"logger":"context","instance_id":"10","node_taints":"{\"role\":\"dev\",\"cluster_load_percent\":0.55}","_msg":"log-line-message-msg-2","severity":"Unspecified"}`,
)
// multi-scope with resource attributes and multi-line
@@ -136,71 +116,14 @@ func TestPushProtoOk(t *testing.T) {
},
},
[]int64{1234, 1235, 2345, 2346, 2347, 2348, 3333},
`{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message","severity":"Trace"}
{"logger":"context","instance_id":"10","node_taints.role":"dev","node_taints.cluster_load_percent":"0.55","_msg":"log-line-message-msg-2","severity":"Debug"}
`{"logger":"context","instance_id":"10","node_taints":"{\"role\":\"dev\",\"cluster_load_percent\":0.55}","_msg":"log-line-message","severity":"Trace"}
{"logger":"context","instance_id":"10","node_taints":"{\"role\":\"dev\",\"cluster_load_percent\":0.55}","_msg":"log-line-message-msg-2","severity":"Debug"}
{"_msg":"log-line-resource-scope-1-0-0","severity":"Info2"}
{"_msg":"log-line-resource-scope-1-0-1","severity":"Info2"}
{"_msg":"log-line-resource-scope-1-1-0","severity":"Info4"}
{"_msg":"log-line-resource-scope-1-1-1","trace_id":"1234","span_id":"45","severity":"Info4"}
{"_msg":"log-line-resource-scope-1-1-2","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7","severity":"Unspecified"}`,
)
// nested fields
f([]pb.ResourceLogs{
{
ScopeLogs: []pb.ScopeLogs{
{
LogRecords: []pb.LogRecord{
{
TimeUnixNano: 1234,
Body: pb.AnyValue{StringValue: ptrTo("nested fields")},
Attributes: []*pb.KeyValue{
{Key: "error", Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{Values: []*pb.KeyValue{
{
Key: "type",
Value: &pb.AnyValue{StringValue: ptrTo("document_parsing_exception")},
},
{
Key: "reason",
Value: &pb.AnyValue{StringValue: ptrTo("failed to parse field [_msg] of type [text]")},
},
{
Key: "caused_by",
Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{Values: []*pb.KeyValue{
{
Key: "type",
Value: &pb.AnyValue{StringValue: ptrTo("x_content_parse_exception")},
},
{
Key: "reason",
Value: &pb.AnyValue{StringValue: ptrTo("unexpected end-of-input in VALUE_STRING")},
},
{
Key: "caused_by",
Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{Values: []*pb.KeyValue{
{
Key: "type",
Value: &pb.AnyValue{StringValue: ptrTo("json_e_o_f_exception")},
},
{
Key: "reason",
Value: &pb.AnyValue{StringValue: ptrTo("eof")},
},
}}},
},
}}},
},
}}}},
},
},
},
},
},
},
}, []int64{1234},
`{"_msg":"nested fields","error.type":"document_parsing_exception","error.reason":"failed to parse field [_msg] of type [text]",`+
`"error.caused_by.type":"x_content_parse_exception","error.caused_by.reason":"unexpected end-of-input in VALUE_STRING",`+
`"error.caused_by.caused_by.type":"json_e_o_f_exception","error.caused_by.caused_by.reason":"eof","severity":"Unspecified"}`)
}
func ptrTo[T any](s T) *T {

View File

@@ -27,7 +27,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels)
for pb.Next() {
if err := pushProtobufRequest(body, blp, nil, false); err != nil {
if err := pushProtobufRequest(body, blp, false); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
}

View File

@@ -45,11 +45,6 @@ var (
ignoreFieldsUDP = flagutil.NewArrayString("syslog.ignoreFields.udp", "Fields to ignore at logs ingested via the corresponding -syslog.listenAddr.udp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#dropping-fields`)
decolorizeFieldsTCP = flagutil.NewArrayString("syslog.decolorizeFields.tcp", "Fields to remove ANSI color codes across logs ingested via the corresponding -syslog.listenAddr.tcp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#decolorizing-fields`)
decolorizeFieldsUDP = flagutil.NewArrayString("syslog.decolorizeFields.udp", "Fields to remove ANSI color codes across logs ingested via the corresponding -syslog.listenAddr.udp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#decolorizing-fields`)
extraFieldsTCP = flagutil.NewArrayString("syslog.extraFields.tcp", "Fields to add to logs ingested via the corresponding -syslog.listenAddr.tcp. "+
`See https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/#adding-extra-fields`)
extraFieldsUDP = flagutil.NewArrayString("syslog.extraFields.udp", "Fields to add to logs ingested via the corresponding -syslog.listenAddr.udp. "+
@@ -193,12 +188,6 @@ func runUDPListener(addr string, argIdx int) {
logger.Fatalf("cannot parse -syslog.ignoreFields.udp=%q for -syslog.listenAddr.udp=%q: %s", ignoreFieldsStr, addr, err)
}
decolorizeFieldsStr := decolorizeFieldsUDP.GetOptionalArg(argIdx)
decolorizeFields, err := parseFieldsList(decolorizeFieldsStr)
if err != nil {
logger.Fatalf("cannot parse -syslog.decolorizeFields.udp=%q for -syslog.listenAddr.udp=%q: %s", decolorizeFieldsStr, addr, err)
}
extraFieldsStr := extraFieldsUDP.GetOptionalArg(argIdx)
extraFields, err := parseExtraFields(extraFieldsStr)
if err != nil {
@@ -207,7 +196,7 @@ func runUDPListener(addr string, argIdx int) {
doneCh := make(chan struct{})
go func() {
serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, decolorizeFields, extraFields)
serveUDP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
close(doneCh)
}()
@@ -260,12 +249,6 @@ func runTCPListener(addr string, argIdx int) {
logger.Fatalf("cannot parse -syslog.ignoreFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", ignoreFieldsStr, addr, err)
}
decolorizeFieldsStr := decolorizeFieldsTCP.GetOptionalArg(argIdx)
decolorizeFields, err := parseFieldsList(decolorizeFieldsStr)
if err != nil {
logger.Fatalf("cannot parse -syslog.decolorizeFields.tcp=%q for -syslog.listenAddr.tcp=%q: %s", decolorizeFieldsStr, addr, err)
}
extraFieldsStr := extraFieldsTCP.GetOptionalArg(argIdx)
extraFields, err := parseExtraFields(extraFieldsStr)
if err != nil {
@@ -274,7 +257,7 @@ func runTCPListener(addr string, argIdx int) {
doneCh := make(chan struct{})
go func() {
serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, decolorizeFields, extraFields)
serveTCP(ln, tenantID, compressMethod, useLocalTimestamp, streamFields, ignoreFields, extraFields)
close(doneCh)
}()
@@ -296,7 +279,7 @@ func checkCompressMethod(compressMethod, addr, protocol string) {
}
}
func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, encoding string, useLocalTimestamp bool, streamFields, ignoreFields, decolorizeFields []string, extraFields []logstorage.Field) {
func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, encoding string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
localAddr := ln.LocalAddr()
@@ -304,7 +287,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, encoding string,
wg.Add(1)
go func() {
defer wg.Done()
cp := insertutil.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, decolorizeFields, extraFields)
cp := insertutil.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
@@ -338,7 +321,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, encoding string,
wg.Wait()
}
func serveTCP(ln net.Listener, tenantID logstorage.TenantID, encoding string, useLocalTimestamp bool, streamFields, ignoreFields, decolorizeFields []string, extraFields []logstorage.Field) {
func serveTCP(ln net.Listener, tenantID logstorage.TenantID, encoding string, useLocalTimestamp bool, streamFields, ignoreFields []string, extraFields []logstorage.Field) {
var cm ingestserver.ConnsMap
cm.Init("syslog")
@@ -368,7 +351,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, encoding string, us
wg.Add(1)
go func() {
cp := insertutil.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, decolorizeFields, extraFields)
cp := insertutil.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields)
if err := processStream("tcp", c, encoding, useLocalTimestamp, cp); err != nil {
logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err)
}
@@ -552,7 +535,7 @@ func processLine(line []byte, currentYear int, timezone *time.Location, useLocal
if useLocalTimestamp {
ts = time.Now().UnixNano()
} else {
nsecs, err := insertutil.ExtractTimestampFromFields(timeFields, p.Fields)
nsecs, err := insertutil.ExtractTimestampFromFields("timestamp", p.Fields)
if err != nil {
return fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err)
}
@@ -565,7 +548,6 @@ func processLine(line []byte, currentYear int, timezone *time.Location, useLocal
return nil
}
var timeFields = []string{"timestamp"}
var msgFields = []string{"message"}
var (

View File

@@ -17,7 +17,7 @@ func isTerminal() bool {
return isatty.IsTerminal(os.Stdout.Fd()) && isatty.IsTerminal(os.Stderr.Fd())
}
func readWithLess(r io.Reader, disableColors, wrapLongLines bool) error {
func readWithLess(r io.Reader, wrapLongLines bool) error {
if !isTerminal() {
// Just write everything to stdout if no terminal is available.
_, err := io.Copy(os.Stdout, r)
@@ -49,9 +49,6 @@ func readWithLess(r io.Reader, disableColors, wrapLongLines bool) error {
return fmt.Errorf("cannot find 'less' command: %w", err)
}
opts := []string{"less", "-F", "-X"}
if !disableColors {
opts = append(opts, "-R")
}
if !wrapLongLines {
opts = append(opts, "-S")
}

View File

@@ -91,7 +91,6 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) {
}
outputMode := outputModeJSONMultiline
disableColors := true
wrapLongLines := false
s := ""
for {
@@ -101,7 +100,7 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) {
case io.EOF:
if s != "" {
// This is non-interactive query execution.
executeQuery(context.Background(), rl, s, outputMode, disableColors, wrapLongLines)
executeQuery(context.Background(), rl, s, outputMode, wrapLongLines)
}
return
case readline.ErrInterrupt:
@@ -177,24 +176,6 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) {
s = ""
continue
}
if s == `\disable_colors` {
if !disableColors {
disableColors = true
fmt.Fprintf(rl, `disabled colors in compact output mode; enter \enable_colors for enabling it`+"\n")
}
historyLines = pushToHistory(rl, historyLines, s)
s = ""
continue
}
if s == `\enable_colors` {
if disableColors {
disableColors = false
fmt.Fprintf(rl, `enabled colors in compact output mode; type \disable_colors for disabling it`+"\n")
}
historyLines = pushToHistory(rl, historyLines, s)
s = ""
continue
}
if line != "" && !strings.HasSuffix(line, ";") {
// Assume the query is incomplete and allow the user finishing the query on the next line
s += "\n"
@@ -204,7 +185,7 @@ func runReadlineLoop(rl *readline.Instance, incompleteLine *string) {
// Execute the query
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
executeQuery(ctx, rl, s, outputMode, disableColors, wrapLongLines)
executeQuery(ctx, rl, s, outputMode, wrapLongLines)
cancel()
historyLines = pushToHistory(rl, historyLines, s)
@@ -292,15 +273,13 @@ func printCommandsHelp(w io.Writer) {
\c - compact output mode
\logfmt - logfmt output mode
\wrap_long_lines - toggles wrapping long lines
\enable_colors - enable ANSI colors in compact output mode
\disable_colors - disable ANSI colors in compact output mode
\tail <query> - live tail <query> results
See https://docs.victoriametrics.com/victorialogs/querying/vlogscli/ for more details
`)
}
func executeQuery(ctx context.Context, output io.Writer, qStr string, outputMode outputMode, disableColors, wrapLongLines bool) {
func executeQuery(ctx context.Context, output io.Writer, qStr string, outputMode outputMode, wrapLongLines bool) {
if strings.HasPrefix(qStr, `\tail `) {
tailQuery(ctx, output, qStr, outputMode)
return
@@ -314,7 +293,7 @@ func executeQuery(ctx context.Context, output io.Writer, qStr string, outputMode
_ = respBody.Close()
}()
if err := readWithLess(respBody, disableColors, wrapLongLines); err != nil {
if err := readWithLess(respBody, wrapLongLines); err != nil {
fmt.Fprintf(output, "error when reading query response: %s\n", err)
return
}

View File

@@ -25,8 +25,8 @@ var (
addr = flag.String("addr", "stdout", "HTTP address to push the generated logs to; if it is set to stdout, then logs are generated to stdout")
workers = flag.Int("workers", 1, "The number of workers to use to push logs to -addr")
start = newTimeFlag("start", "-1d", "Generated logs start from this time; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats")
end = newTimeFlag("end", "0s", "Generated logs end at this time; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats")
start = newTimeFlag("start", "-1d", "Generated logs start from this time; see https://docs.victoriametrics.com/#timestamp-formats")
end = newTimeFlag("end", "0s", "Generated logs end at this time; see https://docs.victoriametrics.com/#timestamp-formats")
activeStreams = flag.Int("activeStreams", 100, "The number of active log streams to generate; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields")
totalStreams = flag.Int("totalStreams", 0, "The number of total log streams; if -totalStreams > -activeStreams, then some active streams are substituted with new streams "+
"during data generation")
@@ -206,8 +206,6 @@ func generateAndPushLogs(cfg *workerConfig, workerID int) {
if err != nil {
logger.Fatalf("cannot perform request to %q: %s", cfg.url, err)
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
logger.Fatalf("unexpected status code got from %q: %d; want 2xx", cfg.url, err)
}

View File

@@ -133,7 +133,7 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
// Send the remaining data
for _, bb := range bufs.All() {
for _, bb := range bufs.GetSlice() {
if err := sendBuf(bb); err != nil {
return err
}

View File

@@ -55,10 +55,7 @@ func ProcessFacetsRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
}
keepConstFields := httputil.GetBool(r, "keep_const_fields")
// Pipes must be dropped, since it is expected facets are obtained
// from the real logs stored in the database.
q.DropAllPipes()
q.AddFacetsPipe(limit, maxValuesPerField, maxValueLen, keepConstFields)
var mLock sync.Mutex
@@ -159,10 +156,8 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
fieldsLimit = 0
}
// Pipes must be dropped, since it is expected hits are obtained
// from the real logs stored in the database.
// Prepare the query for hits count.
q.DropAllPipes()
q.AddCountByTimePipe(int64(step), int64(offset), fields)
var mLock sync.Mutex
@@ -295,10 +290,6 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
return
}
// Pipes must be dropped, since it is expected field names are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain field names for the given query
fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q)
if err != nil {
@@ -338,10 +329,6 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
limit = 0
}
// Pipes must be dropped, since it is expected field values are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain unique values for the given field
values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
if err != nil {
@@ -364,10 +351,6 @@ func ProcessStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter,
return
}
// Pipes must be dropped, since it is expected stream field names are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain stream field names for the given query
names, err := vlstorage.GetStreamFieldNames(ctx, tenantIDs, q)
if err != nil {
@@ -406,10 +389,6 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter,
limit = 0
}
// Pipes must be dropped, since it is expected stream field values are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain stream field values for the given query and the given fieldName
values, err := vlstorage.GetStreamFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
if err != nil {
@@ -441,10 +420,6 @@ func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
limit = 0
}
// Pipes must be dropped, since it is expected stream ids are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain streamIDs for the given query
streamIDs, err := vlstorage.GetStreamIDs(ctx, tenantIDs, q, uint64(limit))
if err != nil {
@@ -476,10 +451,6 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
limit = 0
}
// Pipes must be dropped, since it is expected stream are obtained
// from the real logs stored in the database.
q.DropAllPipes()
// Obtain streams for the given query
streams, err := vlstorage.GetStreams(ctx, tenantIDs, q, uint64(limit))
if err != nil {
@@ -543,11 +514,6 @@ func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http.
if !ok {
logger.Panicf("BUG: it is expected that http.ResponseWriter (%T) supports http.Flusher interface", w)
}
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher.Flush()
qOrig := q
for {
q = qOrig.CloneWithTimeFilter(end, start, end)
@@ -580,7 +546,7 @@ var liveTailRequests = metrics.NewCounter(`vl_live_tailing_requests`)
const tailOffsetNsecs = 5e9
type logRow struct {
timestamp int64
timestamp string
fields []logstorage.Field
}
@@ -596,7 +562,7 @@ type tailProcessor struct {
mu sync.Mutex
perStreamRows map[string][]logRow
lastTimestamps map[string]int64
lastTimestamps map[string]string
err error
}
@@ -606,7 +572,7 @@ func newTailProcessor(cancel func()) *tailProcessor {
cancel: cancel,
perStreamRows: make(map[string][]logRow),
lastTimestamps: make(map[string]int64),
lastTimestamps: make(map[string]string),
}
}
@@ -623,7 +589,7 @@ func (tp *tailProcessor) writeBlock(_ uint, db *logstorage.DataBlock) {
}
// Make sure columns contain _time field, since it is needed for proper tail work.
timestamps, ok := db.GetTimestamps(nil)
timestamps, ok := db.GetTimestamps()
if !ok {
tp.err = fmt.Errorf("missing _time field")
tp.cancel()
@@ -921,7 +887,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
shard.sw = sw
}
defer func() {
shards := bwShards.All()
shards := bwShards.GetSlice()
for _, shard := range shards {
shard.FlushIgnoreErrors()
}
@@ -1072,7 +1038,9 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
}
func getLastNRows(rows []logRow, limit int) []logRow {
sortLogRows(rows)
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
if len(rows) > limit {
rows = rows[len(rows)-limit:]
}
@@ -1097,7 +1065,7 @@ func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.Tenant
clonedColumnNames[i] = strings.Clone(c.Name)
}
timestamps, ok := db.GetTimestamps(nil)
timestamps, ok := db.GetTimestamps()
if !ok {
missingTimeColumn.Store(true)
cancel()
@@ -1189,22 +1157,20 @@ func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID,
}
// Parse optional extra_filters
for _, extraFiltersStr := range r.Form["extra_filters"] {
extraFilters, err := parseExtraFilters(extraFiltersStr)
if err != nil {
return nil, nil, err
}
q.AddExtraFilters(extraFilters)
extraFiltersStr := r.FormValue("extra_filters")
extraFilters, err := parseExtraFilters(extraFiltersStr)
if err != nil {
return nil, nil, err
}
q.AddExtraFilters(extraFilters)
// Parse optional extra_stream_filters
for _, extraStreamFiltersStr := range r.Form["extra_stream_filters"] {
extraStreamFilters, err := parseExtraStreamFilters(extraStreamFiltersStr)
if err != nil {
return nil, nil, err
}
q.AddExtraFilters(extraStreamFilters)
extraStreamFiltersStr := r.FormValue("extra_stream_filters")
extraStreamFilters, err := parseExtraStreamFilters(extraStreamFiltersStr)
if err != nil {
return nil, nil, err
}
q.AddExtraFilters(extraStreamFilters)
return q, tenantIDs, nil
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -35,10 +35,10 @@
<meta property="og:title" content="UI for VictoriaLogs">
<meta property="og:url" content="https://victoriametrics.com/products/victorialogs/">
<meta property="og:description" content="Explore your log data with VictoriaLogs UI">
<script type="module" crossorigin src="./assets/index-BaRvaPfA.js"></script>
<link rel="modulepreload" crossorigin href="./assets/vendor-D8IJGiEn.js">
<script type="module" crossorigin src="./assets/index-C7CcGBou.js"></script>
<link rel="modulepreload" crossorigin href="./assets/vendor-PQqNLyna.js">
<link rel="stylesheet" crossorigin href="./assets/vendor-D1GxaB_c.css">
<link rel="stylesheet" crossorigin href="./assets/index-C85_NB5q.css">
<link rel="stylesheet" crossorigin href="./assets/index-u4IOGr0E.css">
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>

View File

@@ -41,10 +41,7 @@ var (
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which "+
"the storage stops accepting new data")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge . It overrides -httpAuth.* . "+
"See https://docs.victoriametrics.com/victorialogs/#forced-merge")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush . It overrides -httpAuth.* . "+
"See https://docs.victoriametrics.com/victorialogs/#forced-flush")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
storageNodeAddrs = flagutil.NewArrayString("storageNode", "Comma-separated list of TCP addresses for storage nodes to route the ingested logs to and to send select queries to. "+
"If the list is empty, then the ingested logs are stored and queried locally from -storageDataPath")
@@ -206,11 +203,8 @@ func Stop() {
// RequestHandler is a storage request handler.
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
switch path {
case "/internal/force_merge":
if path == "/internal/force_merge" {
return processForceMerge(w, r)
case "/internal/force_flush":
return processForceFlush(w, r)
}
return false
}
@@ -238,21 +232,6 @@ func processForceMerge(w http.ResponseWriter, r *http.Request) bool {
return true
}
func processForceFlush(w http.ResponseWriter, r *http.Request) bool {
if localStorage == nil {
// Force merge isn't supported by non-local storage
return false
}
if !httpserver.CheckAuthFlag(w, r, forceFlushAuthKey) {
return true
}
logger.Infof("flushing storage to make pending data available for reading")
localStorage.DebugFlush()
return true
}
// CanWriteData returns non-nil error if it cannot write data to vlstorage
func CanWriteData() error {
if localStorage == nil {

View File

@@ -248,9 +248,6 @@ func (sn *storageNode) executeRequestAt(ctx context.Context, path string, args u
if err != nil {
logger.Panicf("BUG: unexpected error when creating a request: %s", err)
}
if err := sn.ac.SetHeaders(req, true); err != nil {
return nil, fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
}
// send the request to the storage node
resp, err := sn.c.Do(req)

View File

@@ -1,3 +1,3 @@
See vmagent docs [here](https://docs.victoriametrics.com/victoriametrics/vmagent/).
See vmagent docs [here](https://docs.victoriametrics.com/vmagent/).
vmagent docs can be edited at [docs/vmagent.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmagent.md).
vmagent docs can be edited at [docs/vmagent.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmagent.md).

View File

@@ -165,9 +165,7 @@ func main() {
promscrape.Init(remotewrite.PushDropSamplesOnFailure)
go httpserver.Serve(listenAddrs, requestHandler, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
})
go httpserver.Serve(listenAddrs, useProxyProtocol, requestHandler)
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())
pushmetrics.Init()
@@ -244,7 +242,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h2>vmagent</h2>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/victoriametrics/vmagent/'>https://docs.victoriametrics.com/victoriametrics/vmagent/</a></br>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/vmagent/'>https://docs.victoriametrics.com/vmagent/</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"targets", "status for discovered active targets"},
@@ -754,7 +752,7 @@ func usage() {
const s = `
vmagent collects metrics data via popular data ingestion protocols and routes it to VictoriaMetrics.
See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
See the docs at https://docs.victoriametrics.com/vmagent/ .
`
flagutil.Usage(s)
}

View File

@@ -9,5 +9,4 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
EXPOSE 8429
ENTRYPOINT ["/vmagent-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmagent-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmagent-prod
COPY vmagent-linux-${TARGETARCH}-prod ./vmagent-prod

View File

@@ -30,9 +30,9 @@ import (
var (
forcePromProto = flagutil.NewArrayBool("remoteWrite.forcePromProto", "Whether to force Prometheus remote write protocol for sending data "+
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol")
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol")
forceVMProto = flagutil.NewArrayBool("remoteWrite.forceVMProto", "Whether to force VictoriaMetrics remote write protocol for sending data "+
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol")
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol")
rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", 0, "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
@@ -418,7 +418,7 @@ again:
if retryDuration > maxRetryDuration {
retryDuration = maxRetryDuration
}
remoteWriteRetryLogger.Warnf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
logger.Warnf("couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds",
len(block), c.sanitizedURL, err, retryDuration.Seconds())
t := timerpool.Get(retryDuration)
select {
@@ -458,13 +458,13 @@ again:
} else if statusCode == 415 || statusCode == 400 {
if c.canDowngradeVMProto.Swap(false) {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Downgrading protocol from VictoriaMetrics to Prometheus remote write for all future requests. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", c.sanitizedURL)
"See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol", c.sanitizedURL)
c.useVMProto.Store(false)
}
if encoding.IsZstd(block) {
logger.Infof("received unsupported media type or bad request from remote storage at %q. Re-packing the block to Prometheus remote write and retrying."+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol", c.sanitizedURL)
"See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol", c.sanitizedURL)
block = mustRepackBlockFromZstdToSnappy(block)
@@ -509,7 +509,6 @@ again:
}
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second)
// getRetryDuration returns retry duration.
// retryAfterDuration has the highest priority.

View File

@@ -29,7 +29,7 @@ var (
maxRowsPerBlock = flag.Int("remoteWrite.maxRowsPerBlock", 10000, "The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize")
vmProtoCompressLevel = flag.Int("remoteWrite.vmProtoCompressLevel", 0, "The compression level for VictoriaMetrics remote write protocol. "+
"Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#victoriametrics-remote-write-protocol")
"See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol")
)
type pendingSeries struct {

View File

@@ -7,13 +7,10 @@ import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -22,10 +19,10 @@ var (
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabeling configs, which are applied "+
"to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. "+
"The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/relabeling/")
"See https://docs.victoriametrics.com/vmagent/#relabeling")
relabelConfigPaths = flagutil.NewArrayString("remoteWrite.urlRelabelConfig", "Optional path to relabel configs for the corresponding -remoteWrite.url. "+
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/relabeling/")
"See https://docs.victoriametrics.com/vmagent/#relabeling")
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
@@ -34,59 +31,12 @@ var (
var labelsGlobal []prompbmarshal.Label
var (
relabelConfigReloads *metrics.Counter
relabelConfigReloadErrors *metrics.Counter
relabelConfigSuccess *metrics.Gauge
relabelConfigTimestamp *metrics.Counter
)
func initRelabelMetrics() {
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
relabelConfigSuccess = metrics.NewGauge(`vmagent_relabel_config_last_reload_successful`, nil)
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
}
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
func CheckRelabelConfigs() error {
_, err := loadRelabelConfigs()
return err
}
func initRelabelConfigs() {
rcs, err := loadRelabelConfigs()
if err != nil {
logger.Fatalf("cannot initialize relabel configs: %s", err)
}
allRelabelConfigs.Store(rcs)
if rcs.isSet() {
initRelabelMetrics()
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
}
}
func reloadRelabelConfigs() {
rcs := allRelabelConfigs.Load()
if !rcs.isSet() {
return
}
relabelConfigReloads.Inc()
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
relabelConfigSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
return
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded relabel configs")
}
func loadRelabelConfigs() (*relabelConfigs, error) {
var rcs relabelConfigs
if *relabelConfigPathGlobal != "" {
@@ -120,21 +70,6 @@ type relabelConfigs struct {
perURL []*promrelabel.ParsedConfigs
}
func (rcs *relabelConfigs) isSet() bool {
if rcs == nil {
return false
}
if rcs.global.Len() > 0 {
return true
}
for _, pc := range rcs.perURL {
if pc.Len() > 0 {
return true
}
}
return false
}
// initLabelsGlobal must be called after parsing command-line flags.
func initLabelsGlobal() {
labelsGlobal = nil

View File

@@ -15,7 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@@ -39,21 +39,22 @@ var (
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
enableMultitenantHandlers = flag.Bool("enableMultitenantHandlers", false, "Whether to process incoming data via multitenant insert handlers according to "+
"https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
"according to https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data ."+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#multitenancy for details")
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details")
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url. "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages . "+
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . "+
"See also -remoteWrite.shardByURLReplicas")
shardByURLReplicas = flag.Int("remoteWrite.shardByURLReplicas", 1, "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url "+
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels")
shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@@ -80,30 +81,28 @@ var (
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
`Enabled sorting for labels can slow down ingestion performance a bit`)
maxHourlySeries = flag.Int("remoteWrite.maxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter")
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter")
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter")
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload")
"See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload")
dropSamplesOnOverload = flag.Bool("remoteWrite.dropSamplesOnOverload", false, "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples "+
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence")
)
var (
// rwctxsGlobal contains statically populated entries when -remoteWrite.url is specified.
rwctxsGlobal []*remoteWriteCtx
rwctxsGlobalIdx []int
rwctxConsistentHashGlobal *consistenthash.ConsistentHash
rwctxsGlobal []*remoteWriteCtx
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
ErrQueueFullHTTPRetry = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence"),
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence"),
StatusCode: http.StatusTooManyRequests,
}
@@ -184,7 +183,7 @@ func Init() {
if len(*shardByURLLabels) > 0 && len(*shardByURLIgnoreLabels) > 0 {
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
"see https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
}
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
@@ -196,11 +195,17 @@ func Init() {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
initRelabelConfigs()
rcs, err := loadRelabelConfigs()
if err != nil {
logger.Fatalf("cannot load relabel configs: %s", err)
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
initStreamAggrConfigGlobal()
initRemoteWriteCtxs(*remoteWriteURLs)
rwctxsGlobal = newRemoteWriteCtxs(*remoteWriteURLs)
disableOnDiskQueues := []bool(*disableOnDiskQueue)
disableOnDiskQueueAny = slices.Contains(disableOnDiskQueues, true)
@@ -262,7 +267,30 @@ func dropDanglingQueues() {
}
}
func initRemoteWriteCtxs(urls []string) {
func reloadRelabelConfigs() {
relabelConfigReloads.Inc()
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
relabelConfigSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
return
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded relabel configs")
}
var (
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
relabelConfigSuccess = metrics.NewGauge(`vmagent_relabel_config_last_reload_successful`, nil)
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
)
func newRemoteWriteCtxs(urls []string) []*remoteWriteCtx {
if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
}
@@ -278,7 +306,6 @@ func initRemoteWriteCtxs(urls []string) {
maxInmemoryBlocks = 2
}
rwctxs := make([]*remoteWriteCtx, len(urls))
rwctxIdx := make([]int, len(urls))
for i, remoteWriteURLRaw := range urls {
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
if err != nil {
@@ -289,19 +316,8 @@ func initRemoteWriteCtxs(urls []string) {
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
}
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
rwctxIdx[i] = i
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
for i, url := range urls {
consistentHashNodes = append(consistentHashNodes, fmt.Sprintf("%d:%s", i+1, url))
}
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
}
rwctxsGlobal = rwctxs
rwctxsGlobalIdx = rwctxIdx
return rwctxs
}
var (
@@ -485,10 +501,6 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
return true
}
// getEligibleRemoteWriteCtxs checks whether writes to configured remote storage systems are blocked and
// returns only the unblocked rwctx.
//
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
func getEligibleRemoteWriteCtxs(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
if !disableOnDiskQueueAny {
return rwctxsGlobal, true
@@ -505,12 +517,6 @@ func getEligibleRemoteWriteCtxs(tss []prompbmarshal.TimeSeries, forceDropSamples
return nil, false
}
rowsCount := getRowsCount(tss)
if *shardByURL {
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
// data to all rwctxs evenly.
rowsCount = rowsCount / len(rwctxsGlobal)
}
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
}
}
@@ -522,7 +528,6 @@ func pushToRemoteStoragesTrackDropped(tss []prompbmarshal.TimeSeries) {
if len(rwctxs) == 0 {
return
}
if !tryPushBlockToRemoteStorages(rwctxs, tss, true) {
logger.Panicf("BUG: tryPushBlockToRemoteStorages() must return true when forceDropSamplesOnFailure=true")
}
@@ -573,68 +578,7 @@ func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []pr
defer putTSSShards(x)
shards := x.shards
shardAmountRemoteWriteCtx(tssBlock, shards, rwctxs, replicas)
// Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs {
shard := shards[i]
if len(shard) == 0 {
continue
}
wg.Add(1)
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.TryPush(tss, forceDropSamplesOnFailure) {
anyPushFailed.Store(true)
}
}(rwctx, shard)
}
wg.Wait()
return !anyPushFailed.Load()
}
// calculateHealthyRwctxIdx returns the index of healthyRwctxs in rwctxsGlobal.
// It relies on the order of rwctx in healthyRwctxs, which is appended by getEligibleRemoteWriteCtxs.
func calculateHealthyRwctxIdx(healthyRwctxs []*remoteWriteCtx) ([]int, []int) {
// fast path: all rwctxs are healthy.
if len(healthyRwctxs) == len(rwctxsGlobal) {
return rwctxsGlobalIdx, nil
}
unhealthyIdx := make([]int, 0, len(rwctxsGlobal))
healthyIdx := make([]int, 0, len(rwctxsGlobal))
var i int
for j := range rwctxsGlobal {
if i < len(healthyRwctxs) && rwctxsGlobal[j].idx == healthyRwctxs[i].idx {
healthyIdx = append(healthyIdx, j)
i++
} else {
unhealthyIdx = append(unhealthyIdx, j)
}
}
return healthyIdx, unhealthyIdx
}
// shardAmountRemoteWriteCtx distribute time series to shards by consistent hashing.
func shardAmountRemoteWriteCtx(tssBlock []prompbmarshal.TimeSeries, shards [][]prompbmarshal.TimeSeries, rwctxs []*remoteWriteCtx, replicas int) {
tmpLabels := promutil.GetLabels()
defer promutil.PutLabels(tmpLabels)
healthyIdx, unhealthyIdx := calculateHealthyRwctxIdx(rwctxs)
// shardsIdxMap is a map to find which the shard idx by rwctxs idx.
// rwctxConsistentHashGlobal will tell which the rwctxs idx a time series should be written to.
// And this time series should be appended to the shards by correct shard idx.
shardsIdxMap := make(map[int]int, len(healthyIdx))
for idx, rwctxsIdx := range healthyIdx {
shardsIdxMap[rwctxsIdx] = idx
}
for _, ts := range tssBlock {
hashLabels := ts.Labels
if len(shardByURLLabelsMap) > 0 {
@@ -655,25 +599,41 @@ func shardAmountRemoteWriteCtx(tssBlock []prompbmarshal.TimeSeries, shards [][]p
tmpLabels.Labels = hashLabels
}
h := getLabelsHash(hashLabels)
// Get the rwctxIdx through consistent hashing and then map it to the index in shards.
// The rwctxIdx is not always equal to the shardIdx, for example, when some rwctx are not available.
rwctxIdx := rwctxConsistentHashGlobal.GetNodeIdx(h, unhealthyIdx)
shardIdx := shardsIdxMap[rwctxIdx]
replicated := 0
idx := h % uint64(len(shards))
i := 0
for {
shards[shardIdx] = append(shards[shardIdx], ts)
replicated++
if replicated >= replicas {
shards[idx] = append(shards[idx], ts)
i++
if i >= replicas {
break
}
shardIdx++
if shardIdx >= len(shards) {
shardIdx = 0
idx++
if idx >= uint64(len(shards)) {
idx = 0
}
}
}
promutil.PutLabels(tmpLabels)
// Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs {
shard := shards[i]
if len(shard) == 0 {
continue
}
wg.Add(1)
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.TryPush(tss, forceDropSamplesOnFailure) {
anyPushFailed.Store(true)
}
}(rwctx, shard)
}
wg.Wait()
return !anyPushFailed.Load()
}
type tssShards struct {

View File

@@ -4,17 +4,14 @@ import (
"fmt"
"math"
"reflect"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
@@ -177,173 +174,3 @@ metric{env="dev"} 15
metric{env="bar"} 25
`)
}
func TestShardAmountRemoteWriteCtx(t *testing.T) {
// 1. distribute 100000 series to n nodes.
// 2. remove the last node from healthy list.
// 3. distribute the same 10000 series to (n-1) node again.
// 4. check active time series change rate:
// change rate must < (3/total nodes). e.g. +30% if 10 you have 10 nodes.
f := func(remoteWriteCount int, healthyIdx []int, replicas int) {
t.Helper()
defer func() {
rwctxsGlobal = nil
rwctxsGlobalIdx = nil
rwctxConsistentHashGlobal = nil
}()
rwctxsGlobal = make([]*remoteWriteCtx, remoteWriteCount)
rwctxsGlobalIdx = make([]int, remoteWriteCount)
rwctxs := make([]*remoteWriteCtx, 0, len(healthyIdx))
for i := range remoteWriteCount {
rwCtx := &remoteWriteCtx{
idx: i,
}
rwctxsGlobalIdx[i] = i
if i >= len(healthyIdx) {
rwctxsGlobal[i] = rwCtx
continue
}
hIdx := healthyIdx[i]
if hIdx != i {
rwctxs = append(rwctxs, &remoteWriteCtx{
idx: hIdx,
})
} else {
rwctxs = append(rwctxs, rwCtx)
}
rwctxsGlobal[i] = rwCtx
}
seriesCount := 100000
// build 1000000 series
tssBlock := make([]prompbmarshal.TimeSeries, 0, seriesCount)
for i := 0; i < seriesCount; i++ {
tssBlock = append(tssBlock, prompbmarshal.TimeSeries{
Labels: []prompbmarshal.Label{
{
Name: "label",
Value: strconv.Itoa(i),
},
},
Samples: []prompbmarshal.Sample{
{
Timestamp: 0,
Value: 0,
},
},
})
}
// build consistent hash for x remote write context
// build active time series set
nodes := make([]string, 0, remoteWriteCount)
activeTimeSeriesByNodes := make([]map[string]struct{}, remoteWriteCount)
for i := 0; i < remoteWriteCount; i++ {
nodes = append(nodes, fmt.Sprintf("node%d", i))
activeTimeSeriesByNodes[i] = make(map[string]struct{})
}
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(nodes, 0)
// create shards
x := getTSSShards(len(rwctxs))
shards := x.shards
// execute
shardAmountRemoteWriteCtx(tssBlock, shards, rwctxs, replicas)
for i, nodeIdx := range healthyIdx {
for _, ts := range shards[i] {
// add it to node[nodeIdx]'s active time series
activeTimeSeriesByNodes[nodeIdx][prompbmarshal.LabelsToString(ts.Labels)] = struct{}{}
}
}
totalActiveTimeSeries := 0
for _, activeTimeSeries := range activeTimeSeriesByNodes {
totalActiveTimeSeries += len(activeTimeSeries)
}
avgActiveTimeSeries1 := totalActiveTimeSeries / remoteWriteCount
putTSSShards(x)
// removed last node
rwctxs = rwctxs[:len(rwctxs)-1]
healthyIdx = healthyIdx[:len(healthyIdx)-1]
x = getTSSShards(len(rwctxs))
shards = x.shards
// execute
shardAmountRemoteWriteCtx(tssBlock, shards, rwctxs, replicas)
for i, nodeIdx := range healthyIdx {
for _, ts := range shards[i] {
// add it to node[nodeIdx]'s active time series
activeTimeSeriesByNodes[nodeIdx][prompbmarshal.LabelsToString(ts.Labels)] = struct{}{}
}
}
totalActiveTimeSeries = 0
for _, activeTimeSeries := range activeTimeSeriesByNodes {
totalActiveTimeSeries += len(activeTimeSeries)
}
avgActiveTimeSeries2 := totalActiveTimeSeries / remoteWriteCount
changed := math.Abs(float64(avgActiveTimeSeries2-avgActiveTimeSeries1) / float64(avgActiveTimeSeries1))
threshold := 3 / float64(remoteWriteCount)
if changed >= threshold {
t.Fatalf("average active time series before: %d, after: %d, changed: %.2f. threshold: %.2f", avgActiveTimeSeries1, avgActiveTimeSeries2, changed, threshold)
}
}
f(5, []int{0, 1, 2, 3, 4}, 1)
f(5, []int{0, 1, 2, 3, 4}, 2)
f(10, []int{0, 1, 2, 3, 4, 5, 6, 7, 9}, 1)
f(10, []int{0, 1, 2, 3, 4, 5, 6, 7, 9}, 3)
}
func TestCalculateHealthyRwctxIdx(t *testing.T) {
f := func(total int, healthyIdx []int, unhealthyIdx []int) {
t.Helper()
healthyMap := make(map[int]bool)
for _, idx := range healthyIdx {
healthyMap[idx] = true
}
rwctxsGlobal = make([]*remoteWriteCtx, total)
rwctxsGlobalIdx = make([]int, total)
rwctxs := make([]*remoteWriteCtx, 0, len(healthyIdx))
for i := range rwctxsGlobal {
rwctx := &remoteWriteCtx{idx: i}
rwctxsGlobal[i] = rwctx
if healthyMap[i] {
rwctxs = append(rwctxs, rwctx)
}
rwctxsGlobalIdx[i] = i
}
gotHealthyIdx, gotUnhealthyIdx := calculateHealthyRwctxIdx(rwctxs)
if !reflect.DeepEqual(healthyIdx, gotHealthyIdx) {
t.Errorf("calculateHealthyRwctxIdx want healthyIdx = %v, got %v", healthyIdx, gotHealthyIdx)
}
if !reflect.DeepEqual(unhealthyIdx, gotUnhealthyIdx) {
t.Errorf("calculateHealthyRwctxIdx want unhealthyIdx = %v, got %v", unhealthyIdx, gotUnhealthyIdx)
}
}
f(5, []int{0, 1, 2, 3, 4}, nil)
f(5, []int{0, 1, 2, 4}, []int{3})
f(5, []int{2, 4}, []int{0, 1, 3})
f(5, []int{0, 2, 4}, []int{1, 3})
f(5, []int{}, []int{0, 1, 2, 3, 4})
f(5, []int{4}, []int{0, 1, 2, 3})
f(1, []int{0}, nil)
f(1, []int{}, []int{0})
}

View File

@@ -16,55 +16,55 @@ import (
var (
// Global config
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
"are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval on "+
"aggregator before optional aggregation with -streamAggr.config . "+
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication")
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrGlobalIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+
"current aggregation interval for aggregator. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples")
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrGlobalIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for "+
"aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignore-aggregation-intervals-on-start")
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#dropping-unneeded-labels")
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrGlobalEnableWindows = flag.Bool("streamAggr.enableWindows", false, "Enables aggregation within fixed windows for all global aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
"See https://docs.victoriametrics.com/stream-aggregation/#aggregation-windows.")
// Per URL config
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/")
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication")
"with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples")
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flagutil.NewArrayInt("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+
"for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving buffered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignore-aggregation-intervals-on-start")
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+
"Multiple labels per remoteWrite.url must be delimited by '^^': -remoteWrite.streamAggr.dropInputLabels='replica^^az,replica'. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#dropping-unneeded-labels")
"See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
"See https://docs.victoriametrics.com/stream-aggregation/#aggregation-windows.")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.

View File

@@ -1,3 +1,3 @@
See vmalert-tool docs [here](https://docs.victoriametrics.com/victoriametrics/vmalert-tool/).
See vmalert-tool docs [here](https://docs.victoriametrics.com/vmalert-tool.html).
vmalert-tool docs can be edited at [docs/vmalert-tool.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmalert-tool.md).
vmalert-tool docs can be edited at [docs/vmalert-tool.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmalert-tool.md).

View File

@@ -9,5 +9,4 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
EXPOSE 8429
ENTRYPOINT ["/vmalert-tool-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmalert-tool-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmalert-tool-prod
COPY vmalert-tool-linux-${TARGETARCH}-prod ./vmalert-tool-prod

View File

@@ -74,7 +74,7 @@ test-vmalert:
go test -v -race -cover ./app/vmalert/notifier
go test -v -race -cover ./app/vmalert/config
go test -v -race -cover ./app/vmalert/remotewrite
go test -v -race -cover ./app/vmalert/vmalertutil
go test -v -race -cover ./app/vmalert/utils
run-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules2-good.rules \

View File

@@ -1,3 +1,3 @@
See vmalert docs [here](https://docs.victoriametrics.com/victoriametrics/vmalert/).
See vmalert docs [here](https://docs.victoriametrics.com/vmalert/).
vmalert docs can be edited at [docs/vmalert.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmalert.md).
vmalert docs can be edited at [docs/vmalert.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmalert.md).

View File

@@ -2,6 +2,7 @@ package config
import (
"bytes"
"crypto/md5"
"flag"
"fmt"
"hash/fnv"
@@ -10,16 +11,15 @@ import (
"sort"
"strings"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config/log"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"gopkg.in/yaml.v2"
)
var (
defaultRuleType = flag.String("rule.defaultRuleType", "prometheus", `Default type for rule expressions, can be overridden via "type" parameter on the group level, see https://docs.victoriametrics.com/victoriametrics/vmalert/#groups. Supported values: "graphite", "prometheus" and "vlogs".`)
defaultRuleType = flag.String("rule.defaultRuleType", "prometheus", `Default type for rule expressions, can be overridden via "type" parameter on the group level, see https://docs.victoriametrics.com/vmalert/#groups. Supported values: "graphite", "prometheus" and "vlogs".`)
)
// Group contains list of Rules grouped into
@@ -50,8 +50,6 @@ type Group struct {
NotifierHeaders []Header `yaml:"notifier_headers,omitempty"`
// EvalAlignment will make the timestamp of group query requests be aligned with interval
EvalAlignment *bool `yaml:"eval_alignment,omitempty"`
// Debug enables debug logs for the group
Debug bool `yaml:"debug,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]any `yaml:",inline"`
}
@@ -69,7 +67,7 @@ func (g *Group) UnmarshalYAML(unmarshal func(any) error) error {
if g.Type.Get() == "" {
g.Type = NewRawType(*defaultRuleType)
}
h := fnv.New64a()
h := md5.New()
h.Write(b)
g.Checksum = fmt.Sprintf("%x", h.Sum(nil))
return nil
@@ -145,7 +143,7 @@ type Rule struct {
KeepFiringFor *promutil.Duration `yaml:"keep_firing_for,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
Annotations map[string]string `yaml:"annotations,omitempty"`
Debug *bool `yaml:"debug,omitempty"`
Debug bool `yaml:"debug,omitempty"`
// UpdateEntriesLimit defines max number of rule's state updates stored in memory.
// Overrides `-rule.updateEntriesLimit`.
UpdateEntriesLimit *int `yaml:"update_entries_limit,omitempty"`

View File

@@ -44,7 +44,7 @@ Enterprise version of vmalert supports S3 and GCS paths to rules.
For example: gs://bucket/path/to/rules, s3://bucket/path/to/rules
S3 and GCS paths support only matching by prefix, e.g. s3://bucket/dir/rule_ matches
all files with prefix rule_ in folder dir.
See https://docs.victoriametrics.com/victoriametrics/vmalert/#reading-rules-from-object-storage
See https://docs.victoriametrics.com/vmalert/#reading-rules-from-object-storage
`)
ruleTemplatesPath = flagutil.NewArrayString("rule.templates", `Path or glob pattern to location with go template definitions `+
@@ -71,7 +71,7 @@ absolute path to all .tpl files in root.
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier. By default, hostname is used as address.")
externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager `+
`for cases where you want to build a custom link to Grafana, Prometheus or any other service. `+
`Supports templating - see https://docs.victoriametrics.com/victoriametrics/vmalert/#templating . `+
`Supports templating - see https://docs.victoriametrics.com/vmalert/#templating . `+
`For example, link to Grafana: -external.alert.source='explore?orgId=1&left={"datasource":"VictoriaMetrics","queries":[{"expr":{{.Expr|jsonEscape|queryEscape}},"refId":"A"}],"range":{"from":"now-1h","to":"now"}}'. `+
`Link to VMUI: -external.alert.source='vmui/#/?g0.expr={{.Expr|queryEscape}}'. `+
`If empty 'vmalert/alert?group_id={{.GroupID}}&alert_id={{.AlertID}}' is used.`)
@@ -182,9 +182,7 @@ func main() {
listenAddrs = []string{":8880"}
}
rh := &requestHandler{m: manager}
go httpserver.Serve(listenAddrs, rh.handler, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
})
go httpserver.Serve(listenAddrs, useProxyProtocol, rh.handler)
pushmetrics.Init()
sig := procutil.WaitForSigterm()
@@ -320,7 +318,7 @@ func usage() {
const s = `
vmalert processes alerts and recording rules.
See the docs at https://docs.victoriametrics.com/victoriametrics/vmalert/ .
See the docs at https://docs.victoriametrics.com/vmalert/ .
`
flagutil.Usage(s)
}

View File

@@ -9,5 +9,4 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
EXPOSE 8880
ENTRYPOINT ["/vmalert-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmalert-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmalert-prod
COPY vmalert-linux-${TARGETARCH}-prod ./vmalert-prod

View File

@@ -21,8 +21,6 @@ type Alert struct {
GroupID uint64
// Name represents Alert name
Name string
// Type defines the datasource type of the Alert
Type string
// Labels is the list of label-value pairs attached to the Alert
Labels map[string]string
// Annotations is the list of annotations generated on Alert evaluation
@@ -80,7 +78,6 @@ func (as AlertState) String() string {
// AlertTplData is used to execute templating
type AlertTplData struct {
Type string
Labels map[string]string
Value float64
Expr string
@@ -92,7 +89,6 @@ type AlertTplData struct {
var tplHeaders = []string{
"{{ $value := .Value }}",
"{{ $type := .Type }}",
"{{ $labels := .Labels }}",
"{{ $expr := .Expr }}",
"{{ $externalLabels := .ExternalLabels }}",
@@ -110,7 +106,6 @@ var tplHeaders = []string{
func (a *Alert) ExecTemplate(q templates.QueryFn, labels, annotations map[string]string) (map[string]string, error) {
tplData := AlertTplData{
Value: a.Value,
Type: a.Type,
Labels: labels,
Expr: a.Expr,
AlertID: a.ID,

View File

@@ -61,7 +61,7 @@ func TestAlertExecTemplate(t *testing.T) {
for k := range tplExpected {
got, exp := tpl[k], tplExpected[k]
if got != exp {
t.Fatalf("unexpected template for key=%q; \ngot %q; \nwant %q", k, got, exp)
t.Fatalf("unexpected template for key=%q; got %q; want %q", k, got, exp)
}
}
}
@@ -200,16 +200,6 @@ func TestAlertExecTemplate(t *testing.T) {
}, map[string]string{
"grafana_url": "vm-grafana.com?from=1660944898&to=1660937698",
})
// Datasource type
f(&Alert{
Type: "vlogs",
Expr: "up",
}, map[string]string{
"grafana_url": `vm-grafana.com/explore?left={"datasource":"{{ if eq .Type "vlogs" }}VictoriaLogs{{ else }}VictoriaMetrics{{ end }}","queries":[{"expr":"{{ .Expr }}"}]}`,
}, map[string]string{
"grafana_url": `vm-grafana.com/explore?left={"datasource":"VictoriaLogs","queries":[{"expr":"up"}]}`,
})
}
func TestAlert_toPromLabels(t *testing.T) {

View File

@@ -1,8 +1,8 @@
package notifier
import (
"crypto/md5"
"fmt"
"hash/fnv"
"net/url"
"os"
"path"
@@ -99,7 +99,7 @@ func (cfg *Config) UnmarshalYAML(unmarshal func(any) error) error {
if err != nil {
return fmt.Errorf("failed to marshal configuration for checksum: %w", err)
}
h := fnv.New64a()
h := md5.New()
h.Write(b)
cfg.Checksum = fmt.Sprintf("%x", h.Sum(nil))
return nil

View File

@@ -146,13 +146,8 @@ func (c *Client) Close() error {
return fmt.Errorf("client is already closed")
}
close(c.input)
start := time.Now()
logger.Infof("shutting down remote write client: flushing remained series")
close(c.doneCh)
c.wg.Wait()
logger.Infof("shutting down remote write client: finished in %v", time.Since(start))
return nil
}
@@ -161,16 +156,21 @@ func (c *Client) run(ctx context.Context) {
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained series")
shutdownFlushCnt := 0
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
shutdownFlushCnt += len(wr.Timeseries)
c.flush(lastCtx, wr)
}
}
// flush the last batch. `flush` will re-check and avoid flushing empty batch.
shutdownFlushCnt += len(wr.Timeseries)
c.flush(lastCtx, wr)
logger.Infof("shutting down remote write client flushed %d series", shutdownFlushCnt)
cancel()
}
c.wg.Add(1)

View File

@@ -24,7 +24,7 @@ var (
"and processing need to wait for previous rule results to be persisted by remote storage before evaluating the next rule."+
"Keep it equal or bigger than -remoteWrite.flushInterval.")
replayMaxDatapoints = flag.Int("replay.maxDatapointsPerQuery", 1e3,
"Max number of data points expected in one request. It affects the max time range for every '/query_range' request during the replay. The higher the value, the less requests will be made during replay.")
"Max number of data points expected in one request. It affects the max time range for every `/query_range` request during the replay. The higher the value, the less requests will be made during replay.")
replayRuleRetryAttempts = flag.Int("replay.ruleRetryAttempts", 5,
"Defines how many retries to make before giving up on rule if request for it returns an error.")
disableProgressBar = flag.Bool("replay.disableProgressBar", false, "Whether to disable rendering progress bars during the replay. "+

View File

@@ -124,10 +124,6 @@ func (arm *alertingRuleMetrics) close() {
// NewAlertingRule creates a new AlertingRule
func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule {
debug := group.Debug
if cfg.Debug != nil {
debug = *cfg.Debug
}
ar := &AlertingRule{
Type: group.Type,
RuleID: cfg.ID,
@@ -141,14 +137,14 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
GroupName: group.Name,
File: group.File,
EvalInterval: group.Interval,
Debug: debug,
Debug: cfg.Debug,
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
Debug: debug,
Debug: cfg.Debug,
}),
alerts: make(map[uint64]*notifier.Alert),
}
@@ -541,7 +537,6 @@ func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.Query
tplData := notifier.AlertTplData{
Value: m.Values[0],
Type: ar.Type.String(),
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
@@ -602,7 +597,6 @@ func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, labels, a
return &notifier.Alert{
GroupID: ar.GroupID,
Name: ar.Name,
Type: ar.Type.String(),
Expr: ar.Expr,
For: ar.For,
ActiveAt: start,

View File

@@ -653,7 +653,6 @@ func TestAlertingRuleExecRange(t *testing.T) {
hash(map[string]string{"alertname": "for-pending"}): {
GroupID: fakeGroup.GetID(),
Name: "for-pending",
Type: config.NewPrometheusType().String(),
Labels: map[string]string{"alertname": "for-pending"},
Annotations: map[string]string{"activeAt": "5000"},
State: notifier.StatePending,
@@ -673,7 +672,6 @@ func TestAlertingRuleExecRange(t *testing.T) {
hash(map[string]string{"alertname": "for-firing"}): {
GroupID: fakeGroup.GetID(),
Name: "for-firing",
Type: config.NewPrometheusType().String(),
Labels: map[string]string{"alertname": "for-firing"},
Annotations: map[string]string{"activeAt": "1000"},
State: notifier.StateFiring,
@@ -694,7 +692,6 @@ func TestAlertingRuleExecRange(t *testing.T) {
hash(map[string]string{"alertname": "for-hold-pending"}): {
GroupID: fakeGroup.GetID(),
Name: "for-hold-pending",
Type: config.NewPrometheusType().String(),
Labels: map[string]string{"alertname": "for-hold-pending"},
Annotations: map[string]string{"activeAt": "5000"},
State: notifier.StatePending,
@@ -750,7 +747,6 @@ func TestAlertingRuleExecRange(t *testing.T) {
hash(map[string]string{"alertname": "multi-series"}): {
GroupID: fakeGroup.GetID(),
Name: "multi-series",
Type: config.NewPrometheusType().String(),
Labels: map[string]string{"alertname": "multi-series"},
Annotations: map[string]string{},
State: notifier.StateFiring,
@@ -762,7 +758,6 @@ func TestAlertingRuleExecRange(t *testing.T) {
hash(map[string]string{"alertname": "multi-series", "foo": "bar"}): {
GroupID: fakeGroup.GetID(),
Name: "multi-series",
Type: config.NewPrometheusType().String(),
Labels: map[string]string{"alertname": "multi-series", "foo": "bar"},
Annotations: map[string]string{},
State: notifier.StatePending,

View File

@@ -30,8 +30,8 @@ var (
resendDelay = flag.Duration("rule.resendDelay", 0, "MiniMum amount of time to wait before resending an alert to notifier.")
maxResolveDuration = flag.Duration("rule.maxResolveDuration", 0, "Limits the maxiMum duration for automatic alert expiration, "+
"which by default is 4 times evaluationInterval of the parent group")
evalDelay = flag.Duration("rule.evalDelay", 30*time.Second, "Adjustment of the 'time' parameter for rule evaluation requests to compensate intentional data delay from the datasource. "+
"Normally, should be equal to '-search.latencyOffset' (cmd-line flag configured for VictoriaMetrics single-node or vmselect). "+
evalDelay = flag.Duration("rule.evalDelay", 30*time.Second, "Adjustment of the `time` parameter for rule evaluation requests to compensate intentional data delay from the datasource. "+
"Normally, should be equal to `-search.latencyOffset` (cmd-line flag configured for VictoriaMetrics single-node or vmselect). "+
"This doesn't apply to groups with eval_offset specified.")
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
remoteReadLookBack = flag.Duration("remoteRead.lookback", time.Hour, "Lookback defines how far to look into past for alerts timeseries. "+
@@ -57,7 +57,6 @@ type Group struct {
// checksum stores the hash of yaml definition for this group.
checksum string
LastEvaluation time.Time
Debug bool
Labels map[string]string
Params url.Values
@@ -119,7 +118,6 @@ func NewGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Headers: make(map[string]string),
NotifierHeaders: make(map[string]string),
Labels: cfg.Labels,
Debug: cfg.Debug,
evalAlignment: cfg.EvalAlignment,
doneCh: make(chan struct{}),
@@ -273,7 +271,6 @@ func (g *Group) updateWith(newGroup *Group) error {
g.Limit = newGroup.Limit
g.checksum = newGroup.checksum
g.Rules = newRules
g.Debug = newGroup.Debug
return nil
}
@@ -474,7 +471,6 @@ func (g *Group) DeepCopy() *Group {
newG := Group{}
_ = json.Unmarshal(data, &newG)
newG.Rules = g.Rules
newG.id = g.id
return &newG
}

View File

@@ -36,206 +36,144 @@ func TestMain(m *testing.M) {
}
func TestUpdateWith(t *testing.T) {
f := func(oldG, newG config.Group) {
f := func(currentRules, newRules []config.Rule) {
t.Helper()
ns := metrics.NewSet()
g := &Group{
Name: "test",
metrics: &groupMetrics{set: ns},
}
qb := &datasource.FakeQuerier{}
for i := range oldG.Rules {
oldG.Rules[i].ID = config.HashRule(oldG.Rules[i])
}
for i := range newG.Rules {
newG.Rules[i].ID = config.HashRule(newG.Rules[i])
for _, r := range currentRules {
r.ID = config.HashRule(r)
g.Rules = append(g.Rules, g.newRule(qb, r))
}
g := NewGroup(oldG, qb, 0, nil)
g.metrics = &groupMetrics{set: ns}
expect := NewGroup(newG, qb, 0, nil)
ng := &Group{
Name: "test",
}
for _, r := range newRules {
r.ID = config.HashRule(r)
ng.Rules = append(ng.Rules, ng.newRule(qb, r))
}
err := g.updateWith(expect)
err := g.updateWith(ng)
if err != nil {
t.Fatalf("cannot update rule: %s", err)
}
if len(g.Rules) != len(expect.Rules) {
t.Fatalf("expected to have %d rules; got: %d", len(expect.Rules), len(g.Rules))
if len(g.Rules) != len(newRules) {
t.Fatalf("expected to have %d rules; got: %d", len(g.Rules), len(newRules))
}
sort.Slice(g.Rules, func(i, j int) bool {
return g.Rules[i].ID() < g.Rules[j].ID()
})
sort.Slice(expect.Rules, func(i, j int) bool {
return expect.Rules[i].ID() < expect.Rules[j].ID()
sort.Slice(ng.Rules, func(i, j int) bool {
return ng.Rules[i].ID() < ng.Rules[j].ID()
})
for i, r := range g.Rules {
got, want := r, expect.Rules[i]
got, want := r, ng.Rules[i]
if got.ID() != want.ID() {
t.Fatalf("expected to have rule %q; got %q", want, got)
}
if err := CompareRules(t, got, want); err != nil {
t.Fatalf("comparison1 error: %s", err)
t.Fatalf("comparison error: %s", err)
}
}
if g.Debug != expect.Debug {
t.Fatalf("expected to have debug %v; got %v", expect.Debug, g.Debug)
}
}
// new rule
f(config.Group{}, config.Group{
Rules: []config.Rule{
{Alert: "bar"},
}})
f(nil, []config.Rule{
{Alert: "bar"},
})
// update alerting rule
f(config.Group{
Rules: []config.Rule{
{
Alert: "foo",
Expr: "up > 0",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"bar": "baz",
},
Annotations: map[string]string{
"summary": "{{ $value|humanize }}",
"description": "{{$labels}}",
},
},
{
Alert: "bar",
Expr: "up > 0",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"bar": "baz",
},
},
}}, config.Group{
Rules: []config.Rule{
{
Alert: "foo",
Expr: "up > 10",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"baz": "bar",
},
Annotations: map[string]string{
"summary": "none",
},
},
{
Alert: "bar",
Expr: "up > 0",
For: promutil.NewDuration(2 * time.Second),
KeepFiringFor: promutil.NewDuration(time.Minute),
Labels: map[string]string{
"bar": "baz",
},
},
}})
// update recording rule
debug := true
f(config.Group{
Rules: []config.Rule{{
Record: "foo",
Expr: "max(up)",
f([]config.Rule{
{
Alert: "foo",
Expr: "up > 0",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"bar": "baz",
},
}}}, config.Group{
Rules: []config.Rule{{
Record: "foo",
Expr: "min(up)",
Debug: &debug,
Annotations: map[string]string{
"summary": "{{ $value|humanize }}",
"description": "{{$labels}}",
},
},
{
Alert: "bar",
Expr: "up > 0",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"bar": "baz",
},
},
}, []config.Rule{
{
Alert: "foo",
Expr: "up > 10",
For: promutil.NewDuration(time.Second),
Labels: map[string]string{
"baz": "bar",
},
}}})
// update debug
f(config.Group{
Rules: []config.Rule{
{
Record: "foo",
Expr: "max(up)",
Annotations: map[string]string{
"summary": "none",
},
{
Alert: "foo",
Expr: "up > 0",
Debug: &debug,
For: promutil.NewDuration(time.Second),
},
}}, config.Group{
Rules: []config.Rule{
{
Record: "foo",
Expr: "max(up)",
Debug: &debug,
},
{
Alert: "foo",
Expr: "up > 0",
For: promutil.NewDuration(time.Second),
},
}})
// empty rule
f(config.Group{
Rules: []config.Rule{{Alert: "foo"}, {Record: "bar"}}}, config.Group{})
// multiple rules
f(config.Group{
Rules: []config.Rule{
{Alert: "bar"},
{Alert: "baz"},
{Alert: "foo"},
}}, config.Group{
Rules: []config.Rule{
{Alert: "baz"},
{Record: "foo"},
}})
// replace rule
f(config.Group{
Rules: []config.Rule{{Alert: "foo1"}}}, config.Group{
Rules: []config.Rule{{Alert: "foo2"}}})
// replace multiple rules
f(config.Group{
Rules: []config.Rule{
{Alert: "foo1"},
{Record: "foo2"},
{Alert: "foo3"},
}}, config.Group{
Rules: []config.Rule{
{Alert: "foo3"},
{Alert: "foo4"},
{Record: "foo5"},
}})
f(config.Group{Debug: false}, config.Group{Debug: true})
f(config.Group{
Debug: false,
Rules: []config.Rule{
{Alert: "foo1"},
},
}, config.Group{
Debug: true,
Rules: []config.Rule{
{Alert: "foo1"},
{
Alert: "bar",
Expr: "up > 0",
For: promutil.NewDuration(2 * time.Second),
KeepFiringFor: promutil.NewDuration(time.Minute),
Labels: map[string]string{
"bar": "baz",
},
},
})
f(config.Group{
Debug: false,
Rules: []config.Rule{
{Alert: "foo1"},
// update recording rule
f([]config.Rule{{
Record: "foo",
Expr: "max(up)",
Labels: map[string]string{
"bar": "baz",
},
}, config.Group{
Debug: false,
Rules: []config.Rule{
{Alert: "foo1", Debug: &debug},
}}, []config.Rule{{
Record: "foo",
Expr: "min(up)",
Debug: true,
Labels: map[string]string{
"baz": "bar",
},
}})
// empty rule
f([]config.Rule{{Alert: "foo"}, {Record: "bar"}}, nil)
// multiple rules
f([]config.Rule{
{Alert: "bar"},
{Alert: "baz"},
{Alert: "foo"},
}, []config.Rule{
{Alert: "baz"},
{Record: "foo"},
})
// replace rule
f([]config.Rule{{Alert: "foo1"}}, []config.Rule{{Alert: "foo2"}})
// replace multiple rules
f([]config.Rule{
{Alert: "foo1"},
{Record: "foo2"},
{Alert: "foo3"},
}, []config.Rule{
{Alert: "foo3"},
{Alert: "foo4"},
{Record: "foo5"},
})
}
@@ -338,7 +276,6 @@ func TestGroupStart(t *testing.T) {
summary: "{{ $value }}"
`
)
var groups []config.Group
err := yaml.Unmarshal([]byte(rules), &groups)
if err != nil {
@@ -539,7 +476,6 @@ func TestCloseWithEvalInterruption(t *testing.T) {
summary: "{{ $value }}"
`
)
var groups []config.Group
err := yaml.Unmarshal([]byte(rules), &groups)
if err != nil {

View File

@@ -83,10 +83,6 @@ func (rr *RecordingRule) ID() uint64 {
// NewRecordingRule creates a new RecordingRule
func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *RecordingRule {
debug := group.Debug
if cfg.Debug != nil {
debug = *cfg.Debug
}
rr := &RecordingRule{
Type: group.Type,
RuleID: cfg.ID,
@@ -96,14 +92,14 @@ func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
GroupID: group.GetID(),
GroupName: group.Name,
File: group.File,
Debug: debug,
Debug: cfg.Debug,
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
Debug: debug,
Debug: cfg.Debug,
}),
}
@@ -258,18 +254,12 @@ func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSer
Value: rr.Name,
})
}
// add extra labels configured by user
for k := range rr.Labels {
existingLabel := promrelabel.GetLabelByName(m.Labels, k)
if existingLabel != nil { // there is a conflict between extra and existing label
if existingLabel.Value == rr.Labels[k] {
// extra and existing labels are identical - do nothing
continue
}
// preserve existing label by adding "exported_" prefix
existingLabel.Name = fmt.Sprintf("exported_%s", existingLabel.Name)
prevLabel := promrelabel.GetLabelByName(m.Labels, k)
if prevLabel != nil && prevLabel.Value != rr.Labels[k] {
// Rename the prevLabel to "exported_" + label.Name
prevLabel.Name = fmt.Sprintf("exported_%s", prevLabel.Name)
}
// add extra label
m.Labels = append(m.Labels, prompbmarshal.Label{
Name: k,
Value: rr.Labels[k],
@@ -288,7 +278,6 @@ func (rr *RecordingRule) updateWith(r Rule) error {
rr.Expr = nr.Expr
rr.Labels = nr.Labels
rr.q = nr.q
rr.Debug = nr.Debug
return nil
}

View File

@@ -168,7 +168,6 @@ func TestRecordingRule_Exec(t *testing.T) {
}, [][]datasource.Metric{{
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin"),
metricWithValueAndLabels(t, 1, "__name__", "baz", "job", "baz", "source", "test"),
}}, [][]prompbmarshal.TimeSeries{{
newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompbmarshal.Label{
{
@@ -203,21 +202,6 @@ func TestRecordingRule_Exec(t *testing.T) {
Value: "origin",
},
}),
newTimeSeries([]float64{1}, []int64{ts.UnixNano()},
[]prompbmarshal.Label{
{
Name: "__name__",
Value: "job:foo",
},
{
Name: "job",
Value: "baz",
},
{
Name: "source",
Value: "test",
},
}),
}})
}

View File

@@ -35,7 +35,7 @@ type Rule interface {
registerMetrics(set *metrics.Set)
}
var errDuplicate = errors.New("result contains metrics with the same labelset during evaluation. See https://docs.victoriametrics.com/victoriametrics/vmalert/#series-with-the-same-labelset for details")
var errDuplicate = errors.New("result contains metrics with the same labelset during evaluation. See https://docs.victoriametrics.com/vmalert/#series-with-the-same-labelset for details")
type ruleState struct {
sync.RWMutex

File diff suppressed because one or more lines are too long

View File

@@ -1,101 +0,0 @@
html {
scroll-padding-top: 60px;
}
.nav-link svg {
fill: var(--bs-navbar-color);
}
.nav-link.active svg {
fill: var(--bs-navbar-active-color);
}
.nav-link:not(.active):hover svg {
fill: var(--bs-nav-link-hover-color);
}
.dropdown-menu {
--bs-dropdown-link-active-color: #000;
--bs-dropdown-link-active-bg: #e9e9e9;
& .dropdown-item.active {
font-weight: 700;
}
}
.dropdown-menu {
min-width: 0;
}
#search {
max-width: 400px;
}
body {
padding-top: 4.5rem;
}
.group-items {
cursor: pointer;
padding: 5px;
margin-top: 5px;
position: relative;
}
.btn svg, .dropdown-item svg {
fill: #000;
stroke: #000;
}
.btn:hover svg, .btn.show svg {
fill: #fff;
stroke: #fff;
}
.btn:has(svg) {
height: 38px;
}
.group-items:not(:has(.sub-item:not(.d-none))) {
display: none !important;
}
.group-items:hover {
background-color: #f8f9fa!important;
}
.table {
table-layout: fixed;
}
.table .error-cell {
word-break: break-word;
font-size: 14px;
}
pre {
overflow: scroll;
min-height: 30px;
max-width: 100%;
}
pre::-webkit-scrollbar {
-webkit-appearance: none;
width: 0px;
height: 5px;
}
pre::-webkit-scrollbar-thumb {
border-radius: 5px;
background-color: rgba(0,0,0,.5);
-webkit-box-shadow: 0 0 1px rgba(255,255,255,.5);
}
textarea.curl-area {
width: 100%;
line-height: 1;
font-size: 12px;
border: none;
margin: 0;
padding: 0;
overflow: scroll;
}

View File

@@ -1,37 +0,0 @@
<svg xmlns="http://www.w3.org/2000/svg">
<symbol id="vm" viewBox="0 0 464.61 533.89">
<path d="M192.16 234.72c9 7.67 24.12 13.49 39.3 13.69h1.68c15.18-.2 30.31-6 39.3-13.69 47.43-40.45 184.65-166.24 184.65-166.24C493.93 34.21 391.45.2 233.14.01h-1.68C73.15.2-29.33 34.21 7.46 68.48c.05 0 137.27 125.79 184.7 166.24"/><path d="M272.4 302.83c-9 7.67-24.12 13.5-39.3 13.7h-1.6c-15.18-.2-30.31-6-39.3-13.7-32.81-28-148.56-132.93-192.16-172.7v60.74c0 6.67 2.55 15.52 7.09 19.68 29.64 27.18 143.94 131.8 185.07 166.88 9 7.67 24.12 13.49 39.3 13.69h1.6c15.18-.2 30.31-6 39.3-13.69 41.13-35.08 155.43-139.7 185.07-166.88 4.54-4.16 7.09-13 7.09-19.68v-60.74c-43.6 39.82-159.35 144.72-192.16 172.7"/><path d="M272.4 445.59c-9 7.67-24.12 13.49-39.3 13.69h-1.6c-15.18-.2-30.31-6-39.3-13.69-32.81-28-148.56-132.94-192.16-172.7v60.73c0 6.67 2.55 15.53 7.09 19.69 29.64 27.17 143.94 131.8 185.07 166.87 9 7.67 24.12 13.5 39.3 13.7h1.6c15.18-.2 30.31-6 39.3-13.7 41.13-35.07 155.43-139.7 185.07-166.87 4.54-4.16 7.09-13 7.09-19.69v-60.73c-43.6 39.76-159.35 144.72-192.16 172.7"/>
</symbol>
<symbol id="collapse" viewBox="3 2 42 44">
<path d="M22.6 15.4a1.9 1.9 0 0 0 2.8 0l6-5.9a2.1 2.1 0 0 0 .2-2.7 1.9 1.9 0 0 0-3-.2L26 9.2V4a2 2 0 0 0-4 0v5.2l-2.6-2.6a1.9 1.9 0 0 0-3 .2 2.1 2.1 0 0 0 .2 2.7Zm2.8 17.2a1.9 1.9 0 0 0-2.8 0l-6 5.9a2.1 2.1 0 0 0-.2 2.7 1.9 1.9 0 0 0 3 .2l2.6-2.6V44a2 2 0 0 0 4 0v-5.2l2.6 2.6a1.9 1.9 0 0 0 3-.2 2.1 2.1 0 0 0-.2-2.7ZM6 22h36a2 2 0 0 0 0-4H6a2 2 0 0 0 0 4m36 4H6a2 2 0 0 0 0 4h36a2 2 0 0 0 0-4"/>
</symbol>
<symbol id="expand" viewBox="205.701 131.84 40.006 44.045">
<path d="M224.163 175.27a1.9 1.9 0 0 0 2.8 0l6-5.9a2.1 2.1 0 0 0 .2-2.7 1.9 1.9 0 0 0-3-.2l-2.6 2.6v-5.2c0-1.54-1.667-2.502-3-1.732-.619.357-1 1.017-1 1.732v5.2l-2.6-2.6a1.9 1.9 0 0 0-3 .2 2.1 2.1 0 0 0 .2 2.7zm-16.459-23.297h36c1.54 0 2.502-1.667 1.732-3a2 2 0 0 0-1.732-1h-36c-1.54 0-2.502 1.667-1.732 3 .357.619 1.017 1 1.732 1m36 4h-36c-1.54 0-2.502 1.667-1.732 3 .357.619 1.017 1 1.732 1h36c1.54 0 2.502-1.667 1.732-3a2 2 0 0 0-1.732-1m-16.59-23.517a1.9 1.9 0 0 0-2.8 0l-6 5.9a2.1 2.1 0 0 0-.2 2.7 1.9 1.9 0 0 0 3 .2l2.6-2.6v5.2c0 1.54 1.667 2.502 3 1.732.619-.357 1-1.017 1-1.732v-5.2l2.6 2.6a1.9 1.9 0 0 0 3-.2 2.1 2.1 0 0 0-.2-2.7z"/>
</symbol>
<symbol id="filter" viewBox="-10 -10 320 310">
<path d="M288.953 0h-277c-5.522 0-10 4.478-10 10v49.531c0 5.522 4.478 10 10 10h12.372l91.378 107.397v113.978a10 10 0 0 0 15.547 8.32l49.5-33a10 10 0 0 0 4.453-8.32v-80.978l91.378-107.397h12.372c5.522 0 10-4.478 10-10V10c0-5.522-4.477-10-10-10M167.587 166.77a10 10 0 0 0-2.384 6.48v79.305l-29.5 19.666V173.25a10 10 0 0 0-2.384-6.48L50.585 69.531h199.736zM278.953 49.531h-257V20h257z"/>
</symbol>
<symbol id="search" viewBox="0 0 490.4 490.4">
<path d="m484.1 454.796-110.5-110.6c29.8-36.3 47.6-82.8 47.6-133.4 0-116.3-94.3-210.6-210.6-210.6S0 94.496 0 210.796s94.3 210.6 210.6 210.6c50.8 0 97.4-18 133.8-48l110.5 110.5c12.9 11.8 25 4.2 29.2 0 8.4-8.3 8.4-20.8 0-29.1m-443-244c0-93.6 75.9-169.5 169.5-169.5s169.6 75.9 169.6 169.5-75.9 169.5-169.5 169.5-169.6-75.9-169.6-169.5"/>
</symbol>
<symbol id="all" viewBox="-10 -10 320 310">
<path d="M288.953 0h-277c-5.522 0-10 4.478-10 10v49.531c0 5.522 4.478 10 10 10h12.372l91.378 107.397v113.978a10 10 0 0 0 15.547 8.32l49.5-33a10 10 0 0 0 4.453-8.32v-80.978l91.378-107.397h12.372c5.522 0 10-4.478 10-10V10c0-5.522-4.477-10-10-10M167.587 166.77a10 10 0 0 0-2.384 6.48v79.305l-29.5 19.666V173.25a10 10 0 0 0-2.384-6.48L50.585 69.531h199.736zM278.953 49.531h-257V20h257z"/><path stroke-width="20" stroke="#000" d="m-10-10 310 310"/>
</symbol>
<symbol id="nomatch" viewBox="0 0 116.623 116.623">
<path d="M99.114 89.251H57.783l19.181-41.797h22.149a5.971 5.971 0 1 0 0-11.942H82.444L94.591 9.049a5.97 5.97 0 0 0-2.938-7.917 5.97 5.97 0 0 0-7.918 2.936L69.304 35.513H41.662c-2.289-.458-17.685-4.48-17.685-29.542a5.97 5.97 0 1 0-11.942 0c0 18.536 7.157 29.533 14.93 35.514-7.774 5.981-14.93 16.978-14.93 35.514a5.971 5.971 0 1 0 11.942 0c0-25.007 14.886-29.013 17.677-29.542h22.17L44.643 89.253H17.509a5.972 5.972 0 0 0 0 11.943h21.653l-3.196 6.965a5.972 5.972 0 1 0 10.856 4.981l5.479-11.943h46.812a5.975 5.975 0 0 0 .001-11.948"/>
</symbol>
<symbol id="exclamation" viewBox="0 0 16 16">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</symbol>
<symbol id="unhealthy" viewBox="0 0 14 14">
<circle cx="7" cy="7" r="6.5" stroke="#000" fill="none"/><path stroke-linecap="round" stroke="#000" stroke-width="1.5" d="m5 5 4 4M5 9l4-4"/>
</symbol>
</svg>

Before

Width:  |  Height:  |  Size: 4.7 KiB

File diff suppressed because one or more lines are too long

View File

@@ -1,32 +1,38 @@
function actionAll(isCollapse) {
document.querySelectorAll('.collapse').forEach((collapse) => {
if (isCollapse) {
collapse.classList.remove('show');
} else {
collapse.classList.add('show');
function expandAll() {
$('.group-heading').each(function () {
let style = $(this).attr("style")
// display only elements that are currently visible
if (style === "display: none;") {
return
}
$(this).next().addClass('show')
});
}
function groupFilter(key) {
if (key) {
location.href = `?filter=${key}`;
} else {
window.location = window.location.pathname;
function collapseAll() {
$('.collapse').removeClass('show');
}
function showByID(id) {
if (!id) {
return
}
let parent = $("#" + id).parent();
if (!parent) {
return
}
let target = $("#" + parent.attr("data-bs-target"));
if (target.length > 0) {
target.addClass('show');
}
}
function showBySelector(selector) {
if (!selector) {
return
}
const control = document.querySelector(`${selector} [data-bs-target]`);
if (!control) {
return
}
let target = document.getElementById(control.getAttribute('data-bs-target').slice(1));
if (target) {
target.classList.add('show');
function toggleByID(id) {
if (id) {
let el = $("#" + id);
if (el.length > 0) {
el.click();
}
}
}
@@ -40,16 +46,28 @@ function debounce(func, delay) {
};
}
$('#search').on("keyup", debounce(search, 500));
// search shows or hides groups&rules that satisfy the search phrase.
// case-insensitive, respects GET param `search`.
function search() {
let searchBox = document.getElementById('search');
if (!searchBox) {
return;
}
const searchPhrase = searchBox.value.toLowerCase();
$(".rule").show();
filterRules(searchPhrase);
let groupHeader = $(".group-heading")
let searchPhrase = $("#search").val().toLowerCase()
if (searchPhrase.length === 0) {
groupHeader.show()
setParamURL('search', '')
return
}
$(".rule-table").removeClass('show');
groupHeader.hide()
searchPhrase = searchPhrase.toLowerCase()
filterRuleByName(searchPhrase);
filterRuleByLabels(searchPhrase);
filterGroupsByName(searchPhrase);
setParamURL('search', searchPhrase)
}
@@ -65,51 +83,81 @@ function getParamURL(key) {
return url.searchParams.get(key)
}
function filterRules(searchPhrase) {
document.querySelectorAll('.sub-items').forEach((rules) => {
let found = false;
rules.querySelectorAll('.sub-item').forEach((rule) => {
if (searchPhrase) {
const ruleName = rule.innerText.toLowerCase();
const matches = []
const hasValue = ruleName.indexOf(searchPhrase) >= 0;
rule.querySelectorAll('.label').forEach((label) => {
const text = label.innerText.toLowerCase();
if (text.indexOf(searchPhrase) >= 0) {
matches.push(text);
}
});
if (!matches.length && !hasValue) {
rule.classList.add('d-none');
return;
}
}
rule.classList.remove('d-none');
found = true;
});
if (found && searchPhrase || !searchPhrase) {
rules.classList.remove('d-none');
} else {
rules.classList.add('d-none');
function filterGroupsByName(searchPhrase) {
$(".group-heading").each(function () {
const groupName = $(this).attr('data-group-name').toLowerCase();
const hasValue = groupName.indexOf(searchPhrase) >= 0
if (!hasValue) {
return
}
const target = $(this).attr("data-bs-target");
$(`div[id="${target}"] .rule`).show();
$(this).show();
});
}
function filterRuleByName(searchPhrase) {
$(".rule").each(function () {
const ruleName = $(this).attr("data-rule-name").toLowerCase();
const hasValue = ruleName.indexOf(searchPhrase) >= 0
if (!hasValue) {
$(this).hide();
return
}
const target = $(this).attr('data-bs-target')
$(`#rules-${target}`).addClass('show');
$(`div[data-bs-target='rules-${target}']`).show();
$(this).show();
});
}
function filterRuleByLabels(searchPhrase) {
$(".rule").each(function () {
const matches = $(".label", this).filter(function () {
const label = $(this).text().toLowerCase();
return label.indexOf(searchPhrase) >= 0;
}).length;
if (matches > 0) {
const target = $(this).attr('data-bs-target')
$(`#rules-${target}`).addClass('show');
$(`div[data-bs-target='rules-${target}']`).show();
$(this).show();
}
});
}
document.addEventListener('DOMContentLoaded', () => {
$(document).ready(function () {
$(".group-heading a").click(function (e) {
e.stopPropagation(); // prevent collapse logic on link click
let target = $(this).attr('href');
if (target.length > 0) {
toggleByID(target.substr(1));
}
});
$(".group-heading").click(function (e) {
let target = $(this).attr('data-bs-target');
let el = $("#" + target);
new bootstrap.Collapse(el, {
toggle: true
});
});
// update search element with value from URL, if any
const searchPhrase = getParamURL('search')
const searchBox = document.getElementById('search');
if (searchBox) {
searchBox.addEventListener('keyup', debounce(search, 500));
searchBox.value = searchPhrase;
}
let searchPhrase = getParamURL('search')
$("#search").val(searchPhrase)
// apply filtering by search phrase
search()
showBySelector(window.location.hash);
document.querySelectorAll('[data-bs-toggle="tooltip"]').forEach((tooltip) => {
new bootstrap.Tooltip(tooltip);
});
let hash = window.location.hash.substr(1);
showByID(hash);
});
$(document).ready(function () {
$('[data-bs-toggle="tooltip"]').tooltip();
});

File diff suppressed because one or more lines are too long

View File

@@ -1,10 +1,16 @@
{% import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
) %}
{% func Footer(r *http.Request) %}
{%code prefix := vmalertutil.Prefix(r.URL.Path) %}
</main>
<script src="{%s prefix %}static/js/jquery-3.6.0.min.js" type="text/javascript"></script>
<script src="{%s prefix %}static/js/bootstrap.bundle.min.js" type="text/javascript"></script>
<script src="{%s prefix %}static/js/custom.js" type="text/javascript"></script>
</body>
</html>
{% endfunc %}

View File

@@ -7,54 +7,77 @@ package tpl
//line app/vmalert/tpl/footer.qtpl:1
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
)
//line app/vmalert/tpl/footer.qtpl:6
//line app/vmalert/tpl/footer.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/tpl/footer.qtpl:6
//line app/vmalert/tpl/footer.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/tpl/footer.qtpl:6
//line app/vmalert/tpl/footer.qtpl:8
func StreamFooter(qw422016 *qt422016.Writer, r *http.Request) {
//line app/vmalert/tpl/footer.qtpl:6
//line app/vmalert/tpl/footer.qtpl:8
qw422016.N().S(`
`)
//line app/vmalert/tpl/footer.qtpl:9
prefix := vmalertutil.Prefix(r.URL.Path)
//line app/vmalert/tpl/footer.qtpl:9
qw422016.N().S(`
</main>
<script src="`)
//line app/vmalert/tpl/footer.qtpl:11
qw422016.E().S(prefix)
//line app/vmalert/tpl/footer.qtpl:11
qw422016.N().S(`static/js/jquery-3.6.0.min.js" type="text/javascript"></script>
<script src="`)
//line app/vmalert/tpl/footer.qtpl:12
qw422016.E().S(prefix)
//line app/vmalert/tpl/footer.qtpl:12
qw422016.N().S(`static/js/bootstrap.bundle.min.js" type="text/javascript"></script>
<script src="`)
//line app/vmalert/tpl/footer.qtpl:13
qw422016.E().S(prefix)
//line app/vmalert/tpl/footer.qtpl:13
qw422016.N().S(`static/js/custom.js" type="text/javascript"></script>
</body>
</html>
`)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
}
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
func WriteFooter(qq422016 qtio422016.Writer, r *http.Request) {
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
StreamFooter(qw422016, r)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
}
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
func Footer(r *http.Request) string {
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
WriteFooter(qb422016, r)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
return qs422016
//line app/vmalert/tpl/footer.qtpl:10
//line app/vmalert/tpl/footer.qtpl:16
}

View File

@@ -1,7 +1,7 @@
{% import (
"net/http"
"net/url"
"path"
"net/url"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
) %}
@@ -13,13 +13,65 @@
<head>
<title>vmalert{% if title != "" %} - {%s title %}{% endif %}</title>
<link href="{%s prefix %}static/css/bootstrap.min.css" rel="stylesheet" />
<link href="{%s prefix %}static/css/custom.css" rel="stylesheet" />
<script src="{%s prefix %}static/js/bootstrap.bundle.min.js" type="text/javascript" defer></script>
<script src="{%s prefix %}static/js/custom.js" type="text/javascript" defer></script>
<style>
body{
min-height: 75rem;
padding-top: 4.5rem;
}
.group-heading {
cursor: pointer;
padding: 5px;
margin-top: 5px;
position: relative;
}
.group-heading .anchor {
position:absolute;
top:-60px;
}
.group-heading span {
float: right;
margin-left: 5px;
margin-right: 5px;
}
.group-heading:hover {
background-color: #f8f9fa!important;
}
.table {
table-layout: fixed;
}
.table .error-cell{
word-break: break-word;
font-size: 14px;
}
pre {
overflow: scroll;
min-height: 30px;
max-width: 100%;
}
pre::-webkit-scrollbar {
-webkit-appearance: none;
width: 0px;
height: 5px;
}
pre::-webkit-scrollbar-thumb {
border-radius: 5px;
background-color: rgba(0,0,0,.5);
-webkit-box-shadow: 0 0 1px rgba(255,255,255,.5);
}
textarea.curl-area{
width: 100%;
line-height: 1;
font-size: 12px;
border: none;
margin: 0;
padding: 0;
overflow: scroll;
}
</style>
</head>
<body>
{%= printNavItems(r, title, navItems, userErr) %}
<main class="px-4 py-2">
<main class="px-2">
{%= errorBody(userErr) %}
{% endfunc %}
@@ -27,27 +79,25 @@
{% code
type NavItem struct {
Name string
URL string
Icon string
Url string
}
%}
{% func printNavItems(r *http.Request, current string, items []NavItem, userErr error) %}
{%code prefix := vmalertutil.Prefix(r.URL.Path) %}
<nav class="navbar navbar-expand navbar-dark fixed-top bg-dark">
{%code
prefix := vmalertutil.Prefix(r.URL.Path)
%}
<nav class="navbar navbar-expand-md navbar-dark fixed-top bg-dark">
<div class="container-fluid">
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto mb-0 align-items-center">
<ul class="navbar-nav me-auto mb-2 mb-md-0">
{% for _, item := range items %}
<li class="nav-item">
{% code
u, _ := url.Parse(item.URL)
u, _ := url.Parse(item.Url)
%}
<a class="d-flex gap-2 align-items-center nav-link{% if current == item.Name %} active{% endif %}"
href="{% if u.IsAbs() %}{%s item.URL %}{% else %}{%s path.Join(prefix, item.URL) %}{% endif %}">
{% if item.Icon != "" %}
<svg width="40" height="40"><use href="{%s prefix %}static/icons/icons.svg#{%s item.Icon %}"></svg>
{% endif %}
<a class="nav-link{% if current == item.Name %} active{% endif %}"
href="{% if u.IsAbs() %}{%s item.Url %}{% else %}{%s path.Join(prefix,item.Url) %}{% endif %}">
{%s item.Name %}
</a>
</li>

View File

@@ -55,196 +55,216 @@ func StreamHeader(qw422016 *qt422016.Writer, r *http.Request, navItems []NavItem
qw422016.E().S(prefix)
//line app/vmalert/tpl/header.qtpl:15
qw422016.N().S(`static/css/bootstrap.min.css" rel="stylesheet" />
<link href="`)
//line app/vmalert/tpl/header.qtpl:16
qw422016.E().S(prefix)
//line app/vmalert/tpl/header.qtpl:16
qw422016.N().S(`static/css/custom.css" rel="stylesheet" />
<script src="`)
//line app/vmalert/tpl/header.qtpl:17
qw422016.E().S(prefix)
//line app/vmalert/tpl/header.qtpl:17
qw422016.N().S(`static/js/bootstrap.bundle.min.js" type="text/javascript" defer></script>
<script src="`)
//line app/vmalert/tpl/header.qtpl:18
qw422016.E().S(prefix)
//line app/vmalert/tpl/header.qtpl:18
qw422016.N().S(`static/js/custom.js" type="text/javascript" defer></script>
<style>
body{
min-height: 75rem;
padding-top: 4.5rem;
}
.group-heading {
cursor: pointer;
padding: 5px;
margin-top: 5px;
position: relative;
}
.group-heading .anchor {
position:absolute;
top:-60px;
}
.group-heading span {
float: right;
margin-left: 5px;
margin-right: 5px;
}
.group-heading:hover {
background-color: #f8f9fa!important;
}
.table {
table-layout: fixed;
}
.table .error-cell{
word-break: break-word;
font-size: 14px;
}
pre {
overflow: scroll;
min-height: 30px;
max-width: 100%;
}
pre::-webkit-scrollbar {
-webkit-appearance: none;
width: 0px;
height: 5px;
}
pre::-webkit-scrollbar-thumb {
border-radius: 5px;
background-color: rgba(0,0,0,.5);
-webkit-box-shadow: 0 0 1px rgba(255,255,255,.5);
}
textarea.curl-area{
width: 100%;
line-height: 1;
font-size: 12px;
border: none;
margin: 0;
padding: 0;
overflow: scroll;
}
</style>
</head>
<body>
`)
//line app/vmalert/tpl/header.qtpl:21
//line app/vmalert/tpl/header.qtpl:73
streamprintNavItems(qw422016, r, title, navItems, userErr)
//line app/vmalert/tpl/header.qtpl:21
//line app/vmalert/tpl/header.qtpl:73
qw422016.N().S(`
<main class="px-4 py-2">
<main class="px-2">
`)
//line app/vmalert/tpl/header.qtpl:23
//line app/vmalert/tpl/header.qtpl:75
streamerrorBody(qw422016, userErr)
//line app/vmalert/tpl/header.qtpl:23
//line app/vmalert/tpl/header.qtpl:75
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
}
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
func WriteHeader(qq422016 qtio422016.Writer, r *http.Request, navItems []NavItem, title string, userErr error) {
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
StreamHeader(qw422016, r, navItems, title, userErr)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
}
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
func Header(r *http.Request, navItems []NavItem, title string, userErr error) string {
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
WriteHeader(qb422016, r, navItems, title, userErr)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
return qs422016
//line app/vmalert/tpl/header.qtpl:24
//line app/vmalert/tpl/header.qtpl:76
}
//line app/vmalert/tpl/header.qtpl:28
//line app/vmalert/tpl/header.qtpl:80
type NavItem struct {
Name string
URL string
Icon string
Url string
}
//line app/vmalert/tpl/header.qtpl:35
//line app/vmalert/tpl/header.qtpl:86
func streamprintNavItems(qw422016 *qt422016.Writer, r *http.Request, current string, items []NavItem, userErr error) {
//line app/vmalert/tpl/header.qtpl:35
//line app/vmalert/tpl/header.qtpl:86
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:36
//line app/vmalert/tpl/header.qtpl:88
prefix := vmalertutil.Prefix(r.URL.Path)
//line app/vmalert/tpl/header.qtpl:36
//line app/vmalert/tpl/header.qtpl:89
qw422016.N().S(`
<nav class="navbar navbar-expand navbar-dark fixed-top bg-dark">
<nav class="navbar navbar-expand-md navbar-dark fixed-top bg-dark">
<div class="container-fluid">
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto mb-0 align-items-center">
<ul class="navbar-nav me-auto mb-2 mb-md-0">
`)
//line app/vmalert/tpl/header.qtpl:41
//line app/vmalert/tpl/header.qtpl:94
for _, item := range items {
//line app/vmalert/tpl/header.qtpl:41
//line app/vmalert/tpl/header.qtpl:94
qw422016.N().S(`
<li class="nav-item">
`)
//line app/vmalert/tpl/header.qtpl:44
u, _ := url.Parse(item.URL)
//line app/vmalert/tpl/header.qtpl:97
u, _ := url.Parse(item.Url)
//line app/vmalert/tpl/header.qtpl:45
//line app/vmalert/tpl/header.qtpl:98
qw422016.N().S(`
<a class="d-flex gap-2 align-items-center nav-link`)
//line app/vmalert/tpl/header.qtpl:46
<a class="nav-link`)
//line app/vmalert/tpl/header.qtpl:99
if current == item.Name {
//line app/vmalert/tpl/header.qtpl:46
//line app/vmalert/tpl/header.qtpl:99
qw422016.N().S(` active`)
//line app/vmalert/tpl/header.qtpl:46
//line app/vmalert/tpl/header.qtpl:99
}
//line app/vmalert/tpl/header.qtpl:46
//line app/vmalert/tpl/header.qtpl:99
qw422016.N().S(`"
href="`)
//line app/vmalert/tpl/header.qtpl:47
//line app/vmalert/tpl/header.qtpl:100
if u.IsAbs() {
//line app/vmalert/tpl/header.qtpl:47
qw422016.E().S(item.URL)
//line app/vmalert/tpl/header.qtpl:47
//line app/vmalert/tpl/header.qtpl:100
qw422016.E().S(item.Url)
//line app/vmalert/tpl/header.qtpl:100
} else {
//line app/vmalert/tpl/header.qtpl:47
qw422016.E().S(path.Join(prefix, item.URL))
//line app/vmalert/tpl/header.qtpl:47
//line app/vmalert/tpl/header.qtpl:100
qw422016.E().S(path.Join(prefix, item.Url))
//line app/vmalert/tpl/header.qtpl:100
}
//line app/vmalert/tpl/header.qtpl:47
//line app/vmalert/tpl/header.qtpl:100
qw422016.N().S(`">
`)
//line app/vmalert/tpl/header.qtpl:48
if item.Icon != "" {
//line app/vmalert/tpl/header.qtpl:48
qw422016.N().S(`
<svg width="40" height="40"><use href="`)
//line app/vmalert/tpl/header.qtpl:49
qw422016.E().S(prefix)
//line app/vmalert/tpl/header.qtpl:49
qw422016.N().S(`static/icons/icons.svg#`)
//line app/vmalert/tpl/header.qtpl:49
qw422016.E().S(item.Icon)
//line app/vmalert/tpl/header.qtpl:49
qw422016.N().S(`"></svg>
`)
//line app/vmalert/tpl/header.qtpl:50
}
//line app/vmalert/tpl/header.qtpl:50
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:51
//line app/vmalert/tpl/header.qtpl:101
qw422016.E().S(item.Name)
//line app/vmalert/tpl/header.qtpl:51
//line app/vmalert/tpl/header.qtpl:101
qw422016.N().S(`
</a>
</li>
`)
//line app/vmalert/tpl/header.qtpl:54
//line app/vmalert/tpl/header.qtpl:104
}
//line app/vmalert/tpl/header.qtpl:54
//line app/vmalert/tpl/header.qtpl:104
qw422016.N().S(`
</ul>
</div>
`)
//line app/vmalert/tpl/header.qtpl:57
//line app/vmalert/tpl/header.qtpl:107
streamerrorIcon(qw422016, userErr)
//line app/vmalert/tpl/header.qtpl:57
//line app/vmalert/tpl/header.qtpl:107
qw422016.N().S(`
</nav>
`)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
}
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
func writeprintNavItems(qq422016 qtio422016.Writer, r *http.Request, current string, items []NavItem, userErr error) {
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
streamprintNavItems(qw422016, r, current, items, userErr)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
}
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
func printNavItems(r *http.Request, current string, items []NavItem, userErr error) string {
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
writeprintNavItems(qb422016, r, current, items, userErr)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
return qs422016
//line app/vmalert/tpl/header.qtpl:59
//line app/vmalert/tpl/header.qtpl:109
}
//line app/vmalert/tpl/header.qtpl:61
//line app/vmalert/tpl/header.qtpl:111
func streamerrorIcon(qw422016 *qt422016.Writer, err error) {
//line app/vmalert/tpl/header.qtpl:61
//line app/vmalert/tpl/header.qtpl:111
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:62
//line app/vmalert/tpl/header.qtpl:112
if err != nil {
//line app/vmalert/tpl/header.qtpl:62
//line app/vmalert/tpl/header.qtpl:112
qw422016.N().S(`
<div class="d-flex" data-bs-toggle="tooltip" data-bs-placement="left" title="Configuration file failed to reload! Click to see more details.">
<a type="button" data-bs-toggle="collapse" href="#reload-groups-error">
@@ -256,89 +276,89 @@ func streamerrorIcon(qw422016 *qt422016.Writer, err error) {
</a>
</div>
`)
//line app/vmalert/tpl/header.qtpl:72
//line app/vmalert/tpl/header.qtpl:122
}
//line app/vmalert/tpl/header.qtpl:72
//line app/vmalert/tpl/header.qtpl:122
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
}
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
func writeerrorIcon(qq422016 qtio422016.Writer, err error) {
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
streamerrorIcon(qw422016, err)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
}
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
func errorIcon(err error) string {
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
writeerrorIcon(qb422016, err)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
return qs422016
//line app/vmalert/tpl/header.qtpl:73
//line app/vmalert/tpl/header.qtpl:123
}
//line app/vmalert/tpl/header.qtpl:75
//line app/vmalert/tpl/header.qtpl:125
func streamerrorBody(qw422016 *qt422016.Writer, err error) {
//line app/vmalert/tpl/header.qtpl:75
//line app/vmalert/tpl/header.qtpl:125
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:76
//line app/vmalert/tpl/header.qtpl:126
if err != nil {
//line app/vmalert/tpl/header.qtpl:76
//line app/vmalert/tpl/header.qtpl:126
qw422016.N().S(`
<div class="collapse mt-2 mb-2" id="reload-groups-error">
<div class="card card-body">
`)
//line app/vmalert/tpl/header.qtpl:79
//line app/vmalert/tpl/header.qtpl:129
qw422016.E().S(err.Error())
//line app/vmalert/tpl/header.qtpl:79
//line app/vmalert/tpl/header.qtpl:129
qw422016.N().S(`
</div>
</div>
`)
//line app/vmalert/tpl/header.qtpl:82
//line app/vmalert/tpl/header.qtpl:132
}
//line app/vmalert/tpl/header.qtpl:82
//line app/vmalert/tpl/header.qtpl:132
qw422016.N().S(`
`)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
}
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
func writeerrorBody(qq422016 qtio422016.Writer, err error) {
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
streamerrorBody(qw422016, err)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
}
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
func errorBody(err error) string {
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
writeerrorBody(qb422016, err)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
qs422016 := string(qb422016.B)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
return qs422016
//line app/vmalert/tpl/header.qtpl:83
//line app/vmalert/tpl/header.qtpl:133
}

View File

@@ -11,7 +11,7 @@ const prefix = "/vmalert/"
// Prefix returns "/vmalert/" prefix if it is missing in the path.
func Prefix(path string) string {
pp := strings.TrimRight(httpserver.GetPathPrefix(), "/")
pp := httpserver.GetPathPrefix()
path = strings.TrimLeft(path, pp)
if strings.HasPrefix(path, prefix) {
return pp

View File

@@ -5,7 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
@@ -30,17 +30,16 @@ var (
{fmt.Sprintf("api/v1/alert?%s=<int>&%s=<int>", paramGroupID, paramAlertID), "get alert status by group and alert ID"},
}
systemLinks = [][2]string{
{"vmalert/groups", "UI"},
{"flags", "command-line flags"},
{"metrics", "list of application metrics"},
{"-/reload", "reload configuration"},
}
navItems = []tpl.NavItem{
{Name: "vmalert", URL: "../vmalert", Icon: "vm"},
{Name: "Groups", URL: "groups"},
{Name: "Alerts", URL: "alerts"},
{Name: "Notifiers", URL: "notifiers"},
{Name: "Docs", URL: "https://docs.victoriametrics.com/victoriametrics/vmalert/"},
{Name: "vmalert", Url: "../vmalert"},
{Name: "Groups", Url: "groups"},
{Name: "Alerts", Url: "alerts"},
{Name: "Notifiers", Url: "notifiers"},
{Name: "Docs", Url: "https://docs.victoriametrics.com/vmalert/"},
}
)
@@ -89,10 +88,10 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
WriteRuleDetails(w, r, rule)
return true
case "/vmalert/groups":
filter := r.URL.Query().Get("filter")
rf := extractRulesFilter(r, filter)
data := rh.groups(rf)
WriteListGroups(w, r, data, filter)
var data []apiGroup
rf := extractRulesFilter(r)
data = rh.groups(rf)
WriteListGroups(w, r, data)
return true
case "/vmalert/notifiers":
WriteListTargets(w, r, notifier.GetTargets())
@@ -104,10 +103,9 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
// Grafana makes an extra request to `/rules`
// handler in addition to `/api/v1/rules` calls in alerts UI,
var data []apiGroup
filter := r.URL.Query().Get("filter")
rf := extractRulesFilter(r, filter)
rf := extractRulesFilter(r)
data = rh.groups(rf)
WriteListGroups(w, r, data, filter)
WriteListGroups(w, r, data)
return true
case "/vmalert/api/v1/rules", "/api/v1/rules":
@@ -115,8 +113,7 @@ func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
var data []byte
var err error
filter := r.URL.Query().Get("filter")
rf := extractRulesFilter(r, filter)
rf := extractRulesFilter(r)
data, err = rh.listGroups(rf)
if err != nil {
@@ -229,11 +226,9 @@ type rulesFilter struct {
ruleNames []string
ruleType string
excludeAlerts bool
onlyUnhealthy bool
onlyNoMatch bool
}
func extractRulesFilter(r *http.Request, filter string) rulesFilter {
func extractRulesFilter(r *http.Request) rulesFilter {
rf := rulesFilter{}
var ruleType string
@@ -251,12 +246,6 @@ func extractRulesFilter(r *http.Request, filter string) rulesFilter {
rf.ruleNames = append([]string{}, r.Form["rule_name[]"]...)
rf.groupNames = append([]string{}, r.Form["rule_group[]"]...)
rf.files = append([]string{}, r.Form["file[]"]...)
switch filter {
case "unhealthy":
rf.onlyUnhealthy = true
case "noMatch":
rf.onlyNoMatch = true
}
return rf
}
@@ -264,12 +253,24 @@ func (rh *requestHandler) groups(rf rulesFilter) []apiGroup {
rh.m.groupsMu.RLock()
defer rh.m.groupsMu.RUnlock()
isInList := func(list []string, needle string) bool {
if len(list) < 1 {
return true
}
for _, i := range list {
if i == needle {
return true
}
}
return false
}
groups := make([]apiGroup, 0)
for _, group := range rh.m.groups {
if len(rf.groupNames) > 0 && !slices.Contains(rf.groupNames, group.Name) {
if !isInList(rf.groupNames, group.Name) {
continue
}
if len(rf.files) > 0 && !slices.Contains(rf.files, group.File) {
if !isInList(rf.files, group.File) {
continue
}
@@ -281,34 +282,24 @@ func (rh *requestHandler) groups(rf rulesFilter) []apiGroup {
if rf.ruleType != "" && rf.ruleType != r.Type {
continue
}
if len(rf.ruleNames) > 0 && !slices.Contains(rf.ruleNames, r.Name) {
if !isInList(rf.ruleNames, r.Name) {
continue
}
if rf.excludeAlerts {
r.Alerts = nil
}
if (r.LastError == "" && rf.onlyUnhealthy) || (!isNoMatch(r) && rf.onlyNoMatch) {
continue
}
if r.LastError != "" {
g.Unhealthy++
} else {
g.Healthy++
}
if isNoMatch(r) {
g.NoMatch++
}
filteredRules = append(filteredRules, r)
}
g.Rules = filteredRules
groups = append(groups, g)
}
// sort list of groups for deterministic output
slices.SortFunc(groups, func(a, b apiGroup) int {
sort.Slice(groups, func(i, j int) bool {
a, b := groups[i], groups[j]
if a.Name != b.Name {
return strings.Compare(a.Name, b.Name)
return a.Name < b.Name
}
return strings.Compare(a.File, b.File)
return a.File < b.File
})
return groups
}
@@ -354,8 +345,8 @@ func (rh *requestHandler) groupAlerts() []groupAlerts {
})
}
}
slices.SortFunc(gAlerts, func(a, b groupAlerts) int {
return strings.Compare(a.Group.Name, b.Group.Name)
sort.Slice(gAlerts, func(i, j int) bool {
return gAlerts[i].Group.Name < gAlerts[j].Group.Name
})
return gAlerts
}
@@ -377,8 +368,8 @@ func (rh *requestHandler) listAlerts() ([]byte, error) {
}
// sort list of alerts for deterministic output
slices.SortFunc(lr.Data.Alerts, func(a, b *apiAlert) int {
return strings.Compare(a.ID, b.ID)
sort.Slice(lr.Data.Alerts, func(i, j int) bool {
return lr.Data.Alerts[i].ID < lr.Data.Alerts[j].ID
})
b, err := json.Marshal(lr)

View File

@@ -10,69 +10,6 @@
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
) %}
{% func Controls(prefix, currentIcon, currentText string, icons, filters map[string]string, search bool) %}
<div class="btn-toolbar mb-3" role="toolbar">
<div class="d-flex gap-2 justify-content-between w-100">
<div class="d-flex gap-2 align-items-center">
<a class="btn btn-outline-dark d-flex align-items-center" role="button" onclick="actionAll(true)">
<span class="d-none d-md-block">Collapse All</span>
<svg class="d-md-none" height="20" width="20">
<use href="{%s prefix %}static/icons/icons.svg#collapse"/>
</svg>
</a>
<a class="btn btn-outline-dark d-flex align-items-center" role="button" onclick="actionAll(false)">
<span class="d-none d-md-block">Expand All</span>
<svg class="d-md-none" width="20" height="20">
<use href="{%s prefix %}static/icons/icons.svg#expand"/>
</svg>
</a>
{% if len(filters) > 0 %}
<span class="d-none d-md-inline-block">Filter by status:</span>
<svg class="d-md-none" width="20" height="20">
<use href="{%s prefix %}static/icons/icons.svg#filter">
</svg>
<div class="dropdown">
<button
class="btn btn-outline-dark dropdown-toggle d-flex justify-content-between align-items-center"
type="button"
data-bs-toggle="dropdown"
aria-expanded="false"
>
<span class="d-none d-md-inline-block">{%s currentText %}</span>
<svg class="d-md-none" width="22" height="22">
<use href="{%s prefix %}static/icons/icons.svg#{%s currentIcon %}"/>
</svg>
</button>
<ul class="dropdown-menu">
{% for key, title := range filters %}
{% if title != currentText %}
<li>
<a class="dropdown-item" onclick="groupFilter('{%s key %}')">
<span class="d-none d-md-inline-block">{%s title %}</span>
<svg class="d-md-none" width="22" height="22">
<use href="{%s prefix %}static/icons/icons.svg#{%s icons[key] %}"/>
</svg>
</a>
</li>
{% endif %}
{% endfor %}
</ul>
</div>
{% endif %}
</div>
{% if search %}
<div class="input-group flex-grow-1 justify-content-end">
<span class="input-group-text">
<svg height="25" width="20">
<use href="{%s prefix %}static/icons/icons.svg#search">
</svg>
</span>
<input id="search" placeholder="Filter by group, rule or labels" type="text" class="form-control"/>
</div>
{% endif %}
</div>
</div>
{% endfunc %}
{% func Welcome(r *http.Request) %}
{%= tpl.Header(r, navItems, "vmalert", getLastConfigError()) %}
@@ -93,124 +30,143 @@
{%= tpl.Footer(r) %}
{% endfunc %}
{% func ListGroups(r *http.Request, groups []apiGroup, filter string) %}
{%code
prefix := vmalertutil.Prefix(r.URL.Path)
filters := map[string]string{
"": "All",
"unhealthy": "Unhealthy",
"noMatch": "No Match",
}
icons := map[string]string{
"": "all",
"unhealthy": "unhealthy",
"noMatch": "nomatch",
}
currentText := filters[filter]
currentIcon := icons[filter]
%}
{% func buttonActive(filter, expValue string) %}
{% if filter != expValue %}
btn-secondary
{% else %}
btn-primary
{% endif %}
{% endfunc %}
{% func ListGroups(r *http.Request, originGroups []apiGroup) %}
{%code prefix := vmalertutil.Prefix(r.URL.Path) %}
{%= tpl.Header(r, navItems, "Groups", getLastConfigError()) %}
{%= Controls(prefix, currentIcon, currentText, icons, filters, true) %}
{%code
filter := r.URL.Query().Get("filter")
rOk := make(map[string]int)
rNotOk := make(map[string]int)
rNoMatch := make(map[string]int)
var groups []apiGroup
for _, g := range originGroups {
var rules []apiRule
for _, r := range g.Rules {
if r.LastError != "" {
rNotOk[g.ID]++
} else {
rOk[g.ID]++
}
if isNoMatch(r) {
rNoMatch[g.ID]++
}
if (filter == "unhealthy" && r.LastError == "") ||
(filter == "noMatch" && !isNoMatch(r)) {
continue
}
rules = append(rules, r)
}
if len(rules) > 0 {
g.Rules = rules
groups = append(groups, g)
}
}
%}
<div class="btn-toolbar mb-3" role="toolbar">
<div>
<a class="btn {%= buttonActive(filter, "") %}" role="button" onclick="window.location = window.location.pathname">All</a>
<a class="btn btn-primary" role="button" onclick="collapseAll()">Collapse All</a>
<a class="btn btn-primary" role="button" onclick="expandAll()">Expand All</a>
<a class="btn {%= buttonActive(filter, "unhealthy") %}" role="button" onclick="location.href='?filter=unhealthy'" title="Show only rules with errors">Unhealthy</a>
<a class="btn {%= buttonActive(filter, "noMatch") %}" role="button" onclick="location.href='?filter=noMatch'" title="Show only rules matching no time series during last evaluation">NoMatch</a>
</div>
<div class="col-md-4 col-lg-5">
<div class="px-3 input-group">
<div class="input-group-prepend">
<span class="input-group-text">
<svg fill="#000000" height="25px" width="20px" version="1.1" id="Capa_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 490.4 490.4" xml:space="preserve"><g id="SVGRepo_bgCarrier" stroke-width="0"></g><g id="SVGRepo_tracerCarrier" stroke-linecap="round" stroke-linejoin="round"></g><g id="SVGRepo_iconCarrier"> <g> <path d="M484.1,454.796l-110.5-110.6c29.8-36.3,47.6-82.8,47.6-133.4c0-116.3-94.3-210.6-210.6-210.6S0,94.496,0,210.796 s94.3,210.6,210.6,210.6c50.8,0,97.4-18,133.8-48l110.5,110.5c12.9,11.8,25,4.2,29.2,0C492.5,475.596,492.5,463.096,484.1,454.796z M41.1,210.796c0-93.6,75.9-169.5,169.5-169.5s169.6,75.9,169.6,169.5s-75.9,169.5-169.5,169.5S41.1,304.396,41.1,210.796z"></path> </g> </g></svg>
</span>
</div>
<input id="search" placeholder="Filter by group, rule or labels" type="text" class="form-control"/>
</div>
</div>
</div>
{% if len(groups) > 0 %}
{% for _, g := range groups %}
<div id="group-{%s g.ID %}" class="d-flex w-100 border-0 flex-column group-items{% if g.Unhealthy > 0 %} alert-danger{% endif %}">
<span class="d-flex justify-content-between">
<a href="#group-{%s g.ID %}">{%s g.Name %}{% if g.Type != "prometheus" %} ({%s g.Type %}){% endif %} (every {%f.0 g.Interval %}s) #</a>
<span
class="flex-grow-1 d-flex justify-content-end"
role="button"
data-bs-toggle="collapse"
data-bs-target="#sub-{%s g.ID %}"
>
<span class="d-flex gap-2">
{% if g.Unhealthy > 0 %}<span class="badge bg-danger" title="Number of rules with status Error">{%d g.Unhealthy %}</span> {% endif %}
{% if g.NoMatch > 0 %}<span class="badge bg-warning" title="Number of rules with status NoMatch">{%d g.NoMatch %}</span> {% endif %}
<span class="badge bg-success" title="Number of rules with status Ok">{%d g.Healthy %}</span>
</span>
</span>
</span>
<span
class="d-flex flex-column row-gap-2 mb-2"
role="button"
data-bs-toggle="collapse"
data-bs-target="#sub-{%s g.ID %}"
>
<span class="fs-6 text-start w-100 fw-lighter">{%s g.File %}</span>
{% if len(g.Params) > 0 %}
<span class="fs-6 text-start w-100 d-flex justify-content-between fw-lighter">
<span>Extra params</span>
<span class="d-flex align-items-center gap-2">
{% for _, param := range g.Params %}
<span class="badge bg-primary">{%s param %}</span>
{% endfor %}
</span>
</span>
{% endif %}
{% if len(g.Headers) > 0 %}
<span class="fs-6 text-start w-100 d-flex justify-content-between fw-lighter">
<span>Extra headers</span>
<span class="d-flex align-items-center gap-2">
{% for _, header := range g.Headers %}
<span class="badge bg-primary label">{%s header %}</span>
{% endfor %}
</span>
</span>
{% endif %}
</span>
<div class="collapse sub-items" id="sub-{%s g.ID %}">
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col" style="width: 60%">Rule</th>
<th scope="col" style="width: 20%" class="text-center" title="How many samples were produced by the rule">Samples</th>
<th scope="col" style="width: 20%" class="text-center" title="How many seconds ago rule was executed">Updated</th>
</tr>
</thead>
<tbody>
{% for _, r := range g.Rules %}
<tr class="sub-item{% if r.LastError != "" %} alert-danger{% endif %}">
<td>
<div class="row">
<div class="col-12 mb-2">
{% if r.Type == "alerting" %}
{% if r.KeepFiringFor > 0 %}
<b>alert:</b> {%s r.Name %} (for: {%v r.Duration %} seconds, keep_firing_for: {%v r.KeepFiringFor %} seconds)
{% else %}
<b>alert:</b> {%s r.Name %} (for: {%v r.Duration %} seconds)
{% endif %}
{% else %}
<b>record:</b> {%s r.Name %}
{% endif %}
|
{%= seriesFetchedWarn(prefix, r) %}
<span><a target="_blank" href="{%s prefix+r.WebLink() %}">Details</a></span>
</div>
<div class="col-12">
<code><pre>{%s r.Query %}</pre></code>
</div>
<div class="col-12 mb-2">
{% if len(r.Labels) > 0 %} <b>Labels:</b>{% endif %}
{% for k, v := range r.Labels %}
<span class="ms-1 badge bg-primary label">{%s k %}={%s v %}</span>
{% endfor %}
</div>
{% if r.LastError != "" %}
<div class="col-12">
<b>Error:</b>
<div class="error-cell">
{%s r.LastError %}
</div>
</div>
{% endif %}
{% for _, g := range groups %}
<div
class="group-heading{% if rNotOk[g.ID] > 0 %} alert-danger{%endif%}" data-bs-target="rules-{%s g.ID %}" data-group-name="{%s g.Name %}">
<span class="anchor" id="group-{%s g.ID %}"></span>
<a href="#group-{%s g.ID %}">{%s g.Name %}{% if g.Type != "prometheus" %} ({%s g.Type %}){% endif %} (every {%f.0 g.Interval %}s) #</a>
{% if rNotOk[g.ID] > 0 %}<span class="badge bg-danger" title="Number of rules with status Error">{%d rNotOk[g.ID] %}</span> {% endif %}
{% if rNoMatch[g.ID] > 0 %}<span class="badge bg-warning" title="Number of rules with status NoMatch">{%d rNoMatch[g.ID] %}</span> {% endif %}
<span class="badge bg-success" title="Number of rules with status Ok">{%d rOk[g.ID] %}</span>
<p class="fs-6 fw-lighter">{%s g.File %}</p>
{% if len(g.Params) > 0 %}
<div class="fs-6 fw-lighter">Extra params
{% for _, param := range g.Params %}
<span class="float-left badge bg-primary">{%s param %}</span>
{% endfor %}
</div>
{% endif %}
{% if len(g.Headers) > 0 %}
<div class="fs-6 fw-lighter">Extra headers
{% for _, header := range g.Headers %}
<span class="float-left badge bg-primary">{%s header %}</span>
{% endfor %}
</div>
{% endif %}
</div>
<div class="collapse rule-table" id="rules-{%s g.ID %}">
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col" style="width: 60%">Rule</th>
<th scope="col" style="width: 20%" class="text-center" title="How many samples were produced by the rule">Samples</th>
<th scope="col" style="width: 20%" class="text-center" title="How many seconds ago rule was executed">Updated</th>
</tr>
</thead>
<tbody>
{% for _, r := range g.Rules %}
<tr class="rule{% if r.LastError != "" %} alert-danger{% endif %}" data-rule-name="{%s r.Name %}" data-bs-target="{%s g.ID %}">
<td>
<div class="row">
<div class="col-12 mb-2">
{% if r.Type == "alerting" %}
{% if r.KeepFiringFor > 0 %}
<b>alert:</b> {%s r.Name %} (for: {%v r.Duration %} seconds, keep_firing_for: {%v r.KeepFiringFor %} seconds)
{% else %}
<b>alert:</b> {%s r.Name %} (for: {%v r.Duration %} seconds)
{% endif %}
{% else %}
<b>record:</b> {%s r.Name %}
{% endif %}
|
{%= seriesFetchedWarn(r) %}
<span><a target="_blank" href="{%s prefix+r.WebLink() %}">Details</a></span>
</div>
<div class="col-12">
<code><pre>{%s r.Query %}</pre></code>
</div>
<div class="col-12 mb-2">
{% if len(r.Labels) > 0 %} <b>Labels:</b>{% endif %}
{% for k, v := range r.Labels %}
<span class="ms-1 badge bg-primary label">{%s k %}={%s v %}</span>
{% endfor %}
</div>
{% if r.LastError != "" %}
<div class="col-12">
<b>Error:</b>
<div class="error-cell">
{%s r.LastError %}
</div>
</td>
<td class="text-center">{%d r.LastSamples %}</td>
<td class="text-center">{%f.3 time.Since(r.LastEvaluation).Seconds() %}s ago</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% endif %}
</div>
</td>
<td class="text-center">{%d r.LastSamples %}</td>
<td class="text-center">{%f.3 time.Since(r.LastEvaluation).Seconds() %}s ago</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
{% else %}
@@ -218,164 +174,169 @@
<p>No groups...</p>
</div>
{% endif %}
{%= tpl.Footer(r) %}
{% endfunc %}
{% func ListAlerts(r *http.Request, groupAlerts []groupAlerts) %}
{%code prefix := vmalertutil.Prefix(r.URL.Path) %}
{%= tpl.Header(r, navItems, "Alerts", getLastConfigError()) %}
{%= Controls(prefix, "", "", nil, nil, true) %}
{% if len(groupAlerts) > 0 %}
<div class="btn-toolbar mb-3" role="toolbar">
<div>
<a class="btn btn-primary" role="button" onclick="collapseAll()">Collapse All</a>
<a class="btn btn-primary" role="button" onclick="expandAll()">Expand All</a>
</div>
<div class="col-md-4 col-lg-5">
<div class="px-3 input-group">
<div class="input-group-prepend">
<span class="input-group-text">
<svg fill="#000000" height="25px" width="20px" version="1.1" id="Capa_1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 490.4 490.4" xml:space="preserve"><g id="SVGRepo_bgCarrier" stroke-width="0"></g><g id="SVGRepo_tracerCarrier" stroke-linecap="round" stroke-linejoin="round"></g><g id="SVGRepo_iconCarrier"> <g> <path d="M484.1,454.796l-110.5-110.6c29.8-36.3,47.6-82.8,47.6-133.4c0-116.3-94.3-210.6-210.6-210.6S0,94.496,0,210.796 s94.3,210.6,210.6,210.6c50.8,0,97.4-18,133.8-48l110.5,110.5c12.9,11.8,25,4.2,29.2,0C492.5,475.596,492.5,463.096,484.1,454.796z M41.1,210.796c0-93.6,75.9-169.5,169.5-169.5s169.6,75.9,169.6,169.5s-75.9,169.5-169.5,169.5S41.1,304.396,41.1,210.796z"></path> </g> </g></svg>
</span>
</div>
<input id="search" placeholder="Filter by group, rule or labels" type="text" class="form-control"/>
</div>
</div>
</div>
{% for _, ga := range groupAlerts %}
{%code
g := ga.Group
var keys []string
alertsByRule := make(map[string][]*apiAlert)
for _, alert := range ga.Alerts {
if len(alertsByRule[alert.RuleID]) < 1 {
keys = append(keys, alert.RuleID)
}
alertsByRule[alert.RuleID] = append(alertsByRule[alert.RuleID], alert)
}
sort.Strings(keys)
%}
<div class="d-flex w-100 flex-column group-items alert-danger">
<span id="group-{%s g.ID %}" class="d-flex justify-content-between">
<a href="#group-{%s g.ID %}">{%s g.Name %}{% if g.Type != "prometheus" %} ({%s g.Type %}){% endif %}</a>
<span
class="flex-grow-1 d-flex justify-content-end"
role="button"
data-bs-toggle="collapse"
data-bs-target="#sub-{%s g.ID %}"
>
<span class="badge bg-danger" title="Number of active alerts">{%d len(ga.Alerts) %}</span>
</span>
</span>
<span>
<span
class="fs-6 text-start w-100 fw-lighter"
role="button"
data-bs-toggle="collapse"
data-bs-target="#sub-{%s g.ID %}"
>{%s g.File %}</span>
</span>
<div class="collapse sub-items" id="sub-{%s g.ID %}">
{% for _, ruleID := range keys %}
{%code
defaultAR := alertsByRule[ruleID][0]
var labelKeys []string
for k := range defaultAR.Labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)
%}
<br>
<div class="sub-item">
<b>alert:</b> {%s defaultAR.Name %} ({%d len(alertsByRule[ruleID]) %})
| <span><a target="_blank" href="{%s defaultAR.SourceLink %}">Source</a></span>
<br>
<b>expr:</b><code><pre>{%s defaultAR.Expression %}</pre></code>
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col">Labels</th>
<th scope="col">State</th>
<th scope="col">Active at</th>
<th scope="col">Value</th>
<th scope="col">Link</th>
</tr>
</thead>
<tbody>
{% for _, ar := range alertsByRule[ruleID] %}
<tr>
<td>
{% for _, k := range labelKeys %}
<span class="ms-1 badge bg-primary label">{%s k %}={%s ar.Labels[k] %}</span>
{% endfor %}
</td>
<td>{%= badgeState(ar.State) %}</td>
<td>
{%s ar.ActiveAt.Format("2006-01-02T15:04:05Z07:00") %}
{% if ar.Restored %}{%= badgeRestored() %}{% endif %}
{% if ar.Stabilizing %}{%= badgeStabilizing() %}{% endif %}
</td>
<td>{%s ar.Value %}</td>
<td><a href="{%s prefix+ar.WebLink() %}">Details</a></td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
</div>
</div>
{% endfor %}
{% else %}
<div>
<p>No active alerts...</p>
</div>
{% endif %}
{%= tpl.Footer(r) %}
{%code g := ga.Group %}
<div class="group-heading alert-danger" data-bs-target="rules-{%s g.ID %}" data-group-name="{%s g.Name %}">
<span class="anchor" id="group-{%s g.ID %}"></span>
<a href="#group-{%s g.ID %}">{%s g.Name %}{% if g.Type != "prometheus" %} ({%s g.Type %}){% endif %}</a>
<span class="badge bg-danger" title="Number of active alerts">{%d len(ga.Alerts) %}</span>
<br>
<p class="fs-6 fw-lighter">{%s g.File %}</p>
</div>
{%code
var keys []string
alertsByRule := make(map[string][]*apiAlert)
for _, alert := range ga.Alerts {
if len(alertsByRule[alert.RuleID]) < 1 {
keys = append(keys, alert.RuleID)
}
alertsByRule[alert.RuleID] = append(alertsByRule[alert.RuleID], alert)
}
sort.Strings(keys)
%}
<div class="collapse rule-table" id="rules-{%s g.ID %}">
{% for _, ruleID := range keys %}
{%code
defaultAR := alertsByRule[ruleID][0]
var labelKeys []string
for k := range defaultAR.Labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)
%}
<br>
<div class="rule" data-rule-name="{%s defaultAR.Name %}" data-bs-target="{%s g.ID %}">
<b>alert:</b> {%s defaultAR.Name %} ({%d len(alertsByRule[ruleID]) %})
| <span><a target="_blank" href="{%s defaultAR.SourceLink %}">Source</a></span>
<br>
<b>expr:</b><code><pre>{%s defaultAR.Expression %}</pre></code>
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col">Labels</th>
<th scope="col">State</th>
<th scope="col">Active at</th>
<th scope="col">Value</th>
<th scope="col">Link</th>
</tr>
</thead>
<tbody>
{% for _, ar := range alertsByRule[ruleID] %}
<tr>
<td>
{% for _, k := range labelKeys %}
<span class="ms-1 badge bg-primary label">{%s k %}={%s ar.Labels[k] %}</span>
{% endfor %}
</td>
<td>{%= badgeState(ar.State) %}</td>
<td>
{%s ar.ActiveAt.Format("2006-01-02T15:04:05Z07:00") %}
{% if ar.Restored %}{%= badgeRestored() %}{% endif %}
{% if ar.Stabilizing %}{%= badgeStabilizing() %}{% endif %}
</td>
<td>{%s ar.Value %}</td>
<td>
<a href="{%s prefix+ar.WebLink() %}">Details</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
</div>
{% endfor %}
{% else %}
<div>
<p>No active alerts...</p>
</div>
{% endif %}
{%= tpl.Footer(r) %}
{% endfunc %}
{% func ListTargets(r *http.Request, targets map[notifier.TargetType][]notifier.Target) %}
{%code prefix := vmalertutil.Prefix(r.URL.Path) %}
{%= tpl.Header(r, navItems, "Notifiers", getLastConfigError()) %}
{%= Controls(prefix, "", "", nil, nil, false) %}
{% if len(targets) > 0 %}
{%code
var keys []string
for key := range targets {
keys = append(keys, string(key))
}
sort.Strings(keys)
%}
{% for i := range keys %}
{%code
typeK, ns := keys[i], targets[notifier.TargetType(keys[i])]
<a class="btn btn-primary" role="button" onclick="collapseAll()">Collapse All</a>
<a class="btn btn-primary" role="button" onclick="expandAll()">Expand All</a>
{%code
var keys []string
for key := range targets {
keys = append(keys, string(key))
}
sort.Strings(keys)
%}
{% for i := range keys %}
{%code typeK, ns := keys[i], targets[notifier.TargetType(keys[i])]
count := len(ns)
%}
<div class="d-flex w-100 flex-column group-items">
<span class="d-flex justify-content-between" id="group-{%s typeK %}">
<a href="#group-{%s typeK %}">{%s typeK %} ({%d count %})</a>
<span
class="flex-grow-1"
role="button"
data-bs-toggle="collapse"
data-bs-target="#sub-{%s typeK %}"
></span>
</span>
<div id="sub-{%s typeK %}" class="collapse show sub-items">
<table class="table table-striped table-hover table-sm">
<thead>
<tr class="sub-item">
<th scope="col">Labels</th>
<th scope="col">Address</th>
</tr>
</thead>
<tbody>
{% for _, n := range ns %}
<tr>
<td>
{% for _, l := range n.Labels.GetLabels() %}
<span class="ms-1 badge bg-primary">{%s l.Name %}={%s l.Value %}</span>
{% endfor %}
</td>
<td>{%s n.Notifier.Addr() %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% endfor %}
<div class="group-heading" data-bs-target="notifiers-{%s typeK %}">
<span class="anchor" id="group-{%s typeK %}"></span>
<a href="#group-{%s typeK %}">{%s typeK %} ({%d count %})</a>
</div>
<div class="collapse show" id="notifiers-{%s typeK %}">
<table class="table table-striped table-hover table-sm">
<thead>
<tr>
<th scope="col">Labels</th>
<th scope="col">Address</th>
</tr>
</thead>
<tbody>
{% for _, n := range ns %}
<tr>
<td>
{% for _, l := range n.Labels.GetLabels() %}
<span class="ms-1 badge bg-primary">{%s l.Name %}={%s l.Value %}</span>
{% endfor %}
</td>
<td>{%s n.Notifier.Addr() %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
{% else %}
<div>
<p>No targets...</p>
</div>
{% endif %}
{%= tpl.Footer(r) %}
{% endfunc %}
{% func Alert(r *http.Request, alert *apiAlert) %}
@@ -387,6 +348,7 @@
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)
var annotationKeys []string
for k := range alert.Annotations {
annotationKeys = append(annotationKeys, k)
@@ -649,15 +611,15 @@
<span class="badge bg-warning text-dark" title="This firing state is kept because of `keep_firing_for`">stabilizing</span>
{% endfunc %}
{% func seriesFetchedWarn(prefix string, r apiRule) %}
{% func seriesFetchedWarn(r apiRule) %}
{% if isNoMatch(r) %}
<svg
<svg xmlns="http://www.w3.org/2000/svg"
data-bs-toggle="tooltip"
title="No match! This rule's last evaluation hasn't selected any time series from the datasource.
It might be that either this data is missing in the datasource or there is a typo in rule's expression.
See more in Details."
width="18" height="18" fill="currentColor" class="bi bi-exclamation-triangle-fill flex-shrink-0 me-2" role="img" aria-label="Warning:">
<use href="{%s prefix %}static/icons/icons.svg#exclamation"/>
width="18" height="18" fill="currentColor" class="bi bi-exclamation-triangle-fill flex-shrink-0 me-2" viewBox="0 0 16 16" role="img" aria-label="Warning:">
<path d="M8 16A8 8 0 1 0 8 0a8 8 0 0 0 0 16zm.93-9.412-1 4.705c-.07.34.029.533.304.533.194 0 .487-.07.686-.246l-.088.416c-.287.346-.92.598-1.465.598-.703 0-1.002-.422-.808-1.319l.738-3.468c.064-.293.006-.399-.287-.47l-.451-.081.082-.381 2.29-.287zM8 5.5a1 1 0 1 1 0-2 1 1 0 0 1 0 2z"/>
</svg>
{% endif %}
{% endfunc %}

File diff suppressed because it is too large Load Diff

View File

@@ -98,12 +98,6 @@ type apiGroup struct {
EvalOffset float64 `json:"eval_offset,omitempty"`
// EvalDelay will adjust the `time` parameter of rule evaluation requests to compensate intentional query delay from datasource.
EvalDelay float64 `json:"eval_delay,omitempty"`
// Unhealthy unhealthy rules count
Unhealthy int
// Healthy passing rules count
Healthy int
// NoMatch not matching rules count
NoMatch int
}
// groupAlerts represents a group of alerts for WEB view
@@ -331,7 +325,8 @@ func groupToAPI(g *rule.Group) apiGroup {
g = g.DeepCopy()
ag := apiGroup{
// encode as string to avoid rounding
ID: strconv.FormatUint(g.GetID(), 10),
ID: fmt.Sprintf("%d", g.GetID()),
Name: g.Name,
Type: g.Type.String(),
File: g.File,
@@ -341,7 +336,8 @@ func groupToAPI(g *rule.Group) apiGroup {
Params: urlValuesToStrings(g.Params),
Headers: headersToStrings(g.Headers),
NotifierHeaders: headersToStrings(g.NotifierHeaders),
Labels: g.Labels,
Labels: g.Labels,
}
if g.EvalOffset != nil {
ag.EvalOffset = g.EvalOffset.Seconds()

View File

@@ -55,7 +55,7 @@ func TestRecordingToApi(t *testing.T) {
}
}
func TestURLValuesToStrings(t *testing.T) {
func TestUrlValuesToStrings(t *testing.T) {
mapQueryParams := map[string][]string{
"param1": {"param1"},
"param2": {"anotherparam"},

View File

@@ -1,3 +1,3 @@
See vmauth docs [here](https://docs.victoriametrics.com/victoriametrics/vmauth/).
See vmauth docs [here](https://docs.victoriametrics.com/vmauth/).
vmauth docs can be edited at [docs/vmauth.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmauth.md).
vmauth docs can be edited at [docs/vmauth.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmauth.md).

View File

@@ -34,15 +34,15 @@ import (
var (
authConfigPath = flag.String("auth.config", "", "Path to auth config. It can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/ for details on the format of this auth config")
"See https://docs.victoriametrics.com/vmauth/ for details on the format of this auth config")
configCheckInterval = flag.Duration("configCheckInterval", 0, "interval for config file re-read. "+
"Zero value disables config re-reading. By default, refreshing is disabled, send SIGHUP for config refresh.")
defaultRetryStatusCodes = flagutil.NewArrayInt("retryStatusCodes", 0, "Comma-separated list of default HTTP response status codes when vmauth re-tries the request on other backends. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#load-balancing for details")
"See https://docs.victoriametrics.com/vmauth/#load-balancing for details")
defaultLoadBalancingPolicy = flag.String("loadBalancingPolicy", "least_loaded", "The default load balancing policy to use for backend urls specified inside url_prefix section. "+
"Supported policies: least_loaded, first_available. See https://docs.victoriametrics.com/victoriametrics/vmauth/#load-balancing")
"Supported policies: least_loaded, first_available. See https://docs.victoriametrics.com/vmauth/#load-balancing")
discoverBackendIPsGlobal = flag.Bool("discoverBackendIPs", false, "Whether to discover backend IPs via periodic DNS queries to hostnames specified in url_prefix. "+
"This may be useful when url_prefix points to a hostname with dynamically scaled instances behind it. See https://docs.victoriametrics.com/victoriametrics/vmauth/#discovering-backend-ips")
"This may be useful when url_prefix points to a hostname with dynamically scaled instances behind it. See https://docs.victoriametrics.com/vmauth/#discovering-backend-ips")
discoverBackendIPsInterval = flag.Duration("discoverBackendIPsInterval", 10*time.Second, "The interval for re-discovering backend IPs if -discoverBackendIPs command-line flag is set. "+
"Too low value may lead to DNS errors")
httpAuthHeader = flagutil.NewArrayString("httpAuthHeader", "HTTP request header to use for obtaining authorization tokens. By default auth tokens are read from Authorization request header")

View File

@@ -57,15 +57,15 @@ var (
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+
"Bigger values may require more memory. Zero or negative value disables caching of request body. This may be useful when proxying data ingestion requests")
backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#backend-tls-setup")
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#backend-tls-setup")
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSCertFile = flag.String("backend.TLSCertFile", "", "Optional path to TLS client certificate file, which must be sent to HTTPS backend. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#backend-tls-setup")
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSKeyFile = flag.String("backend.TLSKeyFile", "", "Optional path to TLS client key file, which must be sent to HTTPS backend. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#backend-tls-setup")
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSServerName = flag.String("backend.TLSServerName", "", "Optional TLS ServerName, which must be sent to HTTPS backend. "+
"See https://docs.victoriametrics.com/victoriametrics/vmauth/#backend-tls-setup")
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmauth. The auth configuration file is validated. The -auth.config flag must be specified.")
removeXFFHTTPHeaderValue = flag.Bool(`removeXFFHTTPHeaderValue`, false, "Whether to remove the X-Forwarded-For HTTP header value from client requests before forwarding them to the backend. "+
"Recommended when vmauth is exposed to the internet.")
@@ -96,23 +96,20 @@ func main() {
logger.Infof("starting vmauth at %q...", listenAddrs)
startTime := time.Now()
initAuthConfig()
disableInternalRoutes := len(*httpInternalListenAddr) > 0
rh := requestHandlerWithInternalRoutes
if disableInternalRoutes {
rh = requestHandler
}
go httpserver.Serve(listenAddrs, rh, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
// built-in routes will be exposed at *httpInternalListenAddr
serveOpts := httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
DisableBuiltinRoutes: disableInternalRoutes,
})
}
go httpserver.ServeWithOpts(listenAddrs, rh, serveOpts)
if len(*httpInternalListenAddr) > 0 {
go httpserver.Serve(*httpInternalListenAddr, internalRequestHandler, httpserver.ServeOptions{
UseProxyProtocol: useProxyProtocol,
})
go httpserver.Serve(*httpInternalListenAddr, nil, internalRequestHandler)
}
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
@@ -540,7 +537,7 @@ func usage() {
const s = `
vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics.
See the docs at https://docs.victoriametrics.com/victoriametrics/vmauth/ .
See the docs at https://docs.victoriametrics.com/vmauth/ .
`
flagutil.Usage(s)
}

View File

@@ -9,5 +9,4 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
EXPOSE 8427
ENTRYPOINT ["/vmauth-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmauth-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmauth-prod
COPY vmauth-linux-${TARGETARCH}-prod ./vmauth-prod

View File

@@ -1,3 +1,3 @@
See vmbackup docs [here](https://docs.victoriametrics.com/victoriametrics/vmbackup/).
See vmbackup docs [here](https://docs.victoriametrics.com/vmbackup/).
vmbackup docs can be edited at [docs/vmbackup.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmbackup.md).
vmbackup docs can be edited at [docs/vmbackup.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmbackup.md).

View File

@@ -1,7 +1,6 @@
package main
import (
"context"
"flag"
"fmt"
"net/url"
@@ -19,7 +18,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot/snapshotutil"
@@ -28,7 +26,7 @@ import (
var (
httpListenAddr = flag.String("httpListenAddr", ":8420", "TCP address for exporting metrics at /metrics page")
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to VictoriaMetrics data. Must match -storageDataPath from VictoriaMetrics or vmstorage")
snapshotName = flag.String("snapshotName", "", "Name for the snapshot to backup. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-work-with-snapshots. There is no need in setting -snapshotName if -snapshot.createURL is set")
snapshotName = flag.String("snapshotName", "", "Name for the snapshot to backup. See https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-work-with-snapshots. There is no need in setting -snapshotName if -snapshot.createURL is set")
snapshotCreateURL = flag.String("snapshot.createURL", "", "VictoriaMetrics create snapshot url. When this is given a snapshot will automatically be created during backup. "+
"Example: http://victoriametrics:8428/snapshot/create . There is no need in setting -snapshotName if -snapshot.createURL is set")
snapshotDeleteURL = flag.String("snapshot.deleteURL", "", "VictoriaMetrics delete snapshot url. Optional. Will be generated from -snapshot.createURL if not provided. "+
@@ -97,17 +95,10 @@ func main() {
}
listenAddrs := []string{*httpListenAddr}
go httpserver.Serve(listenAddrs, nil, httpserver.ServeOptions{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
procutil.WaitForSigterm()
logger.Infof("received stop signal, canceling backup operation")
cancel()
}()
go httpserver.Serve(listenAddrs, nil, nil)
pushmetrics.Init()
err := makeBackup(ctx)
err := makeBackup()
deleteSnapshot()
if err != nil {
logger.Fatalf("cannot create backup: %s", err)
@@ -122,14 +113,14 @@ func main() {
logger.Infof("successfully shut down http server for metrics in %.3f seconds", time.Since(startTime).Seconds())
}
func makeBackup(ctx context.Context) error {
dstFS, err := newDstFS(ctx)
func makeBackup() error {
dstFS, err := newDstFS()
if err != nil {
return err
}
if *snapshotName == "" {
// Make server-side copy from -origin to -dst
originFS, err := newRemoteOriginFS(ctx)
originFS, err := newRemoteOriginFS()
if err != nil {
return err
}
@@ -148,7 +139,7 @@ func makeBackup(ctx context.Context) error {
if err != nil {
return err
}
originFS, err := newOriginFS(ctx)
originFS, err := newOriginFS()
if err != nil {
return err
}
@@ -173,7 +164,7 @@ func usage() {
vmbackup performs backups for VictoriaMetrics data from instant snapshots to gcs, s3, azblob
or local filesystem. Backed up data can be restored with vmrestore.
See the docs at https://docs.victoriametrics.com/victoriametrics/vmbackup/ .
See the docs at https://docs.victoriametrics.com/vmbackup/ .
`
flagutil.Usage(s)
}
@@ -208,8 +199,8 @@ func newSrcFS() (*fslocal.FS, error) {
return fs, nil
}
func newDstFS(ctx context.Context) (common.RemoteFS, error) {
fs, err := actions.NewRemoteFS(ctx, *dst)
func newDstFS() (common.RemoteFS, error) {
fs, err := actions.NewRemoteFS(*dst)
if err != nil {
return nil, fmt.Errorf("cannot parse `-dst`=%q: %w", *dst, err)
}
@@ -248,22 +239,22 @@ func hasFilepathPrefix(path, prefix string) bool {
return true
}
func newOriginFS(ctx context.Context) (common.OriginFS, error) {
func newOriginFS() (common.OriginFS, error) {
if len(*origin) == 0 {
return &fsnil.FS{}, nil
}
fs, err := actions.NewRemoteFS(ctx, *origin)
fs, err := actions.NewRemoteFS(*origin)
if err != nil {
return nil, fmt.Errorf("cannot parse `-origin`=%q: %w", *origin, err)
}
return fs, nil
}
func newRemoteOriginFS(ctx context.Context) (common.RemoteFS, error) {
func newRemoteOriginFS() (common.RemoteFS, error) {
if len(*origin) == 0 {
return nil, fmt.Errorf("-origin cannot be empty when -snapshotName and -snapshot.createURL aren't set")
}
fs, err := actions.NewRemoteFS(ctx, *origin)
fs, err := actions.NewRemoteFS(*origin)
if err != nil {
return nil, fmt.Errorf("cannot parse `-origin`=%q: %w", *origin, err)
}

View File

@@ -8,5 +8,4 @@ FROM $root_image
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
ENTRYPOINT ["/vmbackup-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmbackup-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmbackup-prod
COPY vmbackup-linux-${TARGETARCH}-prod ./vmbackup-prod

View File

@@ -1,3 +1,3 @@
See vmbackupmanager docs [here](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/).
See vmbackupmanager docs [here](https://docs.victoriametrics.com/vmbackupmanager/).
vmbackupmanager docs can be edited at [docs/vmbackupmanager.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmbackupmanager.md).
vmbackupmanager docs can be edited at [docs/vmbackupmanager.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmbackupmanager.md).

View File

@@ -1,3 +1,3 @@
See vmctl docs [here](https://docs.victoriametrics.com/victoriametrics/vmctl/).
See vmctl docs [here](https://docs.victoriametrics.com/vmctl/).
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmctl.md).
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmctl.md).

View File

@@ -466,12 +466,12 @@ var (
},
&cli.StringFlag{
Name: vmNativeFilterTimeStart,
Usage: "The time filter may contain different timestamp formats. See more details here https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats",
Usage: "The time filter may contain different timestamp formats. See more details here https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats",
Required: true,
},
&cli.StringFlag{
Name: vmNativeFilterTimeEnd,
Usage: "The time filter may contain different timestamp formats. See more details here https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats",
Usage: "The time filter may contain different timestamp formats. See more details here https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats",
},
&cli.StringFlag{
Name: vmNativeStepInterval,
@@ -493,7 +493,7 @@ var (
Name: vmNativeSrcAddr,
Usage: "VictoriaMetrics address to perform export from. \n" +
" Should be the same as --httpListenAddr value for single-node version or vmselect component." +
" If exporting from cluster version see https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format",
" If exporting from cluster version see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format",
Required: true,
},
&cli.StringFlag{
@@ -542,7 +542,7 @@ var (
Name: vmNativeDstAddr,
Usage: "VictoriaMetrics address to perform import to. \n" +
" Should be the same as --httpListenAddr value for single-node version or vminsert component." +
" If importing into cluster version see https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format",
" If importing into cluster version see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format",
Required: true,
},
&cli.StringFlag{
@@ -617,8 +617,8 @@ var (
},
&cli.BoolFlag{
Name: vmNativeDisableBinaryProtocol,
Usage: "Whether to use https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-export-data-in-json-line-format " +
"instead of https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-export-data-in-native-format API." +
Usage: "Whether to use https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format " +
"instead of https://docs.victoriametrics.com/#how-to-export-data-in-native-format API." +
"Binary export/import API protocol implies less network and resource usage, as it transfers compressed binary data blocks." +
"Non-binary export/import API is less efficient, but supports deduplication if it is configured on vm-native-src-addr side.",
Value: false,

View File

@@ -8,5 +8,4 @@ FROM $root_image
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
ENTRYPOINT ["/vmctl-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vmctl-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vmctl-prod
COPY vmctl-linux-${TARGETARCH}-prod ./vmctl-prod

View File

@@ -60,7 +60,7 @@ type Config struct {
// Importer performs insertion of timeseries
// via VictoriaMetrics import protocol
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
// see https://docs.victoriametrics.com/#how-to-import-time-series-data
type Importer struct {
addr string
client *http.Client
@@ -118,11 +118,11 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
addr := strings.TrimRight(cfg.Addr, "/")
// if single version
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
// see https://docs.victoriametrics.com/#how-to-import-time-series-data
importPath := addr + "/api/v1/import"
if cfg.AccountID != "" {
// if cluster version
// see https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format
// see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
importPath = fmt.Sprintf("%s/insert/%s/prometheus/api/v1/import", addr, cfg.AccountID)
}
importPath, err := AddExtraLabelsToImportPath(importPath, cfg.ExtraLabels)
@@ -299,8 +299,6 @@ func (im *Importer) Ping() error {
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}

View File

@@ -1,3 +1,3 @@
See vmgateway docs [here](https://docs.victoriametrics.com/victoriametrics/vmgateway/).
See vmgateway docs [here](https://docs.victoriametrics.com/vmgateway/).
vmgateway docs can be edited at [docs/vmgateway.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmgateway.md).
vmgateway docs can be edited at [docs/vmgateway.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/vmgateway.md).

View File

@@ -1,6 +1,7 @@
package clusternative
import (
"errors"
"fmt"
"net"
@@ -8,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
@@ -27,15 +27,9 @@ func InsertHandler(c net.Conn) error {
// lower-level vminsert sends only small packets to upper-level vminsert.
bc, err := handshake.VMInsertServer(c, 0)
if err != nil {
if handshake.IsTCPHealthcheck(err) {
if errors.Is(err, handshake.ErrIgnoreHealthcheck) {
return nil
}
if handshake.IsClientNetworkError(err) {
logger.Warnf("cannot complete vminsert handshake due to network error with client %q: %s. "+
"Check vminsert logs for errors", c.RemoteAddr(), err)
return nil
}
return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
}
return stream.Parse(bc, func(rows []storage.MetricRow) error {

View File

@@ -3,8 +3,6 @@ package main
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"io"
"net"
"net/http"
@@ -53,7 +51,7 @@ import (
var (
clusternativeListenAddr = flag.String("clusternativeListenAddr", "", "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. "+
"See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty")
"See https://docs.victoriametrics.com/cluster-victoriametrics/#multi-level-cluster-setup . Usually :8400 should be set to match default vmstorage port for vminsert. Disabled work if empty")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
@@ -79,7 +77,7 @@ var (
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
storageNodes = flagutil.NewArrayString("storageNode", "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN . "+
"Enterprise version of VictoriaMetrics supports automatic discovery of vmstorage addresses via DNS SRV records. For example, -storageNode=srv+vmstorage.addrs . "+
"See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#automatic-vmstorage-discovery")
"See https://docs.victoriametrics.com/cluster-victoriametrics/#automatic-vmstorage-discovery")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
@@ -153,7 +151,7 @@ func main() {
if len(listenAddrs) == 0 {
listenAddrs = []string{":8480"}
}
go httpserver.Serve(listenAddrs, requestHandler, httpserver.ServeOptions{UseProxyProtocol: useProxyProtocol})
go httpserver.Serve(listenAddrs, useProxyProtocol, requestHandler)
pushmetrics.Init()
sig := procutil.WaitForSigterm()
@@ -206,7 +204,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, `vminsert - a component of VictoriaMetrics cluster<br/>
<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/">docs</a><br>
<a href="https://docs.victoriametrics.com/cluster-victoriametrics/">docs</a><br>
`)
return true
}
@@ -225,13 +223,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
availableMemory := memory.Allowed() + memory.Remaining()
currentMemory := cgroup.GetMemoryUsage()
if currentMemory*100/int64(availableMemory) > 95 {
httpserver.Errorf(w, r, "out of memory error")
return true
}
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus") {
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(at, r); err != nil {
@@ -467,7 +458,7 @@ func usage() {
const s = `
vminsert accepts data via popular data ingestion protocols and routes it to vmstorage nodes configured via -storageNode.
See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/ .
See the docs at https://docs.victoriametrics.com/cluster-victoriametrics/ .
`
flagutil.Usage(s)
}

View File

@@ -9,5 +9,4 @@ COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certifica
EXPOSE 8480
ENTRYPOINT ["/vminsert-prod"]
ARG TARGETARCH
ARG BINARY_SUFFIX=non-existing
COPY vminsert-linux-${TARGETARCH}-prod${BINARY_SUFFIX} ./vminsert-prod
COPY vminsert-linux-${TARGETARCH}-prod ./vminsert-prod

View File

@@ -62,7 +62,6 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal
if !ctx.TryPrepareLabels(hasRelabeling) {
return nil
}
// use tenant info from data if it's a multi-tenant import.
atLocal := ctx.GetLocalAuthToken(at)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)

Some files were not shown because too many files have changed in this diff Show More