mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-12 21:34:04 +03:00
Compare commits
5 Commits
dependabot
...
change-sta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
017fbbfae2 | ||
|
|
63feb08ef1 | ||
|
|
79765dec07 | ||
|
|
05903c8acd | ||
|
|
a9fae230ae |
@@ -6,45 +6,348 @@ build:
|
||||
sitemap:
|
||||
disable: true
|
||||
---
|
||||
**Objective**
|
||||
|
||||
Setup Victoria Metrics Cluster with support of multiple retention periods within one installation.
|
||||
> [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions for distinct sets of time series and tenants. If you are an Enterprise user, [configure multiple retentions directly through retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters) instead of following this guide.
|
||||
|
||||
**Enterprise Solution**
|
||||
This guide explains how to set up multiple retentions using an [open-source VictoriaMetrics Cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/).
|
||||
|
||||
[VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions
|
||||
for distinct sets of time series and [tenants](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
|
||||
via [retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters).
|
||||
## Overview
|
||||
|
||||
**Open Source Solution**
|
||||
VictoriaMetrics retains metrics by default for **1 month**. You can change data retention with the [`-retentionPeriod` command-line flag](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention), but this value applies to **all time series stored** on a given `vmstorage` node and cannot be customized per tenant or per metric in the open source version.
|
||||
|
||||
Community version of VictoriaMetrics supports only one retention period per `vmstorage` node via [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) command-line flag.
|
||||
The core idea of this guide is to run **separate logic groups of storages** (or even clusters) with individual `-retentionPeriod` settings, while still providing a single unified write and read path via vmagent and vmselect.
|
||||
|
||||
A multi-retention setup can be implemented by dividing a [victoriametrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) into logical groups with different retentions.
|
||||
## Multi-Retention Architecture
|
||||
|
||||
Example:
|
||||
Setup should handle 3 different retention groups 3months, 1year and 3 years.
|
||||
Solution contains 3 groups of vmstorages + vminserts and one group of vmselects. Routing is done by [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
|
||||
by [splitting data streams](https://docs.victoriametrics.com/victoriametrics/vmagent/#splitting-data-streams-among-multiple-systems).
|
||||
The [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) sets how long to keep the metrics.
|
||||
To support multiple retentions with the open source version of VictoriaMetrics cluster, you can split the cluster into several logical groups of storage nodes. Each group is configured with a different `-retentionPeriod` and receives only the data that must follow that retention.
|
||||
|
||||
The diagram below shows a proposed solution
|
||||
Each storage group is connected to a separate vminsert, while a shared vmselect layer queries across all storage groups so that dashboards and alerts continue to see a single unified VictoriaMetrics backend.
|
||||
|
||||

|
||||
|
||||
**Implementation Details**
|
||||
In the example used throughout this guide, the cluster is divided into three groups:
|
||||
|
||||
1. Groups of vminserts A know about only vmstorages A and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
|
||||
1. Groups of vminserts B know about only vmstorages B and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
|
||||
1. Groups of vminserts C know about only vmstorages C and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
|
||||
1. vmselect reads data from all vmstorage nodes via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup)
|
||||
with [deduplication](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#deduplication) setting equal to vmagent's scrape interval or minimum interval between collected samples.
|
||||
1. vmagent routes incoming metrics to the given set of `vminsert` nodes using relabeling rules specified at `-remoteWrite.urlRelabelConfig` [configuration](https://docs.victoriametrics.com/victoriametrics/relabeling/).
|
||||
- Group A: 3-month retention.
|
||||
- Group B: 1-year retention.
|
||||
- Group C: 3-year retention.
|
||||
|
||||
**Multi-Tenant Setup**
|
||||
Metrics are routed to the appropriate vminsert group by splitting data streams in vmagent, so each time series is sent to exactly one retention group instead of being replicated to all groups. See [Deploying vmagent](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#step3) for an example of label‑based routing that implements this split. An optional [vmauth](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#additional-enhancements) layer can be added on top to restrict access to specific sub‑clusters or tenants while still keeping a unified write and read path.
|
||||
|
||||
Every group of vmstorages can handle one tenant or multiple one. Different groups can have overlapping tenants. As vmselect reads from all vmstorage nodes, the data is aggregated on its level.
|
||||
## Implementing Multi-Retention on Kubernetes
|
||||
|
||||
**Additional Enhancements**
|
||||
In this section, we'll install and configure the components for a multi-retention deployment of the VictoriaMetrics cluster. See [Kubernetes monitoring with VictoriaMetrics Cluster](https://docs.victoriametrics.com/guides/k8s-monitoring-via-vm-cluster/) for details on running VictoriaMetrics in Kubernetes.
|
||||
|
||||
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) for routing data to the given vminsert group depending on the needed retention.
|
||||
Run the following command to add the VictoriaMetrics Helm repository:
|
||||
|
||||
```shell
|
||||
helm repo add vm https://victoriametrics.github.io/helm-charts/
|
||||
helm repo update
|
||||
```
|
||||
|
||||
### Step 1: Deploying storage groups {#step1}
|
||||
|
||||
We'll create three storage groups. Each has a different retention period and disk size. Read [Understand Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size/) to estimate how much space you will need for each group. The following table is shown as an example:
|
||||
|
||||
|
||||
| Group | Retention Period | Total disk size |
|
||||
|--------------|------------------|-----------------------|
|
||||
| `vmcluster-a` | 3 months (`3M`) | 80 Gi |
|
||||
| `vmcluster-b` | 1 year (`1Y`) | 300 Gi |
|
||||
| `vmcluster-c` | 3 years (`3Y`) | 900 Gi |
|
||||
|
||||
Create a Helm values file for Group A.
|
||||
|
||||
```shell
|
||||
cat <<EOF > vmcluster-a.yaml
|
||||
vmstorage:
|
||||
enabled: true
|
||||
replicaCount: 1
|
||||
persistence:
|
||||
size: 80Gi
|
||||
extraArgs:
|
||||
retentionPeriod: 3M
|
||||
podLabels:
|
||||
retention-group: a
|
||||
|
||||
vminsert:
|
||||
enabled: true
|
||||
podLabels:
|
||||
retention-group: a
|
||||
|
||||
vmselect:
|
||||
enabled: false
|
||||
EOF
|
||||
```
|
||||
|
||||
The values file above creates vminsert and vmstorage services while turning off vmselect, which we'll deploy separately. The `retentionPeriod` flag configures how long data is kept in this group.
|
||||
|
||||
Create the values files for Group B and Group C:
|
||||
|
||||
```shell
|
||||
cat <<EOF > vmcluster-b.yaml
|
||||
vmstorage:
|
||||
enabled: true
|
||||
replicaCount: 1
|
||||
persistence:
|
||||
size: 300Gi
|
||||
extraArgs:
|
||||
retentionPeriod: 1y
|
||||
podLabels:
|
||||
retention-group: b
|
||||
|
||||
vminsert:
|
||||
enabled: true
|
||||
podLabels:
|
||||
retention-group: b
|
||||
|
||||
vmselect:
|
||||
enabled: false
|
||||
EOF
|
||||
|
||||
|
||||
cat <<EOF > vmcluster-c.yaml
|
||||
vmstorage:
|
||||
enabled: true
|
||||
replicaCount: 1
|
||||
persistence:
|
||||
size: 900Gi
|
||||
extraArgs:
|
||||
retentionPeriod: 3y
|
||||
podLabels:
|
||||
retention-group: c
|
||||
|
||||
vminsert:
|
||||
enabled: true
|
||||
podLabels:
|
||||
retention-group: c
|
||||
|
||||
vmselect:
|
||||
enabled: false
|
||||
EOF
|
||||
```
|
||||
|
||||
Deploy the three storage groups with:
|
||||
|
||||
```shell
|
||||
helm upgrade --install vmcluster-a vm/victoria-metrics-cluster -f vmcluster-a.yaml
|
||||
helm upgrade --install vmcluster-b vm/victoria-metrics-cluster -f vmcluster-b.yaml
|
||||
helm upgrade --install vmcluster-c vm/victoria-metrics-cluster -f vmcluster-c.yaml
|
||||
|
||||
# Wait for all storage pods to be ready
|
||||
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-a
|
||||
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-b
|
||||
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-c
|
||||
```
|
||||
|
||||
### Step 2: Deploying vmselect {#step2}
|
||||
|
||||
Next, we'll deploy a vmselect service to route queries to the storage groups.
|
||||
|
||||
Create a Helm values file with:
|
||||
|
||||
```shell
|
||||
cat <<EOF >vmselect.yaml
|
||||
vmstorage:
|
||||
enabled: false
|
||||
|
||||
vminsert:
|
||||
enabled: false
|
||||
|
||||
vmselect:
|
||||
enabled: true
|
||||
replicaCount: 1
|
||||
suppressStorageFQDNsRender: true
|
||||
extraArgs:
|
||||
# Each list item is a single -storageNode flag. In this example, there is
|
||||
# one vmstorage pod per retention group, so each entry contains a single host.
|
||||
# If you run multiple pods per group, list them as comma-separated hosts
|
||||
# in the same -storageNode value.
|
||||
#
|
||||
# The FQDN format is:
|
||||
# <pod>.<svc>.default.svc
|
||||
# where pod = <release>-victoria-metrics-cluster-vmstorage-<N>
|
||||
# and svc = <release>-victoria-metrics-cluster-vmstorage
|
||||
storageNode:
|
||||
- "vmcluster-a-victoria-metrics-cluster-vmstorage-0.vmcluster-a-victoria-metrics-cluster-vmstorage.default.svc:8401"
|
||||
- "vmcluster-b-victoria-metrics-cluster-vmstorage-0.vmcluster-b-victoria-metrics-cluster-vmstorage.default.svc:8401"
|
||||
- "vmcluster-c-victoria-metrics-cluster-vmstorage-0.vmcluster-c-victoria-metrics-cluster-vmstorage.default.svc:8401"
|
||||
EOF
|
||||
```
|
||||
|
||||
Let's break down the file above:
|
||||
|
||||
- Deploys vmselect as a separate Helm release.
|
||||
- Disables vminsert and vmstorage as these services were already deployed in Step 1.
|
||||
- `suppressStorageFQDNsRender: true` turns off automatic FQDN generation for storage nodes. By default, the Helm chart auto-generates `-storageNodes` flags, but since `vmstorage` has been disabled, we need to supply them manually in `extraArgs`.
|
||||
- In `extraArgs.storageNode:` we define the vmstorage endpoints for queries. On querying, vmselect merges results across all the specified vmstorages to provide a unified view of the data.
|
||||
|
||||
Deploy the `vmselect` release with:
|
||||
|
||||
```shell
|
||||
helm upgrade --install vmselect vm/victoria-metrics-cluster -f vmselect.yaml
|
||||
```
|
||||
|
||||
### Step 3: Deploying vmagent {#step3}
|
||||
|
||||
We'll use `vmagent` to route incoming metrics to the correct retention group. For example, we can use a `retention` label for mapping metrics to storage groups in the following way:
|
||||
|
||||
| `retention` label | Storage Group |
|
||||
|-------------------|--------------|
|
||||
| `"3mo"` | `vmcluster-a` |
|
||||
| `"1yr"` | `vmcluster-b` |
|
||||
| `"3yr"` | `vmcluster-c` |
|
||||
|
||||
|
||||
Create the values file for vmagent:
|
||||
|
||||
```shell
|
||||
cat <<EOF >vmagent.yaml
|
||||
service:
|
||||
enabled: true
|
||||
remoteWrite:
|
||||
# Group A: receives metrics with retention="3mo"
|
||||
- url: http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
|
||||
urlRelabelConfig:
|
||||
- if: '{retention="3mo"}'
|
||||
action: keep
|
||||
# Group B: receives metrics with retention="1yr"
|
||||
- url: http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
|
||||
urlRelabelConfig:
|
||||
- if: '{retention="1yr"}'
|
||||
action: keep
|
||||
# Group C: receives metrics with retention="3yr"
|
||||
- url: http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
|
||||
urlRelabelConfig:
|
||||
- if: '{retention="3yr"}'
|
||||
action: keep
|
||||
EOF
|
||||
```
|
||||
|
||||
> Metrics without a matching `retention` label are silently dropped by the `keep` rules. You must ensure that every metric is labeled, or use a different routing configuration.
|
||||
|
||||
Now deploy the vmagent release:
|
||||
|
||||
```shell
|
||||
helm upgrade --install vmagent vm/victoria-metrics-agent -f vmagent.yaml
|
||||
```
|
||||
|
||||
Wait for vmagent to become ready:
|
||||
|
||||
```shell
|
||||
kubectl rollout status deploy/vmagent-victoria-metrics-agent
|
||||
```
|
||||
|
||||
### Step 4: Verification
|
||||
|
||||
We can send test data to verify that the data is flowing to the correct storage group.
|
||||
|
||||
First, port-forward vmagent and vmselect:
|
||||
|
||||
```shell
|
||||
VMAGENT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
|
||||
kubectl port-forward "svc/$VMAGENT_SVC" 8429 &
|
||||
|
||||
VMSELECT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmselect -o jsonpath='{.items[0].metadata.name}')
|
||||
kubectl port-forward "svc/$VMSELECT_SVC" 8481 &
|
||||
```
|
||||
|
||||
Send test metrics directly to vmagent's HTTP endpoint to exercise all three retention labels:
|
||||
|
||||
```shell
|
||||
POD=$(kubectl get pod -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
|
||||
|
||||
for retention in 3mo 1yr 3yr; do
|
||||
kubectl exec "$POD" -- wget -qO- --post-data="test_routing{retention=\"${retention}\"} 1.0" \
|
||||
"http://127.0.0.1:8429/api/v1/import/prometheus"
|
||||
done
|
||||
```
|
||||
|
||||
Query the data back from vmselect (it may take around 30-60 seconds for new data to be available for queries):
|
||||
|
||||
```shell
|
||||
for retention in 3mo 1yr 3yr; do
|
||||
echo "-> retention=${retention}"
|
||||
curl -s "http://localhost:8481/select/0/prometheus/api/v1/query" \
|
||||
--data-urlencode "query=test_routing{retention=\"${retention}\"}"
|
||||
echo
|
||||
done
|
||||
```
|
||||
|
||||
You can also check that vmagent is forwarding data to all three groups:
|
||||
|
||||
```shell
|
||||
curl -s http://localhost:8429/metrics | grep vmagent_remotewrite_blocks_sent_total
|
||||
```
|
||||
|
||||
Each `url="N:secret-url"` corresponds to one `remoteWrite` entry (N=1 for Group A, N=2 for Group B, N=3 for Group C). Non-zero values confirm data is flowing.
|
||||
|
||||
## Alternative Routing by Existing Labels
|
||||
|
||||
The example setup above relies on a synthetic `retention` label to exist in every incoming metric.
|
||||
|
||||
If having a `retention` label in every metric isn't practical, you can, as an alternative, rely on existing labels to map data to the correct storage group.
|
||||
|
||||
The following example configures vmagent to route metrics based on the `environment` and `team` labels:
|
||||
|
||||
```yaml
|
||||
# vmagent.yaml
|
||||
remoteWrite:
|
||||
# send dev and staging data to Group A
|
||||
- url: "http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
|
||||
urlRelabelConfig:
|
||||
- if: {environment=~"dev|staging"}
|
||||
action: keep
|
||||
# send prod data to Group B
|
||||
- url: "http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
|
||||
urlRelabelConfig:
|
||||
- if: {environment=~"prod|production"}
|
||||
action: keep
|
||||
# send data from Infra and SRE teams to Group C
|
||||
- url: "http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
|
||||
urlRelabelConfig:
|
||||
- if: {team=~"infra|sre"}
|
||||
action: keep
|
||||
```
|
||||
|
||||
> Metrics that do not match any of the `keep` rules are dropped in the configuration above.
|
||||
|
||||
## Additional Enhancements
|
||||
|
||||
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) to route data to the specified vminsert group based on the required retention or to restrict which data different users can query.
|
||||
|
||||
The following [`-auth.config`](https://docs.victoriametrics.com/victoriametrics/vmauth/#quick-start) example exposes the same vmselect backend via vmauth with two users using basic auth:
|
||||
|
||||
- `admin`: can query **all** data across all retention groups.
|
||||
- `dev`: can query **only** time series that have `team="dev"` label, enforced via the `extra_label` query argument.
|
||||
|
||||
```yaml
|
||||
users:
|
||||
# User with access to all data across all retention groups
|
||||
- username: "admin"
|
||||
password: "foo"
|
||||
url_map:
|
||||
- src_paths:
|
||||
- "/api/v1/query"
|
||||
- "/api/v1/query_range"
|
||||
- "/api/v1/series"
|
||||
- "/api/v1/labels"
|
||||
- "/api/v1/label/.+/values"
|
||||
# vmselect service that aggregates all vmstorage groups
|
||||
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus"
|
||||
|
||||
# User restricted to Dev team data only
|
||||
- username: "dev"
|
||||
password: "bar"
|
||||
url_map:
|
||||
- src_paths:
|
||||
- "/api/v1/query"
|
||||
- "/api/v1/query_range"
|
||||
- "/api/v1/series"
|
||||
- "/api/v1/labels"
|
||||
- "/api/v1/label/.+/values"
|
||||
# Same vmselect backend, but enforce label filter at query time
|
||||
# by adding extra_label=team=dev to every proxied request
|
||||
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus/?extra_label=team=dev"
|
||||
```
|
||||
|
||||
This is useful for restricting access by team, environment, or tenant without changing the underlying storage topology.
|
||||
|
||||
@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
## tip
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
|
||||
@@ -681,7 +681,6 @@ If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, produce
|
||||
|
||||
The following outputs track the last seen per-series values in order to properly calculate output values:
|
||||
|
||||
- [histogram_bucket](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#histogram_bucket)
|
||||
- [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase)
|
||||
- [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus)
|
||||
- [rate_avg](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#rate_avg)
|
||||
@@ -703,7 +702,7 @@ These issues can be fixed in the following ways:
|
||||
- By increasing the `interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
|
||||
delays in data ingestion pipelines.
|
||||
- By specifying the `staleness_interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
|
||||
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `2 x interval`.
|
||||
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `interval`.
|
||||
|
||||
## High resource usage
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ specified individually per each `-remoteWrite.url`:
|
||||
# - total_prometheus
|
||||
# See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness for more details.
|
||||
#
|
||||
# staleness_interval: 2m
|
||||
# staleness_interval: 1m
|
||||
|
||||
# ignore_first_sample_interval specifies the interval after which the agent begins sending samples.
|
||||
# By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
|
||||
@@ -291,9 +291,6 @@ The results of `histogram_bucket` is equal to the following [MetricsQL](https://
|
||||
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
|
||||
```
|
||||
|
||||
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
|
||||
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) option.
|
||||
|
||||
See also:
|
||||
- [quantiles](#quantiles)
|
||||
- [avg](#avg)
|
||||
|
||||
@@ -14,16 +14,10 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
|
||||
av.h.Update(sample.value)
|
||||
}
|
||||
|
||||
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
|
||||
ac := c.(*histogramBucketAggrConfig)
|
||||
shared := av.shared
|
||||
if ac.useSharedState {
|
||||
shared.Merge(&av.h)
|
||||
av.h.Reset()
|
||||
} else {
|
||||
shared = &av.h
|
||||
}
|
||||
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
|
||||
av.shared.Merge(&av.h)
|
||||
av.h.Reset()
|
||||
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
|
||||
})
|
||||
}
|
||||
@@ -32,26 +26,17 @@ func (av *histogramBucketAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
|
||||
return &histogramBucketAggrConfig{
|
||||
useSharedState: useSharedState,
|
||||
}
|
||||
func newHistogramBucketAggrConfig() aggrConfig {
|
||||
return &histogramBucketAggrConfig{}
|
||||
}
|
||||
|
||||
type histogramBucketAggrConfig struct {
|
||||
useSharedState bool
|
||||
}
|
||||
type histogramBucketAggrConfig struct{}
|
||||
|
||||
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
|
||||
var shared *metrics.Histogram
|
||||
if ac.useSharedState {
|
||||
if s == nil {
|
||||
shared = &metrics.Histogram{}
|
||||
} else {
|
||||
shared = s.(*metrics.Histogram)
|
||||
}
|
||||
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
|
||||
if s == nil {
|
||||
s = &metrics.Histogram{}
|
||||
}
|
||||
return &histogramBucketAggrValue{
|
||||
shared: shared,
|
||||
shared: s.(*metrics.Histogram),
|
||||
}
|
||||
}
|
||||
|
||||
113
lib/streamaggr/increase.go
Normal file
113
lib/streamaggr/increase.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
type increaseLastValue struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type increaseAggrConfig struct {
|
||||
keepFirstSample bool
|
||||
|
||||
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
||||
// This allows avoiding an initial spike of the output values at startup when new time series
|
||||
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
type increaseAggrValue struct {
|
||||
total *float64
|
||||
shared map[string]increaseLastValue
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
if av.total == nil {
|
||||
av.total = new(float64)
|
||||
}
|
||||
ac := c.(*increaseAggrConfig)
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
||||
lv, ok := av.shared[key]
|
||||
// The last value is stale, reset it.
|
||||
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
|
||||
ok = false
|
||||
}
|
||||
if ok {
|
||||
if sample.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
return
|
||||
}
|
||||
if sample.value >= lv.value {
|
||||
*av.total += sample.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
*av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else if keepFirstSample {
|
||||
*av.total += sample.value
|
||||
}
|
||||
lv.value = sample.value
|
||||
lv.timestamp = sample.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
key = bytesutil.InternString(key)
|
||||
av.shared[key] = lv
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*increaseAggrConfig)
|
||||
for lk, lv := range av.shared {
|
||||
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||
delete(av.shared, lk)
|
||||
}
|
||||
}
|
||||
if av.total == nil {
|
||||
return
|
||||
}
|
||||
total := *av.total
|
||||
av.total = nil
|
||||
ctx.appendSeries(key, ac.getSuffix(), total)
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &increaseAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return cfg
|
||||
}
|
||||
|
||||
func (*increaseAggrConfig) getValue(s any) aggrValue {
|
||||
var shared map[string]increaseLastValue
|
||||
if s == nil {
|
||||
shared = make(map[string]increaseLastValue)
|
||||
} else {
|
||||
shared = s.(map[string]increaseLastValue)
|
||||
}
|
||||
return &increaseAggrValue{
|
||||
shared: shared,
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *increaseAggrConfig) getSuffix() string {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
@@ -75,6 +75,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
|
||||
outputs = av.blue
|
||||
}
|
||||
for idx, o := range outputs {
|
||||
if o == nil {
|
||||
o = av.blue[idx]
|
||||
}
|
||||
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
|
||||
}
|
||||
av.deleteDeadline = deleteDeadline
|
||||
@@ -112,6 +115,9 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
|
||||
outputs = av.blue
|
||||
}
|
||||
for i, o := range outputs {
|
||||
if o == nil {
|
||||
o = av.blue[i]
|
||||
}
|
||||
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
|
||||
}
|
||||
av.mu.Unlock()
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
var rateAggrSharedValuePool sync.Pool
|
||||
@@ -99,6 +100,12 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
|
||||
ac := c.(*rateAggrConfig)
|
||||
var state *rateAggrStateValue
|
||||
sv, ok := av.shared[key]
|
||||
// The last value is stale, reset it.
|
||||
if ok && sv.deleteDeadline < int64(fasttime.UnixTimestamp())*1000 {
|
||||
delete(av.shared, key)
|
||||
putRateAggrSharedValue(sv)
|
||||
ok = false
|
||||
}
|
||||
if ok {
|
||||
state = sv.getState(av.isGreen)
|
||||
if sample.timestamp < state.timestamp {
|
||||
|
||||
@@ -172,12 +172,12 @@ type Config struct {
|
||||
DedupInterval string `yaml:"dedup_interval,omitempty"`
|
||||
|
||||
// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
|
||||
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
|
||||
// The parameter is only relevant for outputs: total, total_prometheus, increase and increase_prometheus.
|
||||
StalenessInterval string `yaml:"staleness_interval,omitempty"`
|
||||
|
||||
// IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.
|
||||
// By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
|
||||
// This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket.
|
||||
// This parameter is relevant only for the following outputs: total, total_prometheus, increase and increase_prometheus.
|
||||
IgnoreFirstSampleInterval string `yaml:"ignore_first_sample_interval,omitempty"`
|
||||
|
||||
// Outputs is a list of output aggregate functions to produce.
|
||||
@@ -501,8 +501,9 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return nil, fmt.Errorf("interval=%s must be a multiple of dedup_interval=%s", interval, dedupInterval)
|
||||
}
|
||||
|
||||
// check cfg.StalenessInterval
|
||||
stalenessInterval := interval * 2
|
||||
// set the default staleness interval as the aggregation interval, to be consistent with query lookbehind window in metricsQL,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102
|
||||
stalenessInterval := interval
|
||||
if cfg.StalenessInterval != "" {
|
||||
stalenessInterval, err = time.ParseDuration(cfg.StalenessInterval)
|
||||
if err != nil {
|
||||
@@ -609,7 +610,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
|
||||
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -716,7 +717,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
|
||||
// check for duplicated output
|
||||
if _, ok := outputsSeen[output]; ok {
|
||||
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
||||
@@ -760,11 +761,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "count_series":
|
||||
return newCountSeriesAggrConfig(), nil
|
||||
case "histogram_bucket":
|
||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||
return newHistogramBucketAggrConfig(), nil
|
||||
case "increase":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "increase_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "last":
|
||||
return newLastAggrConfig(), nil
|
||||
case "max":
|
||||
@@ -782,9 +783,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "sum_samples":
|
||||
return newSumSamplesAggrConfig(), nil
|
||||
case "total":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "total_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "unique_samples":
|
||||
return newUniqueSamplesAggrConfig(), nil
|
||||
default:
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
//go:build synctest
|
||||
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
@@ -475,6 +477,78 @@ foo:1m_increase_prometheus{baz="qwe"} 15
|
||||
outputs: [increase_prometheus]
|
||||
`, "11111111")
|
||||
|
||||
// increase, increase_prometheus, total, total_prometheus outputs with different staleness intervals
|
||||
f([]string{`
|
||||
foo 5
|
||||
bar 200
|
||||
`, `
|
||||
foo 10
|
||||
bar 201
|
||||
`, ``, `
|
||||
foo 7
|
||||
bar 205
|
||||
`}, time.Minute, `bar:1m_increase 200
|
||||
bar:1m_increase 1
|
||||
bar:1m_increase 205
|
||||
bar:1m_increase_prometheus 0
|
||||
bar:1m_increase_prometheus 1
|
||||
bar:1m_increase_prometheus 0
|
||||
bar:1m_total 200
|
||||
bar:1m_total 201
|
||||
bar:1m_total 205
|
||||
bar:1m_total_prometheus 0
|
||||
bar:1m_total_prometheus 1
|
||||
bar:1m_total_prometheus 0
|
||||
bar:1m_without_non_existing_label_increase 0
|
||||
bar:1m_without_non_existing_label_increase 1
|
||||
bar:1m_without_non_existing_label_increase 4
|
||||
bar:1m_without_non_existing_label_increase_prometheus 0
|
||||
bar:1m_without_non_existing_label_increase_prometheus 1
|
||||
bar:1m_without_non_existing_label_increase_prometheus 4
|
||||
bar:1m_without_non_existing_label_total 0
|
||||
bar:1m_without_non_existing_label_total 1
|
||||
bar:1m_without_non_existing_label_total 1
|
||||
bar:1m_without_non_existing_label_total 5
|
||||
bar:1m_without_non_existing_label_total_prometheus 0
|
||||
bar:1m_without_non_existing_label_total_prometheus 1
|
||||
bar:1m_without_non_existing_label_total_prometheus 1
|
||||
bar:1m_without_non_existing_label_total_prometheus 5
|
||||
foo:1m_increase 5
|
||||
foo:1m_increase 5
|
||||
foo:1m_increase 7
|
||||
foo:1m_increase_prometheus 0
|
||||
foo:1m_increase_prometheus 5
|
||||
foo:1m_increase_prometheus 0
|
||||
foo:1m_total 5
|
||||
foo:1m_total 10
|
||||
foo:1m_total 7
|
||||
foo:1m_total_prometheus 0
|
||||
foo:1m_total_prometheus 5
|
||||
foo:1m_total_prometheus 0
|
||||
foo:1m_without_non_existing_label_increase 0
|
||||
foo:1m_without_non_existing_label_increase 5
|
||||
foo:1m_without_non_existing_label_increase 7
|
||||
foo:1m_without_non_existing_label_increase_prometheus 0
|
||||
foo:1m_without_non_existing_label_increase_prometheus 5
|
||||
foo:1m_without_non_existing_label_increase_prometheus 7
|
||||
foo:1m_without_non_existing_label_total 0
|
||||
foo:1m_without_non_existing_label_total 5
|
||||
foo:1m_without_non_existing_label_total 5
|
||||
foo:1m_without_non_existing_label_total 12
|
||||
foo:1m_without_non_existing_label_total_prometheus 0
|
||||
foo:1m_without_non_existing_label_total_prometheus 5
|
||||
foo:1m_without_non_existing_label_total_prometheus 5
|
||||
foo:1m_without_non_existing_label_total_prometheus 12
|
||||
`, `
|
||||
- interval: 1m
|
||||
ignore_first_sample_interval: 0s
|
||||
outputs: [increase, increase_prometheus, total, total_prometheus]
|
||||
- interval: 1m
|
||||
staleness_interval: 2m
|
||||
without: [non_existing_label]
|
||||
outputs: [increase, increase_prometheus, total, total_prometheus]
|
||||
`, "111111")
|
||||
|
||||
// multiple aggregate configs
|
||||
f([]string{`
|
||||
foo 1
|
||||
@@ -482,9 +556,7 @@ foo{bar="baz"} 2
|
||||
foo 3.3
|
||||
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
|
||||
foo:1m_count_series{bar="baz"} 1
|
||||
foo:1m_sum_samples 0
|
||||
foo:1m_sum_samples 4.3
|
||||
foo:1m_sum_samples{bar="baz"} 0
|
||||
foo:1m_sum_samples{bar="baz"} 2
|
||||
foo:5m_by_bar_sum_samples 4.3
|
||||
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
||||
@@ -688,30 +760,39 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
|
||||
outputs: [rate_sum, rate_avg]
|
||||
`, "11111")
|
||||
|
||||
// test rate_sum and rate_avg, when two aggregation intervals are empty
|
||||
// test rate_sum and rate_avg with different staleness intervals
|
||||
f([]string{`
|
||||
foo{abc="123", cde="1"} 1
|
||||
foo{abc="123", cde="1"} 2 1
|
||||
foo{abc="456", cde="1"} 7
|
||||
foo{abc="456", cde="1"} 8 1
|
||||
foo{abc="777", cde="1"} 8
|
||||
foo{abc="777", cde="1"} 9 1
|
||||
`, ``, ``, `
|
||||
foo{abc="123", cde="1"} 19
|
||||
foo{abc="123", cde="1"} 20 1
|
||||
foo{abc="456", cde="1"} 26
|
||||
foo{abc="456", cde="1"} 27 1
|
||||
foo{abc="777", cde="1"} 27
|
||||
foo{abc="777", cde="1"} 28 1
|
||||
foo{abc="456", cde="1"} 3
|
||||
foo{abc="456", cde="1"} 4 1
|
||||
foo{abc="777", cde="1"} 5
|
||||
foo{abc="777", cde="1"} 6 1
|
||||
`, ``, `
|
||||
foo{abc="123", cde="1"} 121
|
||||
foo{abc="123", cde="1"} 122 1
|
||||
foo{abc="456", cde="1"} 123
|
||||
foo{abc="456", cde="1"} 124 1
|
||||
foo{abc="777", cde="1"} 125
|
||||
foo{abc="777", cde="1"} 126 1
|
||||
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
|
||||
foo:1m_by_cde_rate_avg{cde="1"} 1
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 3
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 3
|
||||
foo:1m_without_abc_rate_avg{cde="1"} 1
|
||||
foo:1m_without_abc_rate_avg{cde="1"} 1
|
||||
foo:1m_without_abc_rate_sum{cde="1"} 3
|
||||
foo:1m_without_abc_rate_sum{cde="1"} 3
|
||||
`, `
|
||||
- interval: 1m
|
||||
by: [cde]
|
||||
outputs: [rate_sum, rate_avg]
|
||||
enable_windows: true
|
||||
- interval: 1m
|
||||
staleness_interval: 2m
|
||||
without: [abc]
|
||||
outputs: [rate_sum, rate_avg]
|
||||
enable_windows: true
|
||||
`, "111111111111")
|
||||
|
||||
// rate_sum and rate_avg with duplicated events
|
||||
|
||||
@@ -252,11 +252,15 @@ func TestAggregatorsEqual(t *testing.T) {
|
||||
}
|
||||
|
||||
func timeSeriessToString(tss []prompb.TimeSeries) string {
|
||||
a := make([]string, len(tss))
|
||||
for i, ts := range tss {
|
||||
sorted := make([]prompb.TimeSeries, len(tss))
|
||||
copy(sorted, tss)
|
||||
sort.SliceStable(sorted, func(i, j int) bool {
|
||||
return promrelabel.LabelsToString(sorted[i].Labels) < promrelabel.LabelsToString(sorted[j].Labels)
|
||||
})
|
||||
a := make([]string, len(sorted))
|
||||
for i, ts := range sorted {
|
||||
a[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return strings.Join(a, "")
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,11 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
||||
lv, ok := av.shared.lastValues[key]
|
||||
if ok || keepFirstSample {
|
||||
// The last value is stale, reset it.
|
||||
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
|
||||
ok = false
|
||||
}
|
||||
if ok {
|
||||
if sample.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
return
|
||||
@@ -43,6 +47,8 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else if keepFirstSample {
|
||||
av.total += sample.value
|
||||
}
|
||||
lv.value = sample.value
|
||||
lv.timestamp = sample.timestamp
|
||||
@@ -53,36 +59,30 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
|
||||
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*totalAggrConfig)
|
||||
suffix := ac.getSuffix()
|
||||
// check for stale entries
|
||||
total := av.shared.total + av.total
|
||||
av.total = 0
|
||||
lvs := av.shared.lastValues
|
||||
for lk, lv := range lvs {
|
||||
for lk, lv := range av.shared.lastValues {
|
||||
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||
delete(lvs, lk)
|
||||
delete(av.shared.lastValues, lk)
|
||||
}
|
||||
}
|
||||
if ac.resetTotalOnFlush {
|
||||
av.shared.total = 0
|
||||
} else if math.Abs(total) >= (1 << 53) {
|
||||
if math.Abs(total) >= (1 << 53) {
|
||||
// It is time to reset the entry, since it starts losing float64 precision
|
||||
av.shared.total = 0
|
||||
} else {
|
||||
av.shared.total = total
|
||||
}
|
||||
ctx.appendSeries(key, suffix, total)
|
||||
ctx.appendSeries(key, ac.getSuffix(), total)
|
||||
}
|
||||
|
||||
func (av *totalAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &totalAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
@@ -90,8 +90,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
|
||||
}
|
||||
|
||||
type totalAggrConfig struct {
|
||||
resetTotalOnFlush bool
|
||||
|
||||
// Whether to take into account the first sample in new time series when calculating the output value.
|
||||
keepFirstSample bool
|
||||
|
||||
@@ -117,12 +115,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
|
||||
}
|
||||
|
||||
func (ac *totalAggrConfig) getSuffix() string {
|
||||
if ac.resetTotalOnFlush {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
if ac.keepFirstSample {
|
||||
return "total"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user