mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 17:56:32 +03:00
Compare commits
1 Commits
master
...
labelscomp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8a1b3f9bd |
4
.github/workflows/build.yml
vendored
4
.github/workflows/build.yml
vendored
@@ -63,11 +63,11 @@ jobs:
|
||||
arch: amd64
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
cache-dependency-path: |
|
||||
go.sum
|
||||
|
||||
2
.github/workflows/changelog-linter.yml
vendored
2
.github/workflows/changelog-linter.yml
vendored
@@ -9,7 +9,7 @@ jobs:
|
||||
tip-lint:
|
||||
runs-on: 'ubuntu-latest'
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- uses: 'actions/checkout@v6'
|
||||
with:
|
||||
# needed for proper diff
|
||||
fetch-depth: 0
|
||||
|
||||
2
.github/workflows/check-commit-signed.yml
vendored
2
.github/workflows/check-commit-signed.yml
vendored
@@ -8,7 +8,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0 # we need full history for commit verification
|
||||
|
||||
|
||||
6
.github/workflows/check-licenses.yml
vendored
6
.github/workflows/check-licenses.yml
vendored
@@ -15,11 +15,11 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@master
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: 'go.mod'
|
||||
cache: false
|
||||
@@ -27,7 +27,7 @@ jobs:
|
||||
- run: go version
|
||||
|
||||
- name: Cache Go artifacts
|
||||
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: |
|
||||
~/.cache/go-build
|
||||
|
||||
12
.github/workflows/codeql-analysis-go.yml
vendored
12
.github/workflows/codeql-analysis-go.yml
vendored
@@ -29,18 +29,18 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Set up Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
cache: false
|
||||
go-version-file: 'go.mod'
|
||||
- run: go version
|
||||
|
||||
- name: Cache Go artifacts
|
||||
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: |
|
||||
~/.cache/go-build
|
||||
@@ -50,14 +50,14 @@ jobs:
|
||||
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
|
||||
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
|
||||
uses: github/codeql-action/init@v4.35.2
|
||||
with:
|
||||
languages: go
|
||||
|
||||
- name: Autobuild
|
||||
uses: github/codeql-action/autobuild@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
|
||||
uses: github/codeql-action/autobuild@v4.35.2
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
|
||||
uses: github/codeql-action/analyze@v4.35.2
|
||||
with:
|
||||
category: 'language:go'
|
||||
|
||||
6
.github/workflows/docs.yaml
vendored
6
.github/workflows/docs.yaml
vendored
@@ -16,19 +16,19 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
path: __vm
|
||||
|
||||
- name: Checkout private code
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
repository: VictoriaMetrics/vmdocs
|
||||
token: ${{ secrets.VM_BOT_GH_TOKEN }}
|
||||
path: __vm-docs
|
||||
|
||||
- name: Import GPG key
|
||||
uses: crazy-max/ghaction-import-gpg@2dc316deee8e90f13e1a351ab510b4d5bc0c82cd # v7.0.0
|
||||
uses: crazy-max/ghaction-import-gpg@v7
|
||||
id: import-gpg
|
||||
with:
|
||||
gpg_private_key: ${{ secrets.VM_BOT_GPG_PRIVATE_KEY }}
|
||||
|
||||
14
.github/workflows/test.yml
vendored
14
.github/workflows/test.yml
vendored
@@ -32,11 +32,11 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
cache-dependency-path: |
|
||||
go.sum
|
||||
@@ -47,7 +47,7 @@ jobs:
|
||||
- run: go version
|
||||
|
||||
- name: Cache golangci-lint
|
||||
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: |
|
||||
~/.cache/golangci-lint
|
||||
@@ -72,11 +72,11 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
cache-dependency-path: |
|
||||
go.sum
|
||||
@@ -94,11 +94,11 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Setup Go
|
||||
id: go
|
||||
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
cache-dependency-path: |
|
||||
go.sum
|
||||
|
||||
6
.github/workflows/vmui.yml
vendored
6
.github/workflows/vmui.yml
vendored
@@ -32,11 +32,11 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
uses: actions/checkout@v6
|
||||
|
||||
- name: Cache node_modules
|
||||
id: cache
|
||||
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
|
||||
uses: actions/cache@v5
|
||||
with:
|
||||
path: app/vmui/packages/vmui/node_modules
|
||||
key: vmui-deps-${{ runner.os }}-${{ hashFiles('app/vmui/packages/vmui/package-lock.json', 'app/vmui/Dockerfile-build') }}
|
||||
@@ -69,7 +69,7 @@ jobs:
|
||||
VMUI_SKIP_INSTALL: true
|
||||
|
||||
- name: Annotate Code Linting Results
|
||||
uses: ataylorme/eslint-annotate-action@d57a1193d4c59cbfbf3f86c271f42612f9dbd9e9 # 3.0.0
|
||||
uses: ataylorme/eslint-annotate-action@v3
|
||||
with:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
report-json: app/vmui/packages/vmui/vmui-lint-report.json
|
||||
|
||||
@@ -2,7 +2,6 @@ package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -331,20 +330,15 @@ func (c *client) runWorker() {
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
return
|
||||
case <-c.stopCh:
|
||||
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
|
||||
// If it succeeds, drain the remaining in-memory queue before returning.
|
||||
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
|
||||
// c must be stopped. Wait for a while in the hope the block will be sent.
|
||||
graceDuration := 5 * time.Second
|
||||
select {
|
||||
case ok := <-ch:
|
||||
if !ok {
|
||||
// Return unsent block to the queue.
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
} else {
|
||||
c.drainInMemoryQueue(stopCtx, block[:0])
|
||||
}
|
||||
case <-stopCtx.Done():
|
||||
case <-time.After(graceDuration):
|
||||
// Return unsent block to the queue.
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
}
|
||||
@@ -514,36 +508,6 @@ again:
|
||||
goto again
|
||||
}
|
||||
|
||||
func (c *client) drainInMemoryQueue(stopCtx context.Context, block []byte) {
|
||||
var ok bool
|
||||
for {
|
||||
select {
|
||||
case <-stopCtx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
block, ok = c.fq.MustReadInMemoryBlock(block[:0])
|
||||
if !ok {
|
||||
// The in memory queue has already been drained,
|
||||
// or persisted queue is being used.
|
||||
// In this case it is guaranteed that fq will be empty
|
||||
return
|
||||
}
|
||||
if len(block) == 0 {
|
||||
// skip empty data blocks from sending
|
||||
continue
|
||||
}
|
||||
|
||||
// at this stage c.stopCh should be closed
|
||||
// so sendBlock function should not perform retries
|
||||
if ok := c.sendBlock(block); !ok {
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
|
||||
var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second)
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
)
|
||||
|
||||
func TestParseRetryAfterHeader(t *testing.T) {
|
||||
@@ -37,40 +36,6 @@ func TestParseRetryAfterHeader(t *testing.T) {
|
||||
f(time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0)
|
||||
}
|
||||
|
||||
func TestInitSecretFlags(t *testing.T) {
|
||||
showRemoteWriteURLOrig := *showRemoteWriteURL
|
||||
defer func() {
|
||||
*showRemoteWriteURL = showRemoteWriteURLOrig
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
}()
|
||||
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
*showRemoteWriteURL = false
|
||||
InitSecretFlags()
|
||||
if !flagutil.IsSecretFlag("remotewrite.url") {
|
||||
t.Fatalf("expecting remoteWrite.url to be secret")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.headers") {
|
||||
t.Fatalf("expecting remoteWrite.headers to be secret")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
|
||||
t.Fatalf("expecting remoteWrite.proxyURL to be secret")
|
||||
}
|
||||
|
||||
flagutil.UnregisterAllSecretFlags()
|
||||
*showRemoteWriteURL = true
|
||||
InitSecretFlags()
|
||||
if flagutil.IsSecretFlag("remotewrite.url") {
|
||||
t.Fatalf("remoteWrite.url must remain visible when -remoteWrite.showURL is set")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.headers") {
|
||||
t.Fatalf("expecting remoteWrite.headers to remain secret")
|
||||
}
|
||||
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
|
||||
t.Fatalf("expecting remoteWrite.proxyURL to remain secret")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepackBlockFromZstdToSnappy(t *testing.T) {
|
||||
expectedPlainBlock := []byte(`foobar`)
|
||||
|
||||
|
||||
@@ -151,10 +151,6 @@ func InitSecretFlags() {
|
||||
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
|
||||
flagutil.RegisterSecretFlag("remoteWrite.url")
|
||||
}
|
||||
// remoteWrite.proxyURL can contain authentication codes.
|
||||
flagutil.RegisterSecretFlag("remoteWrite.proxyURL")
|
||||
// remoteWrite.headers can contain auth headers such as Authorization and API keys.
|
||||
flagutil.RegisterSecretFlag("remoteWrite.headers")
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -171,18 +167,6 @@ func Init() {
|
||||
if len(*remoteWriteURLs) == 0 {
|
||||
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
|
||||
}
|
||||
if *shardByURL && len(*disableOnDiskQueue) > 1 {
|
||||
disableOnDiskQueues := *disableOnDiskQueue
|
||||
|
||||
firstValue := disableOnDiskQueues[0]
|
||||
for _, v := range disableOnDiskQueues[1:] {
|
||||
if firstValue != v {
|
||||
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
|
||||
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if limit := getMaxHourlySeries(); limit > 0 {
|
||||
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
|
||||
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
|
||||
@@ -515,9 +499,7 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
|
||||
//
|
||||
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
|
||||
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
|
||||
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
|
||||
if !disableOnDiskQueueAny || *shardByURL {
|
||||
if !disableOnDiskQueueAny {
|
||||
return rwctxsGlobal, true
|
||||
}
|
||||
|
||||
@@ -532,6 +514,12 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
|
||||
return nil, false
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
if *shardByURL {
|
||||
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
|
||||
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
|
||||
// data to all rwctxs evenly.
|
||||
rowsCount = rowsCount / len(rwctxsGlobal)
|
||||
}
|
||||
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
|
||||
}
|
||||
eu, err := url.Parse(externalURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to parse external URL: %s", err)
|
||||
logger.Fatalf("failed to parse external URL: %w", err)
|
||||
}
|
||||
if err := templates.Load([]string{}, *eu); err != nil {
|
||||
logger.Fatalf("failed to load template: %v", err)
|
||||
|
||||
@@ -64,7 +64,6 @@ func InitSecretFlags() {
|
||||
if !*showDatasourceURL {
|
||||
flagutil.RegisterSecretFlag("datasource.url")
|
||||
}
|
||||
flagutil.RegisterSecretFlag("datasource.headers")
|
||||
}
|
||||
|
||||
// ShowDatasourceURL whether to show -datasource.url with sensitive information
|
||||
|
||||
@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
|
||||
}
|
||||
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
|
||||
for _, err := range errors {
|
||||
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
|
||||
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
|
||||
}
|
||||
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
|
||||
}
|
||||
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
|
||||
for addr, metadata := range targetMts {
|
||||
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
|
||||
if err != nil {
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
|
||||
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
|
||||
continue
|
||||
}
|
||||
updatedTargets = append(updatedTargets, Target{
|
||||
|
||||
@@ -194,7 +194,6 @@ func InitSecretFlags() {
|
||||
if !*showNotifierURL {
|
||||
flagutil.RegisterSecretFlag("notifier.url")
|
||||
}
|
||||
flagutil.RegisterSecretFlag("notifier.headers")
|
||||
}
|
||||
|
||||
func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {
|
||||
|
||||
@@ -59,7 +59,6 @@ func InitSecretFlags() {
|
||||
if !*showRemoteReadURL {
|
||||
flagutil.RegisterSecretFlag("remoteRead.url")
|
||||
}
|
||||
flagutil.RegisterSecretFlag("remoteRead.headers")
|
||||
}
|
||||
|
||||
// Init creates a Querier from provided flag values.
|
||||
|
||||
@@ -62,7 +62,6 @@ func InitSecretFlags() {
|
||||
if !*showRemoteWriteURL {
|
||||
flagutil.RegisterSecretFlag("remoteWrite.url")
|
||||
}
|
||||
flagutil.RegisterSecretFlag("remoteWrite.headers")
|
||||
}
|
||||
|
||||
// Init creates Client object from given flags.
|
||||
|
||||
@@ -69,8 +69,6 @@ func Init() {
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
|
||||
initVMUIConfig()
|
||||
initVMAlertProxy()
|
||||
|
||||
flagutil.RegisterSecretFlag("vmalert.proxyURL")
|
||||
}
|
||||
|
||||
// Stop stops vmselect
|
||||
|
||||
@@ -3109,6 +3109,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3406,6 +3406,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3110,6 +3110,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -3407,6 +3407,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2946,6 +2946,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2324,6 +2324,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2945,6 +2945,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -2323,6 +2323,7 @@
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"decimals": 0,
|
||||
"links": [],
|
||||
"mappings": [],
|
||||
"min": 0,
|
||||
|
||||
@@ -43,8 +43,8 @@ Just download VictoriaMetrics and follow [these instructions](https://docs.victo
|
||||
See [available integrations](https://docs.victoriametrics.com/victoriametrics/integrations/) with other systems like
|
||||
[Prometheus](https://docs.victoriametrics.com/victoriametrics/integrations/prometheus/) or [Grafana](https://docs.victoriametrics.com/victoriametrics/integrations/grafana/).
|
||||
|
||||
VictoriaMetrics is developed at a fast pace, so it is recommended to periodically check the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
|
||||
and perform [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
|
||||
VictoriaMetrics is developed at a fast pace, so it is recommended periodically checking the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
|
||||
and performing [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
|
||||
|
||||
### Starting VictoriaMetrics Single Node or Cluster on VictoriaMetrics Cloud {id="starting-vm-on-cloud"}
|
||||
|
||||
@@ -63,7 +63,7 @@ docker run -it --rm -v `pwd`/victoria-metrics-data:/victoria-metrics-data -p 842
|
||||
victoriametrics/victoria-metrics:v1.143.0 --selfScrapeInterval=5s -storageDataPath=victoria-metrics-data
|
||||
```
|
||||
|
||||
_For Enterprise images, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
|
||||
_For Enterprise images see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
|
||||
|
||||
You should see:
|
||||
|
||||
@@ -113,7 +113,7 @@ See more details about [cluster architecture](https://docs.victoriametrics.com/v
|
||||
### Starting VictoriaMetrics Single Node from a Binary {id="starting-vm-single-from-a-binary"}
|
||||
|
||||
1. Download the correct binary for your OS and architecture from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
|
||||
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
|
||||
|
||||
2. Extract the archive to /usr/local/bin by running:
|
||||
|
||||
@@ -164,7 +164,7 @@ END'
|
||||
|
||||
Extra [command-line flags](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#list-of-command-line-flags) can be added to `ExecStart` line.
|
||||
|
||||
If you want to deploy VictoriaMetrics Single Node as a Windows Service, review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
|
||||
If you want to deploy VictoriaMetrics Single Node as a Windows Service review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
|
||||
|
||||
> Please note, `victoriametrics` service is listening on `:8428` for HTTP connections (see `-httpListenAddr` flag).
|
||||
|
||||
@@ -174,7 +174,7 @@ If you want to deploy VictoriaMetrics Single Node as a Windows Service, review t
|
||||
sudo systemctl daemon-reload && sudo systemctl enable --now victoriametrics.service
|
||||
```
|
||||
|
||||
7. Check that the service started successfully:
|
||||
7. Check that service started successfully:
|
||||
|
||||
```sh
|
||||
sudo systemctl status victoriametrics.service
|
||||
@@ -187,12 +187,12 @@ by going to `http://<ip_or_hostname>:8428/vmui`.
|
||||
|
||||
VictoriaMetrics cluster consists of [3 components](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#architecture-overview).
|
||||
It is recommended to run these components in the same private network (for [security reasons](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#security)),
|
||||
but on separate physical nodes for the best performance.
|
||||
but on the separate physical nodes for the best performance.
|
||||
|
||||
On all nodes, you will need to do the following:
|
||||
On all nodes you will need to do the following:
|
||||
|
||||
1. Download the correct binary for your OS and architecture with `-cluster` suffix from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
|
||||
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
|
||||
|
||||
2. Extract the archive to /usr/local/bin by running:
|
||||
|
||||
@@ -254,7 +254,7 @@ for vmstorage can be added to `ExecStart` line.
|
||||
sudo systemctl daemon-reload && sudo systemctl enable --now vmstorage
|
||||
```
|
||||
|
||||
4. Check that the service started successfully:
|
||||
4. Check that service started successfully:
|
||||
|
||||
```sh
|
||||
sudo systemctl status vmstorage
|
||||
@@ -301,14 +301,14 @@ in one flag. See more details in `-storageNode` flag description in [vminsert fl
|
||||
sudo systemctl daemon-reload && sudo systemctl enable --now vminsert.service
|
||||
```
|
||||
|
||||
3. Check that the service started successfully:
|
||||
3. Check that service started successfully:
|
||||
|
||||
```sh
|
||||
sudo systemctl status vminsert.service
|
||||
```
|
||||
|
||||
4. After `vminsert` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8480/-/healthy` link.
|
||||
It should say "VictoriaMetrics is Healthy."
|
||||
It should say "VictoriaMetrics is Healthy"
|
||||
|
||||
#### Installing vmselect
|
||||
|
||||
@@ -344,7 +344,7 @@ END'
|
||||
```
|
||||
|
||||
Replace `<list of vmstorages>` with addresses of previously configured `vmstorage` services.
|
||||
To specify multiple addresses, you can repeat the flag multiple times or separate addresses with commas
|
||||
To specify multiple addresses you can repeat the flag multiple times, or separate addresses with commas
|
||||
in one flag. See more details in `-storageNode` flag description [vminsert flags](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#list-of-command-line-flags-for-vminsert).
|
||||
|
||||
> Please note, `vmselect` service is listening on `:8481` for HTTP connections (see `-httpListenAddr` flag).
|
||||
@@ -362,12 +362,12 @@ sudo systemctl status vmselect.service
|
||||
```
|
||||
|
||||
5. After `vmselect` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8481/select/0/vmui` link.
|
||||
It should open the [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
|
||||
It should open [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
|
||||
|
||||
## Write data
|
||||
|
||||
There are two main models in monitoring for data collection: [push](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#push-model)
|
||||
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring, and both are
|
||||
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring and both are
|
||||
supported by VictoriaMetrics.
|
||||
|
||||
See more details on [key concepts of writing data here](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#write-data).
|
||||
@@ -389,7 +389,7 @@ and [other integrations](https://docs.victoriametrics.com/victoriametrics/integr
|
||||
## Alerting
|
||||
|
||||
To run periodic conditions checks use [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/).
|
||||
It allows creating a set of conditions using MetricsQL expressions and sending notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
|
||||
It allows creating set of conditions using MetricsQL expressions and send notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
|
||||
when such conditions are met.
|
||||
|
||||
See [vmalert quick start](https://docs.victoriametrics.com/victoriametrics/vmalert/#quickstart).
|
||||
@@ -413,7 +413,7 @@ command line tool. It supports the following databases for migration to Victoria
|
||||
|
||||
## Productionization
|
||||
|
||||
When moving to production with VictoriaMetrics, we recommend following these best practices.
|
||||
When going to production with VictoriaMetrics we recommend following the recommendations below.
|
||||
|
||||
### Monitoring
|
||||
|
||||
@@ -429,7 +429,7 @@ Using the [recommended alerting rules](https://github.com/VictoriaMetrics/Victor
|
||||
will help to identify unwanted issues.
|
||||
|
||||
The rule of thumb is to have a separate installation of VictoriaMetrics or any other monitoring system to monitor the
|
||||
production installation of VictoriaMetrics. This would make monitoring independent and help identify problems with
|
||||
production installation of VictoriaMetrics. This would make monitoring independent and will help identify problems with
|
||||
the main monitoring installation.
|
||||
|
||||
See more details in the article [VictoriaMetrics Monitoring](https://victoriametrics.com/blog/victoriametrics-monitoring/).
|
||||
@@ -457,7 +457,7 @@ For backup configuration, please refer to [vmbackup documentation](https://docs.
|
||||
|
||||
### Configuring limits
|
||||
|
||||
To avoid excessive resource usage or performance degradation, limits must be in place:
|
||||
To avoid excessive resource usage or performance degradation limits must be in place:
|
||||
|
||||
* [Resource usage limits](https://docs.victoriametrics.com/victoriametrics/faq/#how-to-set-a-memory-limit-for-victoriametrics-components);
|
||||
* [Cardinality limiter](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cardinality-limiter).
|
||||
|
||||
@@ -30,19 +30,11 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `basicAuth.usernameFile` command-line flags for reading basic auth username from a file, similar to the existing `basicAuth.passwordFile`. The file is re-read every second. See [#9436](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9436). Thanks to @kimjune01 for the contribution.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve the [Top Queries](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#top-queries) table UI. Duration columns now display human-readable values (e.g. `1.23s`) instead of raw seconds, memory column shows human-readable sizes (e.g. `1.23 MB`), instant queries are labeled as `instant` instead of empty string, and column headers now show tooltips with descriptions. See [#10790](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10790).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996)
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. See [#10918](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). Thanks to @alexei38 for the contribution.
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive credentials.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): hide values passed to `-remoteWrite.headers`,`remoteRead.headers`, `datasource.headers` and `notifier.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10972](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10972).
|
||||
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): hide values passed to `vmalert.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
|
||||
|
||||
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)
|
||||
|
||||
|
||||
@@ -353,18 +353,6 @@ Example Docker image:
|
||||
|
||||
`victoriametrics/victoria-metrics:v1.143.0-enterprise-fips` – uses the FIPS-compatible binary and based on `scratch` image.
|
||||
|
||||
## What Happens to Licensed Components When a License Expires
|
||||
|
||||
When a license expires, all licensed components continue to function normally until a restart occurs.
|
||||
|
||||
License checks happen only at startup. If a license expires while the component is running, nothing changes; the component continues to run until the next restart.
|
||||
|
||||
This means you don't need to restart components to install a new license. The component automatically picks up the new license the next time it restarts. The exception is when the `-license` flag is used, because the license is supplied at startup and changing it requires restarting VictoriaMetrics with the updated flag value.
|
||||
|
||||
If your license has expired and you decide to not renew it, you can switch to the VictoriaMetrics Open Source version without data loss, as both versions share the same data model. In doing so, however, you will lose access to the [VictoriaMetrics Enterprise features](https://docs.victoriametrics.com/victoriametrics/enterprise/#victoriametrics-enterprise-features).
|
||||
|
||||
See [updating the license key](https://docs.victoriametrics.com/victoriametrics/enterprise/#updating-the-license-key) for more details.
|
||||
|
||||
## Monitoring license expiration
|
||||
|
||||
All the Enterprise components expose the following metrics at the `/metrics` page:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
|
||||
srcDir := tb.path
|
||||
srcDir, err = filepath.Abs(srcDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
|
||||
}
|
||||
dstDir, err = filepath.Abs(dstDir)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
|
||||
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
|
||||
}
|
||||
prefix := srcDir + string(filepath.Separator)
|
||||
if strings.HasPrefix(dstDir, prefix) {
|
||||
|
||||
@@ -228,9 +228,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// MustReadBlock reads the next block from fq into dst and returns it.
|
||||
// It first reads from the in-memory queue, then checks file-based queue.
|
||||
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
|
||||
// MustReadBlock reads the next block from fq to dst and returns it.
|
||||
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
@@ -240,7 +238,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
|
||||
}
|
||||
bb := <-fq.ch
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
dst = append(dst, bb.B...)
|
||||
blockBufPool.Put(bb)
|
||||
return dst, true
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
data, ok := fq.pq.MustReadBlockNonblocking(dst)
|
||||
@@ -259,35 +265,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
|
||||
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
|
||||
// It does not block waiting for new blocks.
|
||||
func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
|
||||
if len(fq.ch) == 0 {
|
||||
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
|
||||
}
|
||||
bb := <-fq.ch
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
dst = append(dst, bb.B...)
|
||||
blockBufPool.Put(bb)
|
||||
return dst
|
||||
}
|
||||
|
||||
// Dirname returns the directory name for persistent queue.
|
||||
func (fq *FastQueue) Dirname() string {
|
||||
return filepath.Base(fq.pq.dir)
|
||||
|
||||
@@ -3,32 +3,173 @@ package promutil
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings
|
||||
const lcShardCount = 128
|
||||
|
||||
// lcShard is one shard of the label-to-index map.
|
||||
// Padded to one cache line to prevent false sharing between adjacent shards.
|
||||
type lcShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]uint32
|
||||
_ [32]byte
|
||||
}
|
||||
|
||||
// LabelsCompressor compresses []prompb.Label into short binary strings.
|
||||
// Zero value is ready to use. Each Register call must be paired with Unregister.
|
||||
type LabelsCompressor struct {
|
||||
labelToIdx sync.Map
|
||||
idxToLabel labelsMap
|
||||
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
|
||||
// to reduce RWMutex contention in the concurrent compressFast hot path.
|
||||
labelToIdxShards [lcShardCount]lcShard
|
||||
|
||||
nextIdx atomic.Uint64
|
||||
// generation is incremented after each rotate() call that removes labels.
|
||||
// Callers can use it to detect when cached compressed keys become stale.
|
||||
generation atomic.Uint64
|
||||
|
||||
totalSizeBytes atomic.Uint64
|
||||
idxToLabel atomic.Pointer[[]prompb.Label]
|
||||
|
||||
// usedBitset tracks which indices were used in the current rotation period.
|
||||
// Bits are set atomically from compressFast without mu; grown under mu.
|
||||
usedBitset atomic.Pointer[[]uint64]
|
||||
|
||||
totalSizeBytes uint64
|
||||
mu sync.Mutex
|
||||
|
||||
// freeIdxs holds index slots available for reuse.
|
||||
freeIdxs []uint32
|
||||
|
||||
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
|
||||
pendingIdxs map[uint32]struct{}
|
||||
|
||||
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
|
||||
// Requires absence from both prevBitset and usedBitset to evict a label,
|
||||
// guarding against partial snapshots when rotate fires mid-compress loop.
|
||||
// Only accessed in rotate, which runs in a single goroutine.
|
||||
prevBitset []uint64
|
||||
|
||||
// registry holds staleness intervals of active callers; rotation period = max(registry).
|
||||
registry []time.Duration
|
||||
|
||||
// tickerCh signals runRotate to re-evaluate the rotation period.
|
||||
tickerCh chan struct{}
|
||||
}
|
||||
|
||||
// idleRotationPeriod is the rotation period used when no callers are registered.
|
||||
// It must be long enough that a sleeping goroutine does not consume measurable resources.
|
||||
const idleRotationPeriod = time.Hour
|
||||
|
||||
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
|
||||
// Rotation period equals max(registry). Must be paired with Unregister.
|
||||
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
if lc.tickerCh == nil {
|
||||
lc.tickerCh = make(chan struct{}, 1)
|
||||
go lc.runRotate()
|
||||
}
|
||||
lc.registry = append(lc.registry, stalenessInterval)
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Unregister removes stalenessInterval from the registry.
|
||||
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
|
||||
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
|
||||
lc.mu.Lock()
|
||||
for i, d := range lc.registry {
|
||||
if d == stalenessInterval {
|
||||
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
tickerCh := lc.tickerCh
|
||||
lc.mu.Unlock()
|
||||
if tickerCh != nil {
|
||||
select {
|
||||
case tickerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
|
||||
// must be called with lc.mu held
|
||||
var max time.Duration
|
||||
for _, d := range lc.registry {
|
||||
if d > max {
|
||||
max = d
|
||||
}
|
||||
}
|
||||
if max == 0 {
|
||||
return idleRotationPeriod
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) runRotate() {
|
||||
lc.mu.Lock()
|
||||
period := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
|
||||
timer := time.NewTimer(period)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
lc.rotate()
|
||||
lc.mu.Lock()
|
||||
period = lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
timer.Reset(period)
|
||||
case <-lc.tickerCh:
|
||||
lc.mu.Lock()
|
||||
newPeriod := lc.maxRegisteredStaleness()
|
||||
lc.mu.Unlock()
|
||||
if newPeriod != period {
|
||||
period = newPeriod
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(period)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SizeBytes returns the size of lc data in bytes
|
||||
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
||||
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
||||
lc.mu.Lock()
|
||||
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// ItemsCount returns the number of items in lc
|
||||
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
||||
return lc.nextIdx.Load()
|
||||
lc.mu.Lock()
|
||||
p := lc.idxToLabel.Load()
|
||||
var n uint64
|
||||
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
|
||||
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
|
||||
}
|
||||
lc.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
@@ -42,46 +183,139 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
||||
|
||||
a := encoding.GetUint64s(len(labels) + 1)
|
||||
a.A[0] = uint64(len(labels))
|
||||
lc.compress(a.A[1:], labels)
|
||||
|
||||
if lc.compressFast(a.A[1:], labels) {
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
lc.compressSlow(a.A[1:], labels)
|
||||
lc.mu.Unlock()
|
||||
|
||||
dst = encoding.MarshalVarUint64s(dst, a.A)
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
||||
if len(labels) == 0 {
|
||||
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
var keyBuf [256]byte
|
||||
for i, label := range labels {
|
||||
// Build composite key name+'\x00'+value into the stack buffer.
|
||||
totalLen := len(label.Name) + 1 + len(label.Value)
|
||||
var key string
|
||||
if totalLen <= len(keyBuf) {
|
||||
n := copy(keyBuf[:], label.Name)
|
||||
keyBuf[n] = '\x00'
|
||||
copy(keyBuf[n+1:], label.Value)
|
||||
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
|
||||
} else {
|
||||
key = makeLabelKey(label)
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
}
|
||||
}
|
||||
}
|
||||
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
|
||||
if lc.usedBitset.Load() != p {
|
||||
for _, idx64 := range dst {
|
||||
lc.markUsed(uint32(idx64))
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
|
||||
for i, label := range labels {
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
|
||||
shard.mu.RLock()
|
||||
idx, ok := shard.m[key]
|
||||
shard.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
cloned := cloneLabel(label)
|
||||
idx = lc.newIndex(cloned)
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[string]uint32)
|
||||
}
|
||||
shard.m[key] = idx
|
||||
shard.mu.Unlock()
|
||||
} else {
|
||||
lc.markUsed(idx)
|
||||
}
|
||||
dst[i] = uint64(idx)
|
||||
}
|
||||
}
|
||||
|
||||
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
|
||||
func (lc *LabelsCompressor) markUsed(idx uint32) {
|
||||
p := lc.usedBitset.Load()
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
_ = dst[len(labels)-1]
|
||||
for i, label := range labels {
|
||||
v, ok := lc.labelToIdx.Load(label)
|
||||
if !ok {
|
||||
idx := lc.nextIdx.Add(1)
|
||||
v = idx
|
||||
labelCopy := cloneLabel(label)
|
||||
|
||||
// Must store idxToLabel entry before labelToIdx,
|
||||
// so it can be found by possible concurrent goroutines.
|
||||
//
|
||||
// We might store duplicated entries for single label with different indexes,
|
||||
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
|
||||
lc.idxToLabel.Store(idx, labelCopy)
|
||||
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
|
||||
if loaded {
|
||||
// This label has been stored by a concurrent goroutine with different index,
|
||||
// use it for key consistency in aggrState.
|
||||
v = vNew
|
||||
}
|
||||
|
||||
// Update lc.totalSizeBytes
|
||||
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
||||
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
||||
lc.totalSizeBytes.Add(entrySizeBytes)
|
||||
bits := *p
|
||||
word := idx >> 6
|
||||
if int(word) < len(bits) {
|
||||
mask := uint64(1) << (idx & 63)
|
||||
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
||||
atomic.OrUint64(&bits[word], mask)
|
||||
}
|
||||
dst[i] = v.(uint64)
|
||||
}
|
||||
}
|
||||
|
||||
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) growBitset(idx uint32) {
|
||||
needed := int(idx>>6) + 1
|
||||
p := lc.usedBitset.Load()
|
||||
var bits []uint64
|
||||
if p != nil {
|
||||
bits = *p
|
||||
}
|
||||
if needed <= len(bits) {
|
||||
return
|
||||
}
|
||||
newBits := make([]uint64, needed)
|
||||
copy(newBits, bits)
|
||||
lc.usedBitset.Store(&newBits)
|
||||
}
|
||||
|
||||
func makeLabelKey(label prompb.Label) string {
|
||||
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
|
||||
buf = append(buf, label.Name...)
|
||||
buf = append(buf, '\x00')
|
||||
buf = append(buf, label.Value...)
|
||||
return string(buf)
|
||||
}
|
||||
|
||||
// Generation returns the current generation count.
|
||||
// It increments whenever labels are evicted, invalidating any cached compressed keys.
|
||||
func (lc *LabelsCompressor) Generation() uint64 {
|
||||
return lc.generation.Load()
|
||||
}
|
||||
|
||||
func cloneLabel(label prompb.Label) prompb.Label {
|
||||
// pre-allocate memory for label name and value
|
||||
n := len(label.Name) + len(label.Value)
|
||||
@@ -98,6 +332,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
||||
}
|
||||
}
|
||||
|
||||
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
|
||||
// Must be called with lc.mu held.
|
||||
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
|
||||
var idx uint32
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
|
||||
if len(lc.freeIdxs) > 0 {
|
||||
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
|
||||
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
next[idx] = cloned
|
||||
lc.idxToLabel.Store(&next)
|
||||
} else {
|
||||
idx = uint32(len(newIdxToLabel))
|
||||
next := append(newIdxToLabel, cloned)
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
|
||||
lc.growBitset(idx)
|
||||
lc.markUsed(idx)
|
||||
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
|
||||
return idx
|
||||
}
|
||||
|
||||
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
@@ -124,111 +387,139 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
||||
if len(tail) > 0 {
|
||||
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
||||
}
|
||||
dst = lc.decompress(dst, a.A)
|
||||
|
||||
p := lc.idxToLabel.Load()
|
||||
if p == nil {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: idxToLabel is nil in Decompress")
|
||||
return dst
|
||||
}
|
||||
labels := *p
|
||||
for _, idx := range a.A {
|
||||
if int(idx) >= len(labels) {
|
||||
encoding.PutUint64s(a)
|
||||
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
|
||||
}
|
||||
dst = append(dst, labels[idx])
|
||||
}
|
||||
|
||||
encoding.PutUint64s(a)
|
||||
return dst
|
||||
}
|
||||
|
||||
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
||||
for _, idx := range src {
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: missing label for idx=%d", idx)
|
||||
}
|
||||
dst = append(dst, label)
|
||||
// rotate evicts unused labels and recycles their slots.
|
||||
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
|
||||
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
|
||||
// giving in-flight keys a full rotation period to drain via Decompress.
|
||||
// Must not be called concurrently with itself.
|
||||
func (lc *LabelsCompressor) rotate() {
|
||||
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
|
||||
// Also snapshot idxToLabel for the phase-1 scan.
|
||||
lc.mu.Lock()
|
||||
var currentBits []uint64
|
||||
if p := lc.usedBitset.Load(); p != nil {
|
||||
currentBits = make([]uint64, len(*p))
|
||||
copy(currentBits, *p)
|
||||
newBits := make([]uint64, len(*p))
|
||||
lc.usedBitset.Store(&newBits)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// labelsMap maps uint64 key to prompb.Label
|
||||
//
|
||||
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
||||
type labelsMap struct {
|
||||
readOnly atomic.Pointer[[]*prompb.Label]
|
||||
|
||||
mutableLock sync.Mutex
|
||||
mutable map[uint64]*prompb.Label
|
||||
misses uint64
|
||||
}
|
||||
|
||||
// Store stores label under the given idx.
|
||||
//
|
||||
// It is safe calling Store from concurrent goroutines.
|
||||
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
|
||||
lm.mutableLock.Lock()
|
||||
if lm.mutable == nil {
|
||||
lm.mutable = make(map[uint64]*prompb.Label)
|
||||
}
|
||||
lm.mutable[idx] = &label
|
||||
lm.mutableLock.Unlock()
|
||||
}
|
||||
|
||||
// Load returns the label for the given idx.
|
||||
//
|
||||
// Load returns false if lm doesn't contain label for the given idx.
|
||||
//
|
||||
// It is safe calling Load from concurrent goroutines.
|
||||
//
|
||||
// The performance of Load() scales linearly with CPU cores.
|
||||
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
|
||||
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
// Fast path - the label for the given idx has been found in lm.readOnly.
|
||||
return *pLabel, true
|
||||
// Skip if nothing was used; all-zero snapshots from rapid rotation would
|
||||
// advance pendingIdxs and reclaim slots before in-flight keys drain.
|
||||
anyUsed := false
|
||||
for _, w := range currentBits {
|
||||
if w != 0 {
|
||||
anyUsed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - search in lm.mutable.
|
||||
return lm.loadSlow(idx)
|
||||
}
|
||||
|
||||
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
|
||||
lm.mutableLock.Lock()
|
||||
|
||||
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
||||
pReadOnly := lm.readOnly.Load()
|
||||
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
lm.mutableLock.Unlock()
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
|
||||
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
||||
lm.misses++
|
||||
pLabel := lm.mutable[idx]
|
||||
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
||||
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
||||
lm.misses = 0
|
||||
}
|
||||
lm.mutableLock.Unlock()
|
||||
|
||||
if pLabel == nil {
|
||||
return prompb.Label{}, false
|
||||
}
|
||||
return *pLabel, true
|
||||
}
|
||||
|
||||
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
||||
if len(lm.mutable) == 0 {
|
||||
// Nothing to move
|
||||
if !anyUsed {
|
||||
lc.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var labels []*prompb.Label
|
||||
if pReadOnly != nil {
|
||||
labels = append(labels, *pReadOnly...)
|
||||
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
|
||||
prevSnap := lc.prevBitset
|
||||
lc.prevBitset = currentBits
|
||||
pendingIdxs := lc.pendingIdxs
|
||||
idxSnap := lc.idxToLabel.Load()
|
||||
lc.mu.Unlock()
|
||||
|
||||
// A label is used if present in either the current or previous period snapshot.
|
||||
isUsed := func(idx uint32) bool {
|
||||
word := idx >> 6
|
||||
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
|
||||
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
|
||||
return inCurrent || inPrev
|
||||
}
|
||||
for idx, pLabel := range lm.mutable {
|
||||
if idx < uint64(len(labels)) {
|
||||
labels[idx] = pLabel
|
||||
} else {
|
||||
for idx > uint64(len(labels)) {
|
||||
labels = append(labels, nil)
|
||||
|
||||
// Phase 2: reclaim slots evicted last rotation if still unused.
|
||||
var toReclaim []uint32
|
||||
var nextRelease map[uint32]struct{}
|
||||
for idx := range pendingIdxs {
|
||||
if isUsed(idx) {
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
labels = append(labels, pLabel)
|
||||
nextRelease[idx] = struct{}{}
|
||||
} else {
|
||||
toReclaim = append(toReclaim, idx)
|
||||
}
|
||||
}
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
|
||||
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
|
||||
// cache-line contention with concurrent compressFast lookups.
|
||||
phase1Evicted := false
|
||||
if idxSnap != nil {
|
||||
for i, label := range *idxSnap {
|
||||
if label == (prompb.Label{}) {
|
||||
continue // freed slot
|
||||
}
|
||||
idx := uint32(i)
|
||||
if _, inPending := pendingIdxs[idx]; inPending {
|
||||
continue // already queued for release
|
||||
}
|
||||
if isUsed(idx) {
|
||||
continue
|
||||
}
|
||||
key := makeLabelKey(label)
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
shard := &lc.labelToIdxShards[h%lcShardCount]
|
||||
shard.mu.Lock()
|
||||
delete(shard.m, key)
|
||||
shard.mu.Unlock()
|
||||
phase1Evicted = true
|
||||
if nextRelease == nil {
|
||||
nextRelease = make(map[uint32]struct{})
|
||||
}
|
||||
nextRelease[idx] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if phase1Evicted {
|
||||
lc.generation.Add(1)
|
||||
}
|
||||
|
||||
lc.mu.Lock()
|
||||
if len(toReclaim) > 0 {
|
||||
p := lc.idxToLabel.Load()
|
||||
var newIdxToLabel []prompb.Label
|
||||
if p != nil {
|
||||
newIdxToLabel = *p
|
||||
}
|
||||
next := make([]prompb.Label, len(newIdxToLabel))
|
||||
copy(next, newIdxToLabel)
|
||||
for _, idx := range toReclaim {
|
||||
label := next[idx]
|
||||
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
|
||||
if lc.totalSizeBytes >= entrySize {
|
||||
lc.totalSizeBytes -= entrySize
|
||||
}
|
||||
next[idx] = prompb.Label{}
|
||||
lc.freeIdxs = append(lc.freeIdxs, idx)
|
||||
}
|
||||
lc.idxToLabel.Store(&next)
|
||||
}
|
||||
lc.pendingIdxs = nextRelease
|
||||
lc.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -8,47 +8,96 @@ import (
|
||||
)
|
||||
|
||||
func BenchmarkLabelsCompressorCompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
Sink.Add(uint64(len(dst)))
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
for _, labels := range series {
|
||||
lc.Compress(nil, labels)
|
||||
}
|
||||
})
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var dst []byte
|
||||
for pb.Next() {
|
||||
dst = dst[:0]
|
||||
for _, labels := range series {
|
||||
dst = lc.Compress(dst, labels)
|
||||
}
|
||||
Sink.Add(uint64(len(dst)))
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
}
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
}
|
||||
|
||||
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
|
||||
var lc LabelsCompressor
|
||||
series := newTestSeries(100, 10)
|
||||
datas := make([][]byte, len(series))
|
||||
var dst []byte
|
||||
for i, labels := range series {
|
||||
dstLen := len(dst)
|
||||
dst = lc.Compress(dst, labels)
|
||||
datas[i] = dst[dstLen:]
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
run := func(b *testing.B, withRotate bool) {
|
||||
var lc LabelsCompressor
|
||||
datas := make([][]byte, len(series))
|
||||
var buf []byte
|
||||
for i, labels := range series {
|
||||
bufLen := len(buf)
|
||||
buf = lc.Compress(buf, labels)
|
||||
datas[i] = buf[bufLen:]
|
||||
}
|
||||
})
|
||||
var rotations atomic.Int64
|
||||
if withRotate {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
lc.rotate()
|
||||
rotations.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
}
|
||||
rotations.Store(0)
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var labels []prompb.Label
|
||||
for pb.Next() {
|
||||
for _, data := range datas {
|
||||
labels = lc.Decompress(labels[:0], data)
|
||||
}
|
||||
Sink.Add(uint64(len(labels)))
|
||||
}
|
||||
})
|
||||
if withRotate {
|
||||
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
|
||||
}
|
||||
}
|
||||
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
|
||||
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
|
||||
}
|
||||
|
||||
var Sink atomic.Uint64
|
||||
|
||||
@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
|
||||
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
|
||||
ok, err := tf.match(b)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
|
||||
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
|
||||
}
|
||||
if !ok {
|
||||
delete(lvs, lv)
|
||||
|
||||
@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
|
||||
ts.Seek(prefixBuf.B)
|
||||
if !ts.NextItem() {
|
||||
if err := ts.Error(); err != nil {
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
|
||||
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
|
||||
}
|
||||
defer func() {
|
||||
if err := zr.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
|
||||
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
|
||||
|
||||
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
|
||||
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
|
||||
}
|
||||
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
)
|
||||
|
||||
func BenchmarkDedupAggr(b *testing.B) {
|
||||
@@ -63,14 +64,15 @@ func newBenchSamples(count int) []pushSample {
|
||||
}
|
||||
labelsLen := len(labels)
|
||||
samples := make([]pushSample, count)
|
||||
var lc promutil.LabelsCompressor
|
||||
var keyBuf []byte
|
||||
for i := range samples {
|
||||
sample := &samples[i]
|
||||
labels = append(labels[:labelsLen], prompb.Label{
|
||||
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
|
||||
Name: "app",
|
||||
Value: fmt.Sprintf("instance-%d", i),
|
||||
})
|
||||
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
|
||||
keyBuf = lc.Compress(keyBuf[:0], labels)
|
||||
sample.key = string(keyBuf)
|
||||
sample.value = float64(i)
|
||||
}
|
||||
|
||||
@@ -77,6 +77,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
|
||||
lc.Register(2 * interval)
|
||||
|
||||
d.wg.Go(func() {
|
||||
d.runFlusher(pushFunc)
|
||||
})
|
||||
@@ -86,6 +88,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
|
||||
// MustStop stops d.
|
||||
func (d *Deduplicator) MustStop() {
|
||||
lc.Unregister(2 * d.interval)
|
||||
|
||||
metrics.UnregisterSet(d.ms, true)
|
||||
d.ms = nil
|
||||
|
||||
@@ -203,7 +207,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
|
||||
dstSamples := ctx.samples
|
||||
for _, ps := range samples {
|
||||
labelsLen := len(labels)
|
||||
labels = decompressLabels(labels, ps.key)
|
||||
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
|
||||
|
||||
dstSamplesLen := len(dstSamples)
|
||||
dstSamples = append(dstSamples, prompb.Sample{
|
||||
|
||||
@@ -17,29 +17,27 @@ type aggrOutputs struct {
|
||||
outputSamples *metrics.Counter
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
|
||||
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
|
||||
src := bytesutil.ToUnsafeBytes(key)
|
||||
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||
if nSize <= 0 {
|
||||
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
|
||||
}
|
||||
src = src[nSize:]
|
||||
outputKey := src[:outputKeyLen]
|
||||
if !ao.useInputKey {
|
||||
return key, bytesutil.ToUnsafeString(outputKey)
|
||||
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
|
||||
if !ao.useInputKey || int(outputKeyLen) == len(src) {
|
||||
return outputKey, outputKey
|
||||
}
|
||||
inputKey := src[outputKeyLen:]
|
||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
|
||||
return outputKey, inputKey
|
||||
}
|
||||
|
||||
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
|
||||
var inputKey, outputKey string
|
||||
var sample *pushSample
|
||||
var outputs []aggrValue
|
||||
var nv *aggrValues
|
||||
for i := range samples {
|
||||
sample = &samples[i]
|
||||
inputKey, outputKey = ao.getInputOutputKey(sample.key)
|
||||
sample := &samples[i]
|
||||
outputKey, inputKey := ao.getInputOutputKey(sample.key)
|
||||
|
||||
again:
|
||||
v, ok := ao.m.Load(outputKey)
|
||||
@@ -81,7 +79,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
|
||||
}
|
||||
av.mu.Unlock()
|
||||
if deleted {
|
||||
// The entry has been deleted by the concurrent call to flush
|
||||
// The entry has been deleted by the concurrent call to flush.
|
||||
// Try obtaining and updating the entry again.
|
||||
goto again
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -24,6 +23,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
@@ -257,6 +257,10 @@ type Config struct {
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
|
||||
// workCh limits the number of concurrent aggregator.Push calls.
|
||||
// Pre-allocated once to avoid per-Push channel allocation.
|
||||
workCh chan struct{}
|
||||
|
||||
// configData contains marshaled configs.
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
@@ -287,6 +291,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
}
|
||||
|
||||
ms := metrics.NewSet()
|
||||
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
for i, cfg := range cfgs {
|
||||
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
|
||||
@@ -307,6 +312,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
|
||||
configData: configData,
|
||||
filePath: filePath,
|
||||
ms: ms,
|
||||
@@ -364,19 +370,21 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
// use all available CPU cores to copy time-series into aggregators
|
||||
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
var wg sync.WaitGroup
|
||||
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
for _, aggr := range a.as {
|
||||
concurrencyChan <- struct{}{}
|
||||
wg.Go(func() {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
<-concurrencyChan
|
||||
})
|
||||
if len(a.as) == 1 {
|
||||
a.as[0].Push(tss, matchIdxs)
|
||||
return matchIdxs
|
||||
}
|
||||
|
||||
// Use all available CPU cores to push time-series into aggregators in parallel.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
|
||||
var wg sync.WaitGroup
|
||||
for _, aggr := range a.as {
|
||||
a.workCh <- struct{}{}
|
||||
wg.Go(func() {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
<-a.workCh
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return matchIdxs
|
||||
@@ -413,8 +421,8 @@ type aggregator struct {
|
||||
ignoreOldSamples bool
|
||||
enableWindows bool
|
||||
|
||||
by []string
|
||||
without []string
|
||||
bySet map[string]struct{}
|
||||
withoutSet map[string]struct{}
|
||||
aggregateOnlyByTime bool
|
||||
|
||||
// interval is the interval between flushes
|
||||
@@ -446,6 +454,8 @@ type aggregator struct {
|
||||
// for `interval: 1m`, `by: [job]`
|
||||
suffix string
|
||||
|
||||
seriesKeyCache seriesKeyCache
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
|
||||
@@ -645,8 +655,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
enableWindows: enableWindows,
|
||||
stalenessInterval: stalenessInterval,
|
||||
|
||||
by: by,
|
||||
without: without,
|
||||
bySet: makeStringSet(by),
|
||||
withoutSet: makeStringSet(without),
|
||||
aggregateOnlyByTime: aggregateOnlyByTime,
|
||||
|
||||
interval: interval,
|
||||
@@ -709,6 +719,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
}
|
||||
a.cs.Store(cs)
|
||||
|
||||
lc.Register(a.stalenessInterval)
|
||||
|
||||
a.wg.Go(func() {
|
||||
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
|
||||
})
|
||||
@@ -915,6 +927,7 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
|
||||
//
|
||||
// If pushFunc is nil, then the aggregator state is just reset.
|
||||
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
|
||||
a.seriesKeyCache.reset()
|
||||
startTime := time.Now()
|
||||
ao := a.aggrOutputs
|
||||
|
||||
@@ -943,6 +956,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
//
|
||||
// The aggregator stops pushing the aggregated metrics after this call.
|
||||
func (a *aggregator) MustStop() {
|
||||
lc.Unregister(a.stalenessInterval)
|
||||
close(a.stopCh)
|
||||
a.wg.Wait()
|
||||
}
|
||||
@@ -952,7 +966,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
buf := ctx.buf
|
||||
labels := &ctx.labels
|
||||
inputLabels := &ctx.inputLabels
|
||||
outputLabels := &ctx.outputLabels
|
||||
@@ -986,19 +999,22 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
labels.Sort()
|
||||
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
h := computeLabelsFingerprint(labels.Labels)
|
||||
gen := lc.Generation()
|
||||
key, ok := a.seriesKeyCache.get(h, gen)
|
||||
if !ok {
|
||||
inputLabels.Reset()
|
||||
outputLabels.Reset()
|
||||
if !a.aggregateOnlyByTime {
|
||||
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
|
||||
} else {
|
||||
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
|
||||
}
|
||||
bufLen := len(ctx.buf)
|
||||
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
|
||||
key = string(ctx.buf[bufLen:])
|
||||
a.seriesKeyCache.set(h, gen, key)
|
||||
}
|
||||
|
||||
bufLen := len(buf)
|
||||
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
|
||||
// key remains valid only by the end of this function and can't be reused after
|
||||
// do not intern key because number of unique keys could be too high
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, s := range ts.Samples {
|
||||
if math.IsNaN(s.Value) {
|
||||
// Skip NaN values
|
||||
@@ -1042,8 +1058,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
|
||||
|
||||
ctx.buf = buf
|
||||
|
||||
pushSamples := a.aggrOutputs.pushSamples
|
||||
if a.da != nil {
|
||||
pushSamples = a.da.pushSamples
|
||||
@@ -1060,18 +1074,13 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
|
||||
bb := bbPool.Get()
|
||||
bb.B = lc.Compress(bb.B, outputLabels)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
||||
dst = append(dst, bb.B...)
|
||||
bbPool.Put(bb)
|
||||
dst = lc.Compress(dst, inputLabels)
|
||||
return dst
|
||||
}
|
||||
|
||||
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
|
||||
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
|
||||
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
|
||||
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
|
||||
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
|
||||
ctx.buf = append(ctx.buf, ctx.keybuf...)
|
||||
if len(inputLabels) > 0 {
|
||||
ctx.buf = lc.Compress(ctx.buf, inputLabels)
|
||||
}
|
||||
}
|
||||
|
||||
type pushCtx struct {
|
||||
@@ -1081,6 +1090,7 @@ type pushCtx struct {
|
||||
inputLabels promutil.Labels
|
||||
outputLabels promutil.Labels
|
||||
buf []byte
|
||||
keybuf []byte
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
@@ -1115,10 +1125,10 @@ func putPushCtx(ctx *pushCtx) {
|
||||
|
||||
var pushCtxPool sync.Pool
|
||||
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
|
||||
if len(without) > 0 {
|
||||
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
|
||||
if len(withoutSet) > 0 {
|
||||
for _, label := range labels {
|
||||
if slices.Contains(without, label.Name) {
|
||||
if _, ok := withoutSet[label.Name]; ok {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1126,7 +1136,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
|
||||
}
|
||||
} else {
|
||||
for _, label := range labels {
|
||||
if !slices.Contains(by, label.Name) {
|
||||
if _, ok := bySet[label.Name]; !ok {
|
||||
dstInput = append(dstInput, label)
|
||||
} else {
|
||||
dstOutput = append(dstOutput, label)
|
||||
@@ -1136,6 +1146,17 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
|
||||
return dstInput, dstOutput
|
||||
}
|
||||
|
||||
func makeStringSet(keys []string) map[string]struct{} {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
m := make(map[string]struct{}, len(keys))
|
||||
for _, k := range keys {
|
||||
m[k] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
|
||||
v := flushCtxPool.Get()
|
||||
if v == nil {
|
||||
@@ -1235,7 +1256,7 @@ func (ctx *flushCtx) flushSeries() {
|
||||
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1257,7 +1278,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
|
||||
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
|
||||
labelsLen := len(ctx.labels)
|
||||
samplesLen := len(ctx.samples)
|
||||
ctx.labels = decompressLabels(ctx.labels, key)
|
||||
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
|
||||
if !ctx.a.keepMetricNames {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
|
||||
}
|
||||
@@ -1345,3 +1366,74 @@ func sortAndRemoveDuplicates(a []string) []string {
|
||||
}
|
||||
|
||||
var bbPool bytesutil.ByteBufferPool
|
||||
|
||||
type seriesKeyCache struct {
|
||||
shards [64]seriesKeyCacheShard
|
||||
}
|
||||
|
||||
type seriesKeyCacheShard struct {
|
||||
mu sync.RWMutex
|
||||
m map[uint64]seriesCacheEntry
|
||||
_ [32]byte // cache-line padding
|
||||
}
|
||||
|
||||
type seriesCacheEntry struct {
|
||||
generation uint64
|
||||
key string
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.RLock()
|
||||
e, ok := shard.m[h]
|
||||
shard.mu.RUnlock()
|
||||
if !ok || e.generation != generation {
|
||||
return "", false
|
||||
}
|
||||
return e.key, true
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) set(h, generation uint64, key string) {
|
||||
shard := &c.shards[h>>58]
|
||||
shard.mu.Lock()
|
||||
if shard.m == nil {
|
||||
shard.m = make(map[uint64]seriesCacheEntry)
|
||||
}
|
||||
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *seriesKeyCache) reset() {
|
||||
for i := range c.shards {
|
||||
shard := &c.shards[i]
|
||||
shard.mu.Lock()
|
||||
shard.m = nil
|
||||
shard.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
|
||||
var buf [512]byte
|
||||
n := 0
|
||||
for i, l := range labels {
|
||||
need := len(l.Name) + 1 + len(l.Value) + 1
|
||||
if n+need > len(buf) {
|
||||
b := make([]byte, n, n+need*2)
|
||||
copy(b, buf[:n])
|
||||
for _, l2 := range labels[i:] {
|
||||
b = append(b, l2.Name...)
|
||||
b = append(b, '\x00')
|
||||
b = append(b, l2.Value...)
|
||||
b = append(b, '\x00')
|
||||
}
|
||||
return xxhash.Sum64(b)
|
||||
}
|
||||
n += copy(buf[n:], l.Name)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
n += copy(buf[n:], l.Value)
|
||||
buf[n] = '\x00'
|
||||
n++
|
||||
}
|
||||
return xxhash.Sum64(buf[:n])
|
||||
}
|
||||
|
||||
@@ -45,13 +45,16 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
a := newBenchAggregators([]string{output}, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var matchIdxs []uint32
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -82,8 +85,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
|
||||
a := make([]string, 0, seriesCount)
|
||||
for j := range seriesCount {
|
||||
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
|
||||
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
|
||||
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
|
||||
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
|
||||
a = append(a, s)
|
||||
}
|
||||
metrics := strings.Join(a, "\n")
|
||||
@@ -101,13 +104,16 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
|
||||
defer a.MustStop()
|
||||
|
||||
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
|
||||
a.Push(benchSeries, nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchSeries) * loops))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var matchIdxs []uint32
|
||||
matchIdxs := make([]uint32, len(benchSeries))
|
||||
for pb.Next() {
|
||||
for range loops {
|
||||
matchIdxs = a.Push(benchSeries, matchIdxs)
|
||||
@@ -117,21 +123,17 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
|
||||
}
|
||||
|
||||
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
outputsQuoted := make([]string, len(outputs))
|
||||
var config string
|
||||
for i := range outputs {
|
||||
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
|
||||
cfg := fmt.Sprintf(`
|
||||
var b strings.Builder
|
||||
for _, output := range outputs {
|
||||
fmt.Fprintf(&b, `
|
||||
- match: http_requests_total
|
||||
interval: 24h
|
||||
by: [job]
|
||||
outputs: [%s]
|
||||
`, stringsutil.JSONString(outputs[i]))
|
||||
config += cfg
|
||||
|
||||
`, stringsutil.JSONString(output))
|
||||
}
|
||||
|
||||
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
|
||||
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user