Compare commits

..

2 Commits

Author SHA1 Message Date
Haley Wang
7d7d17d192 add changelog 2025-02-10 14:08:32 +08:00
Evgeny Kuzin
0a8b4281e5 fix race using the same list from 2 goroutines 2025-02-07 11:55:45 -05:00
42 changed files with 2177 additions and 1650 deletions

View File

@@ -2,20 +2,19 @@ package insertutils
import (
"fmt"
"math"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// ExtractTimestampRFC3339NanoFromFields extracts RFC3339 timestamp in nanoseconds from the field with the name timeField at fields.
// ExtractTimestampFromFields extracts timestamp in nanoseconds from the field with the name timeField at fields.
//
// The value for the timeField is set to empty string after returning from the function,
// so it could be ignored during data ingestion.
//
// The current timestamp is returned if fields do not contain a field with timeField name or if the timeField value is empty.
func ExtractTimestampRFC3339NanoFromFields(timeField string, fields []logstorage.Field) (int64, error) {
func ExtractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
@@ -48,22 +47,24 @@ func parseTimestamp(s string) (int64, error) {
return nsecs, nil
}
// ParseUnixTimestamp parses s as unix timestamp in either seconds or milliseconds and returns the parsed timestamp in nanoseconds.
// ParseUnixTimestamp parses s as unix timestamp in seconds, milliseconds, microseconds or nanoseconds and returns the parsed timestamp in nanoseconds.
func ParseUnixTimestamp(s string) (int64, error) {
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse unix timestamp from %q: %w", s, err)
}
if n < (1<<31) && n >= (-1<<31) {
// The timestamp is in seconds. Convert it to milliseconds
n *= 1e3
// The timestamp is in seconds.
return n * 1e9, nil
}
if n > int64(math.MaxInt64)/1e6 {
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
if n < 1e3*(1<<31) && n >= 1e3*(-1<<31) {
// The timestamp is in milliseconds.
return n * 1e6, nil
}
if n < int64(math.MinInt64)/1e6 {
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
if n < 1e6*(1<<31) && n >= 1e6*(-1<<31) {
// The timestamp is in microseconds.
return n * 1e3, nil
}
n *= 1e6
// The timestamp is in nanoseconds
return n, nil
}

View File

@@ -6,11 +6,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestExtractTimestampRFC3339NanoFromFields_Success(t *testing.T) {
func TestExtractTimestampFromFields_Success(t *testing.T) {
f := func(timeField string, fields []logstorage.Field, nsecsExpected int64) {
t.Helper()
nsecs, err := ExtractTimestampRFC3339NanoFromFields(timeField, fields)
nsecs, err := ExtractTimestampFromFields(timeField, fields)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -51,6 +51,18 @@ func TestExtractTimestampRFC3339NanoFromFields_Success(t *testing.T) {
{Name: "foo", Value: "bar"},
}, 1718773640123456789)
// Unix timestamp in nanoseconds
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "1718773640123456789"},
}, 1718773640123456789)
// Unix timestamp in microseconds
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "1718773640123456"},
}, 1718773640123456000)
// Unix timestamp in milliseconds
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
@@ -64,14 +76,14 @@ func TestExtractTimestampRFC3339NanoFromFields_Success(t *testing.T) {
}, 1718773640000000000)
}
func TestExtractTimestampRFC3339NanoFromFields_Error(t *testing.T) {
func TestExtractTimestampFromFields_Error(t *testing.T) {
f := func(s string) {
t.Helper()
fields := []logstorage.Field{
{Name: "time", Value: s},
}
nsecs, err := ExtractTimestampRFC3339NanoFromFields("time", fields)
nsecs, err := ExtractTimestampFromFields("time", fields)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -80,6 +92,7 @@ func TestExtractTimestampRFC3339NanoFromFields_Error(t *testing.T) {
}
}
// invalid time
f("foobar")
// incomplete time

View File

@@ -99,7 +99,7 @@ func readLine(lr *insertutils.LineReader, timeField string, msgFields []string,
if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
ts, err := insertutils.ExtractTimestampRFC3339NanoFromFields(timeField, p.Fields)
ts, err := insertutils.ExtractTimestampFromFields(timeField, p.Fields)
if err != nil {
return false, fmt.Errorf("cannot get timestamp: %w", err)
}

View File

@@ -560,7 +560,7 @@ func processLine(line []byte, currentYear int, timezone *time.Location, useLocal
if useLocalTimestamp {
ts = time.Now().UnixNano()
} else {
nsecs, err := insertutils.ExtractTimestampRFC3339NanoFromFields("timestamp", p.Fields)
nsecs, err := insertutils.ExtractTimestampFromFields("timestamp", p.Fields)
if err != nil {
return fmt.Errorf("cannot get timestamp from syslog line %q: %w", line, err)
}

View File

@@ -2,7 +2,6 @@ package notifier
import (
"context"
"fmt"
"testing"
"time"
@@ -28,12 +27,10 @@ func TestBlackHoleNotifier_Send(t *testing.T) {
}
func TestBlackHoleNotifier_Close(t *testing.T) {
addr := "blackhole-close"
bh := newBlackHoleNotifier()
bh.addr = addr
if err := bh.Send(context.Background(), []Alert{{
GroupID: 0,
Name: "alert1",
Name: "alert0",
Start: time.Now().UTC(),
End: time.Now().UTC(),
Annotations: map[string]string{"a": "b", "c": "d", "e": "f"},
@@ -44,10 +41,10 @@ func TestBlackHoleNotifier_Close(t *testing.T) {
bh.Close()
defaultMetrics := metricset.GetDefaultSet()
alertMetricName := fmt.Sprintf("vmalert_alerts_sent_total{addr=%q}", addr)
alertMetricName := "vmalert_alerts_sent_total{addr=\"blackhole\"}"
for _, name := range defaultMetrics.ListMetricNames() {
if name == alertMetricName {
t.Fatalf("Metric name should have unregistered. But still present")
t.Fatalf("Metric name should have unregistered.But still present")
}
}
}

View File

@@ -1,56 +1,14 @@
package utils
import (
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
import "github.com/VictoriaMetrics/metrics"
type namedMetric struct {
Name string
}
var usedMetrics map[string]*atomic.Int64
var usedMetricMu sync.Mutex
func trackUsedMetric(name string) {
usedMetricMu.Lock()
defer usedMetricMu.Unlock()
if usedMetrics == nil {
usedMetrics = make(map[string]*atomic.Int64)
}
if _, ok := usedMetrics[name]; !ok {
usedMetrics[name] = &atomic.Int64{}
}
usedMetrics[name].Add(1)
}
// Unregister removes the metric by name from default registry
func (nm namedMetric) Unregister() {
if usedMetrics == nil {
logger.Fatalf("BUG: unregistered metric %q before registering", nm.Name)
}
usedMetricMu.Lock()
counter, ok := usedMetrics[nm.Name]
if !ok {
logger.Fatalf("BUG: unregistered metric %q before registering", nm.Name)
}
current := counter.Add(-1)
usedMetricMu.Unlock()
if current < 0 {
logger.Fatalf("BUG: negative metric counter for %q", nm.Name)
}
if current == 0 {
metrics.UnregisterMetric(nm.Name)
}
metrics.UnregisterMetric(nm.Name)
}
// Gauge is a metrics.Gauge with Name
@@ -61,7 +19,6 @@ type Gauge struct {
// GetOrCreateGauge creates a new Gauge with the given name
func GetOrCreateGauge(name string, f func() float64) *Gauge {
trackUsedMetric(name)
return &Gauge{
namedMetric: namedMetric{Name: name},
Gauge: metrics.GetOrCreateGauge(name, f),
@@ -76,7 +33,6 @@ type Counter struct {
// GetOrCreateCounter creates a new Counter with the given name
func GetOrCreateCounter(name string) *Counter {
trackUsedMetric(name)
return &Counter{
namedMetric: namedMetric{Name: name},
Counter: metrics.GetOrCreateCounter(name),
@@ -91,7 +47,6 @@ type Summary struct {
// GetOrCreateSummary creates a new Summary with the given name
func GetOrCreateSummary(name string) *Summary {
trackUsedMetric(name)
return &Summary{
namedMetric: namedMetric{Name: name},
Summary: metrics.GetOrCreateSummary(name),

View File

@@ -1,52 +0,0 @@
package utils
import (
"testing"
"github.com/VictoriaMetrics/metrics"
)
func isMetricRegistered(name string) bool {
metricNames := metrics.GetDefaultSet().ListMetricNames()
for _, mn := range metricNames {
if mn == name {
return true
}
}
return false
}
func TestMetricIsUnregistered(t *testing.T) {
metricName := "example_runs_total"
c := GetOrCreateCounter(metricName)
if !isMetricRegistered(metricName) {
t.Errorf("Expected metric %s to be present", metricName)
}
c.Unregister()
if isMetricRegistered(metricName) {
t.Errorf("Expected metric %s to be unregistered", metricName)
}
}
func TestMetricIsRemovedIfNoUses(t *testing.T) {
metricName := "example_runs_total"
c := GetOrCreateCounter(metricName)
c2 := GetOrCreateCounter(metricName)
if !isMetricRegistered(metricName) {
t.Errorf("Expected metric %s to be present", metricName)
}
c.Unregister()
// metric should still be registered since c2 is using it
if !isMetricRegistered(metricName) {
t.Errorf("Expected metric %s to be present", metricName)
}
c2.Unregister()
if isMetricRegistered(metricName) {
t.Errorf("Expected metric %s to be unregistered", metricName)
}
}

View File

@@ -8,7 +8,6 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/metrics"
@@ -1002,9 +1001,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
sr := getStorageSearch()
defer putStorageSearch(sr)
startTime := time.Now()
sr.Init(qt, vmstorage.Storage, tfss, tr, sq.MaxMetrics, deadline.Deadline())
indexSearchDuration.UpdateDuration(startTime)
// Start workers that call f in parallel on available CPU cores.
workCh := make(chan *exportWork, gomaxprocs*8)
@@ -1142,9 +1139,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
defer vmstorage.WG.Done()
sr := getStorageSearch()
startTime := time.Now()
maxSeriesCount := sr.Init(qt, vmstorage.Storage, tfss, tr, sq.MaxMetrics, deadline.Deadline())
indexSearchDuration.UpdateDuration(startTime)
type blockRefs struct {
brs []blockRef
}
@@ -1296,8 +1291,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
return &rss, nil
}
var indexSearchDuration = metrics.NewHistogram(`vm_index_search_duration_seconds`)
type blockRef struct {
partRef storage.PartRef
addr tmpBlockAddr

View File

@@ -1,13 +1,13 @@
{
"files": {
"main.css": "./static/css/main.af583aad.css",
"main.js": "./static/js/main.1413b18d.js",
"main.css": "./static/css/main.7fa18e1b.css",
"main.js": "./static/js/main.ba08300f.js",
"static/js/685.f772060c.chunk.js": "./static/js/685.f772060c.chunk.js",
"static/media/MetricsQL.md": "./static/media/MetricsQL.a00044c91d9781cf8557.md",
"index.html": "./index.html"
},
"entrypoints": [
"static/css/main.af583aad.css",
"static/js/main.1413b18d.js"
"static/css/main.7fa18e1b.css",
"static/js/main.ba08300f.js"
]
}

View File

@@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.svg"/><link rel="apple-touch-icon" href="./favicon.svg"/><link rel="mask-icon" href="./favicon.svg" color="#000000"><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="Explore and troubleshoot your VictoriaMetrics data"/><link rel="manifest" href="./manifest.json"/><title>vmui</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:site" content="@https://victoriametrics.com/"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:image" content="./preview.jpg"><meta property="og:type" content="website"><meta property="og:title" content="UI for VictoriaMetrics"><meta property="og:url" content="https://victoriametrics.com/"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><script defer="defer" src="./static/js/main.1413b18d.js"></script><link href="./static/css/main.af583aad.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.svg"/><link rel="apple-touch-icon" href="./favicon.svg"/><link rel="mask-icon" href="./favicon.svg" color="#000000"><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="Explore and troubleshoot your VictoriaMetrics data"/><link rel="manifest" href="./manifest.json"/><title>vmui</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:site" content="@https://victoriametrics.com/"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:image" content="./preview.jpg"><meta property="og:type" content="website"><meta property="og:title" content="UI for VictoriaMetrics"><meta property="og:url" content="https://victoriametrics.com/"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><script defer="defer" src="./static/js/main.ba08300f.js"></script><link href="./static/css/main.7fa18e1b.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,4 @@
FROM golang:1.23.5 AS build-web-stage
FROM golang:1.23.6 AS build-web-stage
COPY build /build
WORKDIR /build

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ ROOT_IMAGE ?= alpine:3.21.2
ROOT_IMAGE_SCRATCH ?= scratch
CERTS_IMAGE := alpine:3.21.2
GO_BUILDER_IMAGE := golang:1.23.5-alpine
GO_BUILDER_IMAGE := golang:1.23.6-alpine
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1
BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __)
DOCKER ?= docker

View File

@@ -152,7 +152,7 @@ services:
# and distributes them according to --config.file.
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.27.0
image: prom/alertmanager:v0.28.0
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:

View File

@@ -126,7 +126,7 @@ services:
# and distributes them according to --config.file.
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.27.0
image: prom/alertmanager:v0.28.0
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:

View File

@@ -93,7 +93,7 @@ services:
# and distributes them according to --config.file.
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.27.0
image: prom/alertmanager:v0.28.0
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:

View File

@@ -89,7 +89,7 @@ services:
- "--licenseFile=/license"
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.27.0
image: prom/alertmanager:v0.28.0
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:

View File

@@ -18,7 +18,7 @@ services:
- vlogs
generator:
image: golang:1.23.5-alpine
image: golang:1.23.6-alpine
restart: always
working_dir: /go/src/app
volumes:

View File

@@ -2,7 +2,7 @@ version: '3'
services:
generator:
image: golang:1.23.5-alpine
image: golang:1.23.6-alpine
restart: always
working_dir: /go/src/app
volumes:

View File

@@ -58,7 +58,7 @@ services:
- ./vmsingle/promscrape.yml:/promscrape.yml
grafana:
image: grafana/grafana:9.2.7
image: grafana/grafana:11.5.0
depends_on: [vmsingle]
ports:
- 3000:3000

View File

@@ -16,8 +16,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): improve performance for [`stats by (...) ...`](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) by up to 30% when it is applied to big number of `by (...)` groups.
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): improve performance for [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) by up to 30% when it is applied to big number of unique values.
* FEATURE: [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe): improve performance for [`count_uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#count_uniq-stats) and [`count_uniq_hash`](https://docs.victoriametrics.com/victorialogs/logsql/#count_uniq_hash-stats) functions by up to 30% when they are applied to big number of unique values.
* FEATURE: [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe): return the path to the part where every data block is stored. The path to the part is returned in the `part_path` field. This allows investigating the distribution of data blocks among parts.
* FEATURE: reduce VictoriaLogs startup time by multiple times when it opens a large datastore with big [retention](https://docs.victoriametrics.com/victorialogs/#retention).
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): accept timestamps with microsecond and nanosecond precision at [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the `_msg` field to the list of fields for the group view, allowing users to select multiple fields, including `_msg`, for log display.
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): properly limit [`concurrency` query option](https://docs.victoriametrics.com/victorialogs/logsql/#query-options) for [`stats`](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe) and [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe). This prevents from `runtime error: index out of range` panic. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201).

View File

@@ -56,7 +56,8 @@ Otherwise the timestamp field must be in one of the following formats:
If timezone information is missing (for example, `2023-06-20 15:32:10`),
then the time is parsed in the local timezone of the host where VictoriaLogs runs.
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
- Unix timestamp in seconds, milliseconds, microseconds or nanoseconds. For example, `1686026893` (seconds), `1686026893735` (milliseconds),
`1686026893735321` (microseconds) or `1686026893735321098` (nanoseconds).
See [these docs](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for details on fields,
which must be present in the ingested log messages.
@@ -111,7 +112,8 @@ Otherwise the timestamp field must be in one of the following formats:
If timezone information is missing (for example, `2023-06-20 15:32:10`),
then the time is parsed in the local timezone of the host where VictoriaLogs runs.
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
- Unix timestamp in seconds, milliseconds, microseconds or nanoseconds. For example, `1686026893` (seconds), `1686026893735` (milliseconds),
`1686026893735321` (microseconds) or `1686026893735321098` (nanoseconds).
See [these docs](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for details on fields,
which must be present in the ingested log messages.

View File

@@ -146,7 +146,8 @@ The timestamp field must be in one of the following formats:
If timezone information is missing (for example, `2023-06-20 15:32:10`),
then the time is parsed in the local timezone of the host where VictoriaLogs runs.
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
- Unix timestamp in seconds, milliseconds, microseconds or nanoseconds. For example, `1686026893` (seconds), `1686026893735` (milliseconds),
`1686026893735321` (microseconds) or `1686026893735321098` (nanoseconds).
For example, the following [log entry](#data-model) contains valid timestamp with millisecond precision in the `_time` field:

View File

@@ -18,6 +18,16 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* SECURITY: upgrade Go builder from Go1.23.5 to Go1.23.6. See the list of issues addressed in [Go1.23.6](https://github.com/golang/go/issues?q=milestone%3AGo1.23.6+label%3ACherryPickApproved).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix polluted alert messages when multiple Alertmanager instances are configured as notifiers with alert_relabel_configs.. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8040), and thanks to @evkuzin for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8258).
## [v1.111.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.111.0)
Released at 2025-02-07
**Update note 1: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/victoriametrics/) stop exposing `vm_index_search_duration_seconds` histogram metric. This metric records time spent on search operations in the index. It was introduced in [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0). However, this metric was used neither in dashboards nor in alerting rules. It also has high cardinality because index search operations latency can differ by 3 orders of magnitude. Hence, dropping it as unused.**
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmstorage](https://docs.victoriametrics.com/cluster-victoriametrics/): improve startup times when opening a storage with the [retention](https://docs.victoriametrics.com/#retention) exceeding a few months.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add the ability to switch the heatmap to a line chart. Now, vmui would suggest to switch to line graph display if heatmap can't be properly rendered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8057).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): add `-httpInternalListenAddr` cmd-line flag to serve internal HTTP routes `/metrics`, `/flags`, etc. It allows properly route requests to backends with the same service routes as vmauth. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6468) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7345) for details.
@@ -29,7 +39,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui) for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html) components: properly display enterprise features when the enterprise version is used.
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and [vmselect](https://docs.victoriametrics.com/cluster-victoriametrics/): fix discrepancies when using `or` binary operator. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7759) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7640) issues for details.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly update number of unique series for [cardinality limiter](https://docs.victoriametrics.com/#cardinality-limiter) on ingestion. Previously, limit could undercount the real number of the ingested unique series.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): do not unregister group metrics if the group is still in use. Previously, this could lead to group metrics being absent even though rules group is still running. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8229) for details.
## [v1.102.12](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.12)

View File

@@ -35,7 +35,7 @@ which means there is no need to define metric names or their labels in advance.
metrics anytime.
Actually, the metric name is also a label with a special name `__name__`.
The `__name__` key could be omitted {{% available_from "#" %}} for simplicity. So the following series are identical:
The `__name__` key could be omitted {{% available_from "v1.111.0" %}} for simplicity. So the following series are identical:
```
requests_total{path="/", code="200"}

View File

@@ -1086,7 +1086,7 @@ It is recommended protecting the following endpoints with authKeys:
* `/metrics` with `-metricsAuthKey` command-line flag, so unauthorized users couldn't access [vmauth metrics](#monitoring).
* `/debug/pprof` with `-pprofAuthKey` command-line flag, so unauthorized users couldn't access [profiling information](#profiling).
As an alternative, it's possible to serve internal API routes at the different listen address with command-line flag `-httpInternalListenAddr=127.0.0.1:8426`. {{% available_from "#" %}}
As an alternative, it's possible to serve internal API routes at the different listen address with command-line flag `-httpInternalListenAddr=127.0.0.1:8426`. {{% available_from "v1.111.0" %}}
`vmauth` also supports the ability to restrict access by IP - see [these docs](#ip-filters). See also [concurrency limiting docs](#concurrency-limiting).

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/VictoriaMetrics/VictoriaMetrics
go 1.23.5
go 1.23.6
// This is needed in order to avoid vmbackup and vmrestore binary size increase by 20MB
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8008

View File

@@ -27,7 +27,15 @@ type chunkedAllocator struct {
uniqValuesProcessors []statsUniqValuesProcessor
valuesProcessors []statsValuesProcessor
pipeStatsGroups []pipeStatsGroup
pipeStatsGroups []pipeStatsGroup
pipeStatsGroupMaps []pipeStatsGroupMap
statsProcessors []statsProcessor
statsCountUniqSets []statsCountUniqSet
statsCountUniqHashSets []statsCountUniqHashSet
hitsMaps []hitsMap
u64Buf []uint64
@@ -116,6 +124,26 @@ func (a *chunkedAllocator) newPipeStatsGroup() (p *pipeStatsGroup) {
return addNewItem(&a.pipeStatsGroups, a)
}
func (a *chunkedAllocator) newPipeStatsGroupMaps(itemsLen uint) []pipeStatsGroupMap {
return addNewItems(&a.pipeStatsGroupMaps, itemsLen, a)
}
func (a *chunkedAllocator) newStatsProcessors(itemsLen uint) []statsProcessor {
return addNewItems(&a.statsProcessors, itemsLen, a)
}
func (a *chunkedAllocator) newStatsCountUniqSets(itemsLen uint) []statsCountUniqSet {
return addNewItems(&a.statsCountUniqSets, itemsLen, a)
}
func (a *chunkedAllocator) newStatsCountUniqHashSets(itemsLen uint) []statsCountUniqHashSet {
return addNewItems(&a.statsCountUniqHashSets, itemsLen, a)
}
func (a *chunkedAllocator) newHitsMaps(itemsLen uint) []hitsMap {
return addNewItems(&a.hitsMaps, itemsLen, a)
}
func (a *chunkedAllocator) newUint64() (p *uint64) {
return addNewItem(&a.u64Buf, a)
}
@@ -125,33 +153,32 @@ func (a *chunkedAllocator) cloneBytesToString(b []byte) string {
}
func (a *chunkedAllocator) cloneString(s string) string {
const maxChunkLen = 64 * 1024
if a.stringsBuf != nil && len(a.stringsBuf)+len(s) > maxChunkLen {
a.stringsBuf = nil
}
if a.stringsBuf == nil {
a.stringsBuf = make([]byte, 0, maxChunkLen)
a.bytesAllocated += maxChunkLen
}
sbLen := len(a.stringsBuf)
a.stringsBuf = append(a.stringsBuf, s...)
return bytesutil.ToUnsafeString(a.stringsBuf[sbLen:])
xs := addNewItems(&a.stringsBuf, uint(len(s)), a)
copy(xs, s)
return bytesutil.ToUnsafeString(xs)
}
func addNewItem[T any](dstPtr *[]T, a *chunkedAllocator) *T {
xs := addNewItems(dstPtr, 1, a)
return &xs[0]
}
func addNewItems[T any](dstPtr *[]T, itemsLen uint, a *chunkedAllocator) []T {
dst := *dstPtr
var maxItems = (64 * 1024) / int(unsafe.Sizeof(dst[0]))
if dst != nil && len(dst)+1 > maxItems {
var maxItems = (64 * 1024) / uint(unsafe.Sizeof(dst[0]))
if itemsLen > maxItems {
return make([]T, itemsLen)
}
if dst != nil && uint(len(dst))+itemsLen > maxItems {
dst = nil
}
if dst == nil {
dst = make([]T, 0, maxItems)
a.bytesAllocated += maxItems * int(unsafe.Sizeof(dst[0]))
a.bytesAllocated += int(maxItems * uint(unsafe.Sizeof(dst[0])))
}
var x T
dst = append(dst, x)
item := &dst[len(dst)-1]
dstLen := uint(len(dst))
dst = dst[:dstLen+itemsLen]
xs := dst[dstLen : dstLen+itemsLen : dstLen+itemsLen]
*dstPtr = dst
return item
return xs
}

View File

@@ -9,36 +9,176 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type hitsMap struct {
type hitsMapAdaptive struct {
stateSizeBudget *int
u64 map[uint64]*uint64
negative64 map[uint64]*uint64
strings map[string]*uint64
// concurrency is the number of parallel workers to use when merging shards.
//
// this field must be updated by the caller before using statsCountUniqProcessor.
concurrency uint
// hm tracks hits until the number of unique values reaches hitsMapAdaptiveMaxLen.
// After that hits are tracked by shards.
hm hitsMap
// shards tracks hits for big number of unique values.
//
// Every shard contains hits for a share of unique values.
shards []hitsMap
// a reduces memory allocations when counting the number of hits over big number of unique values.
a chunkedAllocator
}
// the maximum number of values to track in hitsMapAdaptive.hm before switching to hitsMapAdaptive.shards
//
// Too big value may slow down hitsMapMergeParallel() across big number of CPU cores.
// Too small value may significantly increase RAM usage when hits for big number of unique values are counted.
const hitsMapAdaptiveMaxLen = 4 << 10
func (hma *hitsMapAdaptive) reset() {
*hma = hitsMapAdaptive{}
}
func (hma *hitsMapAdaptive) init(concurrency uint, stateSizeBudget *int) {
hma.reset()
hma.stateSizeBudget = stateSizeBudget
hma.concurrency = concurrency
}
func (hma *hitsMapAdaptive) clear() {
*hma.stateSizeBudget += hma.stateSize()
hma.init(hma.concurrency, hma.stateSizeBudget)
}
func (hma *hitsMapAdaptive) stateSize() int {
n := hma.hm.stateSize()
for i := range hma.shards {
n += hma.shards[i].stateSize()
}
return n
}
func (hma *hitsMapAdaptive) entriesCount() uint64 {
if hma.shards == nil {
return hma.hm.entriesCount()
}
shards := hma.shards
n := uint64(0)
for i := range shards {
n += shards[i].entriesCount()
}
return n
}
func (hma *hitsMapAdaptive) updateStateGeneric(key string, hits uint64) {
if n, ok := tryParseUint64(key); ok {
hma.updateStateUint64(n, hits)
return
}
if len(key) > 0 && key[0] == '-' {
if n, ok := tryParseInt64(key); ok {
hma.updateStateNegativeInt64(n, hits)
return
}
}
hma.updateStateString(bytesutil.ToUnsafeBytes(key), hits)
}
func (hma *hitsMapAdaptive) updateStateInt64(n int64, hits uint64) {
if n >= 0 {
hma.updateStateUint64(uint64(n), hits)
} else {
hma.updateStateNegativeInt64(n, hits)
}
}
func (hma *hitsMapAdaptive) updateStateUint64(n, hits uint64) {
if hma.shards == nil {
stateSize := hma.hm.updateStateUint64(&hma.a, n, hits)
if stateSize > 0 {
*hma.stateSizeBudget -= stateSize
hma.probablyMoveToShards()
}
return
}
hm := hma.getShardByUint64(n)
*hma.stateSizeBudget -= hm.updateStateUint64(&hma.a, n, hits)
}
func (hma *hitsMapAdaptive) updateStateNegativeInt64(n int64, hits uint64) {
if hma.shards == nil {
stateSize := hma.hm.updateStateNegativeInt64(&hma.a, n, hits)
if stateSize > 0 {
*hma.stateSizeBudget -= stateSize
hma.probablyMoveToShards()
}
return
}
hm := hma.getShardByUint64(uint64(n))
*hma.stateSizeBudget -= hm.updateStateNegativeInt64(&hma.a, n, hits)
}
func (hma *hitsMapAdaptive) updateStateString(key []byte, hits uint64) {
if hma.shards == nil {
stateSize := hma.hm.updateStateString(&hma.a, key, hits)
if stateSize > 0 {
*hma.stateSizeBudget -= stateSize
hma.probablyMoveToShards()
}
return
}
hm := hma.getShardByString(key)
*hma.stateSizeBudget -= hm.updateStateString(&hma.a, key, hits)
}
func (hma *hitsMapAdaptive) probablyMoveToShards() {
if hma.hm.entriesCount() < hitsMapAdaptiveMaxLen {
return
}
hma.moveToShards()
}
func (hma *hitsMapAdaptive) moveToShards() {
hma.shards = hma.a.newHitsMaps(hma.concurrency)
for n, pHits := range hma.hm.u64 {
hm := hma.getShardByUint64(n)
hm.setStateUint64(n, pHits)
}
for n, pHits := range hma.hm.negative64 {
hm := hma.getShardByUint64(n)
hm.setStateNegativeInt64(int64(n), pHits)
}
for s, pHits := range hma.hm.strings {
hm := hma.getShardByString(bytesutil.ToUnsafeBytes(s))
hm.setStateString(s, pHits)
}
hma.hm.reset()
}
func (hma *hitsMapAdaptive) getShardByUint64(n uint64) *hitsMap {
h := fastHashUint64(n)
shardIdx := h % uint64(len(hma.shards))
return &hma.shards[shardIdx]
}
func (hma *hitsMapAdaptive) getShardByString(v []byte) *hitsMap {
h := xxhash.Sum64(v)
shardIdx := h % uint64(len(hma.shards))
return &hma.shards[shardIdx]
}
type hitsMap struct {
u64 map[uint64]*uint64
negative64 map[uint64]*uint64
strings map[string]*uint64
}
func (hm *hitsMap) reset() {
hm.stateSizeBudget = nil
hm.u64 = nil
hm.negative64 = nil
hm.strings = nil
}
func (hm *hitsMap) clear() {
*hm.stateSizeBudget += hm.stateSize()
hm.init(hm.stateSizeBudget)
}
func (hm *hitsMap) init(stateSizeBudget *int) {
hm.stateSizeBudget = stateSizeBudget
hm.u64 = make(map[uint64]*uint64)
hm.negative64 = make(map[uint64]*uint64)
hm.strings = make(map[string]*uint64)
*hm = hitsMap{}
}
func (hm *hitsMap) entriesCount() uint64 {
@@ -47,76 +187,89 @@ func (hm *hitsMap) entriesCount() uint64 {
}
func (hm *hitsMap) stateSize() int {
n := 24*(len(hm.u64)+len(hm.negative64)) + 40*len(hm.strings)
for k := range hm.strings {
n += len(k)
size := 0
for n, pHits := range hm.u64 {
size += int(unsafe.Sizeof(n) + unsafe.Sizeof(pHits) + unsafe.Sizeof(*pHits))
}
return n
for n, pHits := range hm.negative64 {
size += int(unsafe.Sizeof(n) + unsafe.Sizeof(pHits) + unsafe.Sizeof(*pHits))
}
for k, pHits := range hm.strings {
size += len(k) + int(unsafe.Sizeof(k)+unsafe.Sizeof(pHits)+unsafe.Sizeof(*pHits))
}
return size
}
func (hm *hitsMap) updateStateGeneric(key string, hits uint64) {
if n, ok := tryParseUint64(key); ok {
hm.updateStateUint64(n, hits)
return
}
if len(key) > 0 && key[0] == '-' {
if n, ok := tryParseInt64(key); ok {
hm.updateStateNegativeInt64(n, hits)
return
}
}
hm.updateStateString(bytesutil.ToUnsafeBytes(key), hits)
}
func (hm *hitsMap) updateStateInt64(n int64, hits uint64) {
if n >= 0 {
hm.updateStateUint64(uint64(n), hits)
} else {
hm.updateStateNegativeInt64(n, hits)
}
}
func (hm *hitsMap) updateStateUint64(n, hits uint64) {
func (hm *hitsMap) updateStateUint64(a *chunkedAllocator, n, hits uint64) int {
pHits := hm.u64[n]
if pHits != nil {
*pHits += hits
return
return 0
}
pHits = hm.a.newUint64()
pHits = a.newUint64()
*pHits = hits
hm.u64[n] = pHits
*hm.stateSizeBudget -= 24
return int(unsafe.Sizeof(*pHits)) + hm.setStateUint64(n, pHits)
}
func (hm *hitsMap) updateStateNegativeInt64(n int64, hits uint64) {
func (hm *hitsMap) setStateUint64(n uint64, pHits *uint64) int {
if hm.u64 == nil {
hm.u64 = map[uint64]*uint64{
n: pHits,
}
return int(unsafe.Sizeof(hm.u64) + unsafe.Sizeof(n) + unsafe.Sizeof(pHits))
}
hm.u64[n] = pHits
return int(unsafe.Sizeof(n) + unsafe.Sizeof(pHits))
}
func (hm *hitsMap) updateStateNegativeInt64(a *chunkedAllocator, n int64, hits uint64) int {
pHits := hm.negative64[uint64(n)]
if pHits != nil {
*pHits += hits
return
return 0
}
pHits = hm.a.newUint64()
pHits = a.newUint64()
*pHits = hits
hm.negative64[uint64(n)] = pHits
*hm.stateSizeBudget -= 24
return int(unsafe.Sizeof(*pHits)) + hm.setStateNegativeInt64(n, pHits)
}
func (hm *hitsMap) updateStateString(key []byte, hits uint64) {
func (hm *hitsMap) setStateNegativeInt64(n int64, pHits *uint64) int {
if hm.negative64 == nil {
hm.negative64 = map[uint64]*uint64{
uint64(n): pHits,
}
return int(unsafe.Sizeof(hm.negative64) + unsafe.Sizeof(uint64(n)) + unsafe.Sizeof(pHits))
}
hm.negative64[uint64(n)] = pHits
return int(unsafe.Sizeof(n) + unsafe.Sizeof(pHits))
}
func (hm *hitsMap) updateStateString(a *chunkedAllocator, key []byte, hits uint64) int {
pHits := hm.strings[string(key)]
if pHits != nil {
*pHits += hits
return
return 0
}
keyCopy := hm.a.cloneBytesToString(key)
pHits = hm.a.newUint64()
keyCopy := a.cloneBytesToString(key)
pHits = a.newUint64()
*pHits = hits
hm.strings[keyCopy] = pHits
return len(keyCopy) + int(unsafe.Sizeof(*pHits)) + hm.setStateString(keyCopy, pHits)
}
*hm.stateSizeBudget -= len(keyCopy) + 40
func (hm *hitsMap) setStateString(v string, pHits *uint64) int {
if hm.strings == nil {
hm.strings = map[string]*uint64{
v: pHits,
}
return int(unsafe.Sizeof(hm.strings) + unsafe.Sizeof(v) + unsafe.Sizeof(pHits))
}
hm.strings[v] = pHits
return int(unsafe.Sizeof(v) + unsafe.Sizeof(pHits))
}
func (hm *hitsMap) mergeState(src *hitsMap, stopCh <-chan struct{}) {
@@ -126,7 +279,7 @@ func (hm *hitsMap) mergeState(src *hitsMap, stopCh <-chan struct{}) {
}
pHitsDst := hm.u64[n]
if pHitsDst == nil {
hm.u64[n] = pHitsSrc
hm.setStateUint64(n, pHitsSrc)
} else {
*pHitsDst += *pHitsSrc
}
@@ -137,7 +290,7 @@ func (hm *hitsMap) mergeState(src *hitsMap, stopCh <-chan struct{}) {
}
pHitsDst := hm.negative64[n]
if pHitsDst == nil {
hm.negative64[n] = pHitsSrc
hm.setStateNegativeInt64(int64(n), pHitsSrc)
} else {
*pHitsDst += *pHitsSrc
}
@@ -148,89 +301,52 @@ func (hm *hitsMap) mergeState(src *hitsMap, stopCh <-chan struct{}) {
}
pHitsDst := hm.strings[k]
if pHitsDst == nil {
hm.strings[k] = pHitsSrc
hm.setStateString(k, pHitsSrc)
} else {
*pHitsDst += *pHitsSrc
}
}
}
// hitsMapMergeParallel merges hms in parallel on the given cpusCount
// hitsMapMergeParallel merges hmas in parallel
//
// The mered disjoint parts of hms are passed to f.
// The merged disjoint parts of hmas are passed to f.
// The function may be interrupted by closing stopCh.
// The caller must check for closed stopCh after returning from the function.
func hitsMapMergeParallel(hms []*hitsMap, cpusCount int, stopCh <-chan struct{}, f func(hm *hitsMap)) {
srcLen := len(hms)
if srcLen < 2 {
// Nothing to merge
if len(hms) == 1 {
f(hms[0])
}
func hitsMapMergeParallel(hmas []*hitsMapAdaptive, stopCh <-chan struct{}, f func(hm *hitsMap)) {
if len(hmas) == 0 {
return
}
var wg sync.WaitGroup
perShardMaps := make([][]hitsMap, srcLen)
for i := range hms {
for i := range hmas {
hma := hmas[i]
if hma.shards != nil {
continue
}
wg.Add(1)
go func(idx int) {
go func() {
defer wg.Done()
stateSizeBudget := 0
perCPU := make([]hitsMap, cpusCount)
for i := range perCPU {
perCPU[i].init(&stateSizeBudget)
}
hm := hms[idx]
for n, pHits := range hm.u64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = pHits
}
for n, pHits := range hm.negative64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = pHits
}
for k, pHits := range hm.strings {
if needStop(stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[k] = pHits
}
perShardMaps[idx] = perCPU
hm.reset()
}(i)
hma.moveToShards()
}()
}
wg.Wait()
if needStop(stopCh) {
return
}
// Merge per-shard entries into perShardMaps[0]
cpusCount := len(hmas[0].shards)
for i := 0; i < cpusCount; i++ {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
hm := &perShardMaps[0][cpuIdx]
for _, perCPU := range perShardMaps[1:] {
hm.mergeState(&perCPU[cpuIdx], stopCh)
perCPU[cpuIdx].reset()
hm := &hmas[0].shards[cpuIdx]
for j := range hmas[1:] {
src := &hmas[1+j].shards[cpuIdx]
hm.mergeState(src, stopCh)
src.reset()
}
f(hm)
}(i)

View File

@@ -3,6 +3,7 @@ package logstorage
import (
"fmt"
"sort"
"sync"
"sync/atomic"
"unsafe"
@@ -78,25 +79,25 @@ func (pf *pipeFacets) visitSubqueries(_ func(q *Query)) {
func (pf *pipeFacets) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeFacetsProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFacetsProcessorShard{
pipeFacetsProcessorShardNopad: pipeFacetsProcessorShardNopad{
pf: pf,
},
}
}
pfp := &pipeFacetsProcessor{
pf: pf,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
shards := make([]pipeFacetsProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFacetsProcessorShard{
pipeFacetsProcessorShardNopad: pipeFacetsProcessorShardNopad{
pfp: pfp,
},
}
}
pfp.shards = shards
pfp.stateSizeBudget.Store(maxStateSize)
return pfp
@@ -122,8 +123,8 @@ type pipeFacetsProcessorShard struct {
}
type pipeFacetsProcessorShardNopad struct {
// pf points to the parent pipeFacets.
pf *pipeFacets
// pfp points to the parent pipeFacetsProcessor.
pfp *pipeFacetsProcessor
// a is used for reducing memory allocations when counting facets over big number of unique fields
a chunkedAllocator
@@ -140,7 +141,7 @@ type pipeFacetsProcessorShardNopad struct {
}
type pipeFacetsFieldHits struct {
m hitsMap
m hitsMapAdaptive
mustIgnore bool
}
@@ -163,7 +164,7 @@ func (shard *pipeFacetsProcessorShard) updateFacetsForColumn(br *blockResult, c
if fhs.mustIgnore {
return
}
if fhs.m.entriesCount() >= shard.pf.maxValuesPerField {
if fhs.m.entriesCount() >= shard.pfp.pf.maxValuesPerField {
// Ignore fields with too many unique values
fhs.enableIgnoreField()
return
@@ -219,7 +220,7 @@ func (shard *pipeFacetsProcessorShard) updateFacetsForColumn(br *blockResult, c
}
func (shard *pipeFacetsProcessorShard) updateStateInt64(fhs *pipeFacetsFieldHits, n int64) {
if maxValueLen := shard.pf.maxValueLen; maxValueLen <= 21 && uint64(int64StringLen(n)) > maxValueLen {
if maxValueLen := shard.pfp.pf.maxValueLen; maxValueLen <= 21 && uint64(int64StringLen(n)) > maxValueLen {
// Ignore fields with too long values, since they are hard to use in faceted search.
fhs.enableIgnoreField()
return
@@ -228,7 +229,7 @@ func (shard *pipeFacetsProcessorShard) updateStateInt64(fhs *pipeFacetsFieldHits
}
func (shard *pipeFacetsProcessorShard) updateStateUint64(fhs *pipeFacetsFieldHits, n uint64) {
if maxValueLen := shard.pf.maxValueLen; maxValueLen <= 20 && uint64(uint64StringLen(n)) > maxValueLen {
if maxValueLen := shard.pfp.pf.maxValueLen; maxValueLen <= 20 && uint64(uint64StringLen(n)) > maxValueLen {
// Ignore fields with too long values, since they are hard to use in faceted search.
fhs.enableIgnoreField()
return
@@ -288,7 +289,7 @@ func (shard *pipeFacetsProcessorShard) updateStateGeneric(fhs *pipeFacetsFieldHi
// So it is better ignoring empty values.
return
}
if uint64(len(v)) > shard.pf.maxValueLen {
if uint64(len(v)) > shard.pfp.pf.maxValueLen {
// Ignore fields with too long values, since they are hard to use in faceted search.
fhs.enableIgnoreField()
return
@@ -303,7 +304,7 @@ func (shard *pipeFacetsProcessorShard) getFieldHits(fieldName string) *pipeFacet
fhs, ok := shard.m[fieldName]
if !ok {
fhs = &pipeFacetsFieldHits{}
fhs.m.init(&shard.stateSizeBudget)
fhs.m.init(uint(len(shard.pfp.shards)), &shard.stateSizeBudget)
fieldNameCopy := shard.a.cloneString(fieldName)
shard.m[fieldNameCopy] = fhs
shard.stateSizeBudget -= len(fieldNameCopy) + int(unsafe.Sizeof(fhs)+unsafe.Sizeof(*fhs))
@@ -341,7 +342,7 @@ func (pfp *pipeFacetsProcessor) flush() error {
}
// merge state across shards
hms := make(map[string]*hitsMap)
hmasByFieldName := make(map[string][]*hitsMapAdaptive)
rowsTotal := uint64(0)
for _, shard := range pfp.shards {
if needStop(pfp.stopCh) {
@@ -351,19 +352,14 @@ func (pfp *pipeFacetsProcessor) flush() error {
if fhs.mustIgnore {
continue
}
hm, ok := hms[fieldName]
if !ok {
hms[fieldName] = &fhs.m
continue
}
hm.mergeState(&fhs.m, pfp.stopCh)
hmasByFieldName[fieldName] = append(hmasByFieldName[fieldName], &fhs.m)
}
rowsTotal += shard.rowsTotal
}
// sort fieldNames
fieldNames := make([]string, 0, len(hms))
for fieldName := range hms {
fieldNames := make([]string, 0, len(hmasByFieldName))
for fieldName := range hmasByFieldName {
fieldNames = append(fieldNames, fieldName)
}
sort.Strings(fieldNames)
@@ -377,31 +373,30 @@ func (pfp *pipeFacetsProcessor) flush() error {
if needStop(pfp.stopCh) {
return nil
}
hm := hms[fieldName]
if hm.entriesCount() > pfp.pf.maxValuesPerField {
hmas := hmasByFieldName[fieldName]
var hms []*hitsMap
var hmsLock sync.Mutex
hitsMapMergeParallel(hmas, pfp.stopCh, func(hm *hitsMap) {
hmsLock.Lock()
hms = append(hms, hm)
hmsLock.Unlock()
})
entriesCount := uint64(0)
for _, hm := range hms {
entriesCount += hm.entriesCount()
}
if entriesCount > pfp.pf.maxValuesPerField {
continue
}
vs := make([]pipeTopEntry, 0, hm.entriesCount())
for n, pHits := range hm.u64 {
vs = append(vs, pipeTopEntry{
k: string(marshalUint64String(nil, n)),
hits: *pHits,
})
vs := make([]pipeTopEntry, 0, entriesCount)
for _, hm := range hms {
vs = appendTopEntryFacets(vs, hm)
}
for n, pHits := range hm.negative64 {
vs = append(vs, pipeTopEntry{
k: string(marshalInt64String(nil, int64(n))),
hits: *pHits,
})
}
for k, pHits := range hm.strings {
vs = append(vs, pipeTopEntry{
k: k,
hits: *pHits,
})
}
if len(vs) == 1 && vs[0].hits == rowsTotal && !pfp.pf.keepConstFields {
if len(vs) == 1 && vs[0].hits == rowsTotal && !wctx.pfp.pf.keepConstFields {
// Skip field with constant value.
continue
}
@@ -412,6 +407,9 @@ func (pfp *pipeFacetsProcessor) flush() error {
vs = vs[:limit]
}
for _, v := range vs {
if needStop(pfp.stopCh) {
return nil
}
wctx.writeRow(fieldName, v.k, v.hits)
}
}
@@ -420,6 +418,28 @@ func (pfp *pipeFacetsProcessor) flush() error {
return nil
}
func appendTopEntryFacets(dst []pipeTopEntry, hm *hitsMap) []pipeTopEntry {
for n, pHits := range hm.u64 {
dst = append(dst, pipeTopEntry{
k: string(marshalUint64String(nil, n)),
hits: *pHits,
})
}
for n, pHits := range hm.negative64 {
dst = append(dst, pipeTopEntry{
k: string(marshalInt64String(nil, int64(n))),
hits: *pHits,
})
}
for k, pHits := range hm.strings {
dst = append(dst, pipeTopEntry{
k: k,
hits: *pHits,
})
}
return dst
}
type pipeFacetsWriteContext struct {
pfp *pipeFacetsProcessor
rcs []resultColumn

View File

@@ -13,7 +13,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeStats processes '| stats ...' queries.
@@ -46,6 +45,8 @@ type statsFunc interface {
updateNeededFields(neededFields fieldsSet)
// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc
//
// a must be used for allocating memory inside the returned statsProcessor.
newStatsProcessor(a *chunkedAllocator) statsProcessor
}
@@ -210,26 +211,26 @@ const stateSizeBudgetChunk = 1 << 20
func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.4)
shards := make([]pipeStatsProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStatsProcessorShard{
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
ps: ps,
},
}
shards[i].init()
}
psp := &pipeStatsProcessor{
ps: ps,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
shards := make([]pipeStatsProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStatsProcessorShard{
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
psp: psp,
},
}
shards[i].init()
}
psp.shards = shards
psp.stateSizeBudget.Store(maxStateSize)
return psp
@@ -255,9 +256,19 @@ type pipeStatsProcessorShard struct {
}
type pipeStatsProcessorShardNopad struct {
ps *pipeStats
psp *pipeStatsProcessor
m pipeStatsGroupMap
// groupMap is used for tracking small number of groups until it reaches pipeStatsGroupMapMaxLen.
// After that the groups are tracked by groupMapShards.
groupMap pipeStatsGroupMap
// groupMapShards are used for tracking big number of groups.
//
// Every shard contains a share of unique groups, which are merged in parallel at flush().
groupMapShards []pipeStatsGroupMap
// a is used for reducing memory allocations when calculating stats among big number of different groups.
a chunkedAllocator
// bms and brTmp are used for applying per-func filters.
bms []bitmap
@@ -269,34 +280,26 @@ type pipeStatsProcessorShardNopad struct {
stateSizeBudget int
}
// the maximum number of groups to track in pipeStatsProcessorShard.groupMap before switching to pipeStatsProcessorShard.groupMapShards
//
// Too big value may slow down flush() across big number of CPU cores.
// Too small value may significantly increase RAM usage when stats for big number of groups is calculated.
const pipeStatsGroupMapMaxLen = 4 << 10
type pipeStatsGroupMap struct {
shard *pipeStatsProcessorShard
u64 map[uint64]*pipeStatsGroup
negative64 map[uint64]*pipeStatsGroup
strings map[string]*pipeStatsGroup
// a and sfpsBuf are used for reducing memory allocations when calculating stats among big number of different groups.
a chunkedAllocator
sfpsBuf []statsProcessor
}
func (psm *pipeStatsGroupMap) reset() {
psm.shard = nil
psm.u64 = nil
psm.negative64 = nil
psm.strings = nil
psm.sfpsBuf = nil
*psm = pipeStatsGroupMap{}
}
func (psm *pipeStatsGroupMap) init(shard *pipeStatsProcessorShard) {
psm.shard = shard
psm.u64 = make(map[uint64]*pipeStatsGroup)
psm.negative64 = make(map[uint64]*pipeStatsGroup)
psm.strings = make(map[string]*pipeStatsGroup)
}
func (psm *pipeStatsGroupMap) entriesCount() uint64 {
@@ -304,98 +307,74 @@ func (psm *pipeStatsGroupMap) entriesCount() uint64 {
return uint64(n)
}
func (psm *pipeStatsGroupMap) getPipeStatsGroupGeneric(key string) *pipeStatsGroup {
if n, ok := tryParseUint64(key); ok {
return psm.getPipeStatsGroupUint64(n)
func (psm *pipeStatsGroupMap) getPipeStatsGroupUint64(n uint64) (*pipeStatsGroup, bool) {
if psg := psm.u64[n]; psg != nil {
return psg, false
}
if len(key) > 0 && key[0] == '-' {
if n, ok := tryParseInt64(key); ok {
return psm.getPipeStatsGroupNegativeInt64(n)
psg := psm.shard.newPipeStatsGroup()
psm.setPipeStatsGroupUint64(n, psg)
return psg, true
}
func (psm *pipeStatsGroupMap) setPipeStatsGroupUint64(n uint64, psg *pipeStatsGroup) {
if psm.u64 == nil {
psm.u64 = map[uint64]*pipeStatsGroup{
n: psg,
}
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(psm.u64) + unsafe.Sizeof(n) + unsafe.Sizeof(psg))
} else {
psm.u64[n] = psg
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
}
return psm.getPipeStatsGroupString(bytesutil.ToUnsafeBytes(key))
}
func (psm *pipeStatsGroupMap) getPipeStatsGroupInt64(n int64) *pipeStatsGroup {
if n >= 0 {
return psm.getPipeStatsGroupUint64(uint64(n))
func (psm *pipeStatsGroupMap) getPipeStatsGroupNegativeInt64(n int64) (*pipeStatsGroup, bool) {
if psg := psm.negative64[uint64(n)]; psg != nil {
return psg, false
}
return psm.getPipeStatsGroupNegativeInt64(n)
psg := psm.shard.newPipeStatsGroup()
psm.setPipeStatsGroupNegativeInt64(n, psg)
return psg, true
}
func (psm *pipeStatsGroupMap) getPipeStatsGroupUint64(n uint64) *pipeStatsGroup {
psg := psm.u64[n]
if psg != nil {
return psg
func (psm *pipeStatsGroupMap) setPipeStatsGroupNegativeInt64(n int64, psg *pipeStatsGroup) {
if psm.negative64 == nil {
psm.negative64 = map[uint64]*pipeStatsGroup{
uint64(n): psg,
}
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(psm.negative64) + unsafe.Sizeof(n) + unsafe.Sizeof(psg))
} else {
psm.negative64[uint64(n)] = psg
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
}
psg = psm.newPipeStatsGroup()
psm.u64[n] = psg
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
return psg
}
func (psm *pipeStatsGroupMap) getPipeStatsGroupNegativeInt64(n int64) *pipeStatsGroup {
psg := psm.negative64[uint64(n)]
if psg != nil {
return psg
func (psm *pipeStatsGroupMap) getPipeStatsGroupString(key []byte) (*pipeStatsGroup, bool) {
if psg := psm.strings[string(key)]; psg != nil {
return psg, false
}
psg = psm.newPipeStatsGroup()
psm.negative64[uint64(n)] = psg
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(n) + unsafe.Sizeof(psg))
return psg
psg := psm.shard.newPipeStatsGroup()
keyCopy := psm.shard.a.cloneBytesToString(key)
psm.shard.stateSizeBudget -= len(keyCopy)
psm.setPipeStatsGroupString(keyCopy, psg)
return psg, true
}
func (psm *pipeStatsGroupMap) getPipeStatsGroupString(key []byte) *pipeStatsGroup {
psg := psm.strings[string(key)]
if psg != nil {
return psg
func (psm *pipeStatsGroupMap) setPipeStatsGroupString(v string, psg *pipeStatsGroup) {
if psm.strings == nil {
psm.strings = map[string]*pipeStatsGroup{
v: psg,
}
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(psm.strings) + unsafe.Sizeof(v))
} else {
psm.strings[v] = psg
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(v))
}
psg = psm.newPipeStatsGroup()
keyCopy := psm.a.cloneBytesToString(key)
psm.strings[keyCopy] = psg
psm.shard.stateSizeBudget -= len(keyCopy) + int(unsafe.Sizeof(keyCopy))
return psg
}
func (psm *pipeStatsGroupMap) newPipeStatsGroup() *pipeStatsGroup {
sfps := psm.newStatsProcessors()
for i, f := range psm.shard.ps.funcs {
bytesAllocated := psm.a.bytesAllocated
sfps[i] = f.f.newStatsProcessor(&psm.a)
psm.shard.stateSizeBudget -= psm.a.bytesAllocated - bytesAllocated
}
psg := psm.a.newPipeStatsGroup()
psg.funcs = psm.shard.ps.funcs
psg.sfps = sfps
psm.shard.stateSizeBudget -= int(unsafe.Sizeof(*psg) + unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
return psg
}
func (psm *pipeStatsGroupMap) newStatsProcessors() []statsProcessor {
funcsLen := len(psm.shard.ps.funcs)
if len(psm.sfpsBuf)+funcsLen > cap(psm.sfpsBuf) {
psm.sfpsBuf = nil
}
if psm.sfpsBuf == nil {
psm.sfpsBuf = make([]statsProcessor, 0, pipeStatsProcessorChunkLen)
}
sfpsBufLen := len(psm.sfpsBuf)
psm.sfpsBuf = slicesutil.SetLength(psm.sfpsBuf, sfpsBufLen+funcsLen)
return psm.sfpsBuf[sfpsBufLen:]
}
const pipeStatsProcessorChunkLen = 64 * 1024 / int(unsafe.Sizeof((statsProcessor)(nil)))
func (psm *pipeStatsGroupMap) mergeState(src *pipeStatsGroupMap, stopCh <-chan struct{}) {
for n, psgSrc := range src.u64 {
if needStop(stopCh) {
@@ -403,7 +382,7 @@ func (psm *pipeStatsGroupMap) mergeState(src *pipeStatsGroupMap, stopCh <-chan s
}
psgDst := psm.u64[n]
if psgDst == nil {
psm.u64[n] = psgSrc
psm.setPipeStatsGroupUint64(n, psgSrc)
} else {
psgDst.mergeState(psgSrc)
}
@@ -414,7 +393,7 @@ func (psm *pipeStatsGroupMap) mergeState(src *pipeStatsGroupMap, stopCh <-chan s
}
psgDst := psm.negative64[n]
if psgDst == nil {
psm.negative64[n] = psgSrc
psm.setPipeStatsGroupNegativeInt64(int64(n), psgSrc)
} else {
psgDst.mergeState(psgSrc)
}
@@ -425,22 +404,54 @@ func (psm *pipeStatsGroupMap) mergeState(src *pipeStatsGroupMap, stopCh <-chan s
}
psgDst := psm.strings[k]
if psgDst == nil {
psm.strings[k] = psgSrc
psm.setPipeStatsGroupString(k, psgSrc)
} else {
psgDst.mergeState(psgSrc)
}
}
}
func (shard *pipeStatsProcessorShard) init() {
shard.m.init(shard)
func initStatsConcurrency(sfp statsProcessor, concurrency uint) {
switch t := sfp.(type) {
case *statsCountUniqProcessor:
t.concurrency = concurrency
case *statsCountUniqHashProcessor:
t.concurrency = concurrency
case *statsUniqValuesProcessor:
t.concurrency = concurrency
}
}
funcsLen := len(shard.ps.funcs)
func (shard *pipeStatsProcessorShard) init() {
shard.groupMap.init(shard)
funcsLen := len(shard.psp.ps.funcs)
shard.bms = make([]bitmap, funcsLen)
}
func (shard *pipeStatsProcessorShard) newPipeStatsGroup() *pipeStatsGroup {
bytesAllocated := shard.a.bytesAllocated
funcsLen := len(shard.psp.ps.funcs)
sfps := shard.a.newStatsProcessors(uint(funcsLen))
for i, f := range shard.psp.ps.funcs {
sfp := f.f.newStatsProcessor(&shard.a)
initStatsConcurrency(sfp, uint(len(shard.psp.shards)))
sfps[i] = sfp
}
psg := shard.a.newPipeStatsGroup()
psg.funcs = shard.psp.ps.funcs
psg.sfps = sfps
shard.stateSizeBudget -= shard.a.bytesAllocated - bytesAllocated
return psg
}
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
byFields := shard.ps.byFields
byFields := shard.psp.ps.byFields
// Update shard.bms by applying per-function filters
shard.applyPerFunctionFilters(br)
@@ -448,7 +459,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
// Process stats for the defined functions
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
psg := shard.m.getPipeStatsGroupString(nil)
psg := shard.getPipeStatsGroupString(nil)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
@@ -481,7 +492,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
psg := shard.m.getPipeStatsGroupString(keyBuf)
psg := shard.getPipeStatsGroupString(keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
shard.keyBuf = keyBuf
return
@@ -505,7 +516,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
}
psg = shard.m.getPipeStatsGroupString(keyBuf)
psg = shard.getPipeStatsGroupString(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -517,7 +528,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
if c.isConst {
// Fast path for column with a constant value.
v := br.getBucketedValue(c.valuesEncoded[0], bf)
psg := shard.m.getPipeStatsGroupGeneric(v)
psg := shard.getPipeStatsGroupGeneric(v)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
@@ -530,7 +541,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
for i, v := range values {
if i <= 0 || values[i-1] != v {
n := unmarshalUint8(v)
psg = shard.m.getPipeStatsGroupUint64(uint64(n))
psg = shard.getPipeStatsGroupUint64(uint64(n))
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -541,7 +552,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
for i, v := range values {
if i <= 0 || values[i-1] != v {
n := unmarshalUint16(v)
psg = shard.m.getPipeStatsGroupUint64(uint64(n))
psg = shard.getPipeStatsGroupUint64(uint64(n))
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -552,7 +563,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
for i, v := range values {
if i <= 0 || values[i-1] != v {
n := unmarshalUint32(v)
psg = shard.m.getPipeStatsGroupUint64(uint64(n))
psg = shard.getPipeStatsGroupUint64(uint64(n))
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -563,7 +574,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
for i, v := range values {
if i <= 0 || values[i-1] != v {
n := unmarshalUint64(v)
psg = shard.m.getPipeStatsGroupUint64(n)
psg = shard.getPipeStatsGroupUint64(n)
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -574,7 +585,7 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
for i, v := range values {
if i <= 0 || values[i-1] != v {
n := unmarshalInt64(v)
psg = shard.m.getPipeStatsGroupInt64(n)
psg = shard.getPipeStatsGroupInt64(n)
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
@@ -587,14 +598,14 @@ func (shard *pipeStatsProcessorShard) updateStatsSingleColumn(br *blockResult, b
values := c.getValuesBucketed(br, bf)
for i := 0; i < br.rowsLen; i++ {
if i <= 0 || values[i-1] != values[i] {
psg = shard.m.getPipeStatsGroupGeneric(values[i])
psg = shard.getPipeStatsGroupGeneric(values[i])
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
}
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
funcs := shard.ps.funcs
funcs := shard.psp.ps.funcs
for i := range funcs {
iff := funcs[i].iff
if iff == nil {
@@ -609,6 +620,111 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
}
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroupGeneric(v string) *pipeStatsGroup {
if n, ok := tryParseUint64(v); ok {
return shard.getPipeStatsGroupUint64(n)
}
if len(v) > 0 && v[0] == '-' {
if n, ok := tryParseInt64(v); ok {
return shard.getPipeStatsGroupNegativeInt64(n)
}
}
return shard.getPipeStatsGroupString(bytesutil.ToUnsafeBytes(v))
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroupInt64(n int64) *pipeStatsGroup {
if n >= 0 {
return shard.getPipeStatsGroupUint64(uint64(n))
}
return shard.getPipeStatsGroupNegativeInt64(n)
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroupUint64(n uint64) *pipeStatsGroup {
if shard.groupMapShards == nil {
psg, isNew := shard.groupMap.getPipeStatsGroupUint64(n)
if isNew {
shard.probablyMoveGroupMapToShards()
}
return psg
}
psm := shard.getGroupMapShardByUint64(n)
psg, _ := psm.getPipeStatsGroupUint64(n)
return psg
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroupNegativeInt64(n int64) *pipeStatsGroup {
if shard.groupMapShards == nil {
psg, isNew := shard.groupMap.getPipeStatsGroupNegativeInt64(n)
if isNew {
shard.probablyMoveGroupMapToShards()
}
return psg
}
psm := shard.getGroupMapShardByUint64(uint64(n))
psg, _ := psm.getPipeStatsGroupNegativeInt64(n)
return psg
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroupString(v []byte) *pipeStatsGroup {
if shard.groupMapShards == nil {
psg, isNew := shard.groupMap.getPipeStatsGroupString(v)
if isNew {
shard.probablyMoveGroupMapToShards()
}
return psg
}
psm := shard.getGroupMapShardByString(v)
psg, _ := psm.getPipeStatsGroupString(v)
return psg
}
func (shard *pipeStatsProcessorShard) probablyMoveGroupMapToShards() {
if shard.groupMap.entriesCount() < pipeStatsGroupMapMaxLen {
return
}
shard.moveGroupMapToShards()
}
func (shard *pipeStatsProcessorShard) moveGroupMapToShards() {
// set cpusCount to the number of shards, since this is the concurrency limit set by the caller.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201
cpusCount := uint(len(shard.psp.shards))
bytesAllocatedPrev := shard.a.bytesAllocated
shard.groupMapShards = shard.a.newPipeStatsGroupMaps(cpusCount)
shard.stateSizeBudget -= shard.a.bytesAllocated - bytesAllocatedPrev
for i := range shard.groupMapShards {
shard.groupMapShards[i].init(shard)
}
for n, psg := range shard.groupMap.u64 {
psm := shard.getGroupMapShardByUint64(n)
psm.setPipeStatsGroupUint64(n, psg)
}
for n, psg := range shard.groupMap.negative64 {
psm := shard.getGroupMapShardByUint64(n)
psm.setPipeStatsGroupNegativeInt64(int64(n), psg)
}
for s, psg := range shard.groupMap.strings {
psm := shard.getGroupMapShardByString(bytesutil.ToUnsafeBytes(s))
psm.setPipeStatsGroupString(s, psg)
}
shard.groupMap.reset()
}
func (shard *pipeStatsProcessorShard) getGroupMapShardByString(v []byte) *pipeStatsGroupMap {
h := xxhash.Sum64(v)
shardIdx := h % uint64(len(shard.groupMapShards))
return &shard.groupMapShards[shardIdx]
}
func (shard *pipeStatsProcessorShard) getGroupMapShardByUint64(n uint64) *pipeStatsGroupMap {
h := fastHashUint64(n)
shardIdx := h % uint64(len(shard.groupMapShards))
return &shard.groupMapShards[shardIdx]
}
type pipeStatsGroup struct {
funcs []pipeStatsFunc
sfps []statsProcessor
@@ -677,10 +793,7 @@ func (psp *pipeStatsProcessor) flush() error {
}
// Merge states across shards in parallel
psms, err := psp.mergeShardsParallel()
if err != nil {
return err
}
psms := psp.mergeShardsParallel()
if needStop(psp.stopCh) {
return nil
}
@@ -689,8 +802,8 @@ func (psp *pipeStatsProcessor) flush() error {
// Special case - zero matching rows.
shard := &psp.shards[0]
shard.init()
_ = shard.m.getPipeStatsGroupString(nil)
psms = append(psms, &shard.m)
shard.groupMap.getPipeStatsGroupString(nil)
psms = append(psms, &shard.groupMap)
}
// Write the calculated stats in parallel to the next pipe.
@@ -845,93 +958,48 @@ func (psw *pipeStatsWriter) writeShardData(psm *pipeStatsGroupMap) {
}
}
func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, error) {
func (psp *pipeStatsProcessor) mergeShardsParallel() []*pipeStatsGroupMap {
shards := psp.shards
shardsLen := len(shards)
// set cpusCount to the number of shards, since this is the concurrency limit set by the caller.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201
cpusCount := len(shards)
if shardsLen == 1 {
var psms []*pipeStatsGroupMap
shard := &shards[0]
if shard.m.entriesCount() > 0 {
psms = append(psms, &shard.m)
}
return psms, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]pipeStatsGroupMap, shardsLen)
for i := range shards {
shard := &shards[i]
if shard.groupMapShards != nil {
continue
}
wg.Add(1)
go func(idx int) {
go func() {
defer wg.Done()
perCPU := make([]pipeStatsGroupMap, cpusCount)
for i := range perCPU {
perCPU[i].init(&shards[idx])
}
psm := &shards[idx].m
for n, psg := range psm.u64 {
if needStop(psp.stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = psg
}
for n, psg := range psm.negative64 {
if needStop(psp.stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = psg
}
for k, psg := range psm.strings {
if needStop(psp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[k] = psg
}
perShardMaps[idx] = perCPU
psm.reset()
}(i)
shard.moveGroupMapToShards()
}()
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
return nil
}
// Merge per-shard entries into perShardMaps[0]
// set cpusCount to the number of shards, since this is the concurrency limit set by the caller.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201
cpusCount := len(shards[0].groupMapShards)
for i := 0; i < cpusCount; i++ {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
psm := &perShardMaps[0][cpuIdx]
for _, perCPU := range perShardMaps[1:] {
psm.mergeState(&perCPU[cpuIdx], psp.stopCh)
perCPU[cpuIdx].reset()
psm := &shards[0].groupMapShards[cpuIdx]
for j := range shards[1:] {
src := &shards[1+j].groupMapShards[cpuIdx]
psm.mergeState(src, psp.stopCh)
src.reset()
}
}(i)
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
return nil
}
// Filter out maps without entries
psms := perShardMaps[0]
psms := shards[0].groupMapShards
result := make([]*pipeStatsGroupMap, 0, len(psms))
for i := range psms {
if psms[i].entriesCount() > 0 {
@@ -939,7 +1007,7 @@ func (psp *pipeStatsProcessor) mergeShardsParallel() ([]*pipeStatsGroupMap, erro
}
}
return result, nil
return result
}
func parsePipeStats(lex *lexer, needStatsKeyword bool) (pipe, error) {

View File

@@ -94,7 +94,7 @@ func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, ca
pt: pt,
},
}
shards[i].m.init(&shards[i].stateSizeBudget)
shards[i].m.init(uint(workersCount), &shards[i].stateSizeBudget)
}
ptp := &pipeTopProcessor{
@@ -136,7 +136,7 @@ type pipeTopProcessorShardNopad struct {
pt *pipeTop
// m holds per-value hits.
m hitsMap
m hitsMapAdaptive
// keyBuf is a temporary buffer for building keys for m.
keyBuf []byte
@@ -389,21 +389,17 @@ func (ptp *pipeTopProcessor) mergeShardsParallel() []*pipeTopEntry {
return nil
}
hms := make([]*hitsMap, 0, len(ptp.shards))
hmas := make([]*hitsMapAdaptive, 0, len(ptp.shards))
for i := range ptp.shards {
hm := &ptp.shards[i].m
if hm.entriesCount() > 0 {
hms = append(hms, hm)
hma := &ptp.shards[i].m
if hma.entriesCount() > 0 {
hmas = append(hmas, hma)
}
}
// set cpusCount to the number of shards, since this is the concurrency limit set by the caller.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201
cpusCount := len(ptp.shards)
var entries []*pipeTopEntry
var entriesLock sync.Mutex
hitsMapMergeParallel(hms, cpusCount, ptp.stopCh, func(hm *hitsMap) {
hitsMapMergeParallel(hmas, ptp.stopCh, func(hm *hitsMap) {
es := getTopEntries(hm, limit, ptp.stopCh)
entriesLock.Lock()
entries = append(entries, es...)

View File

@@ -77,7 +77,7 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c
pu: pu,
},
}
shards[i].m.init(&shards[i].stateSizeBudget)
shards[i].m.init(uint(workersCount), &shards[i].stateSizeBudget)
}
pup := &pipeUniqProcessor{
@@ -119,7 +119,7 @@ type pipeUniqProcessorShardNopad struct {
pu *pipeUniq
// m holds per-row hits.
m hitsMap
m hitsMapAdaptive
// keyBuf is a temporary buffer for building keys for m.
keyBuf []byte
@@ -462,21 +462,17 @@ func (pup *pipeUniqProcessor) writeShardData(workerID uint, hm *hitsMap, resetHi
}
func (pup *pipeUniqProcessor) mergeShardsParallel() []*hitsMap {
hms := make([]*hitsMap, 0, len(pup.shards))
hmas := make([]*hitsMapAdaptive, 0, len(pup.shards))
for i := range pup.shards {
hm := &pup.shards[i].m
if hm.entriesCount() > 0 {
hms = append(hms, hm)
hma := &pup.shards[i].m
if hma.entriesCount() > 0 {
hmas = append(hmas, hma)
}
}
// set cpusCount to the number of shards, since this is the concurrency limit set by the caller.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8201
cpusCount := len(pup.shards)
hmsResult := make([]*hitsMap, 0, cpusCount)
var hmsResult []*hitsMap
var hmsLock sync.Mutex
hitsMapMergeParallel(hms, cpusCount, pup.stopCh, func(hm *hitsMap) {
hitsMapMergeParallel(hmas, pup.stopCh, func(hm *hitsMap) {
if hm.entriesCount() > 0 {
hmsLock.Lock()
hmsResult = append(hmsResult, hm)

View File

@@ -9,7 +9,6 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
@@ -33,21 +32,40 @@ func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) {
func (su *statsCountUniq) newStatsProcessor(a *chunkedAllocator) statsProcessor {
sup := a.newStatsCountUniqProcessor()
sup.a = a
sup.m.init()
return sup
}
type statsCountUniqProcessor struct {
a *chunkedAllocator
m statsCountUniqSet
ms []*statsCountUniqSet
// concurrency is the number of parallel workers to use when merging shards.
//
// this field must be updated by the caller before using statsCountUniqProcessor.
concurrency uint
// uniqValues is used for tracking small number of unique values until it reaches statsCountUniqValuesMaxLen.
// After that the unique values are tracked by shards.
uniqValues statsCountUniqSet
// shards are used for tracking big number of unique values.
//
// Every shard contains a share of unique values, which are merged in parallel at finalizeStats().
shards []statsCountUniqSet
// shardss is used for collecting shards from other statsCountUniqProcessor instances at mergeState().
shardss [][]statsCountUniqSet
columnValues [][]string
keyBuf []byte
tmpNum int
}
// the maximum number of values to track in statsCountUniqProcessor.uniqValues before switching to statsCountUniqProcessor.shards
//
// Too big value may slow down mergeState() across big number of CPU cores.
// Too small value may significantly increase RAM usage when coun_uniq() is applied individually to big number of groups.
const statsCountUniqValuesMaxLen = 4 << 10
type statsCountUniqSet struct {
timestamps map[uint64]struct{}
u64 map[uint64]struct{}
@@ -56,17 +74,7 @@ type statsCountUniqSet struct {
}
func (sus *statsCountUniqSet) reset() {
sus.timestamps = nil
sus.u64 = nil
sus.negative64 = nil
sus.strings = nil
}
func (sus *statsCountUniqSet) init() {
sus.timestamps = make(map[uint64]struct{})
sus.u64 = make(map[uint64]struct{})
sus.negative64 = make(map[uint64]struct{})
sus.strings = make(map[string]struct{})
*sus = statsCountUniqSet{}
}
func (sus *statsCountUniqSet) entriesCount() uint64 {
@@ -75,66 +83,33 @@ func (sus *statsCountUniqSet) entriesCount() uint64 {
}
func (sus *statsCountUniqSet) updateStateTimestamp(ts int64) int {
_, ok := sus.timestamps[uint64(ts)]
if ok {
return 0
}
sus.timestamps[uint64(ts)] = struct{}{}
return 8
return updateUint64Set(&sus.timestamps, uint64(ts))
}
func (sus *statsCountUniqSet) updateStateUint64(n uint64) int {
_, ok := sus.u64[n]
if ok {
return 0
}
sus.u64[n] = struct{}{}
return 8
}
func (sus *statsCountUniqSet) updateStateInt64(n int64) int {
if n >= 0 {
return sus.updateStateUint64(uint64(n))
}
return sus.updateStateNegativeInt64(n)
return updateUint64Set(&sus.u64, n)
}
func (sus *statsCountUniqSet) updateStateNegativeInt64(n int64) int {
_, ok := sus.negative64[uint64(n)]
if ok {
return 0
}
sus.negative64[uint64(n)] = struct{}{}
return 8
return updateUint64Set(&sus.negative64, uint64(n))
}
func (sus *statsCountUniqSet) updateStateGeneric(a *chunkedAllocator, v string) int {
if n, ok := tryParseUint64(v); ok {
return sus.updateStateUint64(n)
}
if len(v) > 0 && v[0] == '-' {
if n, ok := tryParseInt64(v); ok {
return sus.updateStateNegativeInt64(n)
}
}
return sus.updateStateString(a, v)
}
func (sus *statsCountUniqSet) updateStateString(a *chunkedAllocator, v string) int {
_, ok := sus.strings[v]
if ok {
func (sus *statsCountUniqSet) updateStateString(a *chunkedAllocator, v []byte) int {
if _, ok := sus.strings[string(v)]; ok {
return 0
}
vCopy := a.cloneString(v)
sus.strings[vCopy] = struct{}{}
return int(unsafe.Sizeof(v)) + len(v)
vCopy := a.cloneBytesToString(v)
return setStringSet(&sus.strings, vCopy) + len(vCopy)
}
func (sus *statsCountUniqSet) mergeState(src *statsCountUniqSet, stopCh <-chan struct{}) {
mergeUint64Set(sus.timestamps, src.timestamps, stopCh)
mergeUint64Set(sus.u64, src.u64, stopCh)
mergeUint64Set(sus.negative64, src.negative64, stopCh)
mergeUint64Set(&sus.timestamps, src.timestamps, stopCh)
mergeUint64Set(&sus.u64, src.u64, stopCh)
mergeUint64Set(&sus.negative64, src.negative64, stopCh)
if sus.strings == nil {
sus.strings = make(map[string]struct{})
}
for k := range src.strings {
if needStop(stopCh) {
return
@@ -145,7 +120,46 @@ func (sus *statsCountUniqSet) mergeState(src *statsCountUniqSet, stopCh <-chan s
}
}
func mergeUint64Set(dst map[uint64]struct{}, src map[uint64]struct{}, stopCh <-chan struct{}) {
func updateUint64Set(dstPtr *map[uint64]struct{}, n uint64) int {
dst := *dstPtr
if _, ok := dst[n]; ok {
return 0
}
return setUint64Set(dstPtr, n)
}
func setUint64Set(dstPtr *map[uint64]struct{}, n uint64) int {
dst := *dstPtr
if dst == nil {
dst = map[uint64]struct{}{
n: {},
}
*dstPtr = dst
return int(unsafe.Sizeof(dst) + unsafe.Sizeof(n))
}
dst[n] = struct{}{}
return int(unsafe.Sizeof(n))
}
func setStringSet(dstPtr *map[string]struct{}, v string) int {
dst := *dstPtr
if dst == nil {
dst = map[string]struct{}{
v: {},
}
*dstPtr = dst
return int(unsafe.Sizeof(dst) + unsafe.Sizeof(v))
}
dst[v] = struct{}{}
return int(unsafe.Sizeof(v))
}
func mergeUint64Set(dstPtr *map[uint64]struct{}, src map[uint64]struct{}, stopCh <-chan struct{}) {
dst := *dstPtr
if dst == nil {
dst = make(map[uint64]struct{})
*dstPtr = dst
}
for n := range src {
if needStop(stopCh) {
return
@@ -318,12 +332,12 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
if c.isTime {
// Count unique timestamps
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
// This timestamp has been already counted.
continue
}
stateSizeIncrease += sup.m.updateStateTimestamp(timestamp)
stateSizeIncrease += sup.updateStateTimestamp(timestamps[i])
}
return stateSizeIncrease
}
@@ -356,7 +370,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
continue
}
n := unmarshalUint8(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint16:
@@ -366,7 +380,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
continue
}
n := unmarshalUint16(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint32:
@@ -376,7 +390,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
continue
}
n := unmarshalUint32(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint64:
@@ -386,7 +400,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
continue
}
n := unmarshalUint64(v)
stateSizeIncrease += sup.m.updateStateUint64(n)
stateSizeIncrease += sup.updateStateUint64(n)
}
return stateSizeIncrease
case valueTypeInt64:
@@ -396,7 +410,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRowsSingleColumn(br *blockR
continue
}
n := unmarshalInt64(v)
stateSizeIncrease += sup.m.updateStateInt64(n)
stateSizeIncrease += sup.updateStateInt64(n)
}
return stateSizeIncrease
default:
@@ -421,8 +435,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRowSingleColumn(br *blockResul
if c.isTime {
// Count unique timestamps
timestamps := br.getTimestamps()
timestamp := timestamps[rowIdx]
return sup.m.updateStateTimestamp(timestamp)
return sup.updateStateTimestamp(timestamps[rowIdx])
}
if c.isConst {
// count unique const values
@@ -449,27 +462,27 @@ func (sup *statsCountUniqProcessor) updateStatsForRowSingleColumn(br *blockResul
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint8(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint16:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint16(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint32:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint32(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint64(v)
return sup.m.updateStateUint64(n)
return sup.updateStateUint64(n)
case valueTypeInt64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalInt64(v)
return sup.m.updateStateInt64(n)
return sup.updateStateInt64(n)
default:
// Count unique values for the given rowIdx
v := c.getValueAtRow(br, rowIdx)
@@ -488,20 +501,32 @@ func (sup *statsCountUniqProcessor) mergeState(sf statsFunc, sfp statsProcessor)
}
src := sfp.(*statsCountUniqProcessor)
if src.m.entriesCount() > 100_000 {
// Postpone merging too big number of items in parallel
sup.ms = append(sup.ms, &src.m)
return
if sup.shards == nil {
if src.shards == nil {
sup.uniqValues.mergeState(&src.uniqValues, nil)
src.uniqValues.reset()
sup.probablyMoveUniqValuesToShards()
return
}
sup.moveUniqValuesToShards()
}
sup.m.mergeState(&src.m, nil)
if src.shards == nil {
src.moveUniqValuesToShards()
}
sup.shardss = append(sup.shardss, src.shards)
src.shards = nil
}
func (sup *statsCountUniqProcessor) finalizeStats(sf statsFunc, dst []byte, stopCh <-chan struct{}) []byte {
n := sup.m.entriesCount()
if len(sup.ms) > 0 {
sup.ms = append(sup.ms, &sup.m)
n = countUniqParallel(sup.ms, stopCh)
n := sup.entriesCount()
if len(sup.shardss) > 0 {
if sup.shards != nil {
sup.shardss = append(sup.shardss, sup.shards)
sup.shards = nil
}
n = countUniqParallel(sup.shardss, stopCh)
}
su := sf.(*statsCountUniq)
@@ -511,78 +536,22 @@ func (sup *statsCountUniqProcessor) finalizeStats(sf statsFunc, dst []byte, stop
return strconv.AppendUint(dst, n, 10)
}
func countUniqParallel(ms []*statsCountUniqSet, stopCh <-chan struct{}) uint64 {
shardsLen := len(ms)
cpusCount := cgroup.AvailableCPUs()
var wg sync.WaitGroup
msShards := make([][]statsCountUniqSet, shardsLen)
for i := range msShards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
perCPU := make([]statsCountUniqSet, cpusCount)
for i := range perCPU {
perCPU[i].init()
}
sus := ms[idx]
for ts := range sus.timestamps {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&ts)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].timestamps[ts] = struct{}{}
}
for n := range sus.u64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = struct{}{}
}
for n := range sus.negative64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = struct{}{}
}
for k := range sus.strings {
if needStop(stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[k] = struct{}{}
}
msShards[idx] = perCPU
ms[idx].reset()
}(i)
}
wg.Wait()
func countUniqParallel(shardss [][]statsCountUniqSet, stopCh <-chan struct{}) uint64 {
cpusCount := len(shardss[0])
perCPUCounts := make([]uint64, cpusCount)
var wg sync.WaitGroup
for i := range perCPUCounts {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
sus := &msShards[0][cpuIdx]
for _, perCPU := range msShards[1:] {
sus := &shardss[0][cpuIdx]
for _, perCPU := range shardss[1:] {
sus.mergeState(&perCPU[cpuIdx], stopCh)
perCPU[cpuIdx].reset()
}
perCPUCounts[cpuIdx] = sus.entriesCount()
sus.reset()
}(i)
}
wg.Wait()
@@ -594,12 +563,130 @@ func countUniqParallel(ms []*statsCountUniqSet, stopCh <-chan struct{}) uint64 {
return countTotal
}
func (sup *statsCountUniqProcessor) entriesCount() uint64 {
if sup.shards == nil {
return sup.uniqValues.entriesCount()
}
n := uint64(0)
shards := sup.shards
for i := range shards {
n += shards[i].entriesCount()
}
return n
}
func (sup *statsCountUniqProcessor) updateStateGeneric(v string) int {
return sup.m.updateStateGeneric(sup.a, v)
if n, ok := tryParseUint64(v); ok {
return sup.updateStateUint64(n)
}
if len(v) > 0 && v[0] == '-' {
if n, ok := tryParseInt64(v); ok {
return sup.updateStateNegativeInt64(n)
}
}
return sup.updateStateString(bytesutil.ToUnsafeBytes(v))
}
func (sup *statsCountUniqProcessor) updateStateInt64(n int64) int {
if n >= 0 {
return sup.updateStateUint64(uint64(n))
}
return sup.updateStateNegativeInt64(n)
}
func (sup *statsCountUniqProcessor) updateStateString(v []byte) int {
return sup.m.updateStateString(sup.a, bytesutil.ToUnsafeString(v))
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateString(sup.a, v)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByString(v)
return sus.updateStateString(sup.a, v)
}
func (sup *statsCountUniqProcessor) updateStateTimestamp(ts int64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateTimestamp(ts)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(uint64(ts))
return sus.updateStateTimestamp(ts)
}
func (sup *statsCountUniqProcessor) updateStateUint64(n uint64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateUint64(n)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(n)
return sus.updateStateUint64(n)
}
func (sup *statsCountUniqProcessor) updateStateNegativeInt64(n int64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateNegativeInt64(n)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(uint64(n))
return sus.updateStateNegativeInt64(n)
}
func (sup *statsCountUniqProcessor) probablyMoveUniqValuesToShards() int {
if sup.uniqValues.entriesCount() < statsCountUniqValuesMaxLen {
return 0
}
return sup.moveUniqValuesToShards()
}
func (sup *statsCountUniqProcessor) moveUniqValuesToShards() int {
cpusCount := sup.concurrency
bytesAllocatedPrev := sup.a.bytesAllocated
sup.shards = sup.a.newStatsCountUniqSets(cpusCount)
stateSizeIncrease := sup.a.bytesAllocated - bytesAllocatedPrev
for ts := range sup.uniqValues.timestamps {
sus := sup.getShardByUint64(ts)
setUint64Set(&sus.timestamps, ts)
}
for n := range sup.uniqValues.u64 {
sus := sup.getShardByUint64(n)
setUint64Set(&sus.u64, n)
}
for n := range sup.uniqValues.negative64 {
sus := sup.getShardByUint64(n)
setUint64Set(&sus.negative64, n)
}
for s := range sup.uniqValues.strings {
sus := sup.getShardByString(bytesutil.ToUnsafeBytes(s))
setStringSet(&sus.strings, s)
}
sup.uniqValues.reset()
return stateSizeIncrease
}
func (sup *statsCountUniqProcessor) getShardByString(v []byte) *statsCountUniqSet {
h := xxhash.Sum64(v)
cpuIdx := h % uint64(len(sup.shards))
return &sup.shards[cpuIdx]
}
func (sup *statsCountUniqProcessor) getShardByUint64(n uint64) *statsCountUniqSet {
h := fastHashUint64(n)
cpuIdx := h % uint64(len(sup.shards))
return &sup.shards[cpuIdx]
}
func (sup *statsCountUniqProcessor) limitReached(su *statsCountUniq) bool {
@@ -607,7 +694,7 @@ func (sup *statsCountUniqProcessor) limitReached(su *statsCountUniq) bool {
if limit <= 0 {
return false
}
return sup.m.entriesCount() > limit
return sup.entriesCount() > limit
}
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {

View File

@@ -4,12 +4,10 @@ import (
"fmt"
"strconv"
"sync"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
@@ -33,21 +31,40 @@ func (su *statsCountUniqHash) updateNeededFields(neededFields fieldsSet) {
func (su *statsCountUniqHash) newStatsProcessor(a *chunkedAllocator) statsProcessor {
sup := a.newStatsCountUniqHashProcessor()
sup.a = a
sup.m.init()
return sup
}
type statsCountUniqHashProcessor struct {
a *chunkedAllocator
m statsCountUniqHashSet
ms []*statsCountUniqHashSet
// concurrency is the number of parallel workers to use when merging shards.
//
// this field must be updated by the caller before using statsCountUniqHashProcessor.
concurrency uint
// uniqValues is used for tracking small number of unique values until it reaches statsCountUniqHashValuesMaxLen.
// After that the unique values are tracked by shards.
uniqValues statsCountUniqHashSet
// shards are used for tracking big number of unique values.
//
// Every shard contains a share of unique values, which are merged in parallel at finalizeStats().
shards []statsCountUniqHashSet
// shardss is used for collecting shards from other statsCountUniqProcessor instances at mergeState().
shardss [][]statsCountUniqHashSet
columnValues [][]string
keyBuf []byte
tmpNum int
}
// the maximum number of values to track in statsCountUniqHashProcessor.uniqValues before switching to statsCountUniqHashProcessor.shards
//
// Too big value may slow down mergeState() across big number of CPU cores.
// Too small value may significantly increase RAM usage when coun_uniq_hash() is applied individually to big number of groups.
const statsCountUniqHashValuesMaxLen = 4 << 10
type statsCountUniqHashSet struct {
timestamps map[uint64]struct{}
u64 map[uint64]struct{}
@@ -56,17 +73,7 @@ type statsCountUniqHashSet struct {
}
func (sus *statsCountUniqHashSet) reset() {
sus.timestamps = nil
sus.u64 = nil
sus.negative64 = nil
sus.strings = nil
}
func (sus *statsCountUniqHashSet) init() {
sus.timestamps = make(map[uint64]struct{})
sus.u64 = make(map[uint64]struct{})
sus.negative64 = make(map[uint64]struct{})
sus.strings = make(map[uint64]struct{})
*sus = statsCountUniqHashSet{}
}
func (sus *statsCountUniqHashSet) entriesCount() uint64 {
@@ -75,66 +82,26 @@ func (sus *statsCountUniqHashSet) entriesCount() uint64 {
}
func (sus *statsCountUniqHashSet) updateStateTimestamp(ts int64) int {
_, ok := sus.timestamps[uint64(ts)]
if ok {
return 0
}
sus.timestamps[uint64(ts)] = struct{}{}
return 8
return updateUint64Set(&sus.timestamps, uint64(ts))
}
func (sus *statsCountUniqHashSet) updateStateUint64(n uint64) int {
_, ok := sus.u64[n]
if ok {
return 0
}
sus.u64[n] = struct{}{}
return 8
}
func (sus *statsCountUniqHashSet) updateStateInt64(n int64) int {
if n >= 0 {
return sus.updateStateUint64(uint64(n))
}
return sus.updateStateNegativeInt64(n)
return updateUint64Set(&sus.timestamps, n)
}
func (sus *statsCountUniqHashSet) updateStateNegativeInt64(n int64) int {
_, ok := sus.negative64[uint64(n)]
if ok {
return 0
}
sus.negative64[uint64(n)] = struct{}{}
return 8
return updateUint64Set(&sus.negative64, uint64(n))
}
func (sus *statsCountUniqHashSet) updateStateGeneric(v string) int {
if n, ok := tryParseUint64(v); ok {
return sus.updateStateUint64(n)
}
if len(v) > 0 && v[0] == '-' {
if n, ok := tryParseInt64(v); ok {
return sus.updateStateNegativeInt64(n)
}
}
return sus.updateStateString(bytesutil.ToUnsafeBytes(v))
}
func (sus *statsCountUniqHashSet) updateStateString(v []byte) int {
h := xxhash.Sum64(v)
_, ok := sus.strings[h]
if ok {
return 0
}
sus.strings[h] = struct{}{}
return 8
func (sus *statsCountUniqHashSet) updateStateStringHash(h uint64) int {
return updateUint64Set(&sus.strings, h)
}
func (sus *statsCountUniqHashSet) mergeState(src *statsCountUniqHashSet, stopCh <-chan struct{}) {
mergeUint64Set(sus.timestamps, src.timestamps, stopCh)
mergeUint64Set(sus.u64, src.u64, stopCh)
mergeUint64Set(sus.negative64, src.negative64, stopCh)
mergeUint64Set(sus.strings, src.strings, stopCh)
mergeUint64Set(&sus.timestamps, src.timestamps, stopCh)
mergeUint64Set(&sus.u64, src.u64, stopCh)
mergeUint64Set(&sus.negative64, src.negative64, stopCh)
mergeUint64Set(&sus.strings, src.strings, stopCh)
}
func (sup *statsCountUniqHashProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
@@ -186,7 +153,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRows(sf statsFunc, br *
// Do not count empty values
continue
}
stateSizeIncrease += sup.m.updateStateString(keyBuf)
stateSizeIncrease += sup.updateStateString(keyBuf)
}
sup.keyBuf = keyBuf
return stateSizeIncrease
@@ -233,7 +200,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRows(sf statsFunc, br *
// Do not count empty values
continue
}
stateSizeIncrease += sup.m.updateStateString(keyBuf)
stateSizeIncrease += sup.updateStateString(keyBuf)
}
sup.keyBuf = keyBuf
return stateSizeIncrease
@@ -266,7 +233,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRow(sf statsFunc, br *bloc
// Do not count empty values
return 0
}
return sup.m.updateStateString(keyBuf)
return sup.updateStateString(keyBuf)
}
if len(fields) == 1 {
// Fast path for a single column.
@@ -290,7 +257,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRow(sf statsFunc, br *bloc
// Do not count empty values
return 0
}
return sup.m.updateStateString(keyBuf)
return sup.updateStateString(keyBuf)
}
func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *blockResult, columnName string) int {
@@ -299,12 +266,12 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
if c.isTime {
// Count unique timestamps
timestamps := br.getTimestamps()
for i, timestamp := range timestamps {
for i := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
// This timestamp has been already counted.
continue
}
stateSizeIncrease += sup.m.updateStateTimestamp(timestamp)
stateSizeIncrease += sup.updateStateTimestamp(timestamps[i])
}
return stateSizeIncrease
}
@@ -315,7 +282,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
// Do not count empty values
return 0
}
return sup.m.updateStateGeneric(v)
return sup.updateStateGeneric(v)
}
switch c.valueType {
@@ -327,7 +294,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
// Do not count empty values
return
}
sup.tmpNum += sup.m.updateStateGeneric(v)
sup.tmpNum += sup.updateStateGeneric(v)
})
return sup.tmpNum
case valueTypeUint8:
@@ -337,7 +304,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
continue
}
n := unmarshalUint8(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint16:
@@ -347,7 +314,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
continue
}
n := unmarshalUint16(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint32:
@@ -357,7 +324,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
continue
}
n := unmarshalUint32(v)
stateSizeIncrease += sup.m.updateStateUint64(uint64(n))
stateSizeIncrease += sup.updateStateUint64(uint64(n))
}
return stateSizeIncrease
case valueTypeUint64:
@@ -367,7 +334,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
continue
}
n := unmarshalUint64(v)
stateSizeIncrease += sup.m.updateStateUint64(n)
stateSizeIncrease += sup.updateStateUint64(n)
}
return stateSizeIncrease
case valueTypeInt64:
@@ -377,7 +344,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
continue
}
n := unmarshalInt64(v)
stateSizeIncrease += sup.m.updateStateInt64(n)
stateSizeIncrease += sup.updateStateInt64(n)
}
return stateSizeIncrease
default:
@@ -392,7 +359,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForAllRowsSingleColumn(br *bl
// This value has been already counted.
continue
}
stateSizeIncrease += sup.m.updateStateGeneric(v)
stateSizeIncrease += sup.updateStateGeneric(v)
}
return stateSizeIncrease
}
@@ -403,8 +370,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRowSingleColumn(br *blockR
if c.isTime {
// Count unique timestamps
timestamps := br.getTimestamps()
timestamp := timestamps[rowIdx]
return sup.m.updateStateTimestamp(timestamp)
return sup.updateStateTimestamp(timestamps[rowIdx])
}
if c.isConst {
// count unique const values
@@ -413,7 +379,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRowSingleColumn(br *blockR
// Do not count empty values
return 0
}
return sup.m.updateStateGeneric(v)
return sup.updateStateGeneric(v)
}
switch c.valueType {
@@ -426,32 +392,32 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRowSingleColumn(br *blockR
// Do not count empty values
return 0
}
return sup.m.updateStateGeneric(v)
return sup.updateStateGeneric(v)
case valueTypeUint8:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint8(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint16:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint16(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint32:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint32(v)
return sup.m.updateStateUint64(uint64(n))
return sup.updateStateUint64(uint64(n))
case valueTypeUint64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalUint64(v)
return sup.m.updateStateUint64(n)
return sup.updateStateUint64(n)
case valueTypeInt64:
values := c.getValuesEncoded(br)
v := values[rowIdx]
n := unmarshalInt64(v)
return sup.m.updateStateInt64(n)
return sup.updateStateInt64(n)
default:
// Count unique values for the given rowIdx
v := c.getValueAtRow(br, rowIdx)
@@ -459,7 +425,7 @@ func (sup *statsCountUniqHashProcessor) updateStatsForRowSingleColumn(br *blockR
// Do not count empty values
return 0
}
return sup.m.updateStateGeneric(v)
return sup.updateStateGeneric(v)
}
}
@@ -470,100 +436,57 @@ func (sup *statsCountUniqHashProcessor) mergeState(sf statsFunc, sfp statsProces
}
src := sfp.(*statsCountUniqHashProcessor)
if src.m.entriesCount() > 100_000 {
// Postpone merging too big number of items in parallel
sup.ms = append(sup.ms, &src.m)
return
if sup.shards == nil {
if src.shards == nil {
sup.uniqValues.mergeState(&src.uniqValues, nil)
src.uniqValues.reset()
sup.probablyMoveUniqValuesToShards()
return
}
sup.moveUniqValuesToShards()
}
sup.m.mergeState(&src.m, nil)
if src.shards == nil {
src.moveUniqValuesToShards()
}
sup.shardss = append(sup.shardss, src.shards)
src.shards = nil
}
func (sup *statsCountUniqHashProcessor) finalizeStats(sf statsFunc, dst []byte, stopCh <-chan struct{}) []byte {
su := sf.(*statsCountUniqHash)
n := sup.m.entriesCount()
if len(sup.ms) > 0 {
sup.ms = append(sup.ms, &sup.m)
n = countUniqHashParallel(sup.ms, stopCh)
n := sup.entriesCount()
if len(sup.shardss) > 0 {
if sup.shards != nil {
sup.shardss = append(sup.shardss, sup.shards)
sup.shards = nil
}
n = countUniqHashParallel(sup.shardss, stopCh)
}
su := sf.(*statsCountUniqHash)
if limit := su.limit; limit > 0 && n > limit {
n = limit
}
return strconv.AppendUint(dst, n, 10)
}
func countUniqHashParallel(ms []*statsCountUniqHashSet, stopCh <-chan struct{}) uint64 {
shardsLen := len(ms)
cpusCount := cgroup.AvailableCPUs()
var wg sync.WaitGroup
msShards := make([][]statsCountUniqHashSet, shardsLen)
for i := range msShards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
perCPU := make([]statsCountUniqHashSet, cpusCount)
for i := range perCPU {
perCPU[i].init()
}
sus := ms[idx]
for ts := range sus.timestamps {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&ts)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].timestamps[ts] = struct{}{}
}
for n := range sus.u64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].u64[n] = struct{}{}
}
for n := range sus.negative64 {
if needStop(stopCh) {
return
}
k := unsafe.Slice((*byte)(unsafe.Pointer(&n)), 8)
h := xxhash.Sum64(k)
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].negative64[n] = struct{}{}
}
for h := range sus.strings {
if needStop(stopCh) {
return
}
cpuIdx := h % uint64(len(perCPU))
perCPU[cpuIdx].strings[h] = struct{}{}
}
msShards[idx] = perCPU
ms[idx].reset()
}(i)
}
wg.Wait()
func countUniqHashParallel(shardss [][]statsCountUniqHashSet, stopCh <-chan struct{}) uint64 {
cpusCount := len(shardss[0])
perCPUCounts := make([]uint64, cpusCount)
var wg sync.WaitGroup
for i := range perCPUCounts {
wg.Add(1)
go func(cpuIdx int) {
defer wg.Done()
sus := &msShards[0][cpuIdx]
for _, perCPU := range msShards[1:] {
sus := &shardss[0][cpuIdx]
for _, perCPU := range shardss[1:] {
sus.mergeState(&perCPU[cpuIdx], stopCh)
perCPU[cpuIdx].reset()
}
perCPUCounts[cpuIdx] = sus.entriesCount()
sus.reset()
}(i)
}
wg.Wait()
@@ -575,12 +498,142 @@ func countUniqHashParallel(ms []*statsCountUniqHashSet, stopCh <-chan struct{})
return countTotal
}
func (sup *statsCountUniqHashProcessor) entriesCount() uint64 {
if sup.shards == nil {
return sup.uniqValues.entriesCount()
}
n := uint64(0)
shards := sup.shards
for i := range shards {
n += shards[i].entriesCount()
}
return n
}
func (sup *statsCountUniqHashProcessor) updateStateGeneric(v string) int {
if n, ok := tryParseUint64(v); ok {
return sup.updateStateUint64(n)
}
if len(v) > 0 && v[0] == '-' {
if n, ok := tryParseInt64(v); ok {
return sup.updateStateNegativeInt64(n)
}
}
return sup.updateStateString(bytesutil.ToUnsafeBytes(v))
}
func (sup *statsCountUniqHashProcessor) updateStateInt64(n int64) int {
if n >= 0 {
return sup.updateStateUint64(uint64(n))
}
return sup.updateStateNegativeInt64(n)
}
func (sup *statsCountUniqHashProcessor) updateStateString(v []byte) int {
h := xxhash.Sum64(v)
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateStringHash(h)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
return sup.updateStateStringHash(h)
}
func (sup *statsCountUniqHashProcessor) updateStateStringHash(h uint64) int {
sus := sup.getShardByStringHash(h)
return sus.updateStateStringHash(h)
}
func (sup *statsCountUniqHashProcessor) updateStateTimestamp(ts int64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateTimestamp(ts)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(uint64(ts))
return sus.updateStateTimestamp(ts)
}
func (sup *statsCountUniqHashProcessor) updateStateUint64(n uint64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateUint64(n)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(n)
return sus.updateStateUint64(n)
}
func (sup *statsCountUniqHashProcessor) updateStateNegativeInt64(n int64) int {
if sup.shards == nil {
stateSizeIncrease := sup.uniqValues.updateStateNegativeInt64(n)
if stateSizeIncrease > 0 {
stateSizeIncrease += sup.probablyMoveUniqValuesToShards()
}
return stateSizeIncrease
}
sus := sup.getShardByUint64(uint64(n))
return sus.updateStateNegativeInt64(n)
}
func (sup *statsCountUniqHashProcessor) probablyMoveUniqValuesToShards() int {
if sup.uniqValues.entriesCount() < statsCountUniqHashValuesMaxLen {
return 0
}
return sup.moveUniqValuesToShards()
}
func (sup *statsCountUniqHashProcessor) moveUniqValuesToShards() int {
cpusCount := sup.concurrency
bytesAllocatedPrev := sup.a.bytesAllocated
sup.shards = sup.a.newStatsCountUniqHashSets(cpusCount)
stateSizeIncrease := sup.a.bytesAllocated - bytesAllocatedPrev
for ts := range sup.uniqValues.timestamps {
sus := sup.getShardByUint64(ts)
setUint64Set(&sus.timestamps, ts)
}
for n := range sup.uniqValues.u64 {
sus := sup.getShardByUint64(n)
setUint64Set(&sus.u64, n)
}
for n := range sup.uniqValues.negative64 {
sus := sup.getShardByUint64(n)
setUint64Set(&sus.negative64, n)
}
for h := range sup.uniqValues.strings {
sus := sup.getShardByStringHash(h)
setUint64Set(&sus.strings, h)
}
sup.uniqValues.reset()
return stateSizeIncrease
}
func (sup *statsCountUniqHashProcessor) getShardByStringHash(h uint64) *statsCountUniqHashSet {
cpuIdx := h % uint64(len(sup.shards))
return &sup.shards[cpuIdx]
}
func (sup *statsCountUniqHashProcessor) getShardByUint64(n uint64) *statsCountUniqHashSet {
h := fastHashUint64(n)
cpuIdx := h % uint64(len(sup.shards))
return &sup.shards[cpuIdx]
}
func (sup *statsCountUniqHashProcessor) limitReached(su *statsCountUniqHash) bool {
limit := su.limit
if limit <= 0 {
return false
}
return sup.m.entriesCount() > limit
return sup.entriesCount() > limit
}
func parseStatsCountUniqHash(lex *lexer) (*statsCountUniqHash, error) {
@@ -602,3 +655,10 @@ func parseStatsCountUniqHash(lex *lexer) (*statsCountUniqHash, error) {
}
return su, nil
}
func fastHashUint64(x uint64) uint64 {
x ^= x >> 12 // a
x ^= x << 25 // b
x ^= x >> 27 // c
return x * 2685821657736338717
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
type statsUniqValues struct {
@@ -41,6 +40,11 @@ func (su *statsUniqValues) newStatsProcessor(a *chunkedAllocator) statsProcessor
type statsUniqValuesProcessor struct {
a *chunkedAllocator
// concurrency is the number of parallel workers to use when merging shards.
//
// this field must be updated by the caller before using statsUniqValuesProcessor.
concurrency uint
m map[string]struct{}
ms []map[string]struct{}
}
@@ -162,7 +166,7 @@ func (sup *statsUniqValuesProcessor) finalizeStats(sf statsFunc, dst []byte, sto
var items []string
if len(sup.ms) > 0 {
sup.ms = append(sup.ms, sup.m)
items = mergeSetsParallel(sup.ms, stopCh)
items = mergeSetsParallel(sup.ms, sup.concurrency, stopCh)
} else {
items = setToSortedSlice(sup.m)
}
@@ -174,9 +178,9 @@ func (sup *statsUniqValuesProcessor) finalizeStats(sf statsFunc, dst []byte, sto
return marshalJSONArray(dst, items)
}
func mergeSetsParallel(ms []map[string]struct{}, stopCh <-chan struct{}) []string {
func mergeSetsParallel(ms []map[string]struct{}, concurrency uint, stopCh <-chan struct{}) []string {
shardsLen := len(ms)
cpusCount := cgroup.AvailableCPUs()
cpusCount := concurrency
var wg sync.WaitGroup
msShards := make([][]map[string]struct{}, shardsLen)