Compare commits

...

18 Commits

Author SHA1 Message Date
Dmytro Kozlov
d631d2c100 cloud: update email images for cloud (#8024)
### Describe Your Changes

Updated email images with new support email

### Checklist

The following checks are **mandatory**:

- [X] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
2025-01-14 14:24:33 +01:00
Zakhar Bessarab
89431458bf docs/release-guide: move testing on sandbox step before pushing tags (#8026)
### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2025-01-14 14:23:32 +01:00
Zakhar Bessarab
d8d0c0ac01 docs/changelog: port LTS changes, update release date (#8027)
### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
2025-01-14 14:23:01 +01:00
Roman Khavronenko
c0f5699bad docs: cut new changelog doc for 2024 (#8023)
Cutting new changelod doc reduces the size of the current's year
changelog and improves navigation for users.

### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-01-14 11:31:16 +01:00
Nikolay
277fdd1070 lib/storage: reduce test suite batch size (#8022)
Commit eef6943084 added new test
functions. Which checks various cases for metricName registration at
data ingestion.
Initial dataset size had 4 batches with 100 rows each. It works fine at
machines with 5GB+ memory.
But i386 architecture supports only 4GB of memory per process.

Due to given limitations, batch size should be reduced to 3 batches and
30 rows. It keeps the same
test funtionality, but reduces overall memory usage to ~3GB.

Signed-off-by: f41gh7 <nik@victoriametrics.com>
2025-01-14 11:27:50 +01:00
Roman Khavronenko
d290efb849 lib/opentlemetry: throttle log messages during parsing (#8021)
Samples parsing is a hot path. Bad client could easily overwhelm
receiver with bad or unsupported data. So it is better to throttle such
messages.

Follow-up after
b26a68641c

### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-01-14 11:03:11 +01:00
chenlujjj
b26a68641c lib/opentelemetry: log the metric name of unsupported metrics (#8018)
To resolve:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8009
Log the name of unsupported metrics.
2025-01-14 10:49:30 +01:00
Aliaksandr Valialkin
b88cda5c41 lib/logstorage: make golangci-lint happy after the commit d2a791bef3 2025-01-13 22:31:33 +01:00
Aliaksandr Valialkin
d2a791bef3 lib/logstorage: add histogram stats function for calculating histogram buckets over numeric fields 2025-01-13 22:30:19 +01:00
Aliaksandr Valialkin
99516a5730 lib/logstorage: top pipe: allow mixing the order of hits and rank suffixes 2025-01-13 22:30:19 +01:00
Aliaksandr Valialkin
aecc86c390 lib/logstorage: do not copy pipeTopkProcessorShard when obtaining parition keys 2025-01-13 22:30:19 +01:00
Aliaksandr Valialkin
500b54f5aa app/vlogscli: typo fix, which could result in incomplete results in compact mode 2025-01-13 22:30:18 +01:00
Aliaksandr Valialkin
cc29692e27 lib/logstorage: track integer field values in integer map for top N (int_field)
This reduces memory usage by up to 2x for the map used for tracking hits.
This also reduces CPU usage for tracking integer fields.
2025-01-13 22:30:18 +01:00
Aliaksandr Valialkin
f018aa33cb lib/logstorage: avoid callback overhead at visitValuesReadonly
Process values in batches instead of passing every value in the callback.
This improves performance of reading the encoded values from storage by up to 50%.
2025-01-13 22:30:17 +01:00
Daria Karavaieva
92b6475fa6 docs/vmanomaly: fix table rendering (#8005)
### Describe Your Changes

- fix table rendering on writer and scheduler pages

### Checklist

The following checks are **mandatory**:

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
2025-01-13 14:19:16 +01:00
Andrii Chubatiuk
bda3546cfd docs: added search resupts page for docs.victoriametrics.com (#8017)
### Describe Your Changes

added search page required for docs site

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
2025-01-13 14:18:25 +01:00
Artem Navoiev
2691cdefe3 docs: cloud use separate support email for cloud (#8013)
### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

Signed-off-by: Artem Navoiev <tenmozes@gmail.com>
2025-01-13 14:17:38 +01:00
Github Actions
93b8aa5c9d Automatic update helm docs from VictoriaMetrics/helm-charts@457951a (#8014)
Automated changes by
[create-pull-request](https://github.com/peter-evans/create-pull-request)
GitHub action

Signed-off-by: Github Actions <133988544+victoriametrics-bot@users.noreply.github.com>
Co-authored-by: f41gh7 <18450869+f41gh7@users.noreply.github.com>
2025-01-13 12:12:48 +01:00
34 changed files with 2113 additions and 1268 deletions

View File

@@ -176,7 +176,7 @@ func writeCompactObject(w io.Writer, fields []logstorage.Field) error {
_, err := fmt.Fprintf(w, "%s\n", fields[0].Value)
return err
}
if len(fields) == 2 && fields[0].Name == "_time" || fields[1].Name == "_time" {
if len(fields) == 2 && (fields[0].Name == "_time" || fields[1].Name == "_time") {
// Write _time\tfieldValue as is
if fields[0].Name == "_time" {
_, err := fmt.Fprintf(w, "%s\t%s\n", fields[0].Value, fields[1].Value)

View File

@@ -70,6 +70,7 @@ Bumping the limits may significantly improve build speed.
* linux/386
This step can be run manually with the command `make publish` from the needed git tag.
1. Verify that created images are stable and don't introduce regressions on [test environment](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
1. Test new images on [sandbox](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
1. Push the tags `v1.xx.y` and `v1.xx.y-cluster` created at previous steps to public GitHub repository at https://github.com/VictoriaMetrics/VictoriaMetrics.
Push the tags `v1.xx.y`, `v1.xx.y-cluster`, `v1.xx.y-enterprise` and `v1.xx.y-enterprise-cluster` to the corresponding
branches in private repository.
@@ -88,7 +89,6 @@ Bumping the limits may significantly improve build speed.
file created at the step `a`.
- To run the command `TAG=v1.xx.y make github-create-release github-upload-assets`, so new release is created
and all the needed assets are re-uploaded to it.
1. Test new images on [sandbox](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
1. Go to <https://github.com/VictoriaMetrics/VictoriaMetrics/releases> and verify that draft release with the name `TAG` has been created
and this release contains all the needed binaries and checksums.
1. Update the release description with the content of [CHANGELOG](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md) for this release.

View File

@@ -16,6 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add [`histogram` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#histogram-stats) for calculating [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) over the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
## [v1.5.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.5.0-victorialogs)
Released at 2025-01-13

View File

@@ -2762,6 +2762,7 @@ See also:
- [`uniq` pipe](#uniq-pipe)
- [`stats` pipe](#stats-pipe)
- [`sort` pipe](#sort-pipe)
- [`histogram` stats function](#histogram-stats)
### uniq pipe
@@ -3106,6 +3107,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`count_uniq_hash`](#count_uniq_hash-stats) returns the number of unique hashes for non-empty values at the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`histogram`](#histogram-stats) returns [VictoriaMetrics histogram](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`max`](#max-stats) returns the maximum value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) returns the minimum value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@@ -3253,6 +3255,25 @@ See also:
- [`uniq_values`](#uniq_values-stats)
- [`count`](#count-stats)
### histogram stats
`histogram(field)` [stats pipe function](#stats-pipe-functions) returns [VictoriaMetrics histogram buckets](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
for the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
For example, the following query returns histogram buckets for the `response_size` field grouped by `host` field, across logs for the last 5 minutes:
```logsql
_time:5m | stats by (host) histogram(response_size)
```
If the field contains [duration value](#duration-values), then `histogram` normalizes it to nanoseconds. For example, `1.25ms` is normalized to `1_250_000`.
If the field contains [short numeric value](#short-numeric-values), then `histogram` normalizes it to numeric value without any suffixes. For example, `1KiB` is converted to `1024`.
See also:
- [`quantile`](#quantile-stats)
### max stats
`max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across
@@ -3330,6 +3351,7 @@ _time:5m | stats
See also:
- [`histogram`](#histogram-stats)
- [`min`](#min-stats)
- [`max`](#max-stats)
- [`median`](#median-stats)

View File

@@ -155,7 +155,9 @@ How often to completely retrain the models. If not set, value of `infer_every` i
<tr>
<td>
<span>
`start_from`{{% available_from "v1.18.5" anomaly %}}
</span>
</td>
<td>str, Optional</td>
<td>
@@ -169,8 +171,9 @@ Specifies when to initiate the first `fit_every` call. Accepts either an ISO 860
</tr>
<tr>
<td>
<span>
`tz`{{% available_from "v1.18.5" anomaly %}}
</span>
</td>
<td>str, Optional</td>
<td>

View File

@@ -31,8 +31,9 @@ Future updates will introduce additional export methods, offering users more fle
`class`
</td>
<td>
<span>
`writer.vm.VmWriter` or `vm`{{% available_from "v1.13.0" anomaly %}}
</span>
</td>
<td>
@@ -59,8 +60,9 @@ Datasource URL address
`tenant_id`
</td>
<td>
<span>
`0:0`, `multitenant`{{% available_from "v1.16.2" anomaly %}}
</span>
</td>
<td>
@@ -247,8 +249,9 @@ Token is passed in the standard format with header: `Authorization: bearer {toke
`path_to_file`
</td>
<td>
<span>
Path to a file, which contains token, that is passed in the standard format with header: `Authorization: bearer {token}`{{% available_from "v1.15.9" anomaly %}}
</td>
</span> </td>
</tr>
</tbody>
</table>

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,11 @@
---
weight: 6
weight: 7
title: Year 2020
menu:
docs:
identifier: vm-changelog-2020
parent: vm-changelog
weight: 6
weight: 7
aliases:
- /CHANGELOG_2020.html
- /changelog_2020

View File

@@ -1,11 +1,11 @@
---
weight: 5
weight: 6
title: Year 2021
menu:
docs:
identifier: vm-changelog-2021
parent: vm-changelog
weight: 5
weight: 6
aliases:
- /CHANGELOG_2021.html
- /changelog_2021

View File

@@ -1,11 +1,11 @@
---
weight: 4
weight: 5
title: Year 2022
menu:
docs:
identifier: vm-changelog-2022
parent: vm-changelog
weight: 4
weight: 5
aliases:
- /CHANGELOG_2022.html
- /changelog_2022

View File

@@ -1,11 +1,11 @@
---
weight: 3
weight: 4
title: Year 2023
menu:
docs:
identifier: vm-changelog-2023
parent: vm-changelog
weight: 3
weight: 4
aliases:
- /CHANGELOG_2023.html
- /changelog_2023

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,16 @@
## Next release
- TODO
## 0.7.1
**Release date:** 10 Jan 2025
![Helm: v3](https://img.shields.io/badge/Helm-v3.14%2B-informational?color=informational&logo=helm&link=https%3A%2F%2Fgithub.com%2Fhelm%2Fhelm%2Freleases%2Ftag%2Fv3.14.0) ![AppVersion: v1.108.1](https://img.shields.io/badge/v1.108.1-success?logo=VictoriaMetrics&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fchangelog%23v11081)
- updated common dependency 0.0.35 -> 0.0.37
- fixed typo useMultitenantMode -> useMultiTenantMode in remotewrite settings
- allow passing additional remotewrite setings
## 0.7.0

View File

@@ -1,6 +1,6 @@
![Version](https://img.shields.io/badge/0.7.0-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-distributed%2Fchangelog%2F%23070)
![Version](https://img.shields.io/badge/0.7.1-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-distributed%2Fchangelog%2F%23071)
![ArtifactHub](https://img.shields.io/badge/ArtifactHub-informational?logoColor=white&color=417598&logo=artifacthub&link=https%3A%2F%2Fartifacthub.io%2Fpackages%2Fhelm%2Fvictoriametrics%2Fvictoria-metrics-distributed)
![License](https://img.shields.io/github/license/VictoriaMetrics/helm-charts?labelColor=green&label=&link=https%3A%2F%2Fgithub.com%2FVictoriaMetrics%2Fhelm-charts%2Fblob%2Fmaster%2FLICENSE)
![Slack](https://img.shields.io/badge/Join-4A154B?logo=slack&link=https%3A%2F%2Fslack.victoriametrics.com)

View File

@@ -2,6 +2,14 @@
- TODO
## 0.33.3
**Release date:** 13 Jan 2025
![Helm: v3](https://img.shields.io/badge/Helm-v3.14%2B-informational?color=informational&logo=helm&link=https%3A%2F%2Fgithub.com%2Fhelm%2Fhelm%2Freleases%2Ftag%2Fv3.14.0) ![AppVersion: v1.108.1](https://img.shields.io/badge/v1.108.1-success?logo=VictoriaMetrics&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fchangelog%23v11081)
- updates operator to [v0.51.3](https://github.com/VictoriaMetrics/operator/releases/tag/v0.51.3) version
## 0.33.2
**Release date:** 06 Jan 2025

View File

@@ -1,6 +1,6 @@
![Version](https://img.shields.io/badge/0.33.2-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-k8s-stack%2Fchangelog%2F%230332)
![Version](https://img.shields.io/badge/0.33.3-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-k8s-stack%2Fchangelog%2F%230333)
![ArtifactHub](https://img.shields.io/badge/ArtifactHub-informational?logoColor=white&color=417598&logo=artifacthub&link=https%3A%2F%2Fartifacthub.io%2Fpackages%2Fhelm%2Fvictoriametrics%2Fvictoria-metrics-k8s-stack)
![License](https://img.shields.io/github/license/VictoriaMetrics/helm-charts?labelColor=green&label=&link=https%3A%2F%2Fgithub.com%2FVictoriaMetrics%2Fhelm-charts%2Fblob%2Fmaster%2FLICENSE)
![Slack](https://img.shields.io/badge/Join-4A154B?logo=slack&link=https%3A%2F%2Fslack.victoriametrics.com)

View File

@@ -2,6 +2,14 @@
- TODO
## 0.40.4
**Release date:** 13 Jan 2025
![Helm: v3](https://img.shields.io/badge/Helm-v3.14%2B-informational?color=informational&logo=helm&link=https%3A%2F%2Fgithub.com%2Fhelm%2Fhelm%2Freleases%2Ftag%2Fv3.14.0) ![AppVersion: v0.51.3](https://img.shields.io/badge/v0.51.3-success?logo=VictoriaMetrics&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Foperator%2Fchangelog%23v0513)
- updates operator to [v0.51.3](https://github.com/VictoriaMetrics/operator/releases/tag/v0.51.3) version
## 0.40.3
**Release date:** 06 Jan 2025

View File

@@ -1,6 +1,6 @@
![Version](https://img.shields.io/badge/0.40.3-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-operator%2Fchangelog%2F%230403)
![Version](https://img.shields.io/badge/0.40.4-gray?logo=Helm&labelColor=gray&link=https%3A%2F%2Fdocs.victoriametrics.com%2Fhelm%2Fvictoria-metrics-operator%2Fchangelog%2F%230404)
![ArtifactHub](https://img.shields.io/badge/ArtifactHub-informational?logoColor=white&color=417598&logo=artifacthub&link=https%3A%2F%2Fartifacthub.io%2Fpackages%2Fhelm%2Fvictoriametrics%2Fvictoria-metrics-operator)
![License](https://img.shields.io/github/license/VictoriaMetrics/helm-charts?labelColor=green&label=&link=https%3A%2F%2Fgithub.com%2FVictoriaMetrics%2Fhelm-charts%2Fblob%2Fmaster%2FLICENSE)
![Slack](https://img.shields.io/badge/Join-4A154B?logo=slack&link=https%3A%2F%2Fslack.victoriametrics.com)

7
docs/search/_index.md Normal file
View File

@@ -0,0 +1,7 @@
---
page: search
layout: search
draft: false
weight: 0
search: true
---

View File

@@ -149,7 +149,7 @@ or [Prometheus recording rules definition format](https://prometheus.io/docs/pro
There are limitations for the rules files:
1. All files may contain no more than 100 rules in total. If you need to upload more rules contact us via [support@victoriametrics.com](mailto:support@victoriametrics.com).
1. All files may contain no more than 100 rules in total. If you need to upload more rules contact us via [support-cloud@victoriametrics.com](mailto:support-cloud@victoriametrics.com).
2. The maximum file size is 20mb.
3. The names of the groups in the files should be unique.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

After

Width:  |  Height:  |  Size: 52 KiB

View File

@@ -18,7 +18,7 @@ The tier parameters are derived from testing in typical monitoring environments,
| **Active Time Series Count** | Per Tier Limits | Number of [active time series](https://docs.victoriametrics.com/faq/#what-is-an-active-time-series) that received at least one data point in the last hour. |
| **Read Rate** | Per Tier Limits | Number of datapoints retrieved from the database per second. |
| **New Series Over 24 Hours** (churn rate) | `<= Active Time Series Count` | Number of new series created in 24 hours. High [churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate) leads to higher resource consumption. |
| **Concurrent Requests per Token** | `<= 600` | Maximum concurrent requests per access token. It is recommended to create separate tokens for different clients and environments. This can be adjusted via [support](mailto:support@victoriametrics.com). |
| **Concurrent Requests per Token** | `<= 600` | Maximum concurrent requests per access token. It is recommended to create separate tokens for different clients and environments. This can be adjusted via [support](mailto:support-cloud@victoriametrics.com). |
For a detailed explanation of each parameter, visit the guide on [Understanding Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size.html).
@@ -26,7 +26,7 @@ For a detailed explanation of each parameter, visit the guide on [Understanding
| **Flag** | **Default Value** | **Description** |
|-----------------------------------|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Max Label Value Length** | `<= 1kb` (Default: `4kb`) | Maximum length of label values. Longer values are truncated. Large label values can lead to high RAM consumption. This can be adjusted via [support](mailto:support@victoriametrics.com). |
| **Max Label Value Length** | `<= 1kb` (Default: `4kb`) | Maximum length of label values. Longer values are truncated. Large label values can lead to high RAM consumption. This can be adjusted via [support](mailto:support-cloud@victoriametrics.com). |
| **Max Labels per Time Series** | `<= 30` | Maximum number of labels per time series. Excess labels are dropped. Higher values can increase [cardinality](https://docs.victoriametrics.com/keyconcepts/#cardinality) and resource usage. This can be configured in [deployment settings](https://docs.victoriametrics.com/victoriametrics-cloud/quickstart/#modifying-an-existing-deployment). |

View File

@@ -5,6 +5,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
@@ -227,6 +228,16 @@ func (br *blockResult) cloneValues(values []string) []string {
return br.valuesBuf[valuesBufLen:]
}
func (br *blockResult) addValues(values []string) {
valuesBufLen := len(br.valuesBuf)
br.valuesBuf = slicesutil.SetLength(br.valuesBuf, valuesBufLen+len(values))
valuesBuf := br.valuesBuf[valuesBufLen:]
_ = valuesBuf[len(values)-1]
for i, v := range values {
valuesBuf[i] = br.a.copyString(v)
}
}
func (br *blockResult) addValue(v string) {
valuesBuf := br.valuesBuf
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
@@ -490,73 +501,57 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit
switch ch.valueType {
case valueTypeString:
visitValuesReadonly(bs, ch, bm, br.addValue)
visitValuesReadonly(bs, ch, bm, br.addValues)
case valueTypeDict:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 1 {
logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 1, "dict")
for _, v := range values {
dictIdx := v[0]
if int(dictIdx) >= len(ch.valuesDict.values) {
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
}
}
dictIdx := v[0]
if int(dictIdx) >= len(ch.valuesDict.values) {
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
}
br.addValue(v)
br.addValues(values)
})
case valueTypeUint8:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 1 {
logger.Panicf("FATAL: %s: unexpected size for uint8 column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 1, "uint8")
br.addValues(values)
})
case valueTypeUint16:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 2 {
logger.Panicf("FATAL: %s: unexpected size for uint16 column %q; got %d bytes; want 2 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 2, "uint16")
br.addValues(values)
})
case valueTypeUint32:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected size for uint32 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 4, "uint32")
br.addValues(values)
})
case valueTypeUint64:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for uint64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 8, "uint64")
br.addValues(values)
})
case valueTypeInt64:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for int64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 8, "int64")
br.addValues(values)
})
case valueTypeFloat64:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for float64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 8, "float64")
br.addValues(values)
})
case valueTypeIPv4:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected size for ipv4 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 4, "ipv4")
br.addValues(values)
})
case valueTypeTimestampISO8601:
visitValuesReadonly(bs, ch, bm, func(v string) {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected size for timestmap column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
}
br.addValue(v)
visitValuesReadonly(bs, ch, bm, func(values []string) {
checkValuesSize(bs, ch, values, 8, "iso8601")
br.addValues(values)
})
default:
logger.Panicf("FATAL: %s: unknown valueType=%d for column %q", bs.partPath(), ch.valueType, ch.name)
@@ -565,6 +560,14 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit
return br.valuesBuf[valuesBufLen:]
}
func checkValuesSize(bs *blockSearch, ch *columnHeader, values []string, sizeExpected int, typeStr string) {
for _, v := range values {
if len(v) != sizeExpected {
logger.Panicf("FATAL: %s: unexpected size for %s column %q; got %d bytes; want %d bytes", typeStr, bs.partPath(), ch.name, len(v), sizeExpected)
}
}
}
// addColumn adds column for the given ch to br.
//
// The added column is valid until ch is changed.
@@ -2138,17 +2141,46 @@ func getEmptyStrings(rowsLen int) []string {
var emptyStrings atomic.Pointer[[]string]
func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string)) {
func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(values []string)) {
if bm.isZero() {
// Fast path - nothing to visit
return
}
values := bs.getValuesForColumn(ch)
if bm.areAllBitsSet() {
// Faster path - visit all the values
f(values)
return
}
// Slower path - visit only the selected values
vb := getValuesBuf()
bm.forEachSetBitReadonly(func(idx int) {
f(values[idx])
vb.values = append(vb.values, values[idx])
})
f(vb.values)
putValuesBuf(vb)
}
type valuesBuf struct {
values []string
}
func getValuesBuf() *valuesBuf {
v := valuesBufPool.Get()
if v == nil {
return &valuesBuf{}
}
return v.(*valuesBuf)
}
func putValuesBuf(vb *valuesBuf) {
vb.values = vb.values[:0]
valuesBufPool.Put(vb)
}
var valuesBufPool sync.Pool
func getCanonicalColumnName(columnName string) string {
if columnName == "" {
return "_msg"

View File

@@ -12,6 +12,7 @@ type chunkedAllocator struct {
countEmptyProcessors []statsCountEmptyProcessor
countUniqProcessors []statsCountUniqProcessor
countUniqHashProcessors []statsCountUniqHashProcessor
histogramProcessors []statsHistogramProcessor
maxProcessors []statsMaxProcessor
medianProcessors []statsMedianProcessor
minProcessors []statsMinProcessor
@@ -60,6 +61,11 @@ func (a *chunkedAllocator) newStatsCountUniqHashProcessor() (p *statsCountUniqHa
return p
}
func (a *chunkedAllocator) newStatsHistogramProcessor() (p *statsHistogramProcessor) {
a.histogramProcessors, p = addNewItem(a.histogramProcessors, a)
return p
}
func (a *chunkedAllocator) newStatsMaxProcessor() (p *statsMaxProcessor) {
a.maxProcessors, p = addNewItem(a.maxProcessors, a)
return p

View File

@@ -1207,6 +1207,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`)
f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`)
// stats pipe histogram
f(`* | stats histogram(foo) bar`, `* | stats histogram(foo) as bar`)
f(`* | histogram(foo)`, `* | stats histogram(foo) as "histogram(foo)"`)
// stats pipe quantile
f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`)
f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`)
@@ -1742,6 +1746,12 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats sum_len
f(`foo | stats sum_len`)
// invalid stats histogram
f(`foo | stats histogram`)
f(`foo | stats histogram()`)
f(`foo | stats histogram(a, b)`)
f(`foo | stats histogram(*)`)
// invalid stats quantile
f(`foo | stats quantile`)
f(`foo | stats quantile() foo`)
@@ -1977,6 +1987,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | stats max() q`, `*`, ``)
f(`* | stats max(*) q`, `*`, ``)
f(`* | stats max(x) q`, `x`, ``)
f(`* | stats histogram(foo)`, `foo`, ``)
f(`* | stats quantile(0.5) q`, `*`, ``)
f(`* | stats quantile(0.5, *) q`, `*`, ``)
f(`* | stats quantile(0.5, x) q`, `x`, ``)

View File

@@ -10,6 +10,7 @@ import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
)
@@ -279,7 +280,7 @@ func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string,
b := shard.partitionKey[:0]
for _, c := range shard.partitionColumns {
v := c.getValueAtRow(br, rowIdx)
b = marshalJSONKeyValue(b, c.name, v)
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(v))
}
shard.partitionKey = b
@@ -408,8 +409,8 @@ func (ptp *pipeTopkProcessor) flush() error {
// Obtain all the partition keys
partitionKeysMap := make(map[string]struct{})
var partitionKeys []string
for _, shard := range shards {
for k := range shard.rowsByPartition {
for i := range shards {
for k := range shards[i].rowsByPartition {
if _, ok := partitionKeysMap[k]; !ok {
partitionKeysMap[k] = struct{}{}
partitionKeys = append(partitionKeys, k)
@@ -420,6 +421,9 @@ func (ptp *pipeTopkProcessor) flush() error {
// Merge sorted results across shards per each partitionKey
for _, k := range partitionKeys {
if needStop(ptp.stopCh) {
return nil
}
var rss []*pipeTopkRows
for _, shard := range shards {
rs, ok := shard.rowsByPartition[k]
@@ -428,9 +432,6 @@ func (ptp *pipeTopkProcessor) flush() error {
}
}
ptp.mergeAndFlushRows(rss)
if needStop(ptp.stopCh) {
return nil
}
}
return nil

View File

@@ -874,8 +874,8 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
shardIdx := h % uint64(len(perCPU))
perCPU[shardIdx].u64[n] = psg
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = psg
}
for n, psg := range psm.negative64 {
if needStop(psp.stopCh) {
@@ -883,16 +883,16 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
shardIdx := h % uint64(len(perCPU))
perCPU[shardIdx].negative64[n] = psg
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = psg
}
for k, psg := range psm.strings {
if needStop(psp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
shardIdx := h % uint64(len(perCPU))
perCPU[shardIdx].strings[k] = psg
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[k] = psg
}
perShardMaps[idx] = perCPU
@@ -903,9 +903,6 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge per-shard entries into perShardMaps[0]
for i := 0; i < cpusCount; i++ {
@@ -924,9 +921,6 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Filter out maps without entries
psms := perShardMaps[0]
@@ -1055,6 +1049,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'count_uniq_hash' func: %w", err)
}
return sus, nil
case lex.isKeyword("histogram"):
shs, err := parseStatsHistogram(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'histogram' func: %w", err)
}
return shs, nil
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
@@ -1144,6 +1144,7 @@ var statsNames = []string{
"count_empty",
"count_uniq",
"count_uniq_hash",
"histogram",
"max",
"median",
"min",

View File

@@ -6,6 +6,7 @@ import (
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"unsafe"
@@ -13,6 +14,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
@@ -91,6 +93,7 @@ func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, ca
pt: pt,
},
}
shards[i].m.init(&shards[i])
}
ptp := &pipeTopProcessor{
@@ -131,11 +134,8 @@ type pipeTopProcessorShardNopad struct {
// pt points to the parent pipeTop.
pt *pipeTop
// a reduces memory allocations when counting the number of hits over big number of unique values.
a chunkedAllocator
// m holds per-row hits.
m map[string]*uint64
// m holds per-value hits.
m pipeTopMap
// keyBuf is a temporary buffer for building keys for m.
keyBuf []byte
@@ -148,6 +148,134 @@ type pipeTopProcessorShardNopad struct {
stateSizeBudget int
}
type pipeTopMap struct {
shard *pipeTopProcessorShard
u64 map[uint64]*uint64
negative64 map[uint64]*uint64
strings map[string]*uint64
// a reduces memory allocations when counting the number of hits over big number of unique values.
a chunkedAllocator
}
func (ptm *pipeTopMap) reset() {
ptm.shard = nil
ptm.u64 = nil
ptm.negative64 = nil
ptm.strings = nil
}
func (ptm *pipeTopMap) init(shard *pipeTopProcessorShard) {
ptm.shard = shard
ptm.u64 = make(map[uint64]*uint64)
ptm.negative64 = make(map[uint64]*uint64)
ptm.strings = make(map[string]*uint64)
}
func (ptm *pipeTopMap) updateStateGeneric(key string, hits uint64) {
if n, ok := tryParseUint64(key); ok {
ptm.updateStateUint64(n, hits)
return
}
if len(key) > 0 && key[0] == '-' {
if n, ok := tryParseInt64(key); ok {
ptm.updateStateNegativeInt64(n, hits)
return
}
}
ptm.updateStateString(bytesutil.ToUnsafeBytes(key), hits)
}
func (ptm *pipeTopMap) updateStateInt64(n int64, hits uint64) {
if n >= 0 {
ptm.updateStateUint64(uint64(n), hits)
} else {
ptm.updateStateNegativeInt64(n, hits)
}
}
func (ptm *pipeTopMap) updateStateUint64(n, hits uint64) {
pHits := ptm.u64[n]
if pHits != nil {
*pHits += hits
return
}
pHits = ptm.a.newUint64()
*pHits += hits
ptm.u64[n] = pHits
ptm.shard.stateSizeBudget -= int(unsafe.Sizeof(*pHits) + unsafe.Sizeof(pHits))
}
func (ptm *pipeTopMap) updateStateNegativeInt64(n int64, hits uint64) {
pHits := ptm.negative64[uint64(n)]
if pHits != nil {
*pHits += hits
return
}
pHits = ptm.a.newUint64()
*pHits += hits
ptm.negative64[uint64(n)] = pHits
ptm.shard.stateSizeBudget -= int(unsafe.Sizeof(*pHits) + unsafe.Sizeof(pHits))
}
func (ptm *pipeTopMap) updateStateString(key []byte, hits uint64) {
pHits := ptm.strings[string(key)]
if pHits != nil {
*pHits += hits
return
}
keyCopy := ptm.a.cloneBytesToString(key)
pHits = ptm.a.newUint64()
*pHits += hits
ptm.strings[keyCopy] = pHits
ptm.shard.stateSizeBudget -= len(keyCopy) + int(unsafe.Sizeof(keyCopy)+unsafe.Sizeof(*pHits)+unsafe.Sizeof(pHits))
}
func (ptm *pipeTopMap) mergeState(src *pipeTopMap, stopCh <-chan struct{}) {
for n, pHitsSrc := range src.u64 {
if needStop(stopCh) {
return
}
pHitsDst := ptm.u64[n]
if pHitsDst == nil {
ptm.u64[n] = pHitsSrc
} else {
*pHitsDst += *pHitsSrc
}
}
for n, pHitsSrc := range src.negative64 {
if needStop(stopCh) {
return
}
pHitsDst := ptm.negative64[n]
if pHitsDst == nil {
ptm.negative64[n] = pHitsSrc
} else {
*pHitsDst += *pHitsSrc
}
}
for k, pHitsSrc := range src.strings {
if needStop(stopCh) {
return
}
pHitsDst := ptm.strings[k]
if pHitsDst == nil {
ptm.strings[k] = pHitsSrc
} else {
*pHitsDst += *pHitsSrc
}
}
}
// writeBlock writes br to shard.
func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
byFields := shard.pt.byFields
@@ -155,35 +283,21 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
// Take into account all the columns in br.
keyBuf := shard.keyBuf
cs := br.getColumns()
for i := 0; i < br.rowsLen; i++ {
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
keyBuf = keyBuf[:0]
for _, c := range cs {
v := c.getValueAtRow(br, i)
v := c.getValueAtRow(br, rowIdx)
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
shard.m.updateStateString(keyBuf, 1)
}
shard.keyBuf = keyBuf
return
}
if len(byFields) == 1 {
// Fast path for a single field.
c := br.getColumnByName(byFields[0])
if c.isConst {
v := c.valuesEncoded[0]
shard.updateState(v, uint64(br.rowsLen))
return
}
if c.valueType == valueTypeDict {
c.forEachDictValueWithHits(br, shard.updateState)
return
}
values := c.getValues(br)
for _, v := range values {
shard.updateState(v, 1)
}
shard.updateStatsSingleColumn(br, byFields[0])
return
}
@@ -197,33 +311,62 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
shard.columnValues = columnValues
keyBuf := shard.keyBuf
for i := 0; i < br.rowsLen; i++ {
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
keyBuf = keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[rowIdx]))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
shard.m.updateStateString(keyBuf, 1)
}
shard.keyBuf = keyBuf
}
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
m := shard.getM()
pHits := m[v]
if pHits == nil {
vCopy := shard.a.cloneString(v)
pHits = shard.a.newUint64()
m[vCopy] = pHits
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits))
func (shard *pipeTopProcessorShard) updateStatsSingleColumn(br *blockResult, fieldName string) {
c := br.getColumnByName(fieldName)
if c.isConst {
v := c.valuesEncoded[0]
shard.m.updateStateGeneric(v, uint64(br.rowsLen))
return
}
*pHits += hits
}
func (shard *pipeTopProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
switch c.valueType {
case valueTypeDict:
c.forEachDictValueWithHits(br, shard.m.updateStateGeneric)
case valueTypeUint8:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint8(v)
shard.m.updateStateUint64(uint64(n), 1)
}
case valueTypeUint16:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint16(v)
shard.m.updateStateUint64(uint64(n), 1)
}
case valueTypeUint32:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint32(v)
shard.m.updateStateUint64(uint64(n), 1)
}
case valueTypeUint64:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint64(v)
shard.m.updateStateUint64(n, 1)
}
case valueTypeInt64:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalInt64(v)
shard.m.updateStateInt64(n, 1)
}
default:
values := c.getValues(br)
for _, v := range values {
shard.m.updateStateGeneric(v, 1)
}
}
return shard.m
}
func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) {
@@ -378,101 +521,84 @@ func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
shards := ptp.shards
shardsLen := len(shards)
cpusCount := cgroup.AvailableCPUs()
if shardsLen == 1 {
entries := getTopEntries(shards[0].getM(), limit, ptp.stopCh)
ptm := &shards[0].m
entries := getTopEntries(ptm, limit, ptp.stopCh)
return entries, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*uint64, shardsLen)
perShardMaps := make([][]pipeTopMap, shardsLen)
for i := range shards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
shardMaps := make([]map[string]*uint64, shardsLen)
for i := range shardMaps {
shardMaps[i] = make(map[string]*uint64)
perCPU := make([]pipeTopMap, cpusCount)
for i := range perCPU {
perCPU[i].init(&shards[idx])
}
n := int64(0)
nTotal := int64(0)
for k, pHits := range shards[idx].getM() {
ptm := &shards[idx].m
for n, pHits := range ptm.u64 {
if needStop(ptp.stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = pHits
}
for n, pHits := range ptm.negative64 {
if needStop(ptp.stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = pHits
}
for k, pHits := range ptm.strings {
if needStop(ptp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeTopMap(m, k, pHits)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[k] = pHits
}
nTotal += n
ptp.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
// Clean the original map and return its state size budget back.
shards[idx].m = nil
ptp.stateSizeBudget.Add(nTotal)
perShardMaps[idx] = perCPU
ptm.reset()
}(i)
}
wg.Wait()
if needStop(ptp.stopCh) {
return nil, nil
}
if n := ptp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// Obtain topN entries per each shard
entriess := make([][]*pipeTopEntry, shardsLen)
entriess := make([][]*pipeTopEntry, cpusCount)
for i := range entriess {
wg.Add(1)
go func(idx int) {
go func(cpuIdx int) {
defer wg.Done()
m := perShardMaps[0][idx]
for i := 1; i < len(perShardMaps); i++ {
n := int64(0)
nTotal := int64(0)
for k, pHits := range perShardMaps[i][idx] {
if needStop(ptp.stopCh) {
return
}
n += updatePipeTopMap(m, k, pHits)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
ptp.stateSizeBudget.Add(-n)
// Clean the original map and return its state size budget back.
perShardMaps[i][idx] = nil
ptp.stateSizeBudget.Add(nTotal)
ptm := &perShardMaps[0][cpuIdx]
for _, perCPU := range perShardMaps[1:] {
ptm.mergeState(&perCPU[cpuIdx], ptp.stopCh)
perCPU[cpuIdx].reset()
}
perShardMaps[0][idx] = nil
entriess[idx] = getTopEntries(m, limit, ptp.stopCh)
entriess[cpuIdx] = getTopEntries(ptm, limit, ptp.stopCh)
}(i)
}
wg.Wait()
if needStop(ptp.stopCh) {
return nil, nil
}
if n := ptp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// merge entriess
entries := entriess[0]
@@ -488,31 +614,57 @@ func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
return entries, nil
}
func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
func getTopEntries(ptm *pipeTopMap, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
if limit == 0 {
return nil
}
var eh topEntriesHeap
for k, pHits := range m {
var e pipeTopEntry
pushEntry := func(k string, hits uint64, kCopy bool) {
e.k = k
e.hits = hits
if uint64(len(eh)) < limit {
eCopy := e
if kCopy {
eCopy.k = strings.Clone(eCopy.k)
}
heap.Push(&eh, &eCopy)
return
}
if !eh[0].less(&e) {
return
}
eCopy := e
if kCopy {
eCopy.k = strings.Clone(eCopy.k)
}
eh[0] = &eCopy
heap.Fix(&eh, 0)
}
var b []byte
for n, pHits := range ptm.u64 {
if needStop(stopCh) {
return nil
}
e := pipeTopEntry{
k: k,
hits: *pHits,
b = marshalUint64String(b[:0], n)
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
}
for n, pHits := range ptm.negative64 {
if needStop(stopCh) {
return nil
}
if uint64(len(eh)) < limit {
eCopy := e
heap.Push(&eh, &eCopy)
continue
}
if eh[0].less(&e) {
eCopy := e
eh[0] = &eCopy
heap.Fix(&eh, 0)
b = marshalInt64String(b[:0], int64(n))
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
}
for k, pHits := range ptm.strings {
if needStop(stopCh) {
return nil
}
pushEntry(k, *pHits, false)
}
result := ([]*pipeTopEntry)(eh)
@@ -524,17 +676,6 @@ func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) [
return result
}
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
pHitsDst := m[k]
if pHitsDst != nil {
*pHitsDst += *pHitsSrc
return 0
}
m[k] = pHitsSrc
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc))
}
type topEntriesHeap []*pipeTopEntry
func (h *topEntriesHeap) Less(i, j int) bool {
@@ -614,7 +755,9 @@ func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) {
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
// The 64_000 limit provides the best performance results.
if wctx.valuesLen >= 64_000 {
wctx.flush()
}
}
@@ -670,37 +813,41 @@ func parsePipeTop(lex *lexer) (pipe, error) {
byFields = bfs
}
hitsFieldName := "hits"
if lex.isKeyword("hits") {
lex.nextToken()
if lex.isKeyword("as") {
lex.nextToken()
}
s, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'hits' name: %w", err)
}
hitsFieldName = s
}
for slices.Contains(byFields, hitsFieldName) {
hitsFieldName += "s"
}
pt := &pipeTop{
byFields: byFields,
limit: limit,
limitStr: limitStr,
hitsFieldName: hitsFieldName,
hitsFieldName: "hits",
}
if lex.isKeyword("rank") {
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
for {
switch {
case lex.isKeyword("hits"):
lex.nextToken()
if lex.isKeyword("as") {
lex.nextToken()
}
s, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'hits' name: %w", err)
}
pt.hitsFieldName = s
case lex.isKeyword("rank"):
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
}
pt.rankFieldName = rankFieldName
for slices.Contains(byFields, pt.rankFieldName) {
pt.rankFieldName += "s"
}
default:
for slices.Contains(byFields, pt.hitsFieldName) {
pt.hitsFieldName += "s"
}
return pt, nil
}
pt.rankFieldName = rankFieldName
}
return pt, nil
}
func parseRankFieldName(lex *lexer) (string, error) {

View File

@@ -0,0 +1,189 @@
package logstorage
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
)
type statsHistogram struct {
fieldName string
}
func (sh *statsHistogram) String() string {
return "histogram(" + quoteTokenIfNeeded(sh.fieldName) + ")"
}
func (sh *statsHistogram) updateNeededFields(neededFields fieldsSet) {
updateNeededFieldsForStatsFunc(neededFields, []string{sh.fieldName})
}
func (sh *statsHistogram) newStatsProcessor(a *chunkedAllocator) statsProcessor {
return a.newStatsHistogramProcessor()
}
type statsHistogramProcessor struct {
h metrics.Histogram
}
func (shp *statsHistogramProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
sh := sf.(*statsHistogram)
c := br.getColumnByName(sh.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
f, ok := tryParseNumber(v)
if ok {
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
shp.h.Update(f)
}
}
return 0
}
switch c.valueType {
case valueTypeUint8:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint8(v)
shp.h.Update(float64(n))
}
case valueTypeUint16:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint16(v)
shp.h.Update(float64(n))
}
case valueTypeUint32:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint32(v)
shp.h.Update(float64(n))
}
case valueTypeUint64:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalUint64(v)
shp.h.Update(float64(n))
}
case valueTypeInt64:
values := c.getValuesEncoded(br)
for _, v := range values {
n := unmarshalInt64(v)
shp.h.Update(float64(n))
}
case valueTypeFloat64:
values := c.getValuesEncoded(br)
for _, v := range values {
f := unmarshalFloat64(v)
shp.h.Update(f)
}
case valueTypeIPv4:
// skip ipv4 values, since they cannot be represented as numbers
case valueTypeTimestampISO8601:
// skip iso8601 values, since they cannot be represented as numbers
default:
values := c.getValues(br)
for _, v := range values {
f, ok := tryParseNumber(v)
if ok {
shp.h.Update(f)
}
}
}
return 0
}
func (shp *statsHistogramProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
sh := sf.(*statsHistogram)
c := br.getColumnByName(sh.fieldName)
if c.isConst {
v := c.valuesEncoded[0]
f, ok := tryParseNumber(v)
if ok {
shp.h.Update(f)
}
return 0
}
switch c.valueType {
case valueTypeUint8:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint8(v)
shp.h.Update(float64(n))
case valueTypeUint16:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint16(v)
shp.h.Update(float64(n))
case valueTypeUint32:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint32(v)
shp.h.Update(float64(n))
case valueTypeUint64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint64(v)
shp.h.Update(float64(n))
case valueTypeInt64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalInt64(v)
shp.h.Update(float64(n))
case valueTypeFloat64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
f := unmarshalFloat64(v)
shp.h.Update(f)
case valueTypeIPv4:
// skip ipv4 values, since they cannot be represented as numbers
case valueTypeTimestampISO8601:
// skip iso8601 values, since they cannot be represented as numbers
default:
v := c.getValueAtRow(br, rowIdx)
f, ok := tryParseNumber(v)
if ok {
shp.h.Update(f)
}
}
return 0
}
func (shp *statsHistogramProcessor) mergeState(_ statsFunc, sfp statsProcessor) {
src := sfp.(*statsHistogramProcessor)
shp.h.Merge(&src.h)
}
func (shp *statsHistogramProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
dst = append(dst, '[')
shp.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
dst = append(dst, `{"vmrange":"`...)
dst = append(dst, vmrange...)
dst = append(dst, `","hits":`...)
dst = marshalUint64String(dst, count)
dst = append(dst, `},`...)
})
dst = dst[:len(dst)-1]
dst = append(dst, ']')
return dst
}
func parseStatsHistogram(lex *lexer) (*statsHistogram, error) {
fields, err := parseStatsFuncFields(lex, "histogram")
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
if len(fields) != 1 {
return nil, fmt.Errorf("unexpected number of fields; got %d; want 1", len(fields))
}
sh := &statsHistogram{
fieldName: fields[0],
}
return sh, nil
}

View File

@@ -0,0 +1,52 @@
package logstorage
import (
"testing"
)
func TestParseStatsHistogramSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`histogram(foo)`)
}
func TestParseStatsHistogramFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`histogram`)
f(`histogram(a, b)`)
f(`histogram(a) abc`)
}
func TestStatsHistogram(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats histogram(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1.9`},
},
{
{"a", `3.05`},
{"b", `54`},
},
}, [][]Field{
{
{"x", `[{"vmrange":"1.896e+00...2.154e+00","hits":2},{"vmrange":"2.783e+00...3.162e+00","hits":1}]`},
},
})
}

View File

@@ -6,6 +6,7 @@ import (
"math"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
@@ -53,6 +54,8 @@ func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte,
return nil
}
var skippedSampleLogger = logger.WithThrottler("otlp_skipped_sample", 5*time.Second)
func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
for _, m := range sc.Metrics {
if len(m.Name) == 0 {
@@ -68,6 +71,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
case m.Sum != nil:
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedSum.Inc()
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('sum'): skipping it", metricName)
continue
}
for _, p := range m.Sum.DataPoints {
@@ -80,6 +84,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
case m.Histogram != nil:
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedHistogram.Inc()
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('histogram'): skipping it", metricName)
continue
}
for _, p := range m.Histogram.DataPoints {
@@ -88,6 +93,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
case m.ExponentialHistogram != nil:
if m.ExponentialHistogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
rowsDroppedUnsupportedExponentialHistogram.Inc()
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('exponential histogram'): skipping it", metricName)
continue
}
for _, p := range m.ExponentialHistogram.DataPoints {
@@ -95,7 +101,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
}
default:
rowsDroppedUnsupportedMetricType.Inc()
logger.Warnf("unsupported type for metric %q", metricName)
skippedSampleLogger.Warnf("unsupported type for metric %q", metricName)
}
}
}
@@ -139,7 +145,7 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist
}
if len(p.BucketCounts) != len(p.ExplicitBounds)+1 {
// fast path, broken data format
logger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
skippedSampleLogger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
return
}

View File

@@ -1822,8 +1822,8 @@ func testStorageVariousDataPatternsConcurrently(t *testing.T, registerOnly bool,
func testStorageVariousDataPatterns(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool) {
f := func(t *testing.T, sameBatchMetricNames, sameRowMetricNames, sameBatchDates, sameRowDates bool) {
batches, wantCounts := testGenerateMetricRowBatches(&batchOptions{
numBatches: 4,
numRowsPerBatch: 100,
numBatches: 3,
numRowsPerBatch: 30,
registerOnly: registerOnly,
sameBatchMetricNames: sameBatchMetricNames,
sameRowMetricNames: sameRowMetricNames,