Compare commits

..

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
e8a1b3f9bd lib/promutil: added labelcompressor cleanup 2026-05-17 20:30:34 +03:00
43 changed files with 1000 additions and 689 deletions

View File

@@ -63,11 +63,11 @@ jobs:
arch: amd64
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum

View File

@@ -9,7 +9,7 @@ jobs:
tip-lint:
runs-on: 'ubuntu-latest'
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: 'actions/checkout@v6'
with:
# needed for proper diff
fetch-depth: 0

View File

@@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
fetch-depth: 0 # we need full history for commit verification

View File

@@ -15,11 +15,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@master
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
cache: false
@@ -27,7 +27,7 @@ jobs:
- run: go version
- name: Cache Go artifacts
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/go-build

View File

@@ -29,18 +29,18 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Set up Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache: false
go-version-file: 'go.mod'
- run: go version
- name: Cache Go artifacts
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/go-build
@@ -50,14 +50,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/init@v4.35.2
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/autobuild@v4.35.2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
uses: github/codeql-action/analyze@v4.35.2
with:
category: 'language:go'

View File

@@ -16,19 +16,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
path: __vm
- name: Checkout private code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
with:
repository: VictoriaMetrics/vmdocs
token: ${{ secrets.VM_BOT_GH_TOKEN }}
path: __vm-docs
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@2dc316deee8e90f13e1a351ab510b4d5bc0c82cd # v7.0.0
uses: crazy-max/ghaction-import-gpg@v7
id: import-gpg
with:
gpg_private_key: ${{ secrets.VM_BOT_GPG_PRIVATE_KEY }}

View File

@@ -32,11 +32,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum
@@ -47,7 +47,7 @@ jobs:
- run: go version
- name: Cache golangci-lint
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: |
~/.cache/golangci-lint
@@ -72,11 +72,11 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum
@@ -94,11 +94,11 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Setup Go
id: go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
uses: actions/setup-go@v6
with:
cache-dependency-path: |
go.sum

View File

@@ -32,11 +32,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
uses: actions/checkout@v6
- name: Cache node_modules
id: cache
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
uses: actions/cache@v5
with:
path: app/vmui/packages/vmui/node_modules
key: vmui-deps-${{ runner.os }}-${{ hashFiles('app/vmui/packages/vmui/package-lock.json', 'app/vmui/Dockerfile-build') }}
@@ -69,7 +69,7 @@ jobs:
VMUI_SKIP_INSTALL: true
- name: Annotate Code Linting Results
uses: ataylorme/eslint-annotate-action@d57a1193d4c59cbfbf3f86c271f42612f9dbd9e9 # 3.0.0
uses: ataylorme/eslint-annotate-action@v3
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
report-json: app/vmui/packages/vmui/vmui-lint-report.json

View File

@@ -2,7 +2,6 @@ package remotewrite
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@@ -331,20 +330,15 @@ func (c *client) runWorker() {
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
case <-c.stopCh:
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
// If it succeeds, drain the remaining in-memory queue before returning.
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time.Second
select {
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} else {
c.drainInMemoryQueue(stopCtx, block[:0])
}
case <-stopCtx.Done():
case <-time.After(graceDuration):
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
@@ -514,36 +508,6 @@ again:
goto again
}
func (c *client) drainInMemoryQueue(stopCtx context.Context, block []byte) {
var ok bool
for {
select {
case <-stopCtx.Done():
return
default:
}
block, ok = c.fq.MustReadInMemoryBlock(block[:0])
if !ok {
// The in memory queue has already been drained,
// or persisted queue is being used.
// In this case it is guaranteed that fq will be empty
return
}
if len(block) == 0 {
// skip empty data blocks from sending
continue
}
// at this stage c.stopCh should be closed
// so sendBlock function should not perform retries
if ok := c.sendBlock(block); !ok {
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
}
}
}
var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second)
var remoteWriteRetryLogger = logger.WithThrottler("remoteWriteRetry", 5*time.Second)

View File

@@ -9,7 +9,6 @@ import (
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
func TestParseRetryAfterHeader(t *testing.T) {
@@ -37,40 +36,6 @@ func TestParseRetryAfterHeader(t *testing.T) {
f(time.Now().Add(10*time.Second).Format("Mon, 02 Jan 2006 15:04:05 FAKETZ"), 0)
}
func TestInitSecretFlags(t *testing.T) {
showRemoteWriteURLOrig := *showRemoteWriteURL
defer func() {
*showRemoteWriteURL = showRemoteWriteURLOrig
flagutil.UnregisterAllSecretFlags()
}()
flagutil.UnregisterAllSecretFlags()
*showRemoteWriteURL = false
InitSecretFlags()
if !flagutil.IsSecretFlag("remotewrite.url") {
t.Fatalf("expecting remoteWrite.url to be secret")
}
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to be secret")
}
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
t.Fatalf("expecting remoteWrite.proxyURL to be secret")
}
flagutil.UnregisterAllSecretFlags()
*showRemoteWriteURL = true
InitSecretFlags()
if flagutil.IsSecretFlag("remotewrite.url") {
t.Fatalf("remoteWrite.url must remain visible when -remoteWrite.showURL is set")
}
if !flagutil.IsSecretFlag("remotewrite.headers") {
t.Fatalf("expecting remoteWrite.headers to remain secret")
}
if !flagutil.IsSecretFlag("remotewrite.proxyurl") {
t.Fatalf("expecting remoteWrite.proxyURL to remain secret")
}
}
func TestRepackBlockFromZstdToSnappy(t *testing.T) {
expectedPlainBlock := []byte(`foobar`)

View File

@@ -151,10 +151,6 @@ func InitSecretFlags() {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil.RegisterSecretFlag("remoteWrite.url")
}
// remoteWrite.proxyURL can contain authentication codes.
flagutil.RegisterSecretFlag("remoteWrite.proxyURL")
// remoteWrite.headers can contain auth headers such as Authorization and API keys.
flagutil.RegisterSecretFlag("remoteWrite.headers")
}
var (
@@ -171,18 +167,6 @@ func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
}
if *shardByURL && len(*disableOnDiskQueue) > 1 {
disableOnDiskQueues := *disableOnDiskQueue
firstValue := disableOnDiskQueues[0]
for _, v := range disableOnDiskQueues[1:] {
if firstValue != v {
logger.Fatalf("all -remoteWrite.url targets must have the same -remoteWrite.disableOnDiskQueue setting when -remoteWrite.shardByURL is enabled; " +
"either enable or disable -remoteWrite.disableOnDiskQueue for all targets")
}
}
}
if limit := getMaxHourlySeries(); limit > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(limit, time.Hour)
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
@@ -515,9 +499,7 @@ func tryPush(at *auth.Token, wr *prompb.WriteRequest, forceDropSamplesOnFailure
//
// calculateHealthyRwctxIdx will rely on the order of rwctx to be in ascending order.
func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) {
// When -remoteWrite.shardByURL=true always use all configured remote writes to preserve stable metrics distribution across shards.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507
if !disableOnDiskQueueAny || *shardByURL {
if !disableOnDiskQueueAny {
return rwctxsGlobal, true
}
@@ -532,6 +514,12 @@ func getEligibleRemoteWriteCtxs(tss []prompb.TimeSeries, forceDropSamplesOnFailu
return nil, false
}
rowsCount := getRowsCount(tss)
if *shardByURL {
// Todo: When shardByURL is enabled, the following metrics won't be 100% accurate. Because vmagent don't know
// which rwctx should data be pushed to yet. Let's consider the hashing algorithm fair and will distribute
// data to all rwctxs evenly.
rowsCount = rowsCount / len(rwctxsGlobal)
}
rwctx.rowsDroppedOnPushFailure.Add(rowsCount)
}
}

View File

@@ -61,7 +61,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
}
eu, err := url.Parse(externalURL)
if err != nil {
logger.Fatalf("failed to parse external URL: %s", err)
logger.Fatalf("failed to parse external URL: %w", err)
}
if err := templates.Load([]string{}, *eu); err != nil {
logger.Fatalf("failed to load template: %v", err)

View File

@@ -64,7 +64,6 @@ func InitSecretFlags() {
if !*showDatasourceURL {
flagutil.RegisterSecretFlag("datasource.url")
}
flagutil.RegisterSecretFlag("datasource.headers")
}
// ShowDatasourceURL whether to show -datasource.url with sensitive information

View File

@@ -105,7 +105,7 @@ func (cw *configWatcher) add(typeK TargetType, interval time.Duration, targetsFn
}
targetMetadata, errors := getTargetMetadata(targetsFn, cw.cfg)
for _, err := range errors {
logger.Errorf("failed to init notifier for %q: %s", typeK, err)
logger.Errorf("failed to init notifier for %q: %w", typeK, err)
}
cw.updateTargets(typeK, targetMetadata, cw.cfg, cw.genFn)
}
@@ -274,7 +274,7 @@ func (cw *configWatcher) updateTargets(key TargetType, targetMts map[string]targ
for addr, metadata := range targetMts {
am, err := NewAlertManager(addr, genFn, cfg.HTTPClientConfig, metadata.alertRelabelConfigs, cfg.Timeout.Duration())
if err != nil {
logger.Errorf("failed to init %s notifier with addr %q: %s", key, addr, err)
logger.Errorf("failed to init %s notifier with addr %q: %w", key, addr, err)
continue
}
updatedTargets = append(updatedTargets, Target{

View File

@@ -194,7 +194,6 @@ func InitSecretFlags() {
if !*showNotifierURL {
flagutil.RegisterSecretFlag("notifier.url")
}
flagutil.RegisterSecretFlag("notifier.headers")
}
func notifiersFromFlags(gen AlertURLGenerator) ([]Notifier, error) {

View File

@@ -59,7 +59,6 @@ func InitSecretFlags() {
if !*showRemoteReadURL {
flagutil.RegisterSecretFlag("remoteRead.url")
}
flagutil.RegisterSecretFlag("remoteRead.headers")
}
// Init creates a Querier from provided flag values.

View File

@@ -62,7 +62,6 @@ func InitSecretFlags() {
if !*showRemoteWriteURL {
flagutil.RegisterSecretFlag("remoteWrite.url")
}
flagutil.RegisterSecretFlag("remoteWrite.headers")
}
// Init creates Client object from given flags.

View File

@@ -69,8 +69,6 @@ func Init() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
initVMUIConfig()
initVMAlertProxy()
flagutil.RegisterSecretFlag("vmalert.proxyURL")
}
// Stop stops vmselect

View File

@@ -3109,6 +3109,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3406,6 +3406,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3110,6 +3110,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -3407,6 +3407,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2946,6 +2946,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2324,6 +2324,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2945,6 +2945,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -2323,6 +2323,7 @@
"mode": "off"
}
},
"decimals": 0,
"links": [],
"mappings": [],
"min": 0,

View File

@@ -43,8 +43,8 @@ Just download VictoriaMetrics and follow [these instructions](https://docs.victo
See [available integrations](https://docs.victoriametrics.com/victoriametrics/integrations/) with other systems like
[Prometheus](https://docs.victoriametrics.com/victoriametrics/integrations/prometheus/) or [Grafana](https://docs.victoriametrics.com/victoriametrics/integrations/grafana/).
VictoriaMetrics is developed at a fast pace, so it is recommended to periodically check the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
and perform [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
VictoriaMetrics is developed at a fast pace, so it is recommended periodically checking the [CHANGELOG](https://docs.victoriametrics.com/victoriametrics/changelog/)
and performing [regular upgrades](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-upgrade-victoriametrics).
### Starting VictoriaMetrics Single Node or Cluster on VictoriaMetrics Cloud {id="starting-vm-on-cloud"}
@@ -63,7 +63,7 @@ docker run -it --rm -v `pwd`/victoria-metrics-data:/victoria-metrics-data -p 842
victoriametrics/victoria-metrics:v1.143.0 --selfScrapeInterval=5s -storageDataPath=victoria-metrics-data
```
_For Enterprise images, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
_For Enterprise images see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._
You should see:
@@ -113,7 +113,7 @@ See more details about [cluster architecture](https://docs.victoriametrics.com/v
### Starting VictoriaMetrics Single Node from a Binary {id="starting-vm-single-from-a-binary"}
1. Download the correct binary for your OS and architecture from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
2. Extract the archive to /usr/local/bin by running:
@@ -164,7 +164,7 @@ END'
Extra [command-line flags](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#list-of-command-line-flags) can be added to `ExecStart` line.
If you want to deploy VictoriaMetrics Single Node as a Windows Service, review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
If you want to deploy VictoriaMetrics Single Node as a Windows Service review the [running as a Windows service docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#running-as-windows-service).
> Please note, `victoriametrics` service is listening on `:8428` for HTTP connections (see `-httpListenAddr` flag).
@@ -174,7 +174,7 @@ If you want to deploy VictoriaMetrics Single Node as a Windows Service, review t
sudo systemctl daemon-reload && sudo systemctl enable --now victoriametrics.service
```
7. Check that the service started successfully:
7. Check that service started successfully:
```sh
sudo systemctl status victoriametrics.service
@@ -187,12 +187,12 @@ by going to `http://<ip_or_hostname>:8428/vmui`.
VictoriaMetrics cluster consists of [3 components](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#architecture-overview).
It is recommended to run these components in the same private network (for [security reasons](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#security)),
but on separate physical nodes for the best performance.
but on the separate physical nodes for the best performance.
On all nodes, you will need to do the following:
On all nodes you will need to do the following:
1. Download the correct binary for your OS and architecture with `-cluster` suffix from [GitHub](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
For Enterprise binaries, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
For Enterprise binaries see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases).
2. Extract the archive to /usr/local/bin by running:
@@ -254,7 +254,7 @@ for vmstorage can be added to `ExecStart` line.
sudo systemctl daemon-reload && sudo systemctl enable --now vmstorage
```
4. Check that the service started successfully:
4. Check that service started successfully:
```sh
sudo systemctl status vmstorage
@@ -301,14 +301,14 @@ in one flag. See more details in `-storageNode` flag description in [vminsert fl
sudo systemctl daemon-reload && sudo systemctl enable --now vminsert.service
```
3. Check that the service started successfully:
3. Check that service started successfully:
```sh
sudo systemctl status vminsert.service
```
4. After `vminsert` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8480/-/healthy` link.
It should say "VictoriaMetrics is Healthy."
It should say "VictoriaMetrics is Healthy"
#### Installing vmselect
@@ -344,7 +344,7 @@ END'
```
Replace `<list of vmstorages>` with addresses of previously configured `vmstorage` services.
To specify multiple addresses, you can repeat the flag multiple times or separate addresses with commas
To specify multiple addresses you can repeat the flag multiple times, or separate addresses with commas
in one flag. See more details in `-storageNode` flag description [vminsert flags](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#list-of-command-line-flags-for-vminsert).
> Please note, `vmselect` service is listening on `:8481` for HTTP connections (see `-httpListenAddr` flag).
@@ -362,12 +362,12 @@ sudo systemctl status vmselect.service
```
5. After `vmselect` is in `Running` state, confirm the service is healthy by visiting `http://<ip_or_hostname>:8481/select/0/vmui` link.
It should open the [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
It should open [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) page.
## Write data
There are two main models in monitoring for data collection: [push](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#push-model)
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring, and both are
and [pull](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#pull-model). Both are used in modern monitoring and both are
supported by VictoriaMetrics.
See more details on [key concepts of writing data here](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#write-data).
@@ -389,7 +389,7 @@ and [other integrations](https://docs.victoriametrics.com/victoriametrics/integr
## Alerting
To run periodic conditions checks use [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/).
It allows creating a set of conditions using MetricsQL expressions and sending notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
It allows creating set of conditions using MetricsQL expressions and send notifications to [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/)
when such conditions are met.
See [vmalert quick start](https://docs.victoriametrics.com/victoriametrics/vmalert/#quickstart).
@@ -413,7 +413,7 @@ command line tool. It supports the following databases for migration to Victoria
## Productionization
When moving to production with VictoriaMetrics, we recommend following these best practices.
When going to production with VictoriaMetrics we recommend following the recommendations below.
### Monitoring
@@ -429,7 +429,7 @@ Using the [recommended alerting rules](https://github.com/VictoriaMetrics/Victor
will help to identify unwanted issues.
The rule of thumb is to have a separate installation of VictoriaMetrics or any other monitoring system to monitor the
production installation of VictoriaMetrics. This would make monitoring independent and help identify problems with
production installation of VictoriaMetrics. This would make monitoring independent and will help identify problems with
the main monitoring installation.
See more details in the article [VictoriaMetrics Monitoring](https://victoriametrics.com/blog/victoriametrics-monitoring/).
@@ -457,7 +457,7 @@ For backup configuration, please refer to [vmbackup documentation](https://docs.
### Configuring limits
To avoid excessive resource usage or performance degradation, limits must be in place:
To avoid excessive resource usage or performance degradation limits must be in place:
* [Resource usage limits](https://docs.victoriametrics.com/victoriametrics/faq/#how-to-set-a-memory-limit-for-victoriametrics-components);
* [Cardinality limiter](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cardinality-limiter).

View File

@@ -30,19 +30,11 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `basicAuth.usernameFile` command-line flags for reading basic auth username from a file, similar to the existing `basicAuth.passwordFile`. The file is re-read every second. See [#9436](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9436). Thanks to @kimjune01 for the contribution.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-opentelemetry.labelNameUnderscoreSanitization` command-line flag to control whether to enable prepending of `key` to labels starting with `_` when `-opentelemetry.usePrometheusNaming` is enabled. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#9663](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9663). Thanks to @andriibeee for the contribution.
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve the [Top Queries](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#top-queries) table UI. Duration columns now display human-readable values (e.g. `1.23s`) instead of raw seconds, memory column shows human-readable sizes (e.g. `1.23 MB`), instant queries are labeled as `instant` instead of empty string, and column headers now show tooltips with descriptions. See [#10790](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10790).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): drain in-memory remote write queue on shutdown within the 5-second grace period before falling back to persisting blocks to disk. See [#9996](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9996)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. See [#10918](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). Thanks to @alexei38 for the contribution.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix a bug in [cardinality limiters](https://docs.victoriametrics.com/victoriametrics/vmagent/#cardinality-limiter) where series with different labels, like `{a="bc"}` and `{ab="c"}`, could be incorrectly treated as identical and dropped. See [#10937](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10937).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): hide values passed to `-remoteWrite.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive credentials.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): hide values passed to `-remoteWrite.headers`,`remoteRead.headers`, `datasource.headers` and `notifier.headers` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly establish [mtls](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#mtls-protection) connection between vmstorage and vminsert. Regression was introduced in v1.130.0 release for the enterprise version of vmstorage. See [#10972](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10972).
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix a bug where specifying `-storageDataPath` with a trailing slash could cause `vmrestore` to panic. See [#10823](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10823). Thanks to @utafrali for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent unintentional rerouting of samples to other sharding targets when one of the `-remoteWrite.url` targets with `-remoteWrite.disableOnDiskQueue` becomes blocked. Previously this could break the sharding guarantee by sending samples to wrong targets instead of dropping or retrying them. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): return error on startup if `-remoteWrite.disableOnDiskQueue` is not configured uniformly across all `-remoteWrite.url` targets when `-remoteWrite.shardByURL` is enabled. Either all targets must have it enabled or all must have it disabled. See [#10507](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10507).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): hide values passed to `vmalert.proxyURL` in startup logs, `/metrics`, and `/flags`, since they can contain sensitive HTTP headers such as `Authorization` and API keys.
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -353,18 +353,6 @@ Example Docker image:
`victoriametrics/victoria-metrics:v1.143.0-enterprise-fips` uses the FIPS-compatible binary and based on `scratch` image.
## What Happens to Licensed Components When a License Expires
When a license expires, all licensed components continue to function normally until a restart occurs.
License checks happen only at startup. If a license expires while the component is running, nothing changes; the component continues to run until the next restart.
This means you don't need to restart components to install a new license. The component automatically picks up the new license the next time it restarts. The exception is when the `-license` flag is used, because the license is supplied at startup and changing it requires restarting VictoriaMetrics with the updated flag value.
If your license has expired and you decide to not renew it, you can switch to the VictoriaMetrics Open Source version without data loss, as both versions share the same data model. In doing so, however, you will lose access to the [VictoriaMetrics Enterprise features](https://docs.victoriametrics.com/victoriametrics/enterprise/#victoriametrics-enterprise-features).
See [updating the license key](https://docs.victoriametrics.com/victoriametrics/enterprise/#updating-the-license-key) for more details.
## Monitoring license expiration
All the Enterprise components expose the following metrics at the `/metrics` page:

File diff suppressed because it is too large Load Diff

View File

@@ -1538,11 +1538,11 @@ func (tb *Table) MustCreateSnapshotAt(dstDir string) {
srcDir := tb.path
srcDir, err = filepath.Abs(srcDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", srcDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", srcDir, err)
}
dstDir, err = filepath.Abs(dstDir)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %s", dstDir, err)
logger.Panicf("FATAL: cannot obtain absolute dir for %q: %w", dstDir, err)
}
prefix := srcDir + string(filepath.Separator)
if strings.HasPrefix(dstDir, prefix) {

View File

@@ -228,9 +228,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
return true
}
// MustReadBlock reads the next block from fq into dst and returns it.
// It first reads from the in-memory queue, then checks file-based queue.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
// MustReadBlock reads the next block from fq to dst and returns it.
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
@@ -240,7 +238,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the inmemory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst, true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
@@ -259,35 +265,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It does not block waiting for new blocks.
func (fq *FastQueue) MustReadInMemoryBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
return nil, false
}
func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
dst = append(dst, bb.B...)
blockBufPool.Put(bb)
return dst
}
// Dirname returns the directory name for persistent queue.
func (fq *FastQueue) Dirname() string {
return filepath.Base(fq.pq.dir)

View File

@@ -3,32 +3,173 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
const lcShardCount = 128
// lcShard is one shard of the label-to-index map.
// Padded to one cache line to prevent false sharing between adjacent shards.
type lcShard struct {
mu sync.RWMutex
m map[string]uint32
_ [32]byte
}
// LabelsCompressor compresses []prompb.Label into short binary strings.
// Zero value is ready to use. Each Register call must be paired with Unregister.
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
// labelToIdxShards stores the label-to-index mapping across lcShardCount shards
// to reduce RWMutex contention in the concurrent compressFast hot path.
labelToIdxShards [lcShardCount]lcShard
nextIdx atomic.Uint64
// generation is incremented after each rotate() call that removes labels.
// Callers can use it to detect when cached compressed keys become stale.
generation atomic.Uint64
totalSizeBytes atomic.Uint64
idxToLabel atomic.Pointer[[]prompb.Label]
// usedBitset tracks which indices were used in the current rotation period.
// Bits are set atomically from compressFast without mu; grown under mu.
usedBitset atomic.Pointer[[]uint64]
totalSizeBytes uint64
mu sync.Mutex
// freeIdxs holds index slots available for reuse.
freeIdxs []uint32
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
pendingIdxs map[uint32]struct{}
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
// Requires absence from both prevBitset and usedBitset to evict a label,
// guarding against partial snapshots when rotate fires mid-compress loop.
// Only accessed in rotate, which runs in a single goroutine.
prevBitset []uint64
// registry holds staleness intervals of active callers; rotation period = max(registry).
registry []time.Duration
// tickerCh signals runRotate to re-evaluate the rotation period.
tickerCh chan struct{}
}
// idleRotationPeriod is the rotation period used when no callers are registered.
// It must be long enough that a sleeping goroutine does not consume measurable resources.
const idleRotationPeriod = time.Hour
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
// Rotation period equals max(registry). Must be paired with Unregister.
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
lc.mu.Lock()
if lc.tickerCh == nil {
lc.tickerCh = make(chan struct{}, 1)
go lc.runRotate()
}
lc.registry = append(lc.registry, stalenessInterval)
tickerCh := lc.tickerCh
lc.mu.Unlock()
select {
case tickerCh <- struct{}{}:
default:
}
}
// Unregister removes stalenessInterval from the registry.
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
lc.mu.Lock()
for i, d := range lc.registry {
if d == stalenessInterval {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
tickerCh := lc.tickerCh
lc.mu.Unlock()
if tickerCh != nil {
select {
case tickerCh <- struct{}{}:
default:
}
}
}
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
// must be called with lc.mu held
var max time.Duration
for _, d := range lc.registry {
if d > max {
max = d
}
}
if max == 0 {
return idleRotationPeriod
}
return max
}
func (lc *LabelsCompressor) runRotate() {
lc.mu.Lock()
period := lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer := time.NewTimer(period)
defer timer.Stop()
for {
select {
case <-timer.C:
lc.rotate()
lc.mu.Lock()
period = lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer.Reset(period)
case <-lc.tickerCh:
lc.mu.Lock()
newPeriod := lc.maxRegisteredStaleness()
lc.mu.Unlock()
if newPeriod != period {
period = newPeriod
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
}
}
}
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
lc.mu.Lock()
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
lc.mu.Unlock()
return n
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load()
lc.mu.Lock()
p := lc.idxToLabel.Load()
var n uint64
if p != nil && len(*p) > len(lc.freeIdxs)+len(lc.pendingIdxs) {
n = uint64(len(*p) - len(lc.freeIdxs) - len(lc.pendingIdxs))
}
lc.mu.Unlock()
return n
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
@@ -42,46 +183,139 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
if lc.compressFast(a.A[1:], labels) {
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
lc.mu.Lock()
lc.compressSlow(a.A[1:], labels)
lc.mu.Unlock()
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
var keyBuf [256]byte
for i, label := range labels {
// Build composite key name+'\x00'+value into the stack buffer.
totalLen := len(label.Name) + 1 + len(label.Value)
var key string
if totalLen <= len(keyBuf) {
n := copy(keyBuf[:], label.Name)
keyBuf[n] = '\x00'
copy(keyBuf[n+1:], label.Value)
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
} else {
key = makeLabelKey(label)
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
return false
}
dst[i] = uint64(idx)
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
if lc.usedBitset.Load() != p {
for _, idx64 := range dst {
lc.markUsed(uint32(idx64))
}
}
return true
}
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
for i, label := range labels {
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.RLock()
idx, ok := shard.m[key]
shard.mu.RUnlock()
if !ok {
cloned := cloneLabel(label)
idx = lc.newIndex(cloned)
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[string]uint32)
}
shard.m[key] = idx
shard.mu.Unlock()
} else {
lc.markUsed(idx)
}
dst[i] = uint64(idx)
}
}
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
func (lc *LabelsCompressor) markUsed(idx uint32) {
p := lc.usedBitset.Load()
if p == nil {
return
}
_ = dst[len(labels)-1]
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
idx := lc.nextIdx.Add(1)
v = idx
labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx,
// so it can be found by possible concurrent goroutines.
//
// We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy)
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded {
// This label has been stored by a concurrent goroutine with different index,
// use it for key consistency in aggrState.
v = vNew
}
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
bits := *p
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
dst[i] = v.(uint64)
}
}
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
func (lc *LabelsCompressor) growBitset(idx uint32) {
needed := int(idx>>6) + 1
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
if needed <= len(bits) {
return
}
newBits := make([]uint64, needed)
copy(newBits, bits)
lc.usedBitset.Store(&newBits)
}
func makeLabelKey(label prompb.Label) string {
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
buf = append(buf, label.Name...)
buf = append(buf, '\x00')
buf = append(buf, label.Value...)
return string(buf)
}
// Generation returns the current generation count.
// It increments whenever labels are evicted, invalidating any cached compressed keys.
func (lc *LabelsCompressor) Generation() uint64 {
return lc.generation.Load()
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
@@ -98,6 +332,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
// Must be called with lc.mu held.
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
var idx uint32
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
if len(lc.freeIdxs) > 0 {
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
next[idx] = cloned
lc.idxToLabel.Store(&next)
} else {
idx = uint32(len(newIdxToLabel))
next := append(newIdxToLabel, cloned)
lc.idxToLabel.Store(&next)
}
lc.growBitset(idx)
lc.markUsed(idx)
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
return idx
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
@@ -124,111 +387,139 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
p := lc.idxToLabel.Load()
if p == nil {
encoding.PutUint64s(a)
logger.Panicf("BUG: idxToLabel is nil in Decompress")
return dst
}
labels := *p
for _, idx := range a.A {
if int(idx) >= len(labels) {
encoding.PutUint64s(a)
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
}
dst = append(dst, labels[idx])
}
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label)
// rotate evicts unused labels and recycles their slots.
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
// giving in-flight keys a full rotation period to drain via Decompress.
// Must not be called concurrently with itself.
func (lc *LabelsCompressor) rotate() {
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
// Also snapshot idxToLabel for the phase-1 scan.
lc.mu.Lock()
var currentBits []uint64
if p := lc.usedBitset.Load(); p != nil {
currentBits = make([]uint64, len(*p))
copy(currentBits, *p)
newBits := make([]uint64, len(*p))
lc.usedBitset.Store(&newBits)
}
return dst
}
// labelsMap maps uint64 key to prompb.Label
//
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
type labelsMap struct {
readOnly atomic.Pointer[[]*prompb.Label]
mutableLock sync.Mutex
mutable map[uint64]*prompb.Label
misses uint64
}
// Store stores label under the given idx.
//
// It is safe calling Store from concurrent goroutines.
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
lm.mutableLock.Lock()
if lm.mutable == nil {
lm.mutable = make(map[uint64]*prompb.Label)
}
lm.mutable[idx] = &label
lm.mutableLock.Unlock()
}
// Load returns the label for the given idx.
//
// Load returns false if lm doesn't contain label for the given idx.
//
// It is safe calling Load from concurrent goroutines.
//
// The performance of Load() scales linearly with CPU cores.
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
// Fast path - the label for the given idx has been found in lm.readOnly.
return *pLabel, true
// Skip if nothing was used; all-zero snapshots from rapid rotation would
// advance pendingIdxs and reclaim slots before in-flight keys drain.
anyUsed := false
for _, w := range currentBits {
if w != 0 {
anyUsed = true
break
}
}
// Slow path - search in lm.mutable.
return lm.loadSlow(idx)
}
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
lm.mutableLock.Lock()
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
pReadOnly := lm.readOnly.Load()
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
lm.mutableLock.Unlock()
return *pLabel, true
}
}
// The label for the idx wasn't found in readOnly. Search it in mutable.
lm.misses++
pLabel := lm.mutable[idx]
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
lm.moveMutableToReadOnlyLocked(pReadOnly)
lm.misses = 0
}
lm.mutableLock.Unlock()
if pLabel == nil {
return prompb.Label{}, false
}
return *pLabel, true
}
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
if len(lm.mutable) == 0 {
// Nothing to move
if !anyUsed {
lc.mu.Unlock()
return
}
var labels []*prompb.Label
if pReadOnly != nil {
labels = append(labels, *pReadOnly...)
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
prevSnap := lc.prevBitset
lc.prevBitset = currentBits
pendingIdxs := lc.pendingIdxs
idxSnap := lc.idxToLabel.Load()
lc.mu.Unlock()
// A label is used if present in either the current or previous period snapshot.
isUsed := func(idx uint32) bool {
word := idx >> 6
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
return inCurrent || inPrev
}
for idx, pLabel := range lm.mutable {
if idx < uint64(len(labels)) {
labels[idx] = pLabel
} else {
for idx > uint64(len(labels)) {
labels = append(labels, nil)
// Phase 2: reclaim slots evicted last rotation if still unused.
var toReclaim []uint32
var nextRelease map[uint32]struct{}
for idx := range pendingIdxs {
if isUsed(idx) {
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
labels = append(labels, pLabel)
nextRelease[idx] = struct{}{}
} else {
toReclaim = append(toReclaim, idx)
}
}
clear(lm.mutable)
lm.readOnly.Store(&labels)
// Phase 1: scan idxToLabel rather than sync.Map.Range to avoid
// cache-line contention with concurrent compressFast lookups.
phase1Evicted := false
if idxSnap != nil {
for i, label := range *idxSnap {
if label == (prompb.Label{}) {
continue // freed slot
}
idx := uint32(i)
if _, inPending := pendingIdxs[idx]; inPending {
continue // already queued for release
}
if isUsed(idx) {
continue
}
key := makeLabelKey(label)
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
shard := &lc.labelToIdxShards[h%lcShardCount]
shard.mu.Lock()
delete(shard.m, key)
shard.mu.Unlock()
phase1Evicted = true
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
}
}
if phase1Evicted {
lc.generation.Add(1)
}
lc.mu.Lock()
if len(toReclaim) > 0 {
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
for _, idx := range toReclaim {
label := next[idx]
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
if lc.totalSizeBytes >= entrySize {
lc.totalSizeBytes -= entrySize
}
next[idx] = prompb.Label{}
lc.freeIdxs = append(lc.freeIdxs, idx)
}
lc.idxToLabel.Store(&next)
}
lc.pendingIdxs = nextRelease
lc.mu.Unlock()
}

View File

@@ -8,47 +8,96 @@ import (
)
func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
for _, labels := range series {
lc.Compress(nil, labels)
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
datas := make([][]byte, len(series))
var dst []byte
for i, labels := range series {
dstLen := len(dst)
dst = lc.Compress(dst, labels)
datas[i] = dst[dstLen:]
}
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
datas := make([][]byte, len(series))
var buf []byte
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
datas[i] = buf[bufLen:]
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
var Sink atomic.Uint64

View File

@@ -764,7 +764,7 @@ func filterLabelValues(lvs map[string]struct{}, tf *tagFilter, key string) {
b = marshalTagValue(b, bytesutil.ToUnsafeBytes(lv))
ok, err := tf.match(b)
if err != nil {
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %s", key, lv, tf.String(), err)
logger.Panicf("BUG: cannot match label %q=%q with tagFilter %s: %w", key, lv, tf.String(), err)
}
if !ok {
delete(lvs, lv)

View File

@@ -141,7 +141,7 @@ func (is *indexSearch) legacyContainsTimeRangeSlow(prefixBuf *bytesutil.ByteBuff
ts.Seek(prefixBuf.B)
if !ts.NextItem() {
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %s", minDate, prefixBuf.B, err)
logger.Panicf("FATAL: error when searching for minDate=%d, prefix %q: %w", minDate, prefixBuf.B, err)
}
return false
}

View File

@@ -106,7 +106,7 @@ func loadFrom(loadPath string, maxSizeBytes uint64) (*Tracker, error) {
}
defer func() {
if err := zr.Close(); err != nil {
logger.Panicf("FATAL: cannot close gzip reader: %s", err)
logger.Panicf("FATAL: cannot close gzip reader: %w", err)
}
}()

View File

@@ -517,7 +517,7 @@ func (tb *table) historicalMergeWatcher() {
logger.Infof("start %s for partition (%s, %s)", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath)
if err := pt.ForceMergeAllParts(tb.stopCh); err != nil {
logger.Errorf("cannot %s for partition (%s, %s): %s", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
logger.Errorf("cannot %s for partition (%s, %s): %w", strings.Join(logErrContext, " and "), pt.bigPartsPath, pt.smallPartsPath, err)
}
logger.Infof("finished %s for partition (%s, %s) in %.3f seconds", strings.Join(logContext, " and "), pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds())

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func BenchmarkDedupAggr(b *testing.B) {
@@ -63,14 +64,15 @@ func newBenchSamples(count int) []pushSample {
}
labelsLen := len(labels)
samples := make([]pushSample, count)
var lc promutil.LabelsCompressor
var keyBuf []byte
for i := range samples {
sample := &samples[i]
labels = append(labels[:labelsLen], prompb.Label{
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
keyBuf = lc.Compress(keyBuf[:0], labels)
sample.key = string(keyBuf)
sample.value = float64(i)
}

View File

@@ -77,6 +77,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
lc.Register(2 * interval)
d.wg.Go(func() {
d.runFlusher(pushFunc)
})
@@ -86,6 +88,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
// MustStop stops d.
func (d *Deduplicator) MustStop() {
lc.Unregister(2 * d.interval)
metrics.UnregisterSet(d.ms, true)
d.ms = nil
@@ -203,7 +207,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = decompressLabels(labels, ps.key)
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompb.Sample{

View File

@@ -17,29 +17,27 @@ type aggrOutputs struct {
outputSamples *metrics.Counter
}
func (ao *aggrOutputs) getInputOutputKey(key string) (string, string) {
func (ao *aggrOutputs) getInputOutputKey(key string) (outputKey, inputKey string) {
src := bytesutil.ToUnsafeBytes(key)
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
}
src = src[nSize:]
outputKey := src[:outputKeyLen]
if !ao.useInputKey {
return key, bytesutil.ToUnsafeString(outputKey)
outputKey = bytesutil.ToUnsafeString(src[:outputKeyLen])
if !ao.useInputKey || int(outputKeyLen) == len(src) {
return outputKey, outputKey
}
inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
inputKey = bytesutil.ToUnsafeString(src[outputKeyLen:])
return outputKey, inputKey
}
func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, isGreen bool) {
var inputKey, outputKey string
var sample *pushSample
var outputs []aggrValue
var nv *aggrValues
for i := range samples {
sample = &samples[i]
inputKey, outputKey = ao.getInputOutputKey(sample.key)
sample := &samples[i]
outputKey, inputKey := ao.getInputOutputKey(sample.key)
again:
v, ok := ao.m.Load(outputKey)
@@ -81,7 +79,7 @@ func (ao *aggrOutputs) pushSamples(samples []pushSample, deleteDeadline int64, i
}
av.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flush
// The entry has been deleted by the concurrent call to flush.
// Try obtaining and updating the entry again.
goto again
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
@@ -24,6 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
@@ -257,6 +257,10 @@ type Config struct {
type Aggregators struct {
as []*aggregator
// workCh limits the number of concurrent aggregator.Push calls.
// Pre-allocated once to avoid per-Push channel allocation.
workCh chan struct{}
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
@@ -287,6 +291,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
@@ -307,6 +312,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
configData: configData,
filePath: filePath,
ms: ms,
@@ -364,19 +370,21 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
return matchIdxs
}
// use all available CPU cores to copy time-series into aggregators
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
for _, aggr := range a.as {
concurrencyChan <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-concurrencyChan
})
if len(a.as) == 1 {
a.as[0].Push(tss, matchIdxs)
return matchIdxs
}
// Use all available CPU cores to push time-series into aggregators in parallel.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
for _, aggr := range a.as {
a.workCh <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-a.workCh
})
}
wg.Wait()
return matchIdxs
@@ -413,8 +421,8 @@ type aggregator struct {
ignoreOldSamples bool
enableWindows bool
by []string
without []string
bySet map[string]struct{}
withoutSet map[string]struct{}
aggregateOnlyByTime bool
// interval is the interval between flushes
@@ -446,6 +454,8 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
seriesKeyCache seriesKeyCache
wg sync.WaitGroup
stopCh chan struct{}
@@ -645,8 +655,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
enableWindows: enableWindows,
stalenessInterval: stalenessInterval,
by: by,
without: without,
bySet: makeStringSet(by),
withoutSet: makeStringSet(without),
aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
@@ -709,6 +719,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
lc.Register(a.stalenessInterval)
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
})
@@ -915,6 +927,7 @@ func (a *aggregator) dedupFlush(dedupTime time.Time, cs *currentState) {
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc, flushTime time.Time, cs *currentState, isLast bool) {
a.seriesKeyCache.reset()
startTime := time.Now()
ao := a.aggrOutputs
@@ -943,6 +956,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
lc.Unregister(a.stalenessInterval)
close(a.stopCh)
a.wg.Wait()
}
@@ -952,7 +966,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
ctx := getPushCtx()
defer putPushCtx(ctx)
buf := ctx.buf
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
@@ -986,19 +999,22 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
labels.Sort()
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
h := computeLabelsFingerprint(labels.Labels)
gen := lc.Generation()
key, ok := a.seriesKeyCache.get(h, gen)
if !ok {
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.bySet, a.withoutSet)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
bufLen := len(ctx.buf)
ctx.compressLabels(inputLabels.Labels, outputLabels.Labels)
key = string(ctx.buf[bufLen:])
a.seriesKeyCache.set(h, gen, key)
}
bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
// Skip NaN values
@@ -1042,8 +1058,6 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
ctx.buf = buf
pushSamples := a.aggrOutputs.pushSamples
if a.da != nil {
pushSamples = a.da.pushSamples
@@ -1060,18 +1074,13 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
}
func compressLabels(dst []byte, inputLabels, outputLabels []prompb.Label) []byte {
bb := bbPool.Get()
bb.B = lc.Compress(bb.B, outputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...)
bbPool.Put(bb)
dst = lc.Compress(dst, inputLabels)
return dst
}
func decompressLabels(dst []prompb.Label, key string) []prompb.Label {
return lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
func (ctx *pushCtx) compressLabels(inputLabels, outputLabels []prompb.Label) {
ctx.keybuf = lc.Compress(ctx.keybuf[:0], outputLabels)
ctx.buf = encoding.MarshalVarUint64(ctx.buf, uint64(len(ctx.keybuf)))
ctx.buf = append(ctx.buf, ctx.keybuf...)
if len(inputLabels) > 0 {
ctx.buf = lc.Compress(ctx.buf, inputLabels)
}
}
type pushCtx struct {
@@ -1081,6 +1090,7 @@ type pushCtx struct {
inputLabels promutil.Labels
outputLabels promutil.Labels
buf []byte
keybuf []byte
}
func (ctx *pushCtx) reset() {
@@ -1115,10 +1125,10 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
if len(without) > 0 {
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, bySet, withoutSet map[string]struct{}) ([]prompb.Label, []prompb.Label) {
if len(withoutSet) > 0 {
for _, label := range labels {
if slices.Contains(without, label.Name) {
if _, ok := withoutSet[label.Name]; ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1126,7 +1136,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
}
} else {
for _, label := range labels {
if !slices.Contains(by, label.Name) {
if _, ok := bySet[label.Name]; !ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1136,6 +1146,17 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
return dstInput, dstOutput
}
func makeStringSet(keys []string) map[string]struct{} {
if len(keys) == 0 {
return nil
}
m := make(map[string]struct{}, len(keys))
for _, k := range keys {
m[k] = struct{}{}
}
return m
}
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
@@ -1235,7 +1256,7 @@ func (ctx *flushCtx) flushSeries() {
func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1257,7 +1278,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, value float64) {
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels = decompressLabels(ctx.labels, key)
ctx.labels = lc.Decompress(ctx.labels, bytesutil.ToUnsafeBytes(key))
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
@@ -1345,3 +1366,74 @@ func sortAndRemoveDuplicates(a []string) []string {
}
var bbPool bytesutil.ByteBufferPool
type seriesKeyCache struct {
shards [64]seriesKeyCacheShard
}
type seriesKeyCacheShard struct {
mu sync.RWMutex
m map[uint64]seriesCacheEntry
_ [32]byte // cache-line padding
}
type seriesCacheEntry struct {
generation uint64
key string
}
func (c *seriesKeyCache) get(h, generation uint64) (string, bool) {
shard := &c.shards[h>>58]
shard.mu.RLock()
e, ok := shard.m[h]
shard.mu.RUnlock()
if !ok || e.generation != generation {
return "", false
}
return e.key, true
}
func (c *seriesKeyCache) set(h, generation uint64, key string) {
shard := &c.shards[h>>58]
shard.mu.Lock()
if shard.m == nil {
shard.m = make(map[uint64]seriesCacheEntry)
}
shard.m[h] = seriesCacheEntry{generation: generation, key: key}
shard.mu.Unlock()
}
func (c *seriesKeyCache) reset() {
for i := range c.shards {
shard := &c.shards[i]
shard.mu.Lock()
shard.m = nil
shard.mu.Unlock()
}
}
func computeLabelsFingerprint(labels []prompb.Label) uint64 {
var buf [512]byte
n := 0
for i, l := range labels {
need := len(l.Name) + 1 + len(l.Value) + 1
if n+need > len(buf) {
b := make([]byte, n, n+need*2)
copy(b, buf[:n])
for _, l2 := range labels[i:] {
b = append(b, l2.Name...)
b = append(b, '\x00')
b = append(b, l2.Value...)
b = append(b, '\x00')
}
return xxhash.Sum64(b)
}
n += copy(buf[n:], l.Name)
buf[n] = '\x00'
n++
n += copy(buf[n:], l.Value)
buf[n] = '\x00'
n++
}
return xxhash.Sum64(buf[:n])
}

View File

@@ -45,13 +45,16 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []uint32
matchIdxs := make([]uint32, len(benchSeries))
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -82,8 +85,8 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
func newBenchSeries(seriesCount int) []prompb.TimeSeries {
a := make([]string, 0, seriesCount)
for j := range seriesCount {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo_%d",instance="bar",pod="pod-123232312",namespace="kube-foo-bar",node="node-123-3434-443",`+
`some_other_label="foo-bar-baz",environment="prod",label1="value1",label2="value2",label3="value3"} %d`, j, j%100, j*1000)
s := fmt.Sprintf(`http_requests_total{environment="prod",instance="bar",job="foo_%d",label1="value1",label2="value2",label3="value3",`+
`namespace="kube-foo-bar",node="node-123-3434-443",path="/foo/%d",pod="pod-123232312",some_other_label="foo-bar-baz"} %d`, j%100, j, j*1000)
a = append(a, s)
}
metrics := strings.Join(a, "\n")
@@ -101,13 +104,16 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
// Warm up the LabelsCompressor so benchmark measures steady-state performance.
a.Push(benchSeries, nil)
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []uint32
matchIdxs := make([]uint32, len(benchSeries))
for pb.Next() {
for range loops {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -117,21 +123,17 @@ func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
}
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputsQuoted := make([]string, len(outputs))
var config string
for i := range outputs {
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
cfg := fmt.Sprintf(`
var b strings.Builder
for _, output := range outputs {
fmt.Fprintf(&b, `
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, stringsutil.JSONString(outputs[i]))
config += cfg
`, stringsutil.JSONString(output))
}
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
a, err := LoadFromData([]byte(b.String()), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}