Compare commits

...

3 Commits

Author SHA1 Message Date
hagen1778
291bf16bbf app/vmselect: log calls to /api/v1/admin/tsdb/delete_series
The log message will display when deletion API was called, how many series
it deleted and what params were used. This should help identifying events
of metrics deletion.

Example:
```
2026-06-12T13:02:28.006Z        info    VictoriaMetrics/app/vmselect/prometheus/prometheus.go:529       /api/v1/admin/tsdb/delete_series has been called for "[{__name__=\"vm_http_request_errors_total\"}]". Deleted 0 series.
```

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2026-06-12 15:03:46 +02:00
Andrii Chubatiuk
05903c8acd chore(lib/streamaggr): move increase and increase_prometheus outputs to a separate struct (#11093)
move increase and increase_prometheus outputs to a separate struct
to simplify the code. 

Before, increase, increase_prometheus, total_prometheus 
were implemented within one aggregator making it harder to use or update it.
2026-06-12 11:47:55 +02:00
Pablo (Tomas) Fernandez
a9fae230ae docs: Update "Multi Retention Setup within VictoriaMetrics Cluster" Guide (#11055)
This PR updates the [Multi Retention Setup within VictoriaMetrics
Cluster](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/)
guide.

Changes:
- Rewrote the introduction and added an overview
- Added step-by-step example setup for Kubernetes
- Expanded on alternative ways to route traffic

I've tested the configuration on a K3s cluster and it works, but I don't
know if the way I routed traffic into the retention tiers makes sense,
so any feedback is very much appreciated.

---------

Signed-off-by: Pablo (Tomas) Fernandez <46322567+TomFern@users.noreply.github.com>
Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2026-06-12 11:46:26 +02:00
9 changed files with 482 additions and 82 deletions

View File

@@ -28,6 +28,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -525,6 +526,7 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
if deletedCount > 0 {
promql.ResetRollupResultCache()
}
logger.Infof("/api/v1/admin/tsdb/delete_series has been called for %q. Deleted %d series.", sq.FiltersString(), deletedCount)
return nil
}

View File

@@ -6,45 +6,348 @@ build:
sitemap:
disable: true
---
**Objective**
Setup Victoria Metrics Cluster with support of multiple retention periods within one installation.
> [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions for distinct sets of time series and tenants. If you are an Enterprise user, [configure multiple retentions directly through retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters) instead of following this guide.
**Enterprise Solution**
This guide explains how to set up multiple retentions using an [open-source VictoriaMetrics Cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/).
[VictoriaMetrics Enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports specifying multiple retentions
for distinct sets of time series and [tenants](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy)
via [retention filters](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters).
## Overview
**Open Source Solution**
VictoriaMetrics retains metrics by default for **1 month**. You can change data retention with the [`-retentionPeriod` command-line flag](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention), but this value applies to **all time series stored** on a given `vmstorage` node and cannot be customized per tenant or per metric in the open source version.
Community version of VictoriaMetrics supports only one retention period per `vmstorage` node via [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) command-line flag.
The core idea of this guide is to run **separate logic groups of storages** (or even clusters) with individual `-retentionPeriod` settings, while still providing a single unified write and read path via vmagent and vmselect.
A multi-retention setup can be implemented by dividing a [victoriametrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) into logical groups with different retentions.
## Multi-Retention Architecture
Example:
Setup should handle 3 different retention groups 3months, 1year and 3 years.
Solution contains 3 groups of vmstorages + vminserts and one group of vmselects. Routing is done by [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
by [splitting data streams](https://docs.victoriametrics.com/victoriametrics/vmagent/#splitting-data-streams-among-multiple-systems).
The [-retentionPeriod](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) sets how long to keep the metrics.
To support multiple retentions with the open source version of VictoriaMetrics cluster, you can split the cluster into several logical groups of storage nodes. Each group is configured with a different `-retentionPeriod` and receives only the data that must follow that retention.
The diagram below shows a proposed solution
Each storage group is connected to a separate vminsert, while a shared vmselect layer queries across all storage groups so that dashboards and alerts continue to see a single unified VictoriaMetrics backend.
![Setup](setup.webp)
**Implementation Details**
In the example used throughout this guide, the cluster is divided into three groups:
1. Groups of vminserts A know about only vmstorages A and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts B know about only vmstorages B and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. Groups of vminserts C know about only vmstorages C and this is explicitly specified via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup).
1. vmselect reads data from all vmstorage nodes via `-storageNode` [configuration](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#cluster-setup)
with [deduplication](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#deduplication) setting equal to vmagent's scrape interval or minimum interval between collected samples.
1. vmagent routes incoming metrics to the given set of `vminsert` nodes using relabeling rules specified at `-remoteWrite.urlRelabelConfig` [configuration](https://docs.victoriametrics.com/victoriametrics/relabeling/).
- Group A: 3-month retention.
- Group B: 1-year retention.
- Group C: 3-year retention.
**Multi-Tenant Setup**
Metrics are routed to the appropriate vminsert group by splitting data streams in vmagent, so each time series is sent to exactly one retention group instead of being replicated to all groups. See [Deploying vmagent](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#step3) for an example of labelbased routing that implements this split. An optional [vmauth](https://docs.victoriametrics.com/guides/guide-vmcluster-multiple-retention-setup/#additional-enhancements) layer can be added on top to restrict access to specific subclusters or tenants while still keeping a unified write and read path.
Every group of vmstorages can handle one tenant or multiple one. Different groups can have overlapping tenants. As vmselect reads from all vmstorage nodes, the data is aggregated on its level.
## Implementing Multi-Retention on Kubernetes
**Additional Enhancements**
In this section, we'll install and configure the components for a multi-retention deployment of the VictoriaMetrics cluster. See [Kubernetes monitoring with VictoriaMetrics Cluster](https://docs.victoriametrics.com/guides/k8s-monitoring-via-vm-cluster/) for details on running VictoriaMetrics in Kubernetes.
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) for routing data to the given vminsert group depending on the needed retention.
Run the following command to add the VictoriaMetrics Helm repository:
```shell
helm repo add vm https://victoriametrics.github.io/helm-charts/
helm repo update
```
### Step 1: Deploying storage groups {#step1}
We'll create three storage groups. Each has a different retention period and disk size. Read [Understand Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size/) to estimate how much space you will need for each group. The following table is shown as an example:
| Group | Retention Period | Total disk size |
|--------------|------------------|-----------------------|
| `vmcluster-a` | 3 months (`3M`) | 80 Gi |
| `vmcluster-b` | 1 year (`1Y`) | 300 Gi |
| `vmcluster-c` | 3 years (`3Y`) | 900 Gi |
Create a Helm values file for Group A.
```shell
cat <<EOF > vmcluster-a.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 80Gi
extraArgs:
retentionPeriod: 3M
podLabels:
retention-group: a
vminsert:
enabled: true
podLabels:
retention-group: a
vmselect:
enabled: false
EOF
```
The values file above creates vminsert and vmstorage services while turning off vmselect, which we'll deploy separately. The `retentionPeriod` flag configures how long data is kept in this group.
Create the values files for Group B and Group C:
```shell
cat <<EOF > vmcluster-b.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 300Gi
extraArgs:
retentionPeriod: 1y
podLabels:
retention-group: b
vminsert:
enabled: true
podLabels:
retention-group: b
vmselect:
enabled: false
EOF
cat <<EOF > vmcluster-c.yaml
vmstorage:
enabled: true
replicaCount: 1
persistence:
size: 900Gi
extraArgs:
retentionPeriod: 3y
podLabels:
retention-group: c
vminsert:
enabled: true
podLabels:
retention-group: c
vmselect:
enabled: false
EOF
```
Deploy the three storage groups with:
```shell
helm upgrade --install vmcluster-a vm/victoria-metrics-cluster -f vmcluster-a.yaml
helm upgrade --install vmcluster-b vm/victoria-metrics-cluster -f vmcluster-b.yaml
helm upgrade --install vmcluster-c vm/victoria-metrics-cluster -f vmcluster-c.yaml
# Wait for all storage pods to be ready
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-a
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-b
kubectl rollout status statefulset -l app.kubernetes.io/instance=vmcluster-c
```
### Step 2: Deploying vmselect {#step2}
Next, we'll deploy a vmselect service to route queries to the storage groups.
Create a Helm values file with:
```shell
cat <<EOF >vmselect.yaml
vmstorage:
enabled: false
vminsert:
enabled: false
vmselect:
enabled: true
replicaCount: 1
suppressStorageFQDNsRender: true
extraArgs:
# Each list item is a single -storageNode flag. In this example, there is
# one vmstorage pod per retention group, so each entry contains a single host.
# If you run multiple pods per group, list them as comma-separated hosts
# in the same -storageNode value.
#
# The FQDN format is:
# <pod>.<svc>.default.svc
# where pod = <release>-victoria-metrics-cluster-vmstorage-<N>
# and svc = <release>-victoria-metrics-cluster-vmstorage
storageNode:
- "vmcluster-a-victoria-metrics-cluster-vmstorage-0.vmcluster-a-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-b-victoria-metrics-cluster-vmstorage-0.vmcluster-b-victoria-metrics-cluster-vmstorage.default.svc:8401"
- "vmcluster-c-victoria-metrics-cluster-vmstorage-0.vmcluster-c-victoria-metrics-cluster-vmstorage.default.svc:8401"
EOF
```
Let's break down the file above:
- Deploys vmselect as a separate Helm release.
- Disables vminsert and vmstorage as these services were already deployed in Step 1.
- `suppressStorageFQDNsRender: true` turns off automatic FQDN generation for storage nodes. By default, the Helm chart auto-generates `-storageNodes` flags, but since `vmstorage` has been disabled, we need to supply them manually in `extraArgs`.
- In `extraArgs.storageNode:` we define the vmstorage endpoints for queries. On querying, vmselect merges results across all the specified vmstorages to provide a unified view of the data.
Deploy the `vmselect` release with:
```shell
helm upgrade --install vmselect vm/victoria-metrics-cluster -f vmselect.yaml
```
### Step 3: Deploying vmagent {#step3}
We'll use `vmagent` to route incoming metrics to the correct retention group. For example, we can use a `retention` label for mapping metrics to storage groups in the following way:
| `retention` label | Storage Group |
|-------------------|--------------|
| `"3mo"` | `vmcluster-a` |
| `"1yr"` | `vmcluster-b` |
| `"3yr"` | `vmcluster-c` |
Create the values file for vmagent:
```shell
cat <<EOF >vmagent.yaml
service:
enabled: true
remoteWrite:
# Group A: receives metrics with retention="3mo"
- url: http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3mo"}'
action: keep
# Group B: receives metrics with retention="1yr"
- url: http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="1yr"}'
action: keep
# Group C: receives metrics with retention="3yr"
- url: http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write
urlRelabelConfig:
- if: '{retention="3yr"}'
action: keep
EOF
```
> Metrics without a matching `retention` label are silently dropped by the `keep` rules. You must ensure that every metric is labeled, or use a different routing configuration.
Now deploy the vmagent release:
```shell
helm upgrade --install vmagent vm/victoria-metrics-agent -f vmagent.yaml
```
Wait for vmagent to become ready:
```shell
kubectl rollout status deploy/vmagent-victoria-metrics-agent
```
### Step 4: Verification
We can send test data to verify that the data is flowing to the correct storage group.
First, port-forward vmagent and vmselect:
```shell
VMAGENT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMAGENT_SVC" 8429 &
VMSELECT_SVC=$(kubectl get svc -l app.kubernetes.io/instance=vmselect -o jsonpath='{.items[0].metadata.name}')
kubectl port-forward "svc/$VMSELECT_SVC" 8481 &
```
Send test metrics directly to vmagent's HTTP endpoint to exercise all three retention labels:
```shell
POD=$(kubectl get pod -l app.kubernetes.io/instance=vmagent -o jsonpath='{.items[0].metadata.name}')
for retention in 3mo 1yr 3yr; do
kubectl exec "$POD" -- wget -qO- --post-data="test_routing{retention=\"${retention}\"} 1.0" \
"http://127.0.0.1:8429/api/v1/import/prometheus"
done
```
Query the data back from vmselect (it may take around 30-60 seconds for new data to be available for queries):
```shell
for retention in 3mo 1yr 3yr; do
echo "-> retention=${retention}"
curl -s "http://localhost:8481/select/0/prometheus/api/v1/query" \
--data-urlencode "query=test_routing{retention=\"${retention}\"}"
echo
done
```
You can also check that vmagent is forwarding data to all three groups:
```shell
curl -s http://localhost:8429/metrics | grep vmagent_remotewrite_blocks_sent_total
```
Each `url="N:secret-url"` corresponds to one `remoteWrite` entry (N=1 for Group A, N=2 for Group B, N=3 for Group C). Non-zero values confirm data is flowing.
## Alternative Routing by Existing Labels
The example setup above relies on a synthetic `retention` label to exist in every incoming metric.
If having a `retention` label in every metric isn't practical, you can, as an alternative, rely on existing labels to map data to the correct storage group.
The following example configures vmagent to route metrics based on the `environment` and `team` labels:
```yaml
# vmagent.yaml
remoteWrite:
# send dev and staging data to Group A
- url: "http://vmcluster-a-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"dev|staging"}
action: keep
# send prod data to Group B
- url: "http://vmcluster-b-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {environment=~"prod|production"}
action: keep
# send data from Infra and SRE teams to Group C
- url: "http://vmcluster-c-victoria-metrics-cluster-vminsert:8480/insert/0/prometheus/api/v1/write"
urlRelabelConfig:
- if: {team=~"infra|sre"}
action: keep
```
> Metrics that do not match any of the `keep` rules are dropped in the configuration above.
## Additional Enhancements
You can set up [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) to route data to the specified vminsert group based on the required retention or to restrict which data different users can query.
The following [`-auth.config`](https://docs.victoriametrics.com/victoriametrics/vmauth/#quick-start) example exposes the same vmselect backend via vmauth with two users using basic auth:
- `admin`: can query **all** data across all retention groups.
- `dev`: can query **only** time series that have `team="dev"` label, enforced via the `extra_label` query argument.
```yaml
users:
# User with access to all data across all retention groups
- username: "admin"
password: "foo"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# vmselect service that aggregates all vmstorage groups
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus"
# User restricted to Dev team data only
- username: "dev"
password: "bar"
url_map:
- src_paths:
- "/api/v1/query"
- "/api/v1/query_range"
- "/api/v1/series"
- "/api/v1/labels"
- "/api/v1/label/.+/values"
# Same vmselect backend, but enforce label filter at query time
# by adding extra_label=team=dev to every proxied request
url_prefix: "http://vmselect-victoria-metrics-cluster-vmselect:8481/select/0/prometheus/?extra_label=team=dev"
```
This is useful for restricting access by team, environment, or tenant without changing the underlying storage topology.

View File

@@ -26,6 +26,9 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -468,15 +468,21 @@ func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
return src, nil
}
// String returns string representation of the search query.
// String returns string representation of the search query: tag filters and time range.
func (sq *SearchQuery) String() string {
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
a := sq.FiltersString()
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
}
// FiltersString returns string representation of the tag filters.
func (sq *SearchQuery) FiltersString() []string {
a := make([]string, len(sq.TagFilterss))
for i, tfs := range sq.TagFilterss {
a[i] = tagFiltersToString(tfs)
}
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
return a
}
func tagFiltersToString(tfs []TagFilter) string {

View File

@@ -14,16 +14,10 @@ func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample,
av.h.Update(sample.value)
}
func (av *histogramBucketAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*histogramBucketAggrConfig)
shared := av.shared
if ac.useSharedState {
shared.Merge(&av.h)
av.h.Reset()
} else {
shared = &av.h
}
shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
func (av *histogramBucketAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
av.shared.Merge(&av.h)
av.h.Reset()
av.shared.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", float64(count), "vmrange", vmrange)
})
}
@@ -32,26 +26,17 @@ func (av *histogramBucketAggrValue) state() any {
return av.shared
}
func newHistogramBucketAggrConfig(useSharedState bool) aggrConfig {
return &histogramBucketAggrConfig{
useSharedState: useSharedState,
}
func newHistogramBucketAggrConfig() aggrConfig {
return &histogramBucketAggrConfig{}
}
type histogramBucketAggrConfig struct {
useSharedState bool
}
type histogramBucketAggrConfig struct{}
func (ac *histogramBucketAggrConfig) getValue(s any) aggrValue {
var shared *metrics.Histogram
if ac.useSharedState {
if s == nil {
shared = &metrics.Histogram{}
} else {
shared = s.(*metrics.Histogram)
}
func (*histogramBucketAggrConfig) getValue(s any) aggrValue {
if s == nil {
s = &metrics.Histogram{}
}
return &histogramBucketAggrValue{
shared: shared,
shared: s.(*metrics.Histogram),
}
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
type increaseAggrValue struct {
total *float64
shared map[string]increaseLastValue
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
if av.total == nil {
av.total = new(float64)
}
if ok {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if sample.value >= lv.value {
*av.total += sample.value - lv.value
} else {
// counter reset
*av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
*av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
for lk, lv := range av.shared {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(av.shared, lk)
}
}
if av.total == nil {
return
}
total := *av.total
av.total = nil
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared map[string]increaseLastValue
if s == nil {
shared = make(map[string]increaseLastValue)
} else {
shared = s.(map[string]increaseLastValue)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -75,6 +75,9 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
outputs = av.blue
}
for idx, o := range outputs {
if o == nil {
o = av.blue[idx]
}
o.pushSample(ao.configs[idx], sample, inputKey, deleteDeadline)
}
av.deleteDeadline = deleteDeadline
@@ -112,6 +115,9 @@ func (ao *aggrOutputs) flushState(ctx *flushCtx) {
outputs = av.blue
}
for i, o := range outputs {
if o == nil {
o = av.blue[i]
}
o.flush(ao.configs[i], ctx, outputKey, ctx.isLast)
}
av.mu.Unlock()

View File

@@ -609,7 +609,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -716,7 +716,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -760,11 +760,11 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "count_series":
return newCountSeriesAggrConfig(), nil
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
return newHistogramBucketAggrConfig(), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -53,36 +53,30 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
for lk, lv := range av.shared.lastValues {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
delete(av.shared.lastValues, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
av.shared.total = total
}
ctx.appendSeries(key, suffix, total)
ctx.appendSeries(key, ac.getSuffix(), total)
}
func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +84,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +109,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}