lib/{fs,filestream}: use single ParallelExecutor for fs and filestream tasks

This commit is contained in:
Andrii Chubatiuk
2025-11-05 15:38:08 +02:00
parent 74b03c93a6
commit 305f1c91f8
18 changed files with 287 additions and 351 deletions

View File

@@ -2,148 +2,73 @@ package filestream
import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// ParallelFileCreator is used for parallel creating of files for the given dstPath.
//
// ParallelFileCreator is needed for speeding up creating many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileCreator struct {
tasks []parallelFileCreatorTask
}
type parallelFileCreatorTask struct {
dstPath string
// FileCreatorTask a task for creating the file at the given path and assigning it to *wc.
type FileCreatorTask struct {
path string
wc *WriteCloser
nocache bool
}
// Add registers a task for creating the file at dstPath and assigning it to *wc.
//
// Tasks are executed in parallel on Run() call.
func (pfc *ParallelFileCreator) Add(dstPath string, wc *WriteCloser, nocache bool) {
pfc.tasks = append(pfc.tasks, parallelFileCreatorTask{
dstPath: dstPath,
// NewFileCreatorTask creates new task for creating the file at the given path an assigning it to *wc
func NewFileCreatorTask(path string, wc *WriteCloser, nocache bool) *FileCreatorTask {
return &FileCreatorTask{
path: path,
wc: wc,
nocache: nocache,
})
}
// Run runs all the registered tasks for creating files in parallel.
func (pfc *ParallelFileCreator) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfc.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, wc *WriteCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*wc = MustCreate(dstPath, nocache)
}(task.dstPath, task.wc, task.nocache)
}
wg.Wait()
}
// ParallelFileOpener is used for parallel opening of files at the given dstPath.
//
// ParallelFileOpener is needed for speeding up opening many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileOpener struct {
tasks []parallelFileOpenerTask
// Run executes file creating task
func (t *FileCreatorTask) Run() {
*t.wc = MustCreate(t.path, t.nocache)
}
type parallelFileOpenerTask struct {
// FileOpenerTask a task for opening the file at the given path and assigning it to *rc.
type FileOpenerTask struct {
path string
rc *ReadCloser
nocache bool
}
// Add registers a task for opening the file ath the given path and assigning it to *rc.
//
// Tasks are executed in parallel on Run() call.
func (pfo *ParallelFileOpener) Add(path string, rc *ReadCloser, nocache bool) {
pfo.tasks = append(pfo.tasks, parallelFileOpenerTask{
// NewFileOpenerTask creates new task for opening the file at the given path an assigning it to *rc
func NewFileOpenerTask(path string, rc *ReadCloser, nocache bool) *FileOpenerTask {
return &FileOpenerTask{
path: path,
rc: rc,
nocache: nocache,
})
}
// Run runs all the registered tasks for opening files in parallel.
func (pfo *ParallelFileOpener) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfo.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *ReadCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpen(path, nocache)
}(task.path, task.rc, task.nocache)
}
wg.Wait()
}
// ParallelStreamWriter is used for parallel writing of data from io.WriterTo to the given dstPath files.
//
// ParallelStreamWriter is needed for speeding up writing data to many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelStreamWriter struct {
tasks []parallelStreamWriterTask
// Run executes file opening task
func (t *FileOpenerTask) Run() {
*t.rc = MustOpen(t.path, t.nocache)
}
type parallelStreamWriterTask struct {
dstPath string
src io.WriterTo
// StreamWriterTask adds a task to execute in parallel - to write the data from src to the path.
type StreamWriterTask struct {
path string
src io.WriterTo
}
// Add adds a task to execute in parallel - to write the data from src to the dstPath.
//
// Tasks are executed in parallel on Run() call.
func (psw *ParallelStreamWriter) Add(dstPath string, src io.WriterTo) {
psw.tasks = append(psw.tasks, parallelStreamWriterTask{
dstPath: dstPath,
src: src,
})
}
// Run executes all the tasks added via Add() call in parallel.
func (psw *ParallelStreamWriter) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range psw.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, src io.WriterTo) {
defer func() {
wg.Done()
<-concurrencyCh
}()
f := MustCreate(dstPath, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
}
f.MustClose()
}(task.dstPath, task.src)
// NewStreamWriterTask creates new task for writing the data from src to the path
func NewStreamWriterTask(path string, src io.WriterTo) *StreamWriterTask {
return &StreamWriterTask{
path: path,
src: src,
}
wg.Wait()
}
func (t *StreamWriterTask) Run() {
f := MustCreate(t.path, false)
if _, err := t.src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", t.path, err)
}
f.MustClose()
}

View File

@@ -18,8 +18,8 @@ func getDefaultConcurrency() int {
return n
}
// GetConcurrencyCh returns a channel for limiting the concurrency of operations with files.
func GetConcurrencyCh() chan struct{} {
// getConcurrencyCh returns a channel for limiting the concurrency of operations with files.
func getConcurrencyCh() chan struct{} {
concurrencyChOnce.Do(initConcurrencyCh)
return concurrencyCh
}
@@ -30,3 +30,39 @@ func initConcurrencyCh() {
var concurrencyChOnce sync.Once
var concurrencyCh chan struct{}
type parallelTask interface {
Run()
}
// ParallelExecutor is used for parallel files operations
//
// ParallelExecutor is needed for speeding up files operations on high-latency storage systems such as NFS or Ceph.
type ParallelExecutor struct {
tasks []parallelTask
}
// Add registers a task for parallel file operations
//
// Tasks are executed in parallel on Run() call.
func (pe *ParallelExecutor) Add(task parallelTask) {
pe.tasks = append(pe.tasks, task)
}
func (pe *ParallelExecutor) Run() {
var wg sync.WaitGroup
concurrencyCh := getConcurrencyCh()
for _, task := range pe.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(task parallelTask) {
defer func() {
wg.Done()
<-concurrencyCh
}()
task.Run()
}(task)
}
wg.Wait()
}

View File

@@ -1,55 +1,27 @@
package fs
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
// ParallelReaderAtOpener opens ReaderAt files in parallel.
//
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
// storage systems such as NFS or Ceph.
type ParallelReaderAtOpener struct {
tasks []parallelReaderAtOpenerTask
}
type parallelReaderAtOpenerTask struct {
// ReaderAtOpenerTask task to open ReaderAt files in parallel.
type ReaderAtOpenerTask struct {
path string
rc *MustReadAtCloser
fileSize *uint64
}
// Add adds a task for opening the file at the given path and storing it to *r, while storing the file size into *fileSize.
// NewReaderAtOpenerTask creates new task for writing the data from src to the path
//
// Call Run() for running all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Add(path string, rc *MustReadAtCloser, fileSize *uint64) {
pro.tasks = append(pro.tasks, parallelReaderAtOpenerTask{
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
// storage systems such as NFS or Ceph.
func NewReaderAtOpenerTask(path string, rc *MustReadAtCloser, fileSize *uint64) *ReaderAtOpenerTask {
return &ReaderAtOpenerTask{
path: path,
rc: rc,
fileSize: fileSize,
})
}
}
// Run executes all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Run() {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pro.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *MustReadAtCloser, fileSize *uint64) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpenReaderAt(path)
*fileSize = MustFileSize(path)
}(task.path, task.rc, task.fileSize)
}
wg.Wait()
func (t *ReaderAtOpenerTask) Run() {
*t.rc = OpenReaderAt(t.path)
*t.fileSize = MustFileSize(t.path)
}
// MustCloser must implement MustClose() function.
@@ -57,23 +29,24 @@ type MustCloser interface {
MustClose()
}
// MustCloseParallel closes all the cs in parallel.
// CloserTask task to close all the MustCloser in parallel.
//
// Parallel closing reduces the time needed to flush the data to the underlying files on close
// on high-latency storage systems such as NFS or Ceph.
func MustCloseParallel(cs []MustCloser) {
var wg sync.WaitGroup
concurrencyCh := fsutil.GetConcurrencyCh()
for _, c := range cs {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(c MustCloser) {
defer func() {
wg.Done()
<-concurrencyCh
}()
c.MustClose()
}(c)
}
wg.Wait()
type CloserTask struct {
c MustCloser
}
// NewCloserTask creates new task for writing the data from src to the path
//
// NewCloserTask speeds up opening multiple MustCloser files on high-latency
// storage systems such as NFS or Ceph.
func NewCloserTask(c MustCloser) *CloserTask {
return &CloserTask{
c: c,
}
}
func (t *CloserTask) Run() {
t.c.MustClose()
}

View File

@@ -148,10 +148,10 @@ func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
}
}
// MustOpenReaderAt opens ReaderAt for reading from the file located at path.
// OpenReaderAt opens ReaderAt for reading from the file located at path.
//
// MustClose must be called on the returned ReaderAt when it is no longer needed.
func MustOpenReaderAt(path string) *ReaderAt {
func OpenReaderAt(path string) *ReaderAt {
var r ReaderAt
r.path = path
return &r

View File

@@ -19,7 +19,7 @@ func testReaderAt(t *testing.T, bufSize int) {
data := make([]byte, fileSize)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := MustOpenReaderAt(path)
r := OpenReaderAt(path)
defer r.MustClose()
buf := make([]byte, bufSize)

View File

@@ -26,7 +26,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
data := make([]byte, fileSize)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := MustOpenReaderAt(path)
r := OpenReaderAt(path)
defer r.MustClose()
b.ResetTimer()

View File

@@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -160,17 +161,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pfo filestream.ParallelFileOpener
var pe fsutil.ParallelExecutor
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
pfo.Add(indexPath, &bsr.indexReader, true)
pfo.Add(itemsPath, &bsr.itemsReader, true)
pfo.Add(lensPath, &bsr.lensReader, true)
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
pe.Add(filestream.NewFileOpenerTask(itemsPath, &bsr.itemsReader, true))
pe.Add(filestream.NewFileOpenerTask(lensPath, &bsr.lensReader, true))
pfo.Run()
pe.Run()
}
// MustClose closes the bsr.
@@ -180,12 +181,11 @@ func (bsr *blockStreamReader) MustClose() {
if !bsr.isInmemoryBlock {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
cs := []fs.MustCloser{
bsr.indexReader,
bsr.itemsReader,
bsr.lensReader,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsr.indexReader))
pe.Add(fs.NewCloserTask(bsr.itemsReader))
pe.Add(fs.NewCloserTask(bsr.lensReader))
pe.Run()
}
bsr.reset()
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
type blockStreamWriter struct {
@@ -85,22 +86,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
// Create part files in the directory in parallel in order to speedup the process
// on high-latency storage systems such as NFS or Ceph.
var pfc filestream.ParallelFileCreator
var pe fsutil.ParallelExecutor
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
pfc.Add(indexPath, &bsw.indexWriter, nocache)
pfc.Add(itemsPath, &bsw.itemsWriter, nocache)
pfc.Add(lensPath, &bsw.lensWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(itemsPath, &bsw.itemsWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(lensPath, &bsw.lensWriter, nocache))
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
pfc.Run()
pe.Run()
}
// MustClose closes the bsw.
@@ -116,13 +117,12 @@ func (bsw *blockStreamWriter) MustClose() {
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
bsw.metaindexWriter,
bsw.indexWriter,
bsw.itemsWriter,
bsw.lensWriter,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
pe.Add(fs.NewCloserTask(bsw.indexWriter))
pe.Add(fs.NewCloserTask(bsw.itemsWriter))
pe.Add(fs.NewCloserTask(bsw.lensWriter))
pe.Run()
bsw.reset()
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -42,12 +43,12 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
var psw filestream.ParallelStreamWriter
psw.Add(metaindexPath, &mp.metaindexData)
psw.Add(indexPath, &mp.indexData)
psw.Add(itemsPath, &mp.itemsData)
psw.Add(lensPath, &mp.lensData)
psw.Run()
var pe fsutil.ParallelExecutor
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
pe.Add(filestream.NewStreamWriterTask(itemsPath, &mp.itemsData))
pe.Add(filestream.NewStreamWriterTask(lensPath, &mp.lensData))
pe.Run()
mp.ph.MustWriteMetadata(path)

View File

@@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@@ -96,7 +97,7 @@ func mustOpenFilePart(path string) *part {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pro fs.ParallelReaderAtOpener
var pe fsutil.ParallelExecutor
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
@@ -104,17 +105,17 @@ func mustOpenFilePart(path string) *part {
var indexFile fs.MustReadAtCloser
var indexSize uint64
pro.Add(indexPath, &indexFile, &indexSize)
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
var itemsFile fs.MustReadAtCloser
var itemsSize uint64
pro.Add(itemsPath, &itemsFile, &itemsSize)
pe.Add(fs.NewReaderAtOpenerTask(itemsPath, &itemsFile, &itemsSize))
var lensFile fs.MustReadAtCloser
var lensSize uint64
pro.Add(lensPath, &lensFile, &lensSize)
pe.Add(fs.NewReaderAtOpenerTask(lensPath, &lensFile, &lensSize))
pro.Run()
pe.Run()
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
@@ -143,12 +144,11 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
cs := []fs.MustCloser{
p.indexFile,
p.itemsFile,
p.lensFile,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(p.indexFile))
pe.Add(fs.NewCloserTask(p.itemsFile))
pe.Add(fs.NewCloserTask(p.lensFile))
pe.Run()
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)

View File

@@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -146,17 +147,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open part files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS or Ceph.
var pfo filestream.ParallelFileOpener
var pe fsutil.ParallelExecutor
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
pfo.Add(timestampsPath, &bsr.timestampsReader, true)
pfo.Add(valuesPath, &bsr.valuesReader, true)
pfo.Add(indexPath, &bsr.indexReader, true)
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &bsr.timestampsReader, true))
pe.Add(filestream.NewFileOpenerTask(valuesPath, &bsr.valuesReader, true))
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
pfo.Run()
pe.Run()
}
// MustClose closes the bsr.
@@ -165,12 +166,11 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
func (bsr *blockStreamReader) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
cs := []fs.MustCloser{
bsr.timestampsReader,
bsr.valuesReader,
bsr.indexReader,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsr.timestampsReader))
pe.Add(fs.NewCloserTask(bsr.valuesReader))
pe.Add(fs.NewCloserTask(bsr.indexReader))
pe.Run()
bsr.reset()
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -92,22 +93,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
// Create part files in the directory in parallel in order to reduce the duration
// of the operation on high-latency storage systems such as NFS and Ceph.
var pfc filestream.ParallelFileCreator
var pe fsutil.ParallelExecutor
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
pfc.Add(timestampsPath, &bsw.timestampsWriter, nocache)
pfc.Add(valuesPath, &bsw.valuesWriter, nocache)
pfc.Add(indexPath, &bsw.indexWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &bsw.timestampsWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(valuesPath, &bsw.valuesWriter, nocache))
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
pfc.Run()
pe.Run()
}
// MustClose closes the bsw.
@@ -123,13 +124,12 @@ func (bsw *blockStreamWriter) MustClose() {
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
bsw.timestampsWriter,
bsw.valuesWriter,
bsw.indexWriter,
bsw.metaindexWriter,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(bsw.timestampsWriter))
pe.Add(fs.NewCloserTask(bsw.valuesWriter))
pe.Add(fs.NewCloserTask(bsw.indexWriter))
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
pe.Run()
bsw.reset()
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -44,12 +45,13 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
indexPath := filepath.Join(path, indexFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
var psw filestream.ParallelStreamWriter
psw.Add(timestampsPath, &mp.timestampsData)
psw.Add(valuesPath, &mp.valuesData)
psw.Add(indexPath, &mp.indexData)
psw.Add(metaindexPath, &mp.metaindexData)
psw.Run()
var pe fsutil.ParallelExecutor
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestampsData))
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.valuesData))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
pe.Run()
mp.ph.MustWriteMetadata(path)

View File

@@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@@ -59,7 +60,7 @@ func mustOpenFilePart(path string) *part {
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS and Ceph.
var pro fs.ParallelReaderAtOpener
var pe fsutil.ParallelExecutor
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
@@ -67,17 +68,17 @@ func mustOpenFilePart(path string) *part {
var timestampsFile fs.MustReadAtCloser
var timestampsSize uint64
pro.Add(timestampsPath, &timestampsFile, &timestampsSize)
pe.Add(fs.NewReaderAtOpenerTask(timestampsPath, &timestampsFile, &timestampsSize))
var valuesFile fs.MustReadAtCloser
var valuesSize uint64
pro.Add(valuesPath, &valuesFile, &valuesSize)
pe.Add(fs.NewReaderAtOpenerTask(valuesPath, &valuesFile, &valuesSize))
var indexFile fs.MustReadAtCloser
var indexSize uint64
pro.Add(indexPath, &indexFile, &indexSize)
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
pro.Run()
pe.Run()
size := timestampsSize + valuesSize + indexSize + metaindexSize
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
@@ -118,12 +119,11 @@ func (p *part) String() string {
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Ceph.
cs := []fs.MustCloser{
p.timestampsFile,
p.valuesFile,
p.indexFile,
}
fs.MustCloseParallel(cs)
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(p.timestampsFile))
pe.Add(fs.NewCloserTask(p.valuesFile))
pe.Add(fs.NewCloserTask(p.indexFile))
pe.Run()
ibCache.RemoveBlocksForPart(p)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
@@ -95,10 +96,9 @@ func (r *bloomValuesReader) totalBytesRead() uint64 {
return r.bloom.bytesRead + r.values.bytesRead
}
func (r *bloomValuesReader) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &r.bloom)
dst = append(dst, &r.values)
return dst
func (r *bloomValuesReader) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(&r.bloom))
pe.Add(fs.NewCloserTask(&r.values))
}
type bloomValuesStreamReader struct {
@@ -181,23 +181,22 @@ func (sr *streamReaders) totalBytesRead() uint64 {
func (sr *streamReaders) MustClose() {
// Close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
&sr.columnNamesReader,
&sr.columnIdxsReader,
&sr.metaindexReader,
&sr.indexReader,
&sr.columnsHeaderIndexReader,
&sr.columnsHeaderReader,
&sr.timestampsReader,
}
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sr.columnNamesReader))
pe.Add(fs.NewCloserTask(&sr.columnIdxsReader))
pe.Add(fs.NewCloserTask(&sr.metaindexReader))
pe.Add(fs.NewCloserTask(&sr.indexReader))
pe.Add(fs.NewCloserTask(&sr.columnsHeaderIndexReader))
pe.Add(fs.NewCloserTask(&sr.columnsHeaderReader))
pe.Add(fs.NewCloserTask(&sr.timestampsReader))
cs = sr.messageBloomValuesReader.appendClosers(cs)
cs = sr.oldBloomValuesReader.appendClosers(cs)
sr.messageBloomValuesReader.appendCloserTasks(&pe)
sr.oldBloomValuesReader.appendCloserTasks(&pe)
for i := range sr.bloomValuesShards {
cs = sr.bloomValuesShards[i].appendClosers(cs)
sr.bloomValuesShards[i].appendCloserTasks(&pe)
}
fs.MustCloseParallel(cs)
pe.Run()
}
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {
@@ -355,63 +354,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// Open data readers in parallel in order to reduce the time for this operation
// on high-latency storage systems such as NFS or Ceph.
var pfo filestream.ParallelFileOpener
var pe fsutil.ParallelExecutor
var columnNamesReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pfo.Add(columnNamesPath, &columnNamesReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnNamesPath, &columnNamesReader, nocache))
}
var columnIdxsReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 3 {
pfo.Add(columnIdxsPath, &columnIdxsReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnIdxsPath, &columnIdxsReader, nocache))
}
var metaindexReader filestream.ReadCloser
pfo.Add(metaindexPath, &metaindexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(metaindexPath, &metaindexReader, nocache))
var indexReader filestream.ReadCloser
pfo.Add(indexPath, &indexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(indexPath, &indexReader, nocache))
var columnsHeaderIndexReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
pfo.Add(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache))
}
var columnsHeaderReader filestream.ReadCloser
pfo.Add(columnsHeaderPath, &columnsHeaderReader, nocache)
pe.Add(filestream.NewFileOpenerTask(columnsHeaderPath, &columnsHeaderReader, nocache))
var timestampsReader filestream.ReadCloser
pfo.Add(timestampsPath, &timestampsReader, nocache)
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &timestampsReader, nocache))
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesReader bloomValuesStreamReader
pfo.Add(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache)
pfo.Add(messageValuesPath, &messageBloomValuesReader.values, nocache)
pe.Add(filestream.NewFileOpenerTask(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache))
pe.Add(filestream.NewFileOpenerTask(messageValuesPath, &messageBloomValuesReader.values, nocache))
var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards []bloomValuesStreamReader
if bsr.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
pfo.Add(bloomPath, &oldBloomValuesReader.bloom, nocache)
pe.Add(filestream.NewFileOpenerTask(bloomPath, &oldBloomValuesReader.bloom, nocache))
valuesPath := filepath.Join(path, oldValuesFilename)
pfo.Add(valuesPath, &oldBloomValuesReader.values, nocache)
pe.Add(filestream.NewFileOpenerTask(valuesPath, &oldBloomValuesReader.values, nocache))
} else {
bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount)
for i := range bloomValuesShards {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
pfo.Add(bloomPath, &shard.bloom, nocache)
pe.Add(filestream.NewFileOpenerTask(bloomPath, &shard.bloom, nocache))
valuesPath := getValuesFilePath(path, uint64(i))
pfo.Add(valuesPath, &shard.values, nocache)
pe.Add(filestream.NewFileOpenerTask(valuesPath, &shard.values, nocache))
}
}
pfo.Run()
pe.Run()
// Initialize streamReaders
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,

View File

@@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
@@ -84,10 +85,9 @@ func (w *bloomValuesWriter) totalBytesWritten() uint64 {
return w.bloom.bytesWritten + w.values.bytesWritten
}
func (w *bloomValuesWriter) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, &w.bloom)
dst = append(dst, &w.values)
return dst
func (w *bloomValuesWriter) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(&w.bloom))
pe.Add(fs.NewCloserTask(&w.values))
}
type bloomValuesStreamWriter struct {
@@ -158,22 +158,21 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
func (sw *streamWriters) MustClose() {
// Flush and close files in parallel in order to reduce the time needed for this operation
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
&sw.columnNamesWriter,
&sw.columnIdxsWriter,
&sw.metaindexWriter,
&sw.indexWriter,
&sw.columnsHeaderIndexWriter,
&sw.columnsHeaderWriter,
&sw.timestampsWriter,
}
var pe fsutil.ParallelExecutor
pe.Add(fs.NewCloserTask(&sw.columnNamesWriter))
pe.Add(fs.NewCloserTask(&sw.columnIdxsWriter))
pe.Add(fs.NewCloserTask(&sw.metaindexWriter))
pe.Add(fs.NewCloserTask(&sw.indexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderIndexWriter))
pe.Add(fs.NewCloserTask(&sw.columnsHeaderWriter))
pe.Add(fs.NewCloserTask(&sw.timestampsWriter))
cs = sw.messageBloomValuesWriter.appendClosers(cs)
sw.messageBloomValuesWriter.appendCloserTasks(&pe)
for i := range sw.bloomValuesShards {
cs = sw.bloomValuesShards[i].appendClosers(cs)
sw.bloomValuesShards[i].appendCloserTasks(&pe)
}
fs.MustCloseParallel(cs)
pe.Run()
}
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
@@ -312,39 +311,39 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
var pfc filestream.ParallelFileCreator
var pe fsutil.ParallelExecutor
// Always cache columnNames file, since it is re-read immediately after part creation
var columnNamesWriter filestream.WriteCloser
pfc.Add(columnNamesPath, &columnNamesWriter, false)
pe.Add(filestream.NewFileCreatorTask(columnNamesPath, &columnNamesWriter, false))
// Always cache columnIdxs file, since it is re-read immediately after part creation
var columnIdxsWriter filestream.WriteCloser
pfc.Add(columnIdxsPath, &columnIdxsWriter, false)
pe.Add(filestream.NewFileCreatorTask(columnIdxsPath, &columnIdxsWriter, false))
// Always cache metaindex file, since it is re-read immediately after part creation
var metaindexWriter filestream.WriteCloser
pfc.Add(metaindexPath, &metaindexWriter, false)
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &metaindexWriter, false))
var indexWriter filestream.WriteCloser
pfc.Add(indexPath, &indexWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(indexPath, &indexWriter, nocache))
var columnsHeaderIndexWriter filestream.WriteCloser
pfc.Add(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache))
var columnsHeaderWriter filestream.WriteCloser
pfc.Add(columnsHeaderPath, &columnsHeaderWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(columnsHeaderPath, &columnsHeaderWriter, nocache))
var timestampsWriter filestream.WriteCloser
pfc.Add(timestampsPath, &timestampsWriter, nocache)
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &timestampsWriter, nocache))
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
var messageBloomValuesWriter bloomValuesStreamWriter
pfc.Add(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache)
pfc.Add(messageValuesPath, &messageBloomValuesWriter.values, nocache)
pe.Add(filestream.NewFileCreatorTask(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache))
pe.Add(filestream.NewFileCreatorTask(messageValuesPath, &messageBloomValuesWriter.values, nocache))
pfc.Run()
pe.Run()
createBloomValuesWriter := func(shardIdx uint64) bloomValuesStreamWriter {
bloomPath := getBloomFilePath(path, shardIdx)

View File

@@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
)
// inmemoryPart is an in-memory part.
@@ -120,26 +121,26 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
var psw filestream.ParallelStreamWriter
var pe fsutil.ParallelExecutor
psw.Add(columnNamesPath, &mp.columnNames)
psw.Add(columnIdxsPath, &mp.columnIdxs)
psw.Add(metaindexPath, &mp.metaindex)
psw.Add(indexPath, &mp.index)
psw.Add(columnsHeaderIndexPath, &mp.columnsHeaderIndex)
psw.Add(columnsHeaderPath, &mp.columnsHeader)
psw.Add(timestampsPath, &mp.timestamps)
pe.Add(filestream.NewStreamWriterTask(columnNamesPath, &mp.columnNames))
pe.Add(filestream.NewStreamWriterTask(columnIdxsPath, &mp.columnIdxs))
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindex))
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.index))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderIndexPath, &mp.columnsHeaderIndex))
pe.Add(filestream.NewStreamWriterTask(columnsHeaderPath, &mp.columnsHeader))
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestamps))
psw.Add(messageBloomFilterPath, &mp.messageBloomValues.bloom)
psw.Add(messageValuesPath, &mp.messageBloomValues.values)
pe.Add(filestream.NewStreamWriterTask(messageBloomFilterPath, &mp.messageBloomValues.bloom))
pe.Add(filestream.NewStreamWriterTask(messageValuesPath, &mp.messageBloomValues.values))
bloomPath := getBloomFilePath(path, 0)
psw.Add(bloomPath, &mp.fieldBloomValues.bloom)
pe.Add(filestream.NewStreamWriterTask(bloomPath, &mp.fieldBloomValues.bloom))
valuesPath := getValuesFilePath(path, 0)
psw.Add(valuesPath, &mp.fieldBloomValues.values)
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.fieldBloomValues.values))
psw.Run()
pe.Run()
mp.ph.mustWriteMetadata(path)

View File

@@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -54,10 +55,9 @@ type bloomValuesReaderAt struct {
values fs.MustReadAtCloser
}
func (r *bloomValuesReaderAt) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
dst = append(dst, r.bloom)
dst = append(dst, r.values)
return dst
func (r *bloomValuesReaderAt) appendCloserTasks(pe *fsutil.ParallelExecutor) {
pe.Add(fs.NewCloserTask(r.bloom))
pe.Add(fs.NewCloserTask(r.values))
}
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
@@ -137,36 +137,36 @@ func mustOpenFilePart(pt *partition, path string) *part {
mrs.MustClose()
// Open data files
p.indexFile = fs.MustOpenReaderAt(indexPath)
p.indexFile = fs.OpenReaderAt(indexPath)
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
p.columnsHeaderIndexFile = fs.OpenReaderAt(columnsHeaderIndexPath)
}
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
p.columnsHeaderFile = fs.OpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.OpenReaderAt(timestampsPath)
// Open files with bloom filters and column values
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
p.messageBloomValues.bloom = fs.OpenReaderAt(messageBloomFilterPath)
messageValuesPath := filepath.Join(path, messageValuesFilename)
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
p.messageBloomValues.values = fs.OpenReaderAt(messageValuesPath)
if p.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
p.oldBloomValues.bloom = fs.OpenReaderAt(bloomPath)
valuesPath := filepath.Join(path, oldValuesFilename)
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
p.oldBloomValues.values = fs.OpenReaderAt(valuesPath)
} else {
p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount)
for i := range p.bloomValuesShards {
shard := &p.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = fs.MustOpenReaderAt(bloomPath)
shard.bloom = fs.OpenReaderAt(bloomPath)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = fs.MustOpenReaderAt(valuesPath)
shard.values = fs.OpenReaderAt(valuesPath)
}
}
@@ -176,25 +176,25 @@ func mustOpenFilePart(pt *partition, path string) *part {
func mustClosePart(p *part) {
// Close files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS and Ceph.
var cs []fs.MustCloser
var pe fsutil.ParallelExecutor
cs = append(cs, p.indexFile)
pe.Add(fs.NewCloserTask(p.indexFile))
if p.ph.FormatVersion >= 1 {
cs = append(cs, p.columnsHeaderIndexFile)
pe.Add(fs.NewCloserTask(p.columnsHeaderIndexFile))
}
cs = append(cs, p.columnsHeaderFile)
cs = append(cs, p.timestampsFile)
cs = p.messageBloomValues.appendClosers(cs)
pe.Add(fs.NewCloserTask(p.columnsHeaderFile))
pe.Add(fs.NewCloserTask(p.timestampsFile))
p.messageBloomValues.appendCloserTasks(&pe)
if p.ph.FormatVersion < 1 {
cs = p.oldBloomValues.appendClosers(cs)
p.oldBloomValues.appendCloserTasks(&pe)
} else {
for i := range p.bloomValuesShards {
cs = p.bloomValuesShards[i].appendClosers(cs)
p.bloomValuesShards[i].appendCloserTasks(&pe)
}
}
fs.MustCloseParallel(cs)
pe.Run()
p.pt = nil
}