Compare commits

..

7 Commits

Author SHA1 Message Date
Zhu Jiekun
89414062bf bugfix: allow reloading when init with empty remote write relabeling flags (#10213)
### Describe Your Changes

fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10211

This pull request adds `flagSet bool` field to `relabelConfigs` struct.
And use this flagSet value as the result of `isSet()` function.

The reloading should be available when at least one of the command-line
flags `-remoteWrite.relabelConfig` / `-remoteWrite.urlRelabelConfig` is
set.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

---------

Co-authored-by: Hui Wang <haley@victoriametrics.com>
Co-authored-by: Max Kotliar <mkotlyar@victoriametrics.com>
2026-01-05 12:53:52 +02:00
JAYICE
67c51b009d document: guide users to use --data-binary in curl when import multi lines influx data (#10198)
### Describe Your Changes

fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10165.

Refer to [curl docs](https://curl.se/docs/manpage.html#--data).

> When --data is told to read from a file like that, carriage returns,
newlines and null bytes are stripped out

If users import multiple lines of data in file via `/api/v2/write`, he
may follow the example we gave to use `-d` to instruct curl, then
newlines will be stripped out, hence the parse error in VictoriaMetrics.

It's not VictoriaMetrics' bug, but it will be better to guide users to
use `--data-binary` just like how
[/api/v1/import](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1import)
did.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

---------

Co-authored-by: Zhu Jiekun <jiekun@victoriametrics.com>
2026-01-05 10:54:47 +02:00
Artem Fetishev
e8160fc8fb CHANGELOG.md: cut v1.133.0 release
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-01-02 12:17:38 +00:00
Artem Fetishev
e3a4ceaef3 deployment/docker: upgrade base docker image (Alpine) from 3.22.2 to 3.23.2
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-01-02 10:22:41 +00:00
Andrii Chubatiuk
e9cedca8c8 docs: replace old grafana datasource page with links to a new one (#10231)
### Describe Your Changes

fixes https://github.com/VictoriaMetrics/vmdocs/issues/192

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
2026-01-01 18:54:52 +02:00
Andrii Chubatiuk
b720e55c13 vmsingle: properly proxy requests to all supported vmalert paths (#10179)
### Describe Your Changes

modify initial request path before sending request to vmalert with a
proper value
sync vmalert proxy implementation with one in cluster branch
fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10178

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

---------

Co-authored-by: Hui Wang <haley@victoriametrics.com>
2026-01-01 16:28:55 +02:00
Artem Fetishev
ab1429c896 lib/storage: fix tagFiltersCache stats collection (#10230)
Since the cache may be reset too often, using the sizeBytes as an
indicator that this is the first met indexDB to collect tfssCache stats
is unreliable because it often can be zero all indexDB instances. Use
Requests metric instead because it is never reset.

Follow-up for #10204.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2025-12-31 13:35:54 +01:00
25 changed files with 421 additions and 328 deletions

View File

@@ -9,14 +9,14 @@ import (
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/metrics"
"go.yaml.in/yaml/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"go.yaml.in/yaml/v3"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -139,6 +139,7 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
remoteWriteRelabelConfigData.Store(&rawCfg)
rcs.global = global
}
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*relabelConfigPaths), (len(*remoteWriteURLs)))
@@ -176,19 +177,9 @@ type relabelConfigs struct {
perURL []*promrelabel.ParsedConfigs
}
// isSet indicates whether (global or per-URL) command-line flags is set
func (rcs *relabelConfigs) isSet() bool {
if rcs == nil {
return false
}
if rcs.global.Len() > 0 {
return true
}
for _, pc := range rcs.perURL {
if pc.Len() > 0 {
return true
}
}
return false
return *relabelConfigPathGlobal != "" || len(*relabelConfigPaths) > 0
}
// initLabelsGlobal must be called after parsing command-line flags.

View File

@@ -520,7 +520,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
fmt.Fprintf(w, "%s", `{"status":"error","msg":"for accessing vmalert flag '-vmalert.proxyURL' must be configured"}`)
return true
}
proxyVMAlertRequests(w, r)
proxyVMAlertRequests(w, r, path)
return true
}
@@ -558,7 +558,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/rules", "/rules":
rulesRequests.Inc()
if len(*vmalertProxyURL) > 0 {
proxyVMAlertRequests(w, r)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#rules
@@ -568,7 +568,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/alerts", "/alerts":
alertsRequests.Inc()
if len(*vmalertProxyURL) > 0 {
proxyVMAlertRequests(w, r)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
@@ -578,7 +578,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/notifiers", "/notifiers":
notifiersRequests.Inc()
if len(*vmalertProxyURL) > 0 {
proxyVMAlertRequests(w, r)
proxyVMAlertRequests(w, r, path)
return true
}
w.Header().Set("Content-Type", "application/json")
@@ -725,7 +725,7 @@ var (
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
)
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request) {
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request, path string) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
@@ -736,8 +736,10 @@ func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request) {
// Forward other panics to the caller.
panic(err)
}()
r.Host = vmalertProxyHost
vmalertProxy.ServeHTTP(w, r)
req := r.Clone(r.Context())
req.URL.Path = strings.TrimPrefix(path, "prometheus")
req.Host = vmalertProxyHost
vmalertProxy.ServeHTTP(w, req)
}
var (

View File

@@ -3,9 +3,9 @@
DOCKER_REGISTRIES ?= docker.io quay.io
DOCKER_NAMESPACE ?= victoriametrics
ROOT_IMAGE ?= alpine:3.22.2
ROOT_IMAGE ?= alpine:3.23.2
ROOT_IMAGE_SCRATCH ?= scratch
CERTS_IMAGE := alpine:3.22.2
CERTS_IMAGE := alpine:3.23.2
GO_BUILDER_IMAGE := golang:1.25.5

View File

@@ -26,8 +26,16 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix configuration reloading for `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` when vmagent is launched with empty files. Previously, if vmagent started with an empty config, subsequent config reloads were ignored. See [#10211](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10211).
## [v1.133.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.133.0)
Released at 2026-01-02
**Update Note 1:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): Upgrading to per-partition index requires registering all active time series. Expect slow down of data ingestion and queries during upgrade roll-out. This is a one-time operation. Additionally, for users with retention periods shorter than 1 month the disk usage may increase.
* SECURITY: upgrade base docker image (Alpine) from 3.22.2 to 3.23.2. See [Alpine 3.23.2 release notes](https://www.alpinelinux.org/posts/Alpine-3.23.2-released.html).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add support for global `sampleLimit` setting. This allows users to efficiently limit the number of samples accepted per scrape target. This also ensures target-level `sample_limit` can correctly override the global setting. See [#10145](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10145). Thanks to @kobylyanskiy for the contribution.
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): expose `vmauth_user_request_backend_requests_total` and `vmauth_unauthorized_user_request_backend_requests_total` [metrics](https://docs.victoriametrics.com/victoriametrics/vmauth/#monitoring), which track the number of requests sent to backends. These counts may exceed `vmauth_user_requests_total` and `vmauth_unauthorized_user_requests_total` when requests are retried across multiple backends. See [#10171](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10171).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): expose `vmauth_user_request_errors_total` and `vmauth_unauthorized_user_request_errors_total` [metrics](https://docs.victoriametrics.com/victoriametrics/vmauth/#monitoring), which track the number of user request errors. See [#10188](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10188).
@@ -39,7 +47,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix `vmauth_user_request_backend_errors_total` and `vmauth_unauthorized_user_request_backend_errors_total` to only reflect backend request errors. Previously, these counters could be overcounted with user request error. See [#10177](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10177).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): rotate `dateMetricIDCache` instead of resetting it. This should make the eviction less aggressive. Since the cache does not have fixed max size anymore the `-storage.cacheSizeIndexDBDateMetricID` flag has been removed. See [#10064](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10053) and PR [#10169](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10169).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly add metrics metadata scraped with `promscrape.config` and `selfScrapeInterval`. See [#10175](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10175).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix stats collection for `indexdb/tagFiltersToMetricIDs`, `indexdb/metricID`, and `indexdb/date_metricID` caches. As per PR [#10131](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10131), the stats is collected for most utilized instance only, but if the size of all instances is 0 then the stats won't be collected at all. This may result in max cache size alternating between the actual value and 0. See [#10204](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10204).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix stats collection for `indexdb/tagFiltersToMetricIDs`, `indexdb/metricID`, and `indexdb/date_metricID` caches. As per PR [#10131](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10131), the stats is collected for most utilized instance only, but if the size of all instances is 0 then the stats won't be collected at all. This may result in max cache size alternating between the actual value and 0. See [#10204](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10204) and [#10230](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10230).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly proxy requests to [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) when `-vmalert.proxyURL` flag is set. See [#10178](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10178).
## [v1.132.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.132.0)

View File

@@ -51,14 +51,31 @@ Comma-separated list of expected databases can be passed to VictoriaMetrics via
## InfluxDB v2 format
VictoriaMetrics exposes endpoint for InfluxDB v2 HTTP API at `/influx/api/v2/write` and `/api/v2/write`.
Here's an example writing data with `curl`:
```sh
curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://localhost:8428/api/v2/write'
curl --data-binary 'measurement1,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://<victoriametrics-addr>:8428/api/v2/write'
```
And to write multiple lines of data at once, prepare a file (e.g., `influx.data`) with your data:
```text
measurement2,tag1=value1,tag2=value2 field1=456,field2=4.56
measurement3,tag1=value1,tag2=value2 field1=789,field2=7.89
```
And execute this command to import the data:
```sh
curl -X POST 'http://<victoriametrics-addr>:8428/api/v2/write' --data-binary @influx.data
```
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"measurement_field1","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1695902762311]}
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
{"metric":{"__name__":"measurement1_field1","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1766983684142]}
{"metric":{"__name__":"measurement1_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1766983684142]}
{"metric":{"__name__":"measurement2_field1","tag1":"value1","tag2":"value2"},"values":[456],"timestamps":[1767012583021]}
{"metric":{"__name__":"measurement2_field2","tag1":"value1","tag2":"value2"},"values":[4.56],"timestamps":[1767012583021]}
{"metric":{"__name__":"measurement3_field1","tag1":"value1","tag2":"value2"},"values":[789],"timestamps":[1767012583021]}
{"metric":{"__name__":"measurement3_field2","tag1":"value1","tag2":"value2"},"values":[7.89],"timestamps":[1767012583021]}
```
## Data transformations
@@ -92,13 +109,13 @@ foo_field2{tag1="value1", tag2="value2"} 40
Example for writing data with [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
to local VictoriaMetrics using `curl`:
```sh
curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://localhost:8428/write'
curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'http://<victoriametrics-addr>:8428/write'
```
An arbitrary number of lines delimited by '\n' (aka newline char) can be sent in a single request.
After that the data may be read via [/api/v1/export](https://docs.victoriametrics.com/victoriametrics/#how-to-export-data-in-json-line-format) endpoint:
```sh
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
curl -G 'http://<victoriametrics-addr>:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
```
The `/api/v1/export` endpoint should return the following response:

View File

@@ -9,4 +9,10 @@ aliases:
- /grafana-datasource/
- /grafana-datasource.html
---
{{% content "integrations/grafana/datasource/_index.md" %}}
###### VictoriaMetrics datasource
Moved to [victoriametrics/integrations/grafana](https://docs.victoriametrics.com/victoriametrics/integrations/grafana/#victoriametrics-datasource).
###### Prometheus datasource
Moved to [victoriametrics/integrations/grafana](https://docs.victoriametrics.com/victoriametrics/integrations/grafana/#prometheus-datasource).

View File

@@ -2,73 +2,148 @@ package filestream
import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// FileCreatorTask a task for creating the file at the given path and assigning it to *wc.
type FileCreatorTask struct {
path string
// ParallelFileCreator is used for parallel creating of files for the given dstPath.
//
// ParallelFileCreator is needed for speeding up creating many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileCreator struct {
tasks []parallelFileCreatorTask
}
type parallelFileCreatorTask struct {
dstPath string
wc *WriteCloser
nocache bool
}
// NewFileCreatorTask creates new task for creating the file at the given path an assigning it to *wc
func NewFileCreatorTask(path string, wc *WriteCloser, nocache bool) *FileCreatorTask {
return &FileCreatorTask{
path: path,
// Add registers a task for creating the file at dstPath and assigning it to *wc.
//
// Tasks are executed in parallel on Run() call.
func (pfc *ParallelFileCreator) Add(dstPath string, wc *WriteCloser, nocache bool) {
pfc.tasks = append(pfc.tasks, parallelFileCreatorTask{
dstPath: dstPath,
wc: wc,
nocache: nocache,
})
}
// Run runs all the registered tasks for creating files in parallel.
func (pfc *ParallelFileCreator) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfc.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, wc *WriteCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*wc = MustCreate(dstPath, nocache)
}(task.dstPath, task.wc, task.nocache)
}
wg.Wait()
}
// Run executes file creating task
func (t *FileCreatorTask) Run() {
*t.wc = MustCreate(t.path, t.nocache)
// ParallelFileOpener is used for parallel opening of files at the given dstPath.
//
// ParallelFileOpener is needed for speeding up opening many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileOpener struct {
tasks []parallelFileOpenerTask
}
// FileOpenerTask a task for opening the file at the given path and assigning it to *rc.
type FileOpenerTask struct {
type parallelFileOpenerTask struct {
path string
rc *ReadCloser
nocache bool
}
// NewFileOpenerTask creates new task for opening the file at the given path an assigning it to *rc
func NewFileOpenerTask(path string, rc *ReadCloser, nocache bool) *FileOpenerTask {
return &FileOpenerTask{
// Add registers a task for opening the file ath the given path and assigning it to *rc.
//
// Tasks are executed in parallel on Run() call.
func (pfo *ParallelFileOpener) Add(path string, rc *ReadCloser, nocache bool) {
pfo.tasks = append(pfo.tasks, parallelFileOpenerTask{
path: path,
rc: rc,
nocache: nocache,
})
}
// Run runs all the registered tasks for opening files in parallel.
func (pfo *ParallelFileOpener) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfo.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *ReadCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpen(path, nocache)
}(task.path, task.rc, task.nocache)
}
wg.Wait()
}
// Run executes file opening task
func (t *FileOpenerTask) Run() {
*t.rc = MustOpen(t.path, t.nocache)
// ParallelStreamWriter is used for parallel writing of data from io.WriterTo to the given dstPath files.
//
// ParallelStreamWriter is needed for speeding up writing data to many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelStreamWriter struct {
tasks []parallelStreamWriterTask
}
// StreamWriterTask adds a task to execute in parallel - to write the data from src to the path.
type StreamWriterTask struct {
path string
src io.WriterTo
type parallelStreamWriterTask struct {
dstPath string
src io.WriterTo
}
// NewStreamWriterTask creates new task for writing the data from src to the path
func NewStreamWriterTask(path string, src io.WriterTo) *StreamWriterTask {
return &StreamWriterTask{
path: path,
src: src,
// Add adds a task to execute in parallel - to write the data from src to the dstPath.
//
// Tasks are executed in parallel on Run() call.
func (psw *ParallelStreamWriter) Add(dstPath string, src io.WriterTo) {
psw.tasks = append(psw.tasks, parallelStreamWriterTask{
dstPath: dstPath,
src: src,
})
}
// Run executes all the tasks added via Add() call in parallel.
func (psw *ParallelStreamWriter) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range psw.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, src io.WriterTo) {
defer func() {
wg.Done()
<-concurrencyCh
}()
f := MustCreate(dstPath, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
}
f.MustClose()
}(task.dstPath, task.src)
}
}
func (t *StreamWriterTask) Run() {
f := MustCreate(t.path, false)
if _, err := t.src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", t.path, err)
}
f.MustClose()
wg.Wait()
}

View File

@@ -18,8 +18,8 @@ func getDefaultConcurrency() int {
return n
}
// getConcurrencyCh returns a channel for limiting the concurrency of operations with files.
func getConcurrencyCh() chan struct{} {
// GetConcurrencyCh returns a channel for limiting the concurrency of operations with files.
func GetConcurrencyCh() chan struct{} {
concurrencyChOnce.Do(initConcurrencyCh)
return concurrencyCh
}
@@ -30,39 +30,3 @@ func initConcurrencyCh() {
var concurrencyChOnce sync.Once
var concurrencyCh chan struct{}
type parallelTask interface {
Run()
}
// ParallelExecutor is used for parallel files operations
//
// ParallelExecutor is needed for speeding up files operations on high-latency storage systems such as NFS or Ceph.
type ParallelExecutor struct {
tasks []parallelTask
}
// Add registers a task for parallel file operations
//
// Tasks are executed in parallel on Run() call.
func (pe *ParallelExecutor) Add(task parallelTask) {
pe.tasks = append(pe.tasks, task)
}
func (pe *ParallelExecutor) Run() {
var wg sync.WaitGroup
concurrencyCh := getConcurrencyCh()
for _, task := range pe.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(task parallelTask) {
defer func() {
wg.Done()
<-concurrencyCh
}()
task.Run()
}(task)
}
wg.Wait()
}

View File

@@ -1,27 +1,55 @@
package fs
// ReaderAtOpenerTask task to open ReaderAt files in parallel.
type ReaderAtOpenerTask struct {
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
// ParallelReaderAtOpener opens ReaderAt files in parallel.
//
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
// storage systems such as NFS or Ceph.
type ParallelReaderAtOpener struct {
tasks []parallelReaderAtOpenerTask
}
type parallelReaderAtOpenerTask struct {
path string
rc *MustReadAtCloser
fileSize *uint64
}
// NewReaderAtOpenerTask creates new task for writing the data from src to the path
// Add adds a task for opening the file at the given path and storing it to *r, while storing the file size into *fileSize.
//
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
// storage systems such as NFS or Ceph.
func NewReaderAtOpenerTask(path string, rc *MustReadAtCloser, fileSize *uint64) *ReaderAtOpenerTask {
return &ReaderAtOpenerTask{
// Call Run() for running all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Add(path string, rc *MustReadAtCloser, fileSize *uint64) {
pro.tasks = append(pro.tasks, parallelReaderAtOpenerTask{
path: path,
rc: rc,
fileSize: fileSize,
}
})
}
func (t *ReaderAtOpenerTask) Run() {
*t.rc = OpenReaderAt(t.path)
*t.fileSize = MustFileSize(t.path)
// Run executes all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pro.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *MustReadAtCloser, fileSize *uint64) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpenReaderAt(path)
*fileSize = MustFileSize(path)
}(task.path, task.rc, task.fileSize)
}
wg.Wait()
}
// MustCloser must implement MustClose() function.
@@ -29,24 +57,23 @@ type MustCloser interface {
MustClose()
}
// CloserTask task to close all the MustCloser in parallel.
// MustCloseParallel closes all the cs in parallel.
//
// Parallel closing reduces the time needed to flush the data to the underlying files on close
// on high-latency storage systems such as NFS or Ceph.
type CloserTask struct {
c MustCloser
}
// NewCloserTask creates new task for writing the data from src to the path
//
// NewCloserTask speeds up opening multiple MustCloser files on high-latency
// storage systems such as NFS or Ceph.
func NewCloserTask(c MustCloser) *CloserTask {
return &CloserTask{
c: c,
func MustCloseParallel(cs []MustCloser) {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, c := range cs {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(c MustCloser) {
defer func() {
wg.Done()
<-concurrencyCh
}()
c.MustClose()
}(c)
}
}
func (t *CloserTask) Run() {
t.c.MustClose()
wg.Wait()
}

View File

@@ -148,10 +148,10 @@ func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
}
}
// OpenReaderAt opens ReaderAt for reading from the file located at path.
// MustOpenReaderAt opens ReaderAt for reading from the file located at path.
//
// MustClose must be called on the returned ReaderAt when it is no longer needed.
func OpenReaderAt(path string) *ReaderAt {
func MustOpenReaderAt(path string) *ReaderAt {
var r ReaderAt
r.path = path
return &r

View File

@@ -19,7 +19,7 @@ func testReaderAt(t *testing.T, bufSize int) {
data := make([]byte, fileSize)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := OpenReaderAt(path)
r := MustOpenReaderAt(path)
defer r.MustClose()
buf := make([]byte, bufSize)

View File

@@ -26,7 +26,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
data := make([]byte, fileSize)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := OpenReaderAt(path)
r := MustOpenReaderAt(path)
defer r.MustClose()
b.ResetTimer()

View File

@@ -10,7 +10,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -161,17 +160,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
var pfo filestream.ParallelFileOpener
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
pe.Add(filestream.NewFileOpenerTask(itemsPath, &bsr.itemsReader, true))
pe.Add(filestream.NewFileOpenerTask(lensPath, &bsr.lensReader, true))
pfo.Add(indexPath, &bsr.indexReader, true)
pfo.Add(itemsPath, &bsr.itemsReader, true)
pfo.Add(lensPath, &bsr.lensReader, true)
pe.Run()
pfo.Run()
}
// MustClose closes the bsr.
@@ -181,11 +180,12 @@ func (bsr *blockStreamReader) MustClose() {
if !bsr.isInmemoryBlock {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsr.indexReader))
pe.Add(fs.NewCloserTask(bsr.itemsReader))
pe.Add(fs.NewCloserTask(bsr.lensReader))
pe.Run()
cs := []fs.MustCloser{
bsr.indexReader,
bsr.itemsReader,
bsr.lensReader,
}
fs.MustCloseParallel(cs)
}
bsr.reset()
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
type blockStreamWriter struct {
@@ -86,22 +85,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
// Create part files in the directory in parallel in order to speedup the process
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
var pfc filestream.ParallelFileCreator
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(itemsPath, &bsw.itemsWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(lensPath, &bsw.lensWriter, nocache))
pfc.Add(indexPath, &bsw.indexWriter, nocache)
pfc.Add(itemsPath, &bsw.itemsWriter, nocache)
pfc.Add(lensPath, &bsw.lensWriter, nocache)
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
pe.Run()
pfc.Run()
}
// MustClose closes the bsw.
@@ -117,12 +116,13 @@ func (bsw *blockStreamWriter) MustClose() {
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
pe.Add(fs.NewCloserTask(bsw.indexWriter))
pe.Add(fs.NewCloserTask(bsw.itemsWriter))
pe.Add(fs.NewCloserTask(bsw.lensWriter))
pe.Run()
cs := []fs.MustCloser{
bsw.metaindexWriter,
bsw.indexWriter,
bsw.itemsWriter,
bsw.lensWriter,
}
fs.MustCloseParallel(cs)
bsw.reset()
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -43,12 +42,12 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
var pe fsutil.ParallelExecutor
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
pe.Add(filestream.NewStreamWriterTask(itemsPath, &mp.itemsData))
pe.Add(filestream.NewStreamWriterTask(lensPath, &mp.lensData))
pe.Run()
var psw filestream.ParallelStreamWriter
psw.Add(metaindexPath, &mp.metaindexData)
psw.Add(indexPath, &mp.indexData)
psw.Add(itemsPath, &mp.itemsData)
psw.Add(lensPath, &mp.lensData)
psw.Run()
mp.ph.MustWriteMetadata(path)

View File

@@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@@ -97,7 +96,7 @@ func mustOpenFilePart(path string) *part {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
var pro fs.ParallelReaderAtOpener
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
@@ -105,17 +104,17 @@ func mustOpenFilePart(path string) *part {
var indexFile fs.MustReadAtCloser
var indexSize uint64
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
pro.Add(indexPath, &indexFile, &indexSize)
var itemsFile fs.MustReadAtCloser
var itemsSize uint64
pe.Add(fs.NewReaderAtOpenerTask(itemsPath, &itemsFile, &itemsSize))
pro.Add(itemsPath, &itemsFile, &itemsSize)
var lensFile fs.MustReadAtCloser
var lensSize uint64
pe.Add(fs.NewReaderAtOpenerTask(lensPath, &lensFile, &lensSize))
pro.Add(lensPath, &lensFile, &lensSize)
pe.Run()
pro.Run()
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
@@ -144,11 +143,12 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(p.indexFile))
pe.Add(fs.NewCloserTask(p.itemsFile))
pe.Add(fs.NewCloserTask(p.lensFile))
pe.Run()
cs := []fs.MustCloser{
p.indexFile,
p.itemsFile,
p.lensFile,
}
fs.MustCloseParallel(cs)
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)

View File

@@ -10,7 +10,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -147,17 +146,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open part files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
var pfo filestream.ParallelFileOpener
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &bsr.timestampsReader, true))
pe.Add(filestream.NewFileOpenerTask(valuesPath, &bsr.valuesReader, true))
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
pfo.Add(timestampsPath, &bsr.timestampsReader, true)
pfo.Add(valuesPath, &bsr.valuesReader, true)
pfo.Add(indexPath, &bsr.indexReader, true)
pe.Run()
pfo.Run()
}
// MustClose closes the bsr.
@@ -166,11 +165,12 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
func (bsr *blockStreamReader) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsr.timestampsReader))
pe.Add(fs.NewCloserTask(bsr.valuesReader))
pe.Add(fs.NewCloserTask(bsr.indexReader))
pe.Run()
cs := []fs.MustCloser{
bsr.timestampsReader,
bsr.valuesReader,
bsr.indexReader,
}
fs.MustCloseParallel(cs)
bsr.reset()
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -93,22 +92,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
// Create part files in the directory in parallel in order to reduce the duration
// of the operation on high-latency storage systems such as NFS and Ceph.
var pe fsutil.ParallelExecutor
var pfc filestream.ParallelFileCreator
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &bsw.timestampsWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(valuesPath, &bsw.valuesWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
pfc.Add(timestampsPath, &bsw.timestampsWriter, nocache)
pfc.Add(valuesPath, &bsw.valuesWriter, nocache)
pfc.Add(indexPath, &bsw.indexWriter, nocache)
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
pe.Run()
pfc.Run()
}
// MustClose closes the bsw.
@@ -124,12 +123,13 @@ func (bsw *blockStreamWriter) MustClose() {
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsw.timestampsWriter))
pe.Add(fs.NewCloserTask(bsw.valuesWriter))
pe.Add(fs.NewCloserTask(bsw.indexWriter))
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
pe.Run()
cs := []fs.MustCloser{
bsw.timestampsWriter,
bsw.valuesWriter,
bsw.indexWriter,
bsw.metaindexWriter,
}
fs.MustCloseParallel(cs)
bsw.reset()
}

View File

@@ -245,9 +245,13 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.CompositeFilterSuccessConversions = compositeFilterSuccessConversions.Load()
m.CompositeFilterMissingConversions = compositeFilterMissingConversions.Load()
// Report only once and for an indexDB instance whose tagFiltersCache is
// utilized the most.
if m.TagFiltersToMetricIDsCacheSizeBytes == 0 || db.tagFiltersToMetricIDsCache.SizeBytes() > m.TagFiltersToMetricIDsCacheSizeBytes {
// Report only once and either for the first met indexDB instance or whose
// tagFiltersCache is utilized the most.
//
// In case of tagFiltersCache, use TagFiltersToMetricIDsCacheRequests as an
// indicator that this is the first indexDB instance whose metrics are being
// collected because this cache may be reset too often.
if m.TagFiltersToMetricIDsCacheRequests == 0 || db.tagFiltersToMetricIDsCache.SizeBytes() > m.TagFiltersToMetricIDsCacheSizeBytes {
m.TagFiltersToMetricIDsCacheSize = uint64(db.tagFiltersToMetricIDsCache.Len())
m.TagFiltersToMetricIDsCacheSizeBytes = db.tagFiltersToMetricIDsCache.SizeBytes()
m.TagFiltersToMetricIDsCacheSizeMaxBytes = db.tagFiltersToMetricIDsCache.SizeMaxBytes()
@@ -256,8 +260,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.TagFiltersToMetricIDsCacheResets = db.tagFiltersToMetricIDsCache.Resets()
}
// Report only once and for an indexDB instance whose metricIDCache is
// utilized the most.
// Report only once and for either the first met indexDB instance or whose
// metricIDCache is utilized the most.
mcs := db.metricIDCache.Stats()
if m.MetricIDCacheSizeBytes == 0 || mcs.SizeBytes > m.MetricIDCacheSizeBytes {
m.MetricIDCacheSize = mcs.Size
@@ -266,8 +270,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.MetricIDCacheRotationsCount = mcs.RotationsCount
}
// Report only once and for an indexDB instance whose dateMetricIDCache is
// utilized the most.
// Report only once and for either the first met indexDB instance or whose
// dateMetricIDCache is utilized the most.
dmcs := db.dateMetricIDCache.Stats()
if m.DateMetricIDCacheSizeBytes == 0 || dmcs.SizeBytes > m.DateMetricIDCacheSizeBytes {
m.DateMetricIDCacheSize = dmcs.Size

View File

@@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -45,13 +44,12 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
indexPath := filepath.Join(path, indexFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
var pe fsutil.ParallelExecutor
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestampsData))
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.valuesData))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
pe.Run()
var psw filestream.ParallelStreamWriter
psw.Add(timestampsPath, &mp.timestampsData)
psw.Add(valuesPath, &mp.valuesData)
psw.Add(indexPath, &mp.indexData)
psw.Add(metaindexPath, &mp.metaindexData)
psw.Run()
mp.ph.MustWriteMetadata(path)

View File

@@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@@ -60,7 +59,7 @@ func mustOpenFilePart(path string) *part {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS and Ceph.
var pe fsutil.ParallelExecutor
var pro fs.ParallelReaderAtOpener
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
@@ -68,17 +67,17 @@ func mustOpenFilePart(path string) *part {
var timestampsFile fs.MustReadAtCloser
var timestampsSize uint64
pe.Add(fs.NewReaderAtOpenerTask(timestampsPath, &timestampsFile, &timestampsSize))
pro.Add(timestampsPath, &timestampsFile, &timestampsSize)
var valuesFile fs.MustReadAtCloser
var valuesSize uint64
pe.Add(fs.NewReaderAtOpenerTask(valuesPath, &valuesFile, &valuesSize))
pro.Add(valuesPath, &valuesFile, &valuesSize)
var indexFile fs.MustReadAtCloser
var indexSize uint64
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
pro.Add(indexPath, &indexFile, &indexSize)
pe.Run()
pro.Run()
size := timestampsSize + valuesSize + indexSize + metaindexSize
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
@@ -119,11 +118,12 @@ func (p *part) String() string {
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(p.timestampsFile))
pe.Add(fs.NewCloserTask(p.valuesFile))
pe.Add(fs.NewCloserTask(p.indexFile))
pe.Run()
cs := []fs.MustCloser{
p.timestampsFile,
p.valuesFile,
p.indexFile,
}
fs.MustCloseParallel(cs)
ibCache.RemoveBlocksForPart(p)
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
@@ -96,9 +95,10 @@ func (r *bloomValuesReader) totalBytesRead() uint64 {
return r.bloom.bytesRead + r.values.bytesRead
}
func (r *bloomValuesReader) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(&r.bloom))
pe.Add(fs.NewCloserTask(&r.values))
func (r *bloomValuesReader) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &r.bloom)
dst = append(dst, &r.values)
return dst
}
type bloomValuesStreamReader struct {
@@ -181,22 +181,23 @@ func (sr *streamReaders) totalBytesRead() uint64 {
func (sr *streamReaders) MustClose() {
// Close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sr.columnNamesReader))
pe.Add(fs.NewCloserTask(&sr.columnIdxsReader))
pe.Add(fs.NewCloserTask(&sr.metaindexReader))
pe.Add(fs.NewCloserTask(&sr.indexReader))
pe.Add(fs.NewCloserTask(&sr.columnsHeaderIndexReader))
pe.Add(fs.NewCloserTask(&sr.columnsHeaderReader))
pe.Add(fs.NewCloserTask(&sr.timestampsReader))
sr.messageBloomValuesReader.appendCloserTasks(&pe)
sr.oldBloomValuesReader.appendCloserTasks(&pe)
for i := range sr.bloomValuesShards {
sr.bloomValuesShards[i].appendCloserTasks(&pe)
cs := []fs.MustCloser{
&sr.columnNamesReader,
&sr.columnIdxsReader,
&sr.metaindexReader,
&sr.indexReader,
&sr.columnsHeaderIndexReader,
&sr.columnsHeaderReader,
&sr.timestampsReader,
}
pe.Run()
cs = sr.messageBloomValuesReader.appendClosers(cs)
cs = sr.oldBloomValuesReader.appendClosers(cs)
for i := range sr.bloomValuesShards {
cs = sr.bloomValuesShards[i].appendClosers(cs)
}
fs.MustCloseParallel(cs)
}
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {
@@ -354,63 +355,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open data readers in parallel in order to reduce the time for this operation
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
var pfo filestream.ParallelFileOpener
var columnNamesReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pe.Add(filestream.NewFileOpenerTask(columnNamesPath, &columnNamesReader, nocache))
pfo.Add(columnNamesPath, &columnNamesReader, nocache)
}
var columnIdxsReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 3 {
pe.Add(filestream.NewFileOpenerTask(columnIdxsPath, &columnIdxsReader, nocache))
pfo.Add(columnIdxsPath, &columnIdxsReader, nocache)
}
var metaindexReader filestream.ReadCloser
pe.Add(filestream.NewFileOpenerTask(metaindexPath, &metaindexReader, nocache))
pfo.Add(metaindexPath, &metaindexReader, nocache)
var indexReader filestream.ReadCloser
pe.Add(filestream.NewFileOpenerTask(indexPath, &indexReader, nocache))
pfo.Add(indexPath, &indexReader, nocache)
var columnsHeaderIndexReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pe.Add(filestream.NewFileOpenerTask(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache))
pfo.Add(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache)
}
var columnsHeaderReader filestream.ReadCloser
pe.Add(filestream.NewFileOpenerTask(columnsHeaderPath, &columnsHeaderReader, nocache))
pfo.Add(columnsHeaderPath, &columnsHeaderReader, nocache)
var timestampsReader filestream.ReadCloser
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &timestampsReader, nocache))
pfo.Add(timestampsPath, &timestampsReader, nocache)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesReader bloomValuesStreamReader
pe.Add(filestream.NewFileOpenerTask(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache))
pe.Add(filestream.NewFileOpenerTask(messageValuesPath, &messageBloomValuesReader.values, nocache))
pfo.Add(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache)
pfo.Add(messageValuesPath, &messageBloomValuesReader.values, nocache)
var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards []bloomValuesStreamReader
if bsr.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
pe.Add(filestream.NewFileOpenerTask(bloomPath, &oldBloomValuesReader.bloom, nocache))
pfo.Add(bloomPath, &oldBloomValuesReader.bloom, nocache)
valuesPath := filepath.Join(path, oldValuesFilename)
pe.Add(filestream.NewFileOpenerTask(valuesPath, &oldBloomValuesReader.values, nocache))
pfo.Add(valuesPath, &oldBloomValuesReader.values, nocache)
} else {
bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount)
for i := range bloomValuesShards {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
pe.Add(filestream.NewFileOpenerTask(bloomPath, &shard.bloom, nocache))
pfo.Add(bloomPath, &shard.bloom, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
pe.Add(filestream.NewFileOpenerTask(valuesPath, &shard.values, nocache))
pfo.Add(valuesPath, &shard.values, nocache)
}
}
pe.Run()
pfo.Run()
// Initialize streamReaders
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,

View File

@@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
@@ -85,9 +84,10 @@ func (w *bloomValuesWriter) totalBytesWritten() uint64 {
return w.bloom.bytesWritten + w.values.bytesWritten
}
func (w *bloomValuesWriter) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(&w.bloom))
pe.Add(fs.NewCloserTask(&w.values))
func (w *bloomValuesWriter) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &w.bloom)
dst = append(dst, &w.values)
return dst
}
type bloomValuesStreamWriter struct {
@@ -158,21 +158,22 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
func (sw *streamWriters) MustClose() {
// Flush and close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sw.columnNamesWriter))
pe.Add(fs.NewCloserTask(&sw.columnIdxsWriter))
pe.Add(fs.NewCloserTask(&sw.metaindexWriter))
pe.Add(fs.NewCloserTask(&sw.indexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderIndexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderWriter))
pe.Add(fs.NewCloserTask(&sw.timestampsWriter))
sw.messageBloomValuesWriter.appendCloserTasks(&pe)
for i := range sw.bloomValuesShards {
sw.bloomValuesShards[i].appendCloserTasks(&pe)
cs := []fs.MustCloser{
&sw.columnNamesWriter,
&sw.columnIdxsWriter,
&sw.metaindexWriter,
&sw.indexWriter,
&sw.columnsHeaderIndexWriter,
&sw.columnsHeaderWriter,
&sw.timestampsWriter,
}
pe.Run()
cs = sw.messageBloomValuesWriter.appendClosers(cs)
for i := range sw.bloomValuesShards {
cs = sw.bloomValuesShards[i].appendClosers(cs)
}
fs.MustCloseParallel(cs)
}
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
@@ -311,39 +312,39 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
var pe fsutil.ParallelExecutor
var pfc filestream.ParallelFileCreator
// Always cache columnNames file, since it is re-read immediately after part creation
var columnNamesWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(columnNamesPath, &columnNamesWriter, false))
pfc.Add(columnNamesPath, &columnNamesWriter, false)
// Always cache columnIdxs file, since it is re-read immediately after part creation
var columnIdxsWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(columnIdxsPath, &columnIdxsWriter, false))
pfc.Add(columnIdxsPath, &columnIdxsWriter, false)
// Always cache metaindex file, since it is re-read immediately after part creation
var metaindexWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &metaindexWriter, false))
pfc.Add(metaindexPath, &metaindexWriter, false)
var indexWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(indexPath, &indexWriter, nocache))
pfc.Add(indexPath, &indexWriter, nocache)
var columnsHeaderIndexWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache))
pfc.Add(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache)
var columnsHeaderWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(columnsHeaderPath, &columnsHeaderWriter, nocache))
pfc.Add(columnsHeaderPath, &columnsHeaderWriter, nocache)
var timestampsWriter filestream.WriteCloser
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &timestampsWriter, nocache))
pfc.Add(timestampsPath, &timestampsWriter, nocache)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesWriter bloomValuesStreamWriter
pe.Add(filestream.NewFileCreatorTask(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache))
pe.Add(filestream.NewFileCreatorTask(messageValuesPath, &messageBloomValuesWriter.values, nocache))
pfc.Add(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache)
pfc.Add(messageValuesPath, &messageBloomValuesWriter.values, nocache)
pe.Run()
pfc.Run()
createBloomValuesWriter := func(shardIdx uint64) bloomValuesStreamWriter {
bloomPath := getBloomFilePath(path, shardIdx)

View File

@@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
// inmemoryPart is an in-memory part.
@@ -121,26 +120,26 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
var pe fsutil.ParallelExecutor
var psw filestream.ParallelStreamWriter
pe.Add(filestream.NewStreamWriterTask(columnNamesPath, &mp.columnNames))
pe.Add(filestream.NewStreamWriterTask(columnIdxsPath, &mp.columnIdxs))
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindex))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.index))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderIndexPath, &mp.columnsHeaderIndex))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderPath, &mp.columnsHeader))
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestamps))
psw.Add(columnNamesPath, &mp.columnNames)
psw.Add(columnIdxsPath, &mp.columnIdxs)
psw.Add(metaindexPath, &mp.metaindex)
psw.Add(indexPath, &mp.index)
psw.Add(columnsHeaderIndexPath, &mp.columnsHeaderIndex)
psw.Add(columnsHeaderPath, &mp.columnsHeader)
psw.Add(timestampsPath, &mp.timestamps)
pe.Add(filestream.NewStreamWriterTask(messageBloomFilterPath, &mp.messageBloomValues.bloom))
pe.Add(filestream.NewStreamWriterTask(messageValuesPath, &mp.messageBloomValues.values))
psw.Add(messageBloomFilterPath, &mp.messageBloomValues.bloom)
psw.Add(messageValuesPath, &mp.messageBloomValues.values)
bloomPath := getBloomFilePath(path, 0)
pe.Add(filestream.NewStreamWriterTask(bloomPath, &mp.fieldBloomValues.bloom))
psw.Add(bloomPath, &mp.fieldBloomValues.bloom)
valuesPath := getValuesFilePath(path, 0)
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.fieldBloomValues.values))
psw.Add(valuesPath, &mp.fieldBloomValues.values)
pe.Run()
psw.Run()
mp.ph.mustWriteMetadata(path)

View File

@@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -55,9 +54,10 @@ type bloomValuesReaderAt struct {
values fs.MustReadAtCloser
}
func (r *bloomValuesReaderAt) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(r.bloom))
pe.Add(fs.NewCloserTask(r.values))
func (r *bloomValuesReaderAt) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, r.bloom)
dst = append(dst, r.values)
return dst
}
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
@@ -137,36 +137,36 @@ func mustOpenFilePart(pt *partition, path string) *part {
mrs.MustClose()
// Open data files
p.indexFile = fs.OpenReaderAt(indexPath)
p.indexFile = fs.MustOpenReaderAt(indexPath)
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile = fs.OpenReaderAt(columnsHeaderIndexPath)
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
}
p.columnsHeaderFile = fs.OpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.OpenReaderAt(timestampsPath)
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
// Open files with bloom filters and column values
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
p.messageBloomValues.bloom = fs.OpenReaderAt(messageBloomFilterPath)
p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
messageValuesPath := filepath.Join(path, messageValuesFilename)
p.messageBloomValues.values = fs.OpenReaderAt(messageValuesPath)
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
if p.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
p.oldBloomValues.bloom = fs.OpenReaderAt(bloomPath)
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := filepath.Join(path, oldValuesFilename)
p.oldBloomValues.values = fs.OpenReaderAt(valuesPath)
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
} else {
p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount)
for i := range p.bloomValuesShards {
shard := &p.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = fs.OpenReaderAt(bloomPath)
shard.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = fs.OpenReaderAt(valuesPath)
shard.values = fs.MustOpenReaderAt(valuesPath)
}
}
@@ -176,25 +176,25 @@ func mustOpenFilePart(pt *partition, path string) *part {
func mustClosePart(p *part) {
// Close files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS and Ceph.
var pe fsutil.ParallelExecutor
var cs []fs.MustCloser
pe.Add(fs.NewCloserTask(p.indexFile))
cs = append(cs, p.indexFile)
if p.ph.FormatVersion >= 1 {
pe.Add(fs.NewCloserTask(p.columnsHeaderIndexFile))
cs = append(cs, p.columnsHeaderIndexFile)
}
pe.Add(fs.NewCloserTask(p.columnsHeaderFile))
pe.Add(fs.NewCloserTask(p.timestampsFile))
p.messageBloomValues.appendCloserTasks(&pe)
cs = append(cs, p.columnsHeaderFile)
cs = append(cs, p.timestampsFile)
cs = p.messageBloomValues.appendClosers(cs)
if p.ph.FormatVersion < 1 {
p.oldBloomValues.appendCloserTasks(&pe)
cs = p.oldBloomValues.appendClosers(cs)
} else {
for i := range p.bloomValuesShards {
p.bloomValuesShards[i].appendCloserTasks(&pe)
cs = p.bloomValuesShards[i].appendClosers(cs)
}
}
pe.Run()
fs.MustCloseParallel(cs)
p.pt = nil
}