Compare commits

..

4 Commits

Author SHA1 Message Date
f41gh7
2e4afd249e make linter happy
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2026-06-16 10:25:35 +02:00
f41gh7
bac8eca299 lib/httputil: add load-balancing http transport
This commit adds http client round-robin load-balancing with DNS and
 SRV discovery. It allows http client to route HTTP requests evenly for each discovered IP
address for DNS record.

 Discovered IP addresses are cached locally for 5 seconds.

 This feature allows remove intermediate vmauth as a load-balancer between
 vmagent and remote storages. Which simplifies components management and
 reduces operational overhead.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2388
2026-06-15 08:59:21 +02:00
Andrii Chubatiuk
05903c8acd chore(lib/streamaggr): move increase and increase_prometheus outputs to a separate struct (#11093)
move increase and increase_prometheus outputs to a separate struct
to simplify the code. 

Before, increase, increase_prometheus, total_prometheus 
were implemented within one aggregator making it harder to use or update it.
2026-06-12 11:47:55 +02:00
Pablo (Tomas) Fernandez
a9fae230ae docs: Update "Multi Retention Setup within VictoriaMetrics Cluster" Guide (#11055)
This PR updates the [Multi Retention Setup within VictoriaMetrics
Cluster](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/)
guide.

Changes:
- Rewrote the introduction and added an overview
- Added step-by-step example setup for Kubernetes
- Expanded on alternative ways to route traffic

I've tested the configuration on a K3s cluster and it works, but I don't
know if the way I routed traffic into the retention tiers makes sense,
so any feedback is very much appreciated.

---------

Signed-off-by: Pablo (Tomas) Fernandez <46322567+TomFern@users.noreply.github.com>
Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2026-06-12 11:46:26 +02:00
16 changed files with 935 additions and 92 deletions

View File

@@ -52,14 +52,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
category: 'language:go'

View File

@@ -151,17 +151,23 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
tr.Proxy = http.ProxyURL(pu)
}
hc := &http.Client{
Transport: authCfg.NewRoundTripper(tr),
Timeout: sendTimeout.GetOptionalArg(argIdx),
}
rwURL, err := url.Parse(remoteWriteURL)
if err != nil {
logger.Fatalf("BUG: cannot parse already parsed -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
hc.Transport, rwURL = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
retryMaxIntervalFlag := retryMaxTime
if retryMaxInterval.String() != "" {
retryMaxIntervalFlag = retryMaxInterval
}
c := &client{
sanitizedURL: sanitizedURL,
remoteWriteURL: remoteWriteURL,
remoteWriteURL: rwURL.String(),
authCfg: authCfg,
awsCfg: awsCfg,
fq: fq,

View File

@@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -94,6 +95,12 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost
}
tr.IdleConnTimeout = *idleConnectionTimeout
hc := &http.Client{Transport: tr}
datasourceURL, err := url.Parse(*addr)
if err != nil {
logger.Fatalf("BUG: cannot parse already parsed -datasource.url=%q: %s", *addr, err)
}
hc.Transport, datasourceURL = httputil.NewLoadBalancerTransport(tr, datasourceURL)
if extraParams == nil {
extraParams = url.Values{}
@@ -120,9 +127,9 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
}
return &Client{
c: &http.Client{Transport: tr},
c: hc,
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(*addr, "/"),
datasourceURL: strings.TrimSuffix(datasourceURL.String(), "/"),
appendTypePrefix: *appendTypePrefix,
queryStep: *queryStep,
extraParams: extraParams,

View File

@@ -4,12 +4,14 @@ import (
"flag"
"fmt"
"net/http"
"net/url"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -76,7 +78,13 @@ func Init() (datasource.QuerierBuilder, error) {
return nil, fmt.Errorf("failed to create transport for -remoteRead.url=%q: %w", *addr, err)
}
tr.IdleConnTimeout = *idleConnectionTimeout
c := &http.Client{Transport: tr}
rrURL, err := url.Parse(*addr)
if err != nil {
logger.Fatalf("BUG: cannot parse already parsed -remoteRead.url=%q: %s", *addr, err)
}
c.Transport, rrURL = httputil.NewLoadBalancerTransport(tr, rrURL)
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {
return nil, fmt.Errorf("cannot parse JSON for -remoteRead.oauth2.endpointParams=%s: %w", *oauth2EndpointParams, err)
@@ -89,6 +97,5 @@ func Init() (datasource.QuerierBuilder, error) {
if err != nil {
return nil, fmt.Errorf("failed to configure auth: %w", err)
}
c := &http.Client{Transport: tr}
return datasource.NewPrometheusClient(*addr, authCfg, false, c), nil
return datasource.NewPrometheusClient(rrURL.String(), authCfg, false, c), nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"sync"
@@ -111,12 +112,18 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
hc := &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
}
rwURL, err := url.Parse(cfg.Addr)
if err != nil {
logger.Fatalf("cannot parse already parsed -remoteWrite.url=%q: %s", cfg.Addr, err)
}
hc.Transport, rwURL = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
c: hc,
addr: strings.TrimSuffix(rwURL.String(), "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,

View File

@@ -6,45 +6,348 @@ build:
sitemap:
disable: true
---
**Objective**
Setup Victoria Metrics Cluster with support of multiple retention periods within one installation.
> [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions for distinct sets of time series and tenants. If you are an Enterprise user, [configure multiple retentions directly through retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters) instead of following this guide.
**Enterprise Solution**
This guide explains how to set up multiple retentions using an [open-source VictoriaMetrics Cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/).
[VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions
for distinct sets of time series and [tenants](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
via [retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters).
## Overview
**Open Source Solution**
VictoriaMetrics retains metrics by default for **1 month**. You can change data retention with the [`-retentionPeriod` command-line flag](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention), but this value applies to **all time series stored** on a given `vmstorage` node and cannot be customized per tenant or per metric in the open source version.
Community version of VictoriaMetrics supports only one retention period per `vmstorage` node via [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) command-line flag.
The core idea of this guide is to run **separate logic groups of storages** (or even clusters) with individual `-retentionPeriod` settings, while still providing a single unified write and read path via vmagent and vmselect.
A multi-retention setup can be implemented by dividing a [victoriametrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) into logical groups with different retentions.
## Multi-Retention Architecture
Example:
Setup should handle 3 different retention groups 3months, 1year and 3 years.
Solution contains 3 groups of vmstorages + vminserts and one group of vmselects. Routing is done by [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
by [splitting data streams](https://docs.victoriametrics.com/victoriametrics/vmagent/#splitting-data-streams-among-multiple-systems).
The [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) sets how long to keep the metrics.
To support multiple retentions with the open source version of VictoriaMetrics cluster, you can split the cluster into several logical groups of storage nodes. Each group is configured with a different `-retentionPeriod` and receives only the data that must follow that retention.
The diagram below shows a proposed solution
Each storage group is connected to a separate vminsert, while a shared vmselect layer queries across all storage groups so that dashboards and alerts continue to see a single unified VictoriaMetrics backend.
![Setup](setup.webp)
**Implementation Details**
In the example used throughout this guide, the cluster is divided into three groups:
1. Groups of vminserts A know about only vmstorages A and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts B know about only vmstorages B and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts C know about only vmstorages C and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. vmselect reads data from all vmstorage nodes via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup)
with [deduplication](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#deduplication) setting equal to vmagent's scrape interval or minimum interval between collected samples.
1. vmagent routes incoming metrics to the given set of `vminsert` nodes using relabeling rules specified at `-remoteWrite.urlRelabelConfig` [configuration](https://docs.victoriametrics.com/victoriametrics/relabeling/).
- Group A: 3-month retention.
- Group B: 1-year retention.
- Group C: 3-year retention.
**Multi-Tenant Setup**
Metrics are routed to the appropriate vminsert group by splitting data streams in vmagent, so each time series is sent to exactly one retention group instead of being replicated to all groups. See [Deploying vmagent](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#step3) for an example of labelbased routing that implements this split. An optional [vmauth](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#additional-enhancements) layer can be added on top to restrict access to specific subclusters or tenants while still keeping a unified write and read path.
Every group of vmstorages can handle one tenant or multiple one. Different groups can have overlapping tenants. As vmselect reads from all vmstorage nodes, the data is aggregated on its level.
## Implementing Multi-Retention on Kubernetes
**Additional Enhancements**
In this section, we'll install and configure the components for a multi-retention deployment of the VictoriaMetrics cluster. See [Kubernetes monitoring with VictoriaMetrics Cluster](https://docs.victoriametrics.com/guides/k8s-monitoring-via-vm-cluster/) for details on running VictoriaMetrics in Kubernetes.
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) for routing data to the given vminsert group depending on the needed retention.
Run the following command to add the VictoriaMetrics Helm repository:
```shell
helm repo add vm https://victoriametrics.github.io/helm-charts/
helm repo update
```
### Step 1: Deploying storage groups {#step1}
We'll create three storage groups. Each has a different retention period and disk size. Read [Understand Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size/) to estimate how much space you will need for each group. The following table is shown as an example:
| Group | Retention Period | Total disk size |
|--------------|------------------|-----------------------|
| `vmcluster-a` | 3 months (`3M`) | 80 Gi |
| `vmcluster-b` | 1 year (`1Y`) | 300 Gi |
| `vmcluster-c` | 3 years (`3Y`) | 900 Gi |
Create a Helm values file for Group A.
```shell
cat <<EOF > vmcluster-a.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 80Gi
extraArgs:
retentionPeriod: 3M
podLabels:
retention-group: a
vminsert:
enabled: true
podLabels:
retention-group: a
vmselect:
enabled: false
EOF
```
The values file above creates vminsert and vmstorage services while turning off vmselect, which we'll deploy separately. The `retentionPeriod` flag configures how long data is kept in this group.
Create the values files for Group B and Group C:
```shell
cat <<EOF > vmcluster-b.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 300Gi
extraArgs:
retentionPeriod: 1y
podLabels:
retention-group: b
vminsert:
enabled: true
podLabels:
retention-group: b
vmselect:
enabled: false
EOF
cat <<EOF > vmcluster-c.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 900Gi
extraArgs:
retentionPeriod: 3y
podLabels:
retention-group: c
vminsert:
enabled: true
podLabels:
retention-group: c
vmselect:
enabled: false
EOF
```
Deploy the three storage groups with:
```shell
helm upgrade --install vmcluster-a vm/victoria-metrics-cluster -f vmcluster-a.yaml
helm upgrade --install vmcluster-b vm/victoria-metrics-cluster -f vmcluster-b.yaml
helm upgrade --install vmcluster-c vm/victoria-metrics-cluster -f vmcluster-c.yaml
# Wait for all storage pods to be ready
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-a
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-b
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-c
```
### Step 2: Deploying vmselect {#step2}
Next, we'll deploy a vmselect service to route queries to the storage groups.
Create a Helm values file with:
```shell
cat <<EOF >vmselect.yaml
vmstorage:
enabled: false
vminsert:
enabled: false
vmselect:
enabled: true
replicaCount: 1
suppressStorageFQDNsRender: true
extraArgs:
# Each list item is a single -storageNode flag. In this example, there is
# one vmstorage pod per retention group, so each entry contains a single host.
# If you run multiple pods per group, list them as comma-separated hosts
# in the same -storageNode value.
#
# The FQDN format is:
# <pod>.<svc>.default.svc
# where pod = <release>-victoria-metrics-cluster-vmstorage-<N>
# and svc = <release>-victoria-metrics-cluster-vmstorage
storageNode:
- "vmcluster-a-victoria-metrics-cluster-vmstorage-0.vmcluster-a-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-b-victoria-metrics-cluster-vmstorage-0.vmcluster-b-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-c-victoria-metrics-cluster-vmstorage-0.vmcluster-c-victoria-metrics-cluster-vmstorage.default.svc:8401"
EOF
```
Let's break down the file above:
- Deploys vmselect as a separate Helm release.
- Disables vminsert and vmstorage as these services were already deployed in Step 1.
- `suppressStorageFQDNsRender: true` turns off automatic FQDN generation for storage nodes. By default, the Helm chart auto-generates `-storageNodes` flags, but since `vmstorage` has been disabled, we need to supply them manually in `extraArgs`.
- In `extraArgs.storageNode:` we define the vmstorage endpoints for queries. On querying, vmselect merges results across all the specified vmstorages to provide a unified view of the data.
Deploy the `vmselect` release with:
```shell
helm upgrade --install vmselect vm/victoria-metrics-cluster -f vmselect.yaml
```
### Step 3: Deploying vmagent {#step3}
We'll use `vmagent` to route incoming metrics to the correct retention group. For example, we can use a `retention` label for mapping metrics to storage groups in the following way:
| `retention` label | Storage Group |
|-------------------|--------------|
| `"3mo"` | `vmcluster-a` |
| `"1yr"` | `vmcluster-b` |
| `"3yr"` | `vmcluster-c` |
Create the values file for vmagent:
```shell
cat <<EOF >vmagent.yaml
service:
enabled: true
remoteWrite:
# Group A: receives metrics with retention="3mo"
- url: http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3mo"}'
action: keep
# Group B: receives metrics with retention="1yr"
- url: http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="1yr"}'
action: keep
# Group C: receives metrics with retention="3yr"
- url: http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3yr"}'
action: keep
EOF
```
> Metrics without a matching `retention` label are silently dropped by the `keep` rules. You must ensure that every metric is labeled, or use a different routing configuration.
Now deploy the vmagent release:
```shell
helm upgrade --install vmagent vm/victoria-metrics-agent -f vmagent.yaml
```
Wait for vmagent to become ready:
```shell
kubectl rollout status deploy/vmagent-victoria-metrics-agent
```
### Step 4: Verification
We can send test data to verify that the data is flowing to the correct storage group.
First, port-forward vmagent and vmselect:
```shell
VMAGENT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMAGENT_SVC" 8429 &
VMSELECT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmselect -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMSELECT_SVC" 8481 &
```
Send test metrics directly to vmagent's HTTP endpoint to exercise all three retention labels:
```shell
POD=$(kubectl get pod -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
for retention in 3mo 1yr 3yr; do
kubectl exec "$POD" -- wget -qO- --post-data="test_routing{retention=\"${retention}\"} 1.0" \
"http://127.0.0.1:8429/api/v1/import/prometheus"
done
```
Query the data back from vmselect (it may take around 30-60 seconds for new data to be available for queries):
```shell
for retention in 3mo 1yr 3yr; do
echo "-> retention=${retention}"
curl -s "http://localhost:8481/select/0/prometheus/api/v1/query" \
--data-urlencode "query=test_routing{retention=\"${retention}\"}"
echo
done
```
You can also check that vmagent is forwarding data to all three groups:
```shell
curl -s http://localhost:8429/metrics | grep vmagent_remotewrite_blocks_sent_total
```
Each `url="N:secret-url"` corresponds to one `remoteWrite` entry (N=1 for Group A, N=2 for Group B, N=3 for Group C). Non-zero values confirm data is flowing.
## Alternative Routing by Existing Labels
The example setup above relies on a synthetic `retention` label to exist in every incoming metric.
If having a `retention` label in every metric isn't practical, you can, as an alternative, rely on existing labels to map data to the correct storage group.
The following example configures vmagent to route metrics based on the `environment` and `team` labels:
```yaml
# vmagent.yaml
remoteWrite:
# send dev and staging data to Group A
- url: "http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"dev|staging"}
action: keep
# send prod data to Group B
- url: "http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"prod|production"}
action: keep
# send data from Infra and SRE teams to Group C
- url: "http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {team=~"infra|sre"}
action: keep
```
> Metrics that do not match any of the `keep` rules are dropped in the configuration above.
## Additional Enhancements
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) to route data to the specified vminsert group based on the required retention or to restrict which data different users can query.
The following [`-auth.config`](https://docs.victoriametrics.com/victoriametrics/vmauth/#quick-start) example exposes the same vmselect backend via vmauth with two users using basic auth:
- `admin`: can query **all** data across all retention groups.
- `dev`: can query **only** time series that have `team="dev"` label, enforced via the `extra_label` query argument.
```yaml
users:
# User with access to all data across all retention groups
- username: "admin"
password: "foo"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# vmselect service that aggregates all vmstorage groups
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus"
# User restricted to Dev team data only
- username: "dev"
password: "bar"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# Same vmselect backend, but enforce label filter at query time
# by adding extra_label=team=dev to every proxied request
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus/?extra_label=team=dev"
```
This is useful for restricting access by team, environment, or tenant without changing the underlying storage topology.

View File

@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add client side round-robin load-balancing with `DNS` discovery. See [#2388](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2388) and these [vmagent DNS URLs](https://docs.victoriametrics.com/victoriametrics/vmagent/#dns-urls), [vmalert DNS URLs](https://docs.victoriametrics.com/victoriametrics/vmalert/#dns-urls).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -431,6 +431,43 @@ and `-remoteWrite.streamAggr.config`:
There is also the `-promscrape.configCheckInterval` command-line flag, which can be used to automatically reload configs from the updated `-promscrape.config` file.
## DNS URLs
If `vmagent` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
Each discovered IP address is used for round-robin balancing of write requests.
DNS URLs are supported in the following places:
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428/api/v1/write` is automatically resolved into
`-remoteWrite.url=http://192.168.1.15:8428/api/v1/write`.
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
which returns multiple IP addresses for a single hostname.
### DNS URLs and HTTPS
When a `dns+` URL uses the `https` scheme, `vmagent` connects to the discovered
IP addresses directly. This affects [TLS](https://en.wikipedia.org/wiki/Transport_Layer_Security)
in two ways:
* No [SNI](https://en.wikipedia.org/wiki/Server_Name_Indication) is sent in the TLS handshake,
since the connection target is an IP address rather than a hostname.
* The server certificate is verified against the IP address, so the verification fails
unless the certificate contains the corresponding
[IP SAN](https://en.wikipedia.org/wiki/Subject_Alternative_Name) entries.
To use `dns+` URLs with HTTPS, pass the original hostname via the `-remoteWrite.tlsServerName`
command-line flag. It is used both as SNI and as the name the server certificate
is verified against:
```sh
-remoteWrite.url=https://dns+victoria-metrics:8428/api/v1/write
-remoteWrite.tlsServerName=victoria-metrics
```
## SRV URLs
If `vmagent` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)
@@ -441,7 +478,7 @@ SRV URLs are supported in the following places:
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) record contains
`victoria-metrics-host:8428` TCP address, then `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` is automatically resolved into
`-remoteWrite.url=http://victoria-metrics-host:8428/api/v1/write`. If the DNS SRV record is resolved into multiple TCP addresses, then `vmagent`
uses a randomly chosen address for each connection it establishes to the remote storage.
performs per request round-robin load-balancing.
* In scrape target addresses aka `__address__` label. See [these docs](https://docs.victoriametrics.com/victoriametrics/relabeling/#how-to-modify-scrape-urls-in-targets) for details.

View File

@@ -1470,6 +1470,59 @@ alert_relabel_configs:
The configuration file can be [hot-reloaded](#hot-config-reload).
## DNS URLs
If `vmalert` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
Each discovered IP address is used for round-robin balancing of write requests.
DNS URLs are supported in the following places:
* In `-remoteWrite.url`, `-remoteRead.url` and `-datasource.url` command-line flags. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428` is automatically resolved into
`-remoteWrite.url=http://192.168.1.15:8428`.
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
which returns multiple IP addresses for a single hostname.
### DNS URLs and HTTPS
When a `dns+` URL uses the `https` scheme, `vmalert` connects to the discovered
IP addresses directly. No [SNI](https://en.wikipedia.org/wiki/Server_Name_Indication)
is sent in the TLS handshake, and the server certificate is verified against the IP address,
which fails unless the certificate contains the corresponding
[IP SAN](https://en.wikipedia.org/wiki/Subject_Alternative_Name) entries.
To use `dns+` URLs with HTTPS, pass the original hostname via the corresponding
`tlsServerName` command-line flag - `-datasource.tlsServerName`, `-remoteRead.tlsServerName`
or `-remoteWrite.tlsServerName`. It is used both as SNI and as the name the server
certificate is verified against:
```sh
-datasource.url=https://dns+victoria-metrics:8428
-datasource.tlsServerName=victoria-metrics
```
Alternatively, issue server certificates with IP SAN entries for every backend IP address.
Avoid `tlsInsecureSkipVerify` flags for working around this, since they disable
server certificate verification completely.
## SRV URLs
If `vmalert` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)
record into TCP address with hostname and TCP port, and then use the resulting URL when it needs to connect to it.
SRV URLs are supported in the following places:
* In `-remoteWrite.url`, `-remoteRead.url` and `-datasource.url` command-line flags. For example, if `victoria-metrics` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) record contains
`victoria-metrics-host:8085`, then `-remoteWrite.url=http://srv+victoria-metrics:8428` is automatically resolved into
`-remoteWrite.url=http://victoria-metrics-host:8085`. If the DNS SRV record is resolved into multiple TCP addresses, then `vmalert`
performs per request round-robin load-balancing.
SRV URLs are useful when HTTP services run on different TCP ports or when their TCP ports can change over time (for instance, after a restart).
## Contributing
`vmalert` is mostly designed and built by VictoriaMetrics community.

View File

@@ -0,0 +1,203 @@
package httputil
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/netip"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
// NewLoadBalancerTransport returns new RoundTripper that performs round-robin HTTP requests loadbalancing
// based on discovered backends for the given url host
// and update url with load-balancing prefix
//
// It returns origin transport and url if load-balancing is not needed for given url
func NewLoadBalancerTransport(origin http.RoundTripper, originURL *url.URL) (http.RoundTripper, *url.URL) {
modifiedURL := *originURL
var discoverFunc func(context.Context, string, string) ([]string, error)
switch {
case strings.HasPrefix(originURL.Host, "dns+"):
modifiedURL.Host = modifiedURL.Host[4:]
discoverFunc = discoverDNSBackends
case strings.HasPrefix(originURL.Host, "srv+"):
modifiedURL.Host = modifiedURL.Host[4:]
discoverFunc = discoverSRVBackends
default:
return origin, originURL
}
host, port, err := net.SplitHostPort(modifiedURL.Host)
if err != nil {
host = originURL.Host
port = "80"
if modifiedURL.Scheme == "https" {
port = "443"
}
}
t := &loadbalancerTransport{
tr: origin,
host: host,
port: port,
discoverFunc: discoverFunc,
}
t.discoverBackendsLocked(context.Background())
return t, &modifiedURL
}
type loadbalancerTransport struct {
tr http.RoundTripper
host string
port string
discoverFunc func(context.Context, string, string) ([]string, error)
// mu protects fields below
mu sync.Mutex
lastDiscoveredAt time.Time
dbs *discoveredBackends
}
type discoveredBackends struct {
backends []string
idx uint64
}
// RoundTrip implements http.RoundTripper interface
func (lb *loadbalancerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
backend := lb.pickBackend(r.Context(), false)
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
r2 := r.Clone(r.Context())
r2.URL.Host = backend
if r2.Host == "" {
r2.Host = r.URL.Host
}
resp, err := lb.tr.RoundTrip(r2)
if err != nil {
var dnsErr *net.DNSError
// perform a single retry for in case of trivial error or dns lookup error
if !netutil.IsTrivialNetworkError(err) && (errors.As(err, &dnsErr) && !dnsErr.IsNotFound) {
return nil, err
}
backend := lb.pickBackend(r.Context(), true)
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
// perform the same check for retry as http.Request.isReplayable does
canRetry := r.Body == nil || r.Body == http.NoBody || r.GetBody != nil
if !canRetry {
return nil, err
}
r2 = r.Clone(r.Context())
if r.GetBody != nil {
body, berr := r.GetBody()
if berr != nil {
return nil, err
}
r2.Body = body
}
if r2.Host == "" {
r2.Host = r.URL.Host
}
r2.URL.Host = backend
resp, err = lb.tr.RoundTrip(r2)
}
return resp, err
}
func (lb *loadbalancerTransport) pickBackend(ctx context.Context, forceDiscovery bool) string {
ct := time.Now()
lb.mu.Lock()
defer lb.mu.Unlock()
if forceDiscovery && !ct.Before(lb.lastDiscoveredAt) {
// prevent concurrent force discovery
lb.lastDiscoveredAt = time.Time{}
}
if lb.dbs == nil || ct.Sub(lb.lastDiscoveredAt) > 5*time.Second {
lb.discoverBackendsLocked(ctx)
}
if lb.dbs == nil || len(lb.dbs.backends) == 0 {
return ""
}
idx := lb.dbs.idx
lb.dbs.idx++
return lb.dbs.backends[idx%uint64(len(lb.dbs.backends))]
}
func (lb *loadbalancerTransport) discoverBackendsLocked(ctx context.Context) {
lb.lastDiscoveredAt = time.Now()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
backends, err := lb.discoverFunc(ctx, lb.host, lb.port)
if err != nil {
logger.Errorf("cannot discover backends: %s", err)
return
}
rand.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
dbs := &discoveredBackends{
backends: backends,
}
lb.dbs = dbs
}
func discoverDNSBackends(ctx context.Context, host, port string) ([]string, error) {
addrs, err := netutil.Resolver.LookupIPAddr(ctx, host)
if err != nil {
return nil, fmt.Errorf("failed to lookupIPAddr for host: %q: %w", host, err)
}
backends := make([]string, 0, len(addrs))
for _, addr := range addrs {
if !netutil.TCP6Enabled() {
ip, ok := netip.AddrFromSlice(addr.IP)
if !ok {
logger.Panicf("BUG: cannot build netip Addr from slice addr: %q", addr.IP.String())
}
if !ip.Unmap().Is4() {
continue
}
}
ip := addr.IP.String()
if len(port) > 0 {
ip = net.JoinHostPort(ip, port)
}
backends = append(backends, ip)
}
return backends, nil
}
func discoverSRVBackends(ctx context.Context, host, port string) ([]string, error) {
_, addrs, err := netutil.Resolver.LookupSRV(ctx, "", "", host)
if err != nil {
return nil, fmt.Errorf("failed to LookupSRV records for host: %q: %w", host, err)
}
backends := make([]string, 0, len(addrs))
for _, addr := range addrs {
hostPort := port
if addr.Port > 0 {
hostPort = strconv.FormatUint(uint64(addr.Port), 10)
}
backend := net.JoinHostPort(addr.Target, hostPort)
backends = append(backends, backend)
}
return backends, nil
}

View File

@@ -0,0 +1,133 @@
package httputil
import (
"context"
"fmt"
"net"
"net/http"
"net/netip"
"net/url"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
type testRemoteServer struct {
mu sync.Mutex
requestsPerHost map[string]int
totalRequests int
firstError error
}
func (trs *testRemoteServer) RoundTrip(r *http.Request) (*http.Response, error) {
trs.mu.Lock()
if trs.firstError != nil && trs.totalRequests == 0 {
err := trs.firstError
trs.firstError = nil
trs.totalRequests++
trs.mu.Unlock()
return nil, err
}
trs.totalRequests++
if trs.requestsPerHost == nil {
trs.requestsPerHost = make(map[string]int)
}
trs.requestsPerHost[r.URL.Host]++
trs.mu.Unlock()
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
}
type testDNSResolver struct {
ips []net.IPAddr
}
func (tdr *testDNSResolver) LookupSRV(_ context.Context, _, _, name string) (cname string, addrs []*net.SRV, err error) {
return "", nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func (tdr *testDNSResolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
return tdr.ips, nil
}
func (tdr *testDNSResolver) LookupMX(_ context.Context, name string) ([]*net.MX, error) {
return nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func TestLoadbalancerTransport(t *testing.T) {
f := func(discoveredIPs []string, trs *testRemoteServer) {
t.Helper()
parsedIPs := make([]net.IPAddr, 0, len(discoveredIPs))
for _, dIP := range discoveredIPs {
pIP, err := netip.ParseAddr(dIP)
if err != nil {
t.Fatalf("cannot parse IP=%q: %s", dIP, err)
}
parsedIPs = append(parsedIPs, net.IPAddr{IP: pIP.AsSlice()})
}
tdr := &testDNSResolver{ips: parsedIPs}
originResolver := netutil.Resolver
defer func() { netutil.Resolver = originResolver }()
netutil.Resolver = tdr
requestURL, err := url.Parse("http://dns+vmsingle.example.com:8429/api/v1/write")
if err != nil {
t.Fatalf("cannot parse url: %s", err)
}
lbt, requestURL := NewLoadBalancerTransport(trs, requestURL)
if len(discoveredIPs) == 0 {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
_, err = lbt.RoundTrip(r)
if err == nil {
t.Fatalf("expected no backends found error")
}
return
}
expectedRequestsPerHost := 2
for range len(discoveredIPs) * expectedRequestsPerHost {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
resp, err := lbt.RoundTrip(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resp.Body.Close()
}
requestsPerHost := trs.requestsPerHost
for _, dIP := range discoveredIPs {
expectedHostPort := net.JoinHostPort(dIP, "8429")
gotRequestsPerHost, ok := requestsPerHost[expectedHostPort]
if !ok {
t.Fatalf("not found expected backend request for: %q", expectedHostPort)
}
if gotRequestsPerHost != expectedRequestsPerHost {
t.Fatalf("unexpected requests per host: %d:%d (-;+)", expectedRequestsPerHost, gotRequestsPerHost)
}
}
}
trs := testRemoteServer{}
f([]string{"1.1.1.1"}, &trs)
trs = testRemoteServer{}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// retry dns resolve error
trs = testRemoteServer{
firstError: &net.DNSError{Err: "no such host", IsNotFound: true},
}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// empty backends, expecting error
trs = testRemoteServer{}
f([]string{}, &trs)
}

View File

@@ -14,16 +14,10 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
av.h.Update(sample.value)
}
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*histogramBucketAggrConfig)
shared := av.shared
if ac.useSharedState {
shared.Merge(&av.h)
av.h.Reset()
} else {
shared = &av.h
}
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
av.shared.Merge(&av.h)
av.h.Reset()
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
}
@@ -32,26 +26,17 @@ func (av *histogramBucketAggrValue) state() any {
return av.shared
}
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
return &histogramBucketAggrConfig{
useSharedState: useSharedState,
}
func newHistogramBucketAggrConfig() aggrConfig {
return &histogramBucketAggrConfig{}
}
type histogramBucketAggrConfig struct {
useSharedState bool
}
type histogramBucketAggrConfig struct{}
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
var shared *metrics.Histogram
if ac.useSharedState {
if s == nil {
shared = &metrics.Histogram{}
} else {
shared = s.(*metrics.Histogram)
}
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
if s == nil {
s = &metrics.Histogram{}
}
return &histogramBucketAggrValue{
shared: shared,
shared: s.(*metrics.Histogram),
}
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
type increaseAggrValue struct {
total *float64
shared map[string]increaseLastValue
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
if av.total == nil {
av.total = new(float64)
}
if ok {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if sample.value >= lv.value {
*av.total += sample.value - lv.value
} else {
// counter reset
*av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
*av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
for lk, lv := range av.shared {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(av.shared, lk)
}
}
if av.total == nil {
return
}
total := *av.total
av.total = nil
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared map[string]increaseLastValue
if s == nil {
shared = make(map[string]increaseLastValue)
} else {
shared = s.(map[string]increaseLastValue)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -75,6 +75,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
outputs = av.blue
}
for idx, o := range outputs {
if o == nil {
o = av.blue[idx]
}
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
}
av.deleteDeadline = deleteDeadline
@@ -112,6 +115,9 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
outputs = av.blue
}
for i, o := range outputs {
if o == nil {
o = av.blue[i]
}
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
}
av.mu.Unlock()

View File

@@ -609,7 +609,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -716,7 +716,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -760,11 +760,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "count_series":
return newCountSeriesAggrConfig(), nil
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
return newHistogramBucketAggrConfig(), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -53,36 +53,30 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
for lk, lv := range av.shared.lastValues {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
delete(av.shared.lastValues, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
av.shared.total = total
}
ctx.appendSeries(key, suffix, total)
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +84,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +109,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}