Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
03f37b97fc build(deps): bump github/codeql-action from 4.35.3 to 4.36.0
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 4.35.3 to 4.36.0.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](e46ed2cbd0...7211b7c807)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 4.36.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-12 04:14:40 +00:00
15 changed files with 124 additions and 517 deletions

View File

@@ -52,14 +52,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/init@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/autobuild@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/analyze@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
with:
category: 'language:go'

View File

@@ -6201,7 +6201,7 @@
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6282,14 +6282,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dropped samples ($instance)",
"title": "Ignored samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -6200,7 +6200,7 @@
"type": "prometheus",
"uid": "$ds"
},
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6281,14 +6281,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod)",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dropped samples ($instance)",
"title": "Ignored samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -6,348 +6,45 @@ build:
sitemap:
disable: true
---
**Objective**
> [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions for distinct sets of time series and tenants. If you are an Enterprise user, [configure multiple retentions directly through retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters) instead of following this guide.
Setup Victoria Metrics Cluster with support of multiple retention periods within one installation.
This guide explains how to set up multiple retentions using an [open-source VictoriaMetrics Cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/).
**Enterprise Solution**
## Overview
[VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions
for distinct sets of time series and [tenants](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
via [retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters).
VictoriaMetrics retains metrics by default for **1 month**. You can change data retention with the [`-retentionPeriod` command-line flag](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention), but this value applies to **all time series stored** on a given `vmstorage` node and cannot be customized per tenant or per metric in the open source version.
**Open Source Solution**
The core idea of this guide is to run **separate logic groups of storages** (or even clusters) with individual `-retentionPeriod` settings, while still providing a single unified write and read path via vmagent and vmselect.
Community version of VictoriaMetrics supports only one retention period per `vmstorage` node via [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) command-line flag.
## Multi-Retention Architecture
A multi-retention setup can be implemented by dividing a [victoriametrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) into logical groups with different retentions.
To support multiple retentions with the open source version of VictoriaMetrics cluster, you can split the cluster into several logical groups of storage nodes. Each group is configured with a different `-retentionPeriod` and receives only the data that must follow that retention.
Example:
Setup should handle 3 different retention groups 3months, 1year and 3 years.
Solution contains 3 groups of vmstorages + vminserts and one group of vmselects. Routing is done by [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
by [splitting data streams](https://docs.victoriametrics.com/victoriametrics/vmagent/#splitting-data-streams-among-multiple-systems).
The [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) sets how long to keep the metrics.
Each storage group is connected to a separate vminsert, while a shared vmselect layer queries across all storage groups so that dashboards and alerts continue to see a single unified VictoriaMetrics backend.
The diagram below shows a proposed solution
![Setup](setup.webp)
In the example used throughout this guide, the cluster is divided into three groups:
**Implementation Details**
- Group A: 3-month retention.
- Group B: 1-year retention.
- Group C: 3-year retention.
1. Groups of vminserts A know about only vmstorages A and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts B know about only vmstorages B and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts C know about only vmstorages C and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. vmselect reads data from all vmstorage nodes via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup)
with [deduplication](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#deduplication) setting equal to vmagent's scrape interval or minimum interval between collected samples.
1. vmagent routes incoming metrics to the given set of `vminsert` nodes using relabeling rules specified at `-remoteWrite.urlRelabelConfig` [configuration](https://docs.victoriametrics.com/victoriametrics/relabeling/).
Metrics are routed to the appropriate vminsert group by splitting data streams in vmagent, so each time series is sent to exactly one retention group instead of being replicated to all groups. See [Deploying vmagent](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#step3) for an example of labelbased routing that implements this split. An optional [vmauth](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#additional-enhancements) layer can be added on top to restrict access to specific subclusters or tenants while still keeping a unified write and read path.
**Multi-Tenant Setup**
## Implementing Multi-Retention on Kubernetes
Every group of vmstorages can handle one tenant or multiple one. Different groups can have overlapping tenants. As vmselect reads from all vmstorage nodes, the data is aggregated on its level.
In this section, we'll install and configure the components for a multi-retention deployment of the VictoriaMetrics cluster. See [Kubernetes monitoring with VictoriaMetrics Cluster](https://docs.victoriametrics.com/guides/k8s-monitoring-via-vm-cluster/) for details on running VictoriaMetrics in Kubernetes.
**Additional Enhancements**
Run the following command to add the VictoriaMetrics Helm repository:
```shell
helm repo add vm https://victoriametrics.github.io/helm-charts/
helm repo update
```
### Step 1: Deploying storage groups {#step1}
We'll create three storage groups. Each has a different retention period and disk size. Read [Understand Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size/) to estimate how much space you will need for each group. The following table is shown as an example:
| Group | Retention Period | Total disk size |
|--------------|------------------|-----------------------|
| `vmcluster-a` | 3 months (`3M`) | 80 Gi |
| `vmcluster-b` | 1 year (`1Y`) | 300 Gi |
| `vmcluster-c` | 3 years (`3Y`) | 900 Gi |
Create a Helm values file for Group A.
```shell
cat <<EOF > vmcluster-a.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 80Gi
extraArgs:
retentionPeriod: 3M
podLabels:
retention-group: a
vminsert:
enabled: true
podLabels:
retention-group: a
vmselect:
enabled: false
EOF
```
The values file above creates vminsert and vmstorage services while turning off vmselect, which we'll deploy separately. The `retentionPeriod` flag configures how long data is kept in this group.
Create the values files for Group B and Group C:
```shell
cat <<EOF > vmcluster-b.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 300Gi
extraArgs:
retentionPeriod: 1y
podLabels:
retention-group: b
vminsert:
enabled: true
podLabels:
retention-group: b
vmselect:
enabled: false
EOF
cat <<EOF > vmcluster-c.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 900Gi
extraArgs:
retentionPeriod: 3y
podLabels:
retention-group: c
vminsert:
enabled: true
podLabels:
retention-group: c
vmselect:
enabled: false
EOF
```
Deploy the three storage groups with:
```shell
helm upgrade --install vmcluster-a vm/victoria-metrics-cluster -f vmcluster-a.yaml
helm upgrade --install vmcluster-b vm/victoria-metrics-cluster -f vmcluster-b.yaml
helm upgrade --install vmcluster-c vm/victoria-metrics-cluster -f vmcluster-c.yaml
# Wait for all storage pods to be ready
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-a
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-b
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-c
```
### Step 2: Deploying vmselect {#step2}
Next, we'll deploy a vmselect service to route queries to the storage groups.
Create a Helm values file with:
```shell
cat <<EOF >vmselect.yaml
vmstorage:
enabled: false
vminsert:
enabled: false
vmselect:
enabled: true
replicaCount: 1
suppressStorageFQDNsRender: true
extraArgs:
# Each list item is a single -storageNode flag. In this example, there is
# one vmstorage pod per retention group, so each entry contains a single host.
# If you run multiple pods per group, list them as comma-separated hosts
# in the same -storageNode value.
#
# The FQDN format is:
# <pod>.<svc>.default.svc
# where pod = <release>-victoria-metrics-cluster-vmstorage-<N>
# and svc = <release>-victoria-metrics-cluster-vmstorage
storageNode:
- "vmcluster-a-victoria-metrics-cluster-vmstorage-0.vmcluster-a-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-b-victoria-metrics-cluster-vmstorage-0.vmcluster-b-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-c-victoria-metrics-cluster-vmstorage-0.vmcluster-c-victoria-metrics-cluster-vmstorage.default.svc:8401"
EOF
```
Let's break down the file above:
- Deploys vmselect as a separate Helm release.
- Disables vminsert and vmstorage as these services were already deployed in Step 1.
- `suppressStorageFQDNsRender: true` turns off automatic FQDN generation for storage nodes. By default, the Helm chart auto-generates `-storageNodes` flags, but since `vmstorage` has been disabled, we need to supply them manually in `extraArgs`.
- In `extraArgs.storageNode:` we define the vmstorage endpoints for queries. On querying, vmselect merges results across all the specified vmstorages to provide a unified view of the data.
Deploy the `vmselect` release with:
```shell
helm upgrade --install vmselect vm/victoria-metrics-cluster -f vmselect.yaml
```
### Step 3: Deploying vmagent {#step3}
We'll use `vmagent` to route incoming metrics to the correct retention group. For example, we can use a `retention` label for mapping metrics to storage groups in the following way:
| `retention` label | Storage Group |
|-------------------|--------------|
| `"3mo"` | `vmcluster-a` |
| `"1yr"` | `vmcluster-b` |
| `"3yr"` | `vmcluster-c` |
Create the values file for vmagent:
```shell
cat <<EOF >vmagent.yaml
service:
enabled: true
remoteWrite:
# Group A: receives metrics with retention="3mo"
- url: http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3mo"}'
action: keep
# Group B: receives metrics with retention="1yr"
- url: http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="1yr"}'
action: keep
# Group C: receives metrics with retention="3yr"
- url: http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3yr"}'
action: keep
EOF
```
> Metrics without a matching `retention` label are silently dropped by the `keep` rules. You must ensure that every metric is labeled, or use a different routing configuration.
Now deploy the vmagent release:
```shell
helm upgrade --install vmagent vm/victoria-metrics-agent -f vmagent.yaml
```
Wait for vmagent to become ready:
```shell
kubectl rollout status deploy/vmagent-victoria-metrics-agent
```
### Step 4: Verification
We can send test data to verify that the data is flowing to the correct storage group.
First, port-forward vmagent and vmselect:
```shell
VMAGENT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMAGENT_SVC" 8429 &
VMSELECT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmselect -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMSELECT_SVC" 8481 &
```
Send test metrics directly to vmagent's HTTP endpoint to exercise all three retention labels:
```shell
POD=$(kubectl get pod -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
for retention in 3mo 1yr 3yr; do
kubectl exec "$POD" -- wget -qO- --post-data="test_routing{retention=\"${retention}\"} 1.0" \
"http://127.0.0.1:8429/api/v1/import/prometheus"
done
```
Query the data back from vmselect (it may take around 30-60 seconds for new data to be available for queries):
```shell
for retention in 3mo 1yr 3yr; do
echo "-> retention=${retention}"
curl -s "http://localhost:8481/select/0/prometheus/api/v1/query" \
--data-urlencode "query=test_routing{retention=\"${retention}\"}"
echo
done
```
You can also check that vmagent is forwarding data to all three groups:
```shell
curl -s http://localhost:8429/metrics | grep vmagent_remotewrite_blocks_sent_total
```
Each `url="N:secret-url"` corresponds to one `remoteWrite` entry (N=1 for Group A, N=2 for Group B, N=3 for Group C). Non-zero values confirm data is flowing.
## Alternative Routing by Existing Labels
The example setup above relies on a synthetic `retention` label to exist in every incoming metric.
If having a `retention` label in every metric isn't practical, you can, as an alternative, rely on existing labels to map data to the correct storage group.
The following example configures vmagent to route metrics based on the `environment` and `team` labels:
```yaml
# vmagent.yaml
remoteWrite:
# send dev and staging data to Group A
- url: "http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"dev|staging"}
action: keep
# send prod data to Group B
- url: "http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"prod|production"}
action: keep
# send data from Infra and SRE teams to Group C
- url: "http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {team=~"infra|sre"}
action: keep
```
> Metrics that do not match any of the `keep` rules are dropped in the configuration above.
## Additional Enhancements
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) to route data to the specified vminsert group based on the required retention or to restrict which data different users can query.
The following [`-auth.config`](https://docs.victoriametrics.com/victoriametrics/vmauth/#quick-start) example exposes the same vmselect backend via vmauth with two users using basic auth:
- `admin`: can query **all** data across all retention groups.
- `dev`: can query **only** time series that have `team="dev"` label, enforced via the `extra_label` query argument.
```yaml
users:
# User with access to all data across all retention groups
- username: "admin"
password: "foo"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# vmselect service that aggregates all vmstorage groups
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus"
# User restricted to Dev team data only
- username: "dev"
password: "bar"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# Same vmselect backend, but enforce label filter at query time
# by adding extra_label=team=dev to every proxied request
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus/?extra_label=team=dev"
```
This is useful for restricting access by team, environment, or tenant without changing the underlying storage topology.
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) for routing data to the given vminsert group depending on the needed retention.

View File

@@ -26,8 +26,6 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -396,7 +396,7 @@ before sending them to the configured `-remoteWrite.url`. The deduplication can
Labels can be dropped before deduplication is applied. See [these docs](#dropping-unneeded-labels).
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation. The dropped old samples can be tracked with the `vm_streamaggr_dedup_dropped_samples_total` metric.
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation.
# Relabeling
@@ -444,9 +444,7 @@ outside the current [aggregation interval](https://docs.victoriametrics.com/vict
- To enable [aggregation windows](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows).
- To enable [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` and `vm_streamaggr_dedup_dropped_samples_total` metrics.
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` metric.
## Ignore aggregation intervals on start

View File

@@ -1,7 +1,6 @@
package streamaggr
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
@@ -18,10 +17,9 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
droppedSamples *metrics.Counter
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
}
type dedupAggrShard struct {
@@ -49,20 +47,10 @@ type dedupAggrSample struct {
timestamp int64
}
func newDedupAggr(ms *metrics.Set, metricLabels string) *dedupAggr {
var d dedupAggr
d.shards = make([]dedupAggrShard, dedupAggrShardsCount)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.itemsCount())
})
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.droppedSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_dropped_samples_total{%s}`, metricLabels))
return &d
func newDedupAggr() *dedupAggr {
return &dedupAggr{
shards: make([]dedupAggrShard, dedupAggrShardsCount),
}
}
func (da *dedupAggr) sizeBytes() uint64 {
@@ -99,8 +87,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
if len(shardSamples) == 0 {
continue
}
deduplicatedSamples := da.shards[i].pushSamples(shardSamples, isGreen)
da.droppedSamples.Add(deduplicatedSamples)
da.shards[i].pushSamples(shardSamples, isGreen)
}
putPerShardSamples(pss)
}
@@ -180,9 +167,8 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
var state *dedupAggrState
var deduplicatedSamples int
if isGreen {
state = &das.green
@@ -212,10 +198,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
deduplicatedSamples++
}
state.samplesBuf = samplesBuf
return deduplicatedSamples
}
// deduplicateSamples returns deduplicated timestamp and value results.

View File

@@ -7,13 +7,11 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample)
@@ -61,7 +59,7 @@ func TestDedupAggrSerial(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
var wg sync.WaitGroup
for range concurrency {

View File

@@ -5,8 +5,6 @@ import (
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -25,7 +23,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
b.ResetTimer()
b.ReportAllocs()

View File

@@ -44,6 +44,7 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
interval: interval,
enableWindows: enableWindows,
@@ -63,7 +64,16 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
d.da = newDedupAggr(ms, metricLabels)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)
@@ -110,7 +120,6 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
d.da.droppedSamples.Inc()
continue
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{

View File

@@ -14,10 +14,16 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
av.h.Update(sample.value)
}
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
av.shared.Merge(&av.h)
av.h.Reset()
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*histogramBucketAggrConfig)
shared := av.shared
if ac.useSharedState {
shared.Merge(&av.h)
av.h.Reset()
} else {
shared = &av.h
}
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
}
@@ -26,17 +32,26 @@ func (av *histogramBucketAggrValue) state() any {
return av.shared
}
func newHistogramBucketAggrConfig() aggrConfig {
return &histogramBucketAggrConfig{}
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
return &histogramBucketAggrConfig{
useSharedState: useSharedState,
}
}
type histogramBucketAggrConfig struct{}
type histogramBucketAggrConfig struct {
useSharedState bool
}
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
if s == nil {
s = &metrics.Histogram{}
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
var shared *metrics.Histogram
if ac.useSharedState {
if s == nil {
shared = &metrics.Histogram{}
} else {
shared = s.(*metrics.Histogram)
}
}
return &histogramBucketAggrValue{
shared: s.(*metrics.Histogram),
shared: shared,
}
}

View File

@@ -1,109 +0,0 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
type increaseAggrValue struct {
total *float64
shared map[string]increaseLastValue
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
if av.total == nil {
av.total = new(float64)
}
if ok {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if sample.value >= lv.value {
*av.total += sample.value - lv.value
} else {
// counter reset
*av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
*av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
for lk, lv := range av.shared {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(av.shared, lk)
}
}
if av.total == nil {
return
}
total := *av.total
av.total = nil
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared map[string]increaseLastValue
if s == nil {
shared = make(map[string]increaseLastValue)
} else {
shared = s.(map[string]increaseLastValue)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -75,9 +75,6 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
outputs = av.blue
}
for idx, o := range outputs {
if o == nil {
o = av.blue[idx]
}
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
}
av.deleteDeadline = deleteDeadline
@@ -115,9 +112,6 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
outputs = av.blue
}
for i, o := range outputs {
if o == nil {
o = av.blue[i]
}
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
}
av.mu.Unlock()

View File

@@ -609,7 +609,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -668,7 +668,18 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
if dedupInterval > 0 {
a.da = newDedupAggr(ms, metricLabels)
a.da = newDedupAggr()
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount()
return float64(n)
})
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
@@ -705,7 +716,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -749,11 +760,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "count_series":
return newCountSeriesAggrConfig(), nil
case "histogram_bucket":
return newHistogramBucketAggrConfig(), nil
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
case "increase_prometheus":
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -771,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -53,30 +53,36 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
for lk, lv := range av.shared.lastValues {
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(av.shared.lastValues, lk)
delete(lvs, lk)
}
}
if math.Abs(total) >= (1 << 53) {
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
av.shared.total = total
}
ctx.appendSeries(key, ac.getSuffix(), total)
ctx.appendSeries(key, suffix, total)
}
func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -84,6 +90,8 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -109,6 +117,12 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}