mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-22 19:26:35 +03:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d631d2c100 | ||
|
|
89431458bf | ||
|
|
d8d0c0ac01 | ||
|
|
c0f5699bad | ||
|
|
277fdd1070 | ||
|
|
d290efb849 | ||
|
|
b26a68641c | ||
|
|
b88cda5c41 | ||
|
|
d2a791bef3 | ||
|
|
99516a5730 | ||
|
|
aecc86c390 | ||
|
|
500b54f5aa | ||
|
|
cc29692e27 | ||
|
|
f018aa33cb | ||
|
|
92b6475fa6 | ||
|
|
bda3546cfd | ||
|
|
2691cdefe3 | ||
|
|
93b8aa5c9d |
@@ -176,7 +176,7 @@ func writeCompactObject(w io.Writer, fields []logstorage.Field) error {
|
||||
_, err := fmt.Fprintf(w, "%s\n", fields[0].Value)
|
||||
return err
|
||||
}
|
||||
if len(fields) == 2 && fields[0].Name == "_time" || fields[1].Name == "_time" {
|
||||
if len(fields) == 2 && (fields[0].Name == "_time" || fields[1].Name == "_time") {
|
||||
// Write _time\tfieldValue as is
|
||||
if fields[0].Name == "_time" {
|
||||
_, err := fmt.Fprintf(w, "%s\t%s\n", fields[0].Value, fields[1].Value)
|
||||
|
||||
@@ -70,6 +70,7 @@ Bumping the limits may significantly improve build speed.
|
||||
* linux/386
|
||||
This step can be run manually with the command `make publish` from the needed git tag.
|
||||
1. Verify that created images are stable and don't introduce regressions on [test environment](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
|
||||
1. Test new images on [sandbox](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
|
||||
1. Push the tags `v1.xx.y` and `v1.xx.y-cluster` created at previous steps to public GitHub repository at https://github.com/VictoriaMetrics/VictoriaMetrics.
|
||||
Push the tags `v1.xx.y`, `v1.xx.y-cluster`, `v1.xx.y-enterprise` and `v1.xx.y-enterprise-cluster` to the corresponding
|
||||
branches in private repository.
|
||||
@@ -88,7 +89,6 @@ Bumping the limits may significantly improve build speed.
|
||||
file created at the step `a`.
|
||||
- To run the command `TAG=v1.xx.y make github-create-release github-upload-assets`, so new release is created
|
||||
and all the needed assets are re-uploaded to it.
|
||||
1. Test new images on [sandbox](https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/blob/master/Release-Guide.md#testing-releases).
|
||||
1. Go to <https://github.com/VictoriaMetrics/VictoriaMetrics/releases> and verify that draft release with the name `TAG` has been created
|
||||
and this release contains all the needed binaries and checksums.
|
||||
1. Update the release description with the content of [CHANGELOG](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/CHANGELOG.md) for this release.
|
||||
|
||||
@@ -16,6 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: add [`histogram` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#histogram-stats) for calculating [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) over the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
||||
## [v1.5.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.5.0-victorialogs)
|
||||
|
||||
Released at 2025-01-13
|
||||
|
||||
@@ -2762,6 +2762,7 @@ See also:
|
||||
- [`uniq` pipe](#uniq-pipe)
|
||||
- [`stats` pipe](#stats-pipe)
|
||||
- [`sort` pipe](#sort-pipe)
|
||||
- [`histogram` stats function](#histogram-stats)
|
||||
|
||||
### uniq pipe
|
||||
|
||||
@@ -3106,6 +3107,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
|
||||
- [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`count_uniq_hash`](#count_uniq_hash-stats) returns the number of unique hashes for non-empty values at the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`histogram`](#histogram-stats) returns [VictoriaMetrics histogram](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`max`](#max-stats) returns the maximum value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`min`](#min-stats) returns the minimum value over the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
@@ -3253,6 +3255,25 @@ See also:
|
||||
- [`uniq_values`](#uniq_values-stats)
|
||||
- [`count`](#count-stats)
|
||||
|
||||
### histogram stats
|
||||
|
||||
`histogram(field)` [stats pipe function](#stats-pipe-functions) returns [VictoriaMetrics histogram buckets](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
for the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
||||
For example, the following query returns histogram buckets for the `response_size` field grouped by `host` field, across logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | stats by (host) histogram(response_size)
|
||||
```
|
||||
|
||||
If the field contains [duration value](#duration-values), then `histogram` normalizes it to nanoseconds. For example, `1.25ms` is normalized to `1_250_000`.
|
||||
|
||||
If the field contains [short numeric value](#short-numeric-values), then `histogram` normalizes it to numeric value without any suffixes. For example, `1KiB` is converted to `1024`.
|
||||
|
||||
See also:
|
||||
|
||||
- [`quantile`](#quantile-stats)
|
||||
|
||||
### max stats
|
||||
|
||||
`max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across
|
||||
@@ -3330,6 +3351,7 @@ _time:5m | stats
|
||||
|
||||
See also:
|
||||
|
||||
- [`histogram`](#histogram-stats)
|
||||
- [`min`](#min-stats)
|
||||
- [`max`](#max-stats)
|
||||
- [`median`](#median-stats)
|
||||
|
||||
@@ -155,7 +155,9 @@ How often to completely retrain the models. If not set, value of `infer_every` i
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span>
|
||||
`start_from`{{% available_from "v1.18.5" anomaly %}}
|
||||
</span>
|
||||
</td>
|
||||
<td>str, Optional</td>
|
||||
<td>
|
||||
@@ -169,8 +171,9 @@ Specifies when to initiate the first `fit_every` call. Accepts either an ISO 860
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span>
|
||||
`tz`{{% available_from "v1.18.5" anomaly %}}
|
||||
</span>
|
||||
</td>
|
||||
<td>str, Optional</td>
|
||||
<td>
|
||||
|
||||
@@ -31,8 +31,9 @@ Future updates will introduce additional export methods, offering users more fle
|
||||
`class`
|
||||
</td>
|
||||
<td>
|
||||
|
||||
<span>
|
||||
`writer.vm.VmWriter` or `vm`{{% available_from "v1.13.0" anomaly %}}
|
||||
</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
@@ -59,8 +60,9 @@ Datasource URL address
|
||||
`tenant_id`
|
||||
</td>
|
||||
<td>
|
||||
|
||||
<span>
|
||||
`0:0`, `multitenant`{{% available_from "v1.16.2" anomaly %}}
|
||||
</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
@@ -247,8 +249,9 @@ Token is passed in the standard format with header: `Authorization: bearer {toke
|
||||
`path_to_file`
|
||||
</td>
|
||||
<td>
|
||||
<span>
|
||||
Path to a file, which contains token, that is passed in the standard format with header: `Authorization: bearer {token}`{{% available_from "v1.15.9" anomaly %}}
|
||||
</td>
|
||||
</span> </td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,11 +1,11 @@
|
||||
---
|
||||
weight: 6
|
||||
weight: 7
|
||||
title: Year 2020
|
||||
menu:
|
||||
docs:
|
||||
identifier: vm-changelog-2020
|
||||
parent: vm-changelog
|
||||
weight: 6
|
||||
weight: 7
|
||||
aliases:
|
||||
- /CHANGELOG_2020.html
|
||||
- /changelog_2020
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
---
|
||||
weight: 5
|
||||
weight: 6
|
||||
title: Year 2021
|
||||
menu:
|
||||
docs:
|
||||
identifier: vm-changelog-2021
|
||||
parent: vm-changelog
|
||||
weight: 5
|
||||
weight: 6
|
||||
aliases:
|
||||
- /CHANGELOG_2021.html
|
||||
- /changelog_2021
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
---
|
||||
weight: 4
|
||||
weight: 5
|
||||
title: Year 2022
|
||||
menu:
|
||||
docs:
|
||||
identifier: vm-changelog-2022
|
||||
parent: vm-changelog
|
||||
weight: 4
|
||||
weight: 5
|
||||
aliases:
|
||||
- /CHANGELOG_2022.html
|
||||
- /changelog_2022
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
---
|
||||
weight: 3
|
||||
weight: 4
|
||||
title: Year 2023
|
||||
menu:
|
||||
docs:
|
||||
identifier: vm-changelog-2023
|
||||
parent: vm-changelog
|
||||
weight: 3
|
||||
weight: 4
|
||||
aliases:
|
||||
- /CHANGELOG_2023.html
|
||||
- /changelog_2023
|
||||
|
||||
1291
docs/changelog/CHANGELOG_2024.md
Normal file
1291
docs/changelog/CHANGELOG_2024.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,16 @@
|
||||
## Next release
|
||||
|
||||
- TODO
|
||||
|
||||
## 0.7.1
|
||||
|
||||
**Release date:** 10 Jan 2025
|
||||
|
||||
 
|
||||
|
||||
- updated common dependency 0.0.35 -> 0.0.37
|
||||
- fixed typo useMultitenantMode -> useMultiTenantMode in remotewrite settings
|
||||
- allow passing additional remotewrite setings
|
||||
|
||||
## 0.7.0
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||

|
||||
|
||||
@@ -2,6 +2,14 @@
|
||||
|
||||
- TODO
|
||||
|
||||
## 0.33.3
|
||||
|
||||
**Release date:** 13 Jan 2025
|
||||
|
||||
 
|
||||
|
||||
- updates operator to [v0.51.3](https://github.com/VictoriaMetrics/operator/releases/tag/v0.51.3) version
|
||||
|
||||
## 0.33.2
|
||||
|
||||
**Release date:** 06 Jan 2025
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||

|
||||
|
||||
@@ -2,6 +2,14 @@
|
||||
|
||||
- TODO
|
||||
|
||||
## 0.40.4
|
||||
|
||||
**Release date:** 13 Jan 2025
|
||||
|
||||
 
|
||||
|
||||
- updates operator to [v0.51.3](https://github.com/VictoriaMetrics/operator/releases/tag/v0.51.3) version
|
||||
|
||||
## 0.40.3
|
||||
|
||||
**Release date:** 06 Jan 2025
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
|
||||

|
||||

|
||||

|
||||

|
||||

|
||||
|
||||
7
docs/search/_index.md
Normal file
7
docs/search/_index.md
Normal file
@@ -0,0 +1,7 @@
|
||||
---
|
||||
page: search
|
||||
layout: search
|
||||
draft: false
|
||||
weight: 0
|
||||
search: true
|
||||
---
|
||||
@@ -149,7 +149,7 @@ or [Prometheus recording rules definition format](https://prometheus.io/docs/pro
|
||||
|
||||
There are limitations for the rules files:
|
||||
|
||||
1. All files may contain no more than 100 rules in total. If you need to upload more rules contact us via [support@victoriametrics.com](mailto:support@victoriametrics.com).
|
||||
1. All files may contain no more than 100 rules in total. If you need to upload more rules contact us via [support-cloud@victoriametrics.com](mailto:support-cloud@victoriametrics.com).
|
||||
2. The maximum file size is 20mb.
|
||||
3. The names of the groups in the files should be unique.
|
||||
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 24 KiB After Width: | Height: | Size: 52 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 54 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 52 KiB |
@@ -18,7 +18,7 @@ The tier parameters are derived from testing in typical monitoring environments,
|
||||
| **Active Time Series Count** | Per Tier Limits | Number of [active time series](https://docs.victoriametrics.com/faq/#what-is-an-active-time-series) that received at least one data point in the last hour. |
|
||||
| **Read Rate** | Per Tier Limits | Number of datapoints retrieved from the database per second. |
|
||||
| **New Series Over 24 Hours** (churn rate) | `<= Active Time Series Count` | Number of new series created in 24 hours. High [churn rate](https://docs.victoriametrics.com/faq/#what-is-high-churn-rate) leads to higher resource consumption. |
|
||||
| **Concurrent Requests per Token** | `<= 600` | Maximum concurrent requests per access token. It is recommended to create separate tokens for different clients and environments. This can be adjusted via [support](mailto:support@victoriametrics.com). |
|
||||
| **Concurrent Requests per Token** | `<= 600` | Maximum concurrent requests per access token. It is recommended to create separate tokens for different clients and environments. This can be adjusted via [support](mailto:support-cloud@victoriametrics.com). |
|
||||
|
||||
For a detailed explanation of each parameter, visit the guide on [Understanding Your Setup Size](https://docs.victoriametrics.com/guides/understand-your-setup-size.html).
|
||||
|
||||
@@ -26,7 +26,7 @@ For a detailed explanation of each parameter, visit the guide on [Understanding
|
||||
|
||||
| **Flag** | **Default Value** | **Description** |
|
||||
|-----------------------------------|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| **Max Label Value Length** | `<= 1kb` (Default: `4kb`) | Maximum length of label values. Longer values are truncated. Large label values can lead to high RAM consumption. This can be adjusted via [support](mailto:support@victoriametrics.com). |
|
||||
| **Max Label Value Length** | `<= 1kb` (Default: `4kb`) | Maximum length of label values. Longer values are truncated. Large label values can lead to high RAM consumption. This can be adjusted via [support](mailto:support-cloud@victoriametrics.com). |
|
||||
| **Max Labels per Time Series** | `<= 30` | Maximum number of labels per time series. Excess labels are dropped. Higher values can increase [cardinality](https://docs.victoriametrics.com/keyconcepts/#cardinality) and resource usage. This can be configured in [deployment settings](https://docs.victoriametrics.com/victoriametrics-cloud/quickstart/#modifying-an-existing-deployment). |
|
||||
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
@@ -227,6 +228,16 @@ func (br *blockResult) cloneValues(values []string) []string {
|
||||
return br.valuesBuf[valuesBufLen:]
|
||||
}
|
||||
|
||||
func (br *blockResult) addValues(values []string) {
|
||||
valuesBufLen := len(br.valuesBuf)
|
||||
br.valuesBuf = slicesutil.SetLength(br.valuesBuf, valuesBufLen+len(values))
|
||||
valuesBuf := br.valuesBuf[valuesBufLen:]
|
||||
_ = valuesBuf[len(values)-1]
|
||||
for i, v := range values {
|
||||
valuesBuf[i] = br.a.copyString(v)
|
||||
}
|
||||
}
|
||||
|
||||
func (br *blockResult) addValue(v string) {
|
||||
valuesBuf := br.valuesBuf
|
||||
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
|
||||
@@ -490,73 +501,57 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit
|
||||
|
||||
switch ch.valueType {
|
||||
case valueTypeString:
|
||||
visitValuesReadonly(bs, ch, bm, br.addValue)
|
||||
visitValuesReadonly(bs, ch, bm, br.addValues)
|
||||
case valueTypeDict:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 1 {
|
||||
logger.Panicf("FATAL: %s: unexpected dict value size for column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 1, "dict")
|
||||
for _, v := range values {
|
||||
dictIdx := v[0]
|
||||
if int(dictIdx) >= len(ch.valuesDict.values) {
|
||||
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
|
||||
}
|
||||
}
|
||||
dictIdx := v[0]
|
||||
if int(dictIdx) >= len(ch.valuesDict.values) {
|
||||
logger.Panicf("FATAL: %s: too big dict index for column %q: %d; should be smaller than %d", bs.partPath(), ch.name, dictIdx, len(ch.valuesDict.values))
|
||||
}
|
||||
br.addValue(v)
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeUint8:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 1 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for uint8 column %q; got %d bytes; want 1 byte", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 1, "uint8")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeUint16:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 2 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for uint16 column %q; got %d bytes; want 2 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 2, "uint16")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeUint32:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 4 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for uint32 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 4, "uint32")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeUint64:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 8 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for uint64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 8, "uint64")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeInt64:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 8 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for int64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 8, "int64")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeFloat64:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 8 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for float64 column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 8, "float64")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeIPv4:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 4 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for ipv4 column %q; got %d bytes; want 4 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 4, "ipv4")
|
||||
br.addValues(values)
|
||||
})
|
||||
case valueTypeTimestampISO8601:
|
||||
visitValuesReadonly(bs, ch, bm, func(v string) {
|
||||
if len(v) != 8 {
|
||||
logger.Panicf("FATAL: %s: unexpected size for timestmap column %q; got %d bytes; want 8 bytes", bs.partPath(), ch.name, len(v))
|
||||
}
|
||||
br.addValue(v)
|
||||
visitValuesReadonly(bs, ch, bm, func(values []string) {
|
||||
checkValuesSize(bs, ch, values, 8, "iso8601")
|
||||
br.addValues(values)
|
||||
})
|
||||
default:
|
||||
logger.Panicf("FATAL: %s: unknown valueType=%d for column %q", bs.partPath(), ch.valueType, ch.name)
|
||||
@@ -565,6 +560,14 @@ func (br *blockResult) newValuesEncodedFromColumnHeader(bs *blockSearch, bm *bit
|
||||
return br.valuesBuf[valuesBufLen:]
|
||||
}
|
||||
|
||||
func checkValuesSize(bs *blockSearch, ch *columnHeader, values []string, sizeExpected int, typeStr string) {
|
||||
for _, v := range values {
|
||||
if len(v) != sizeExpected {
|
||||
logger.Panicf("FATAL: %s: unexpected size for %s column %q; got %d bytes; want %d bytes", typeStr, bs.partPath(), ch.name, len(v), sizeExpected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addColumn adds column for the given ch to br.
|
||||
//
|
||||
// The added column is valid until ch is changed.
|
||||
@@ -2138,17 +2141,46 @@ func getEmptyStrings(rowsLen int) []string {
|
||||
|
||||
var emptyStrings atomic.Pointer[[]string]
|
||||
|
||||
func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string)) {
|
||||
func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(values []string)) {
|
||||
if bm.isZero() {
|
||||
// Fast path - nothing to visit
|
||||
return
|
||||
}
|
||||
values := bs.getValuesForColumn(ch)
|
||||
if bm.areAllBitsSet() {
|
||||
// Faster path - visit all the values
|
||||
f(values)
|
||||
return
|
||||
}
|
||||
|
||||
// Slower path - visit only the selected values
|
||||
vb := getValuesBuf()
|
||||
bm.forEachSetBitReadonly(func(idx int) {
|
||||
f(values[idx])
|
||||
vb.values = append(vb.values, values[idx])
|
||||
})
|
||||
f(vb.values)
|
||||
putValuesBuf(vb)
|
||||
}
|
||||
|
||||
type valuesBuf struct {
|
||||
values []string
|
||||
}
|
||||
|
||||
func getValuesBuf() *valuesBuf {
|
||||
v := valuesBufPool.Get()
|
||||
if v == nil {
|
||||
return &valuesBuf{}
|
||||
}
|
||||
return v.(*valuesBuf)
|
||||
}
|
||||
|
||||
func putValuesBuf(vb *valuesBuf) {
|
||||
vb.values = vb.values[:0]
|
||||
valuesBufPool.Put(vb)
|
||||
}
|
||||
|
||||
var valuesBufPool sync.Pool
|
||||
|
||||
func getCanonicalColumnName(columnName string) string {
|
||||
if columnName == "" {
|
||||
return "_msg"
|
||||
|
||||
@@ -12,6 +12,7 @@ type chunkedAllocator struct {
|
||||
countEmptyProcessors []statsCountEmptyProcessor
|
||||
countUniqProcessors []statsCountUniqProcessor
|
||||
countUniqHashProcessors []statsCountUniqHashProcessor
|
||||
histogramProcessors []statsHistogramProcessor
|
||||
maxProcessors []statsMaxProcessor
|
||||
medianProcessors []statsMedianProcessor
|
||||
minProcessors []statsMinProcessor
|
||||
@@ -60,6 +61,11 @@ func (a *chunkedAllocator) newStatsCountUniqHashProcessor() (p *statsCountUniqHa
|
||||
return p
|
||||
}
|
||||
|
||||
func (a *chunkedAllocator) newStatsHistogramProcessor() (p *statsHistogramProcessor) {
|
||||
a.histogramProcessors, p = addNewItem(a.histogramProcessors, a)
|
||||
return p
|
||||
}
|
||||
|
||||
func (a *chunkedAllocator) newStatsMaxProcessor() (p *statsMaxProcessor) {
|
||||
a.maxProcessors, p = addNewItem(a.maxProcessors, a)
|
||||
return p
|
||||
|
||||
@@ -1207,6 +1207,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||
f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`)
|
||||
f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`)
|
||||
|
||||
// stats pipe histogram
|
||||
f(`* | stats histogram(foo) bar`, `* | stats histogram(foo) as bar`)
|
||||
f(`* | histogram(foo)`, `* | stats histogram(foo) as "histogram(foo)"`)
|
||||
|
||||
// stats pipe quantile
|
||||
f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`)
|
||||
f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`)
|
||||
@@ -1742,6 +1746,12 @@ func TestParseQueryFailure(t *testing.T) {
|
||||
// invalid stats sum_len
|
||||
f(`foo | stats sum_len`)
|
||||
|
||||
// invalid stats histogram
|
||||
f(`foo | stats histogram`)
|
||||
f(`foo | stats histogram()`)
|
||||
f(`foo | stats histogram(a, b)`)
|
||||
f(`foo | stats histogram(*)`)
|
||||
|
||||
// invalid stats quantile
|
||||
f(`foo | stats quantile`)
|
||||
f(`foo | stats quantile() foo`)
|
||||
@@ -1977,6 +1987,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
||||
f(`* | stats max() q`, `*`, ``)
|
||||
f(`* | stats max(*) q`, `*`, ``)
|
||||
f(`* | stats max(x) q`, `x`, ``)
|
||||
f(`* | stats histogram(foo)`, `foo`, ``)
|
||||
f(`* | stats quantile(0.5) q`, `*`, ``)
|
||||
f(`* | stats quantile(0.5, *) q`, `*`, ``)
|
||||
f(`* | stats quantile(0.5, x) q`, `x`, ``)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
)
|
||||
@@ -279,7 +280,7 @@ func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string,
|
||||
b := shard.partitionKey[:0]
|
||||
for _, c := range shard.partitionColumns {
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
b = marshalJSONKeyValue(b, c.name, v)
|
||||
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(v))
|
||||
}
|
||||
shard.partitionKey = b
|
||||
|
||||
@@ -408,8 +409,8 @@ func (ptp *pipeTopkProcessor) flush() error {
|
||||
// Obtain all the partition keys
|
||||
partitionKeysMap := make(map[string]struct{})
|
||||
var partitionKeys []string
|
||||
for _, shard := range shards {
|
||||
for k := range shard.rowsByPartition {
|
||||
for i := range shards {
|
||||
for k := range shards[i].rowsByPartition {
|
||||
if _, ok := partitionKeysMap[k]; !ok {
|
||||
partitionKeysMap[k] = struct{}{}
|
||||
partitionKeys = append(partitionKeys, k)
|
||||
@@ -420,6 +421,9 @@ func (ptp *pipeTopkProcessor) flush() error {
|
||||
|
||||
// Merge sorted results across shards per each partitionKey
|
||||
for _, k := range partitionKeys {
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
var rss []*pipeTopkRows
|
||||
for _, shard := range shards {
|
||||
rs, ok := shard.rowsByPartition[k]
|
||||
@@ -428,9 +432,6 @@ func (ptp *pipeTopkProcessor) flush() error {
|
||||
}
|
||||
}
|
||||
ptp.mergeAndFlushRows(rss)
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -874,8 +874,8 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
|
||||
}
|
||||
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
|
||||
h := xxhash.Sum64(k)
|
||||
shardIdx := h % uint64(len(perCPU))
|
||||
perCPU[shardIdx].u64[n] = psg
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].u64[n] = psg
|
||||
}
|
||||
for n, psg := range psm.negative64 {
|
||||
if needStop(psp.stopCh) {
|
||||
@@ -883,16 +883,16 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
|
||||
}
|
||||
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
|
||||
h := xxhash.Sum64(k)
|
||||
shardIdx := h % uint64(len(perCPU))
|
||||
perCPU[shardIdx].negative64[n] = psg
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].negative64[n] = psg
|
||||
}
|
||||
for k, psg := range psm.strings {
|
||||
if needStop(psp.stopCh) {
|
||||
return
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
||||
shardIdx := h % uint64(len(perCPU))
|
||||
perCPU[shardIdx].strings[k] = psg
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].strings[k] = psg
|
||||
}
|
||||
|
||||
perShardMaps[idx] = perCPU
|
||||
@@ -903,9 +903,6 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
|
||||
if needStop(psp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := psp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// Merge per-shard entries into perShardMaps[0]
|
||||
for i := 0; i < cpusCount; i++ {
|
||||
@@ -924,9 +921,6 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
|
||||
if needStop(psp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := psp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// Filter out maps without entries
|
||||
psms := perShardMaps[0]
|
||||
@@ -1055,6 +1049,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
|
||||
return nil, fmt.Errorf("cannot parse 'count_uniq_hash' func: %w", err)
|
||||
}
|
||||
return sus, nil
|
||||
case lex.isKeyword("histogram"):
|
||||
shs, err := parseStatsHistogram(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'histogram' func: %w", err)
|
||||
}
|
||||
return shs, nil
|
||||
case lex.isKeyword("max"):
|
||||
sms, err := parseStatsMax(lex)
|
||||
if err != nil {
|
||||
@@ -1144,6 +1144,7 @@ var statsNames = []string{
|
||||
"count_empty",
|
||||
"count_uniq",
|
||||
"count_uniq_hash",
|
||||
"histogram",
|
||||
"max",
|
||||
"median",
|
||||
"min",
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
@@ -91,6 +93,7 @@ func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, ca
|
||||
pt: pt,
|
||||
},
|
||||
}
|
||||
shards[i].m.init(&shards[i])
|
||||
}
|
||||
|
||||
ptp := &pipeTopProcessor{
|
||||
@@ -131,11 +134,8 @@ type pipeTopProcessorShardNopad struct {
|
||||
// pt points to the parent pipeTop.
|
||||
pt *pipeTop
|
||||
|
||||
// a reduces memory allocations when counting the number of hits over big number of unique values.
|
||||
a chunkedAllocator
|
||||
|
||||
// m holds per-row hits.
|
||||
m map[string]*uint64
|
||||
// m holds per-value hits.
|
||||
m pipeTopMap
|
||||
|
||||
// keyBuf is a temporary buffer for building keys for m.
|
||||
keyBuf []byte
|
||||
@@ -148,6 +148,134 @@ type pipeTopProcessorShardNopad struct {
|
||||
stateSizeBudget int
|
||||
}
|
||||
|
||||
type pipeTopMap struct {
|
||||
shard *pipeTopProcessorShard
|
||||
|
||||
u64 map[uint64]*uint64
|
||||
negative64 map[uint64]*uint64
|
||||
strings map[string]*uint64
|
||||
|
||||
// a reduces memory allocations when counting the number of hits over big number of unique values.
|
||||
a chunkedAllocator
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) reset() {
|
||||
ptm.shard = nil
|
||||
|
||||
ptm.u64 = nil
|
||||
ptm.negative64 = nil
|
||||
ptm.strings = nil
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) init(shard *pipeTopProcessorShard) {
|
||||
ptm.shard = shard
|
||||
|
||||
ptm.u64 = make(map[uint64]*uint64)
|
||||
ptm.negative64 = make(map[uint64]*uint64)
|
||||
ptm.strings = make(map[string]*uint64)
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) updateStateGeneric(key string, hits uint64) {
|
||||
if n, ok := tryParseUint64(key); ok {
|
||||
ptm.updateStateUint64(n, hits)
|
||||
return
|
||||
}
|
||||
if len(key) > 0 && key[0] == '-' {
|
||||
if n, ok := tryParseInt64(key); ok {
|
||||
ptm.updateStateNegativeInt64(n, hits)
|
||||
return
|
||||
}
|
||||
}
|
||||
ptm.updateStateString(bytesutil.ToUnsafeBytes(key), hits)
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) updateStateInt64(n int64, hits uint64) {
|
||||
if n >= 0 {
|
||||
ptm.updateStateUint64(uint64(n), hits)
|
||||
} else {
|
||||
ptm.updateStateNegativeInt64(n, hits)
|
||||
}
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) updateStateUint64(n, hits uint64) {
|
||||
pHits := ptm.u64[n]
|
||||
if pHits != nil {
|
||||
*pHits += hits
|
||||
return
|
||||
}
|
||||
|
||||
pHits = ptm.a.newUint64()
|
||||
*pHits += hits
|
||||
ptm.u64[n] = pHits
|
||||
|
||||
ptm.shard.stateSizeBudget -= int(unsafe.Sizeof(*pHits) + unsafe.Sizeof(pHits))
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) updateStateNegativeInt64(n int64, hits uint64) {
|
||||
pHits := ptm.negative64[uint64(n)]
|
||||
if pHits != nil {
|
||||
*pHits += hits
|
||||
return
|
||||
}
|
||||
|
||||
pHits = ptm.a.newUint64()
|
||||
*pHits += hits
|
||||
ptm.negative64[uint64(n)] = pHits
|
||||
|
||||
ptm.shard.stateSizeBudget -= int(unsafe.Sizeof(*pHits) + unsafe.Sizeof(pHits))
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) updateStateString(key []byte, hits uint64) {
|
||||
pHits := ptm.strings[string(key)]
|
||||
if pHits != nil {
|
||||
*pHits += hits
|
||||
return
|
||||
}
|
||||
|
||||
keyCopy := ptm.a.cloneBytesToString(key)
|
||||
pHits = ptm.a.newUint64()
|
||||
*pHits += hits
|
||||
ptm.strings[keyCopy] = pHits
|
||||
|
||||
ptm.shard.stateSizeBudget -= len(keyCopy) + int(unsafe.Sizeof(keyCopy)+unsafe.Sizeof(*pHits)+unsafe.Sizeof(pHits))
|
||||
}
|
||||
|
||||
func (ptm *pipeTopMap) mergeState(src *pipeTopMap, stopCh <-chan struct{}) {
|
||||
for n, pHitsSrc := range src.u64 {
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
}
|
||||
pHitsDst := ptm.u64[n]
|
||||
if pHitsDst == nil {
|
||||
ptm.u64[n] = pHitsSrc
|
||||
} else {
|
||||
*pHitsDst += *pHitsSrc
|
||||
}
|
||||
}
|
||||
for n, pHitsSrc := range src.negative64 {
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
}
|
||||
pHitsDst := ptm.negative64[n]
|
||||
if pHitsDst == nil {
|
||||
ptm.negative64[n] = pHitsSrc
|
||||
} else {
|
||||
*pHitsDst += *pHitsSrc
|
||||
}
|
||||
}
|
||||
for k, pHitsSrc := range src.strings {
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
}
|
||||
pHitsDst := ptm.strings[k]
|
||||
if pHitsDst == nil {
|
||||
ptm.strings[k] = pHitsSrc
|
||||
} else {
|
||||
*pHitsDst += *pHitsSrc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeBlock writes br to shard.
|
||||
func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||
byFields := shard.pt.byFields
|
||||
@@ -155,35 +283,21 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||
// Take into account all the columns in br.
|
||||
keyBuf := shard.keyBuf
|
||||
cs := br.getColumns()
|
||||
for i := 0; i < br.rowsLen; i++ {
|
||||
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
|
||||
keyBuf = keyBuf[:0]
|
||||
for _, c := range cs {
|
||||
v := c.getValueAtRow(br, i)
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
||||
}
|
||||
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
|
||||
shard.m.updateStateString(keyBuf, 1)
|
||||
}
|
||||
shard.keyBuf = keyBuf
|
||||
return
|
||||
}
|
||||
if len(byFields) == 1 {
|
||||
// Fast path for a single field.
|
||||
c := br.getColumnByName(byFields[0])
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
shard.updateState(v, uint64(br.rowsLen))
|
||||
return
|
||||
}
|
||||
if c.valueType == valueTypeDict {
|
||||
c.forEachDictValueWithHits(br, shard.updateState)
|
||||
return
|
||||
}
|
||||
|
||||
values := c.getValues(br)
|
||||
for _, v := range values {
|
||||
shard.updateState(v, 1)
|
||||
}
|
||||
shard.updateStatsSingleColumn(br, byFields[0])
|
||||
return
|
||||
}
|
||||
|
||||
@@ -197,33 +311,62 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||
shard.columnValues = columnValues
|
||||
|
||||
keyBuf := shard.keyBuf
|
||||
for i := 0; i < br.rowsLen; i++ {
|
||||
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
|
||||
keyBuf = keyBuf[:0]
|
||||
for _, values := range columnValues {
|
||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
|
||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[rowIdx]))
|
||||
}
|
||||
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
|
||||
shard.m.updateStateString(keyBuf, 1)
|
||||
}
|
||||
shard.keyBuf = keyBuf
|
||||
}
|
||||
|
||||
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
||||
m := shard.getM()
|
||||
pHits := m[v]
|
||||
if pHits == nil {
|
||||
vCopy := shard.a.cloneString(v)
|
||||
pHits = shard.a.newUint64()
|
||||
m[vCopy] = pHits
|
||||
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits))
|
||||
func (shard *pipeTopProcessorShard) updateStatsSingleColumn(br *blockResult, fieldName string) {
|
||||
c := br.getColumnByName(fieldName)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
shard.m.updateStateGeneric(v, uint64(br.rowsLen))
|
||||
return
|
||||
}
|
||||
*pHits += hits
|
||||
}
|
||||
|
||||
func (shard *pipeTopProcessorShard) getM() map[string]*uint64 {
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[string]*uint64)
|
||||
switch c.valueType {
|
||||
case valueTypeDict:
|
||||
c.forEachDictValueWithHits(br, shard.m.updateStateGeneric)
|
||||
case valueTypeUint8:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint8(v)
|
||||
shard.m.updateStateUint64(uint64(n), 1)
|
||||
}
|
||||
case valueTypeUint16:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint16(v)
|
||||
shard.m.updateStateUint64(uint64(n), 1)
|
||||
}
|
||||
case valueTypeUint32:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint32(v)
|
||||
shard.m.updateStateUint64(uint64(n), 1)
|
||||
}
|
||||
case valueTypeUint64:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint64(v)
|
||||
shard.m.updateStateUint64(n, 1)
|
||||
}
|
||||
case valueTypeInt64:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalInt64(v)
|
||||
shard.m.updateStateInt64(n, 1)
|
||||
}
|
||||
default:
|
||||
values := c.getValues(br)
|
||||
for _, v := range values {
|
||||
shard.m.updateStateGeneric(v, 1)
|
||||
}
|
||||
}
|
||||
return shard.m
|
||||
}
|
||||
|
||||
func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||
@@ -378,101 +521,84 @@ func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
|
||||
|
||||
shards := ptp.shards
|
||||
shardsLen := len(shards)
|
||||
cpusCount := cgroup.AvailableCPUs()
|
||||
|
||||
if shardsLen == 1 {
|
||||
entries := getTopEntries(shards[0].getM(), limit, ptp.stopCh)
|
||||
ptm := &shards[0].m
|
||||
entries := getTopEntries(ptm, limit, ptp.stopCh)
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
perShardMaps := make([][]map[string]*uint64, shardsLen)
|
||||
perShardMaps := make([][]pipeTopMap, shardsLen)
|
||||
for i := range shards {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
|
||||
shardMaps := make([]map[string]*uint64, shardsLen)
|
||||
for i := range shardMaps {
|
||||
shardMaps[i] = make(map[string]*uint64)
|
||||
perCPU := make([]pipeTopMap, cpusCount)
|
||||
for i := range perCPU {
|
||||
perCPU[i].init(&shards[idx])
|
||||
}
|
||||
|
||||
n := int64(0)
|
||||
nTotal := int64(0)
|
||||
for k, pHits := range shards[idx].getM() {
|
||||
ptm := &shards[idx].m
|
||||
|
||||
for n, pHits := range ptm.u64 {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
|
||||
h := xxhash.Sum64(k)
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].u64[n] = pHits
|
||||
}
|
||||
for n, pHits := range ptm.negative64 {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
|
||||
h := xxhash.Sum64(k)
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].negative64[n] = pHits
|
||||
}
|
||||
for k, pHits := range ptm.strings {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
||||
m := shardMaps[h%uint64(len(shardMaps))]
|
||||
n += updatePipeTopMap(m, k, pHits)
|
||||
if n > stateSizeBudgetChunk {
|
||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||
return
|
||||
}
|
||||
nTotal += n
|
||||
n = 0
|
||||
}
|
||||
cpuIdx := h % uint64(len(perCPU))
|
||||
perCPU[cpuIdx].strings[k] = pHits
|
||||
}
|
||||
nTotal += n
|
||||
ptp.stateSizeBudget.Add(-n)
|
||||
|
||||
perShardMaps[idx] = shardMaps
|
||||
|
||||
// Clean the original map and return its state size budget back.
|
||||
shards[idx].m = nil
|
||||
ptp.stateSizeBudget.Add(nTotal)
|
||||
perShardMaps[idx] = perCPU
|
||||
ptm.reset()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := ptp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// Obtain topN entries per each shard
|
||||
entriess := make([][]*pipeTopEntry, shardsLen)
|
||||
entriess := make([][]*pipeTopEntry, cpusCount)
|
||||
for i := range entriess {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
go func(cpuIdx int) {
|
||||
defer wg.Done()
|
||||
|
||||
m := perShardMaps[0][idx]
|
||||
for i := 1; i < len(perShardMaps); i++ {
|
||||
n := int64(0)
|
||||
nTotal := int64(0)
|
||||
for k, pHits := range perShardMaps[i][idx] {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
n += updatePipeTopMap(m, k, pHits)
|
||||
if n > stateSizeBudgetChunk {
|
||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||
return
|
||||
}
|
||||
nTotal += n
|
||||
n = 0
|
||||
}
|
||||
}
|
||||
nTotal += n
|
||||
ptp.stateSizeBudget.Add(-n)
|
||||
|
||||
// Clean the original map and return its state size budget back.
|
||||
perShardMaps[i][idx] = nil
|
||||
ptp.stateSizeBudget.Add(nTotal)
|
||||
ptm := &perShardMaps[0][cpuIdx]
|
||||
for _, perCPU := range perShardMaps[1:] {
|
||||
ptm.mergeState(&perCPU[cpuIdx], ptp.stopCh)
|
||||
perCPU[cpuIdx].reset()
|
||||
}
|
||||
perShardMaps[0][idx] = nil
|
||||
|
||||
entriess[idx] = getTopEntries(m, limit, ptp.stopCh)
|
||||
entriess[cpuIdx] = getTopEntries(ptm, limit, ptp.stopCh)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := ptp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// merge entriess
|
||||
entries := entriess[0]
|
||||
@@ -488,31 +614,57 @@ func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
|
||||
func getTopEntries(ptm *pipeTopMap, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
|
||||
if limit == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var eh topEntriesHeap
|
||||
for k, pHits := range m {
|
||||
var e pipeTopEntry
|
||||
|
||||
pushEntry := func(k string, hits uint64, kCopy bool) {
|
||||
e.k = k
|
||||
e.hits = hits
|
||||
if uint64(len(eh)) < limit {
|
||||
eCopy := e
|
||||
if kCopy {
|
||||
eCopy.k = strings.Clone(eCopy.k)
|
||||
}
|
||||
heap.Push(&eh, &eCopy)
|
||||
return
|
||||
}
|
||||
|
||||
if !eh[0].less(&e) {
|
||||
return
|
||||
}
|
||||
eCopy := e
|
||||
if kCopy {
|
||||
eCopy.k = strings.Clone(eCopy.k)
|
||||
}
|
||||
eh[0] = &eCopy
|
||||
heap.Fix(&eh, 0)
|
||||
}
|
||||
|
||||
var b []byte
|
||||
for n, pHits := range ptm.u64 {
|
||||
if needStop(stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
e := pipeTopEntry{
|
||||
k: k,
|
||||
hits: *pHits,
|
||||
b = marshalUint64String(b[:0], n)
|
||||
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
|
||||
}
|
||||
for n, pHits := range ptm.negative64 {
|
||||
if needStop(stopCh) {
|
||||
return nil
|
||||
}
|
||||
if uint64(len(eh)) < limit {
|
||||
eCopy := e
|
||||
heap.Push(&eh, &eCopy)
|
||||
continue
|
||||
}
|
||||
if eh[0].less(&e) {
|
||||
eCopy := e
|
||||
eh[0] = &eCopy
|
||||
heap.Fix(&eh, 0)
|
||||
b = marshalInt64String(b[:0], int64(n))
|
||||
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
|
||||
}
|
||||
for k, pHits := range ptm.strings {
|
||||
if needStop(stopCh) {
|
||||
return nil
|
||||
}
|
||||
pushEntry(k, *pHits, false)
|
||||
}
|
||||
|
||||
result := ([]*pipeTopEntry)(eh)
|
||||
@@ -524,17 +676,6 @@ func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) [
|
||||
return result
|
||||
}
|
||||
|
||||
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
|
||||
pHitsDst := m[k]
|
||||
if pHitsDst != nil {
|
||||
*pHitsDst += *pHitsSrc
|
||||
return 0
|
||||
}
|
||||
|
||||
m[k] = pHitsSrc
|
||||
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc))
|
||||
}
|
||||
|
||||
type topEntriesHeap []*pipeTopEntry
|
||||
|
||||
func (h *topEntriesHeap) Less(i, j int) bool {
|
||||
@@ -614,7 +755,9 @@ func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) {
|
||||
}
|
||||
|
||||
wctx.rowsCount++
|
||||
if wctx.valuesLen >= 1_000_000 {
|
||||
|
||||
// The 64_000 limit provides the best performance results.
|
||||
if wctx.valuesLen >= 64_000 {
|
||||
wctx.flush()
|
||||
}
|
||||
}
|
||||
@@ -670,37 +813,41 @@ func parsePipeTop(lex *lexer) (pipe, error) {
|
||||
byFields = bfs
|
||||
}
|
||||
|
||||
hitsFieldName := "hits"
|
||||
if lex.isKeyword("hits") {
|
||||
lex.nextToken()
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
s, err := getCompoundToken(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'hits' name: %w", err)
|
||||
}
|
||||
hitsFieldName = s
|
||||
}
|
||||
for slices.Contains(byFields, hitsFieldName) {
|
||||
hitsFieldName += "s"
|
||||
}
|
||||
|
||||
pt := &pipeTop{
|
||||
byFields: byFields,
|
||||
limit: limit,
|
||||
limitStr: limitStr,
|
||||
hitsFieldName: hitsFieldName,
|
||||
hitsFieldName: "hits",
|
||||
}
|
||||
|
||||
if lex.isKeyword("rank") {
|
||||
rankFieldName, err := parseRankFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
|
||||
for {
|
||||
switch {
|
||||
case lex.isKeyword("hits"):
|
||||
lex.nextToken()
|
||||
if lex.isKeyword("as") {
|
||||
lex.nextToken()
|
||||
}
|
||||
s, err := getCompoundToken(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'hits' name: %w", err)
|
||||
}
|
||||
pt.hitsFieldName = s
|
||||
case lex.isKeyword("rank"):
|
||||
rankFieldName, err := parseRankFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
|
||||
}
|
||||
pt.rankFieldName = rankFieldName
|
||||
for slices.Contains(byFields, pt.rankFieldName) {
|
||||
pt.rankFieldName += "s"
|
||||
}
|
||||
default:
|
||||
for slices.Contains(byFields, pt.hitsFieldName) {
|
||||
pt.hitsFieldName += "s"
|
||||
}
|
||||
return pt, nil
|
||||
}
|
||||
pt.rankFieldName = rankFieldName
|
||||
}
|
||||
return pt, nil
|
||||
}
|
||||
|
||||
func parseRankFieldName(lex *lexer) (string, error) {
|
||||
|
||||
189
lib/logstorage/stats_histogram.go
Normal file
189
lib/logstorage/stats_histogram.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
type statsHistogram struct {
|
||||
fieldName string
|
||||
}
|
||||
|
||||
func (sh *statsHistogram) String() string {
|
||||
return "histogram(" + quoteTokenIfNeeded(sh.fieldName) + ")"
|
||||
}
|
||||
|
||||
func (sh *statsHistogram) updateNeededFields(neededFields fieldsSet) {
|
||||
updateNeededFieldsForStatsFunc(neededFields, []string{sh.fieldName})
|
||||
}
|
||||
|
||||
func (sh *statsHistogram) newStatsProcessor(a *chunkedAllocator) statsProcessor {
|
||||
return a.newStatsHistogramProcessor()
|
||||
}
|
||||
|
||||
type statsHistogramProcessor struct {
|
||||
h metrics.Histogram
|
||||
}
|
||||
|
||||
func (shp *statsHistogramProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
|
||||
sh := sf.(*statsHistogram)
|
||||
|
||||
c := br.getColumnByName(sh.fieldName)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
f, ok := tryParseNumber(v)
|
||||
if ok {
|
||||
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
|
||||
shp.h.Update(f)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
switch c.valueType {
|
||||
case valueTypeUint8:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint8(v)
|
||||
shp.h.Update(float64(n))
|
||||
}
|
||||
case valueTypeUint16:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint16(v)
|
||||
shp.h.Update(float64(n))
|
||||
}
|
||||
case valueTypeUint32:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint32(v)
|
||||
shp.h.Update(float64(n))
|
||||
}
|
||||
case valueTypeUint64:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalUint64(v)
|
||||
shp.h.Update(float64(n))
|
||||
}
|
||||
case valueTypeInt64:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
n := unmarshalInt64(v)
|
||||
shp.h.Update(float64(n))
|
||||
}
|
||||
case valueTypeFloat64:
|
||||
values := c.getValuesEncoded(br)
|
||||
for _, v := range values {
|
||||
f := unmarshalFloat64(v)
|
||||
shp.h.Update(f)
|
||||
}
|
||||
case valueTypeIPv4:
|
||||
// skip ipv4 values, since they cannot be represented as numbers
|
||||
case valueTypeTimestampISO8601:
|
||||
// skip iso8601 values, since they cannot be represented as numbers
|
||||
default:
|
||||
values := c.getValues(br)
|
||||
for _, v := range values {
|
||||
f, ok := tryParseNumber(v)
|
||||
if ok {
|
||||
shp.h.Update(f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (shp *statsHistogramProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
|
||||
sh := sf.(*statsHistogram)
|
||||
|
||||
c := br.getColumnByName(sh.fieldName)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
f, ok := tryParseNumber(v)
|
||||
if ok {
|
||||
shp.h.Update(f)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
switch c.valueType {
|
||||
case valueTypeUint8:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
n := unmarshalUint8(v)
|
||||
shp.h.Update(float64(n))
|
||||
case valueTypeUint16:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
n := unmarshalUint16(v)
|
||||
shp.h.Update(float64(n))
|
||||
case valueTypeUint32:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
n := unmarshalUint32(v)
|
||||
shp.h.Update(float64(n))
|
||||
case valueTypeUint64:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
n := unmarshalUint64(v)
|
||||
shp.h.Update(float64(n))
|
||||
case valueTypeInt64:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
n := unmarshalInt64(v)
|
||||
shp.h.Update(float64(n))
|
||||
case valueTypeFloat64:
|
||||
values := c.getValuesEncoded(br)
|
||||
v := values[rowIdx]
|
||||
f := unmarshalFloat64(v)
|
||||
shp.h.Update(f)
|
||||
case valueTypeIPv4:
|
||||
// skip ipv4 values, since they cannot be represented as numbers
|
||||
case valueTypeTimestampISO8601:
|
||||
// skip iso8601 values, since they cannot be represented as numbers
|
||||
default:
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
f, ok := tryParseNumber(v)
|
||||
if ok {
|
||||
shp.h.Update(f)
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
func (shp *statsHistogramProcessor) mergeState(_ statsFunc, sfp statsProcessor) {
|
||||
src := sfp.(*statsHistogramProcessor)
|
||||
shp.h.Merge(&src.h)
|
||||
}
|
||||
|
||||
func (shp *statsHistogramProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
|
||||
dst = append(dst, '[')
|
||||
shp.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
dst = append(dst, `{"vmrange":"`...)
|
||||
dst = append(dst, vmrange...)
|
||||
dst = append(dst, `","hits":`...)
|
||||
dst = marshalUint64String(dst, count)
|
||||
dst = append(dst, `},`...)
|
||||
})
|
||||
dst = dst[:len(dst)-1]
|
||||
dst = append(dst, ']')
|
||||
return dst
|
||||
}
|
||||
|
||||
func parseStatsHistogram(lex *lexer) (*statsHistogram, error) {
|
||||
fields, err := parseStatsFuncFields(lex, "histogram")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse field name: %w", err)
|
||||
}
|
||||
if len(fields) != 1 {
|
||||
return nil, fmt.Errorf("unexpected number of fields; got %d; want 1", len(fields))
|
||||
}
|
||||
|
||||
sh := &statsHistogram{
|
||||
fieldName: fields[0],
|
||||
}
|
||||
return sh, nil
|
||||
}
|
||||
52
lib/logstorage/stats_histogram_test.go
Normal file
52
lib/logstorage/stats_histogram_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseStatsHistogramSuccess(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParseStatsFuncSuccess(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`histogram(foo)`)
|
||||
}
|
||||
|
||||
func TestParseStatsHistogramFailure(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParseStatsFuncFailure(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`histogram`)
|
||||
f(`histogram(a, b)`)
|
||||
f(`histogram(a) abc`)
|
||||
}
|
||||
|
||||
func TestStatsHistogram(t *testing.T) {
|
||||
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||
t.Helper()
|
||||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
f("stats histogram(a) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1.9`},
|
||||
},
|
||||
{
|
||||
{"a", `3.05`},
|
||||
{"b", `54`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `[{"vmrange":"1.896e+00...2.154e+00","hits":2},{"vmrange":"2.783e+00...3.162e+00","hits":1}]`},
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
@@ -53,6 +54,8 @@ func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte,
|
||||
return nil
|
||||
}
|
||||
|
||||
var skippedSampleLogger = logger.WithThrottler("otlp_skipped_sample", 5*time.Second)
|
||||
|
||||
func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
||||
for _, m := range sc.Metrics {
|
||||
if len(m.Name) == 0 {
|
||||
@@ -68,6 +71,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
||||
case m.Sum != nil:
|
||||
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
||||
rowsDroppedUnsupportedSum.Inc()
|
||||
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('sum'): skipping it", metricName)
|
||||
continue
|
||||
}
|
||||
for _, p := range m.Sum.DataPoints {
|
||||
@@ -80,6 +84,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
||||
case m.Histogram != nil:
|
||||
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
||||
rowsDroppedUnsupportedHistogram.Inc()
|
||||
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('histogram'): skipping it", metricName)
|
||||
continue
|
||||
}
|
||||
for _, p := range m.Histogram.DataPoints {
|
||||
@@ -88,6 +93,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
||||
case m.ExponentialHistogram != nil:
|
||||
if m.ExponentialHistogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
||||
rowsDroppedUnsupportedExponentialHistogram.Inc()
|
||||
skippedSampleLogger.Warnf("unsupported delta temporality for %q ('exponential histogram'): skipping it", metricName)
|
||||
continue
|
||||
}
|
||||
for _, p := range m.ExponentialHistogram.DataPoints {
|
||||
@@ -95,7 +101,7 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
||||
}
|
||||
default:
|
||||
rowsDroppedUnsupportedMetricType.Inc()
|
||||
logger.Warnf("unsupported type for metric %q", metricName)
|
||||
skippedSampleLogger.Warnf("unsupported type for metric %q", metricName)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -139,7 +145,7 @@ func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.Hist
|
||||
}
|
||||
if len(p.BucketCounts) != len(p.ExplicitBounds)+1 {
|
||||
// fast path, broken data format
|
||||
logger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
|
||||
skippedSampleLogger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1822,8 +1822,8 @@ func testStorageVariousDataPatternsConcurrently(t *testing.T, registerOnly bool,
|
||||
func testStorageVariousDataPatterns(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool) {
|
||||
f := func(t *testing.T, sameBatchMetricNames, sameRowMetricNames, sameBatchDates, sameRowDates bool) {
|
||||
batches, wantCounts := testGenerateMetricRowBatches(&batchOptions{
|
||||
numBatches: 4,
|
||||
numRowsPerBatch: 100,
|
||||
numBatches: 3,
|
||||
numRowsPerBatch: 30,
|
||||
registerOnly: registerOnly,
|
||||
sameBatchMetricNames: sameBatchMetricNames,
|
||||
sameRowMetricNames: sameRowMetricNames,
|
||||
|
||||
Reference in New Issue
Block a user