lib/{mergeset,storage}: open files inside parts in parallel

This should reduce the time needed for opening the parts on high-latency storage systems such as NFS or Ceph.

Updates https://github.com/VictoriaMetrics/VictoriaLogs/issues/517
This commit is contained in:
Aliaksandr Valialkin
2025-07-28 13:41:00 +02:00
parent bbda00fec5
commit 65251c8fe2
10 changed files with 340 additions and 169 deletions

148
lib/filestream/parallel.go Normal file
View File

@@ -0,0 +1,148 @@
package filestream
import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// ParallelFileCreator is used for parallel creating of files for the given dstPath.
//
// ParallelFileCreator is needed for speeding up creating many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileCreator struct {
tasks []parallelFileCreatorTask
}
type parallelFileCreatorTask struct {
dstPath string
wc *WriteCloser
nocache bool
}
// Add registers a task for creating the file at dstPath and assigning it to *wc.
//
// Tasks are executed in parallel on Run() call.
func (pfc *ParallelFileCreator) Add(dstPath string, wc *WriteCloser, nocache bool) {
pfc.tasks = append(pfc.tasks, parallelFileCreatorTask{
dstPath: dstPath,
wc: wc,
nocache: nocache,
})
}
// Run runs all the registered tasks for creating files in parallel.
func (pfc *ParallelFileCreator) Run() {
var wg sync.WaitGroup
for _, task := range pfc.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, wc *WriteCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*wc = MustCreate(dstPath, nocache)
}(task.dstPath, task.wc, task.nocache)
}
wg.Wait()
}
// ParallelFileOpener is used for parallel opening of files at the given dstPath.
//
// ParallelFileOpener is needed for speeding up opening many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelFileOpener struct {
tasks []parallelFileOpenerTask
}
type parallelFileOpenerTask struct {
path string
rc *ReadCloser
nocache bool
}
// Add registers a task for opening the file ath the given path and assigning it to *rc.
//
// Tasks are executed in parallel on Run() call.
func (pfo *ParallelFileOpener) Add(path string, rc *ReadCloser, nocache bool) {
pfo.tasks = append(pfo.tasks, parallelFileOpenerTask{
path: path,
rc: rc,
nocache: nocache,
})
}
// Run runs all the registered tasks for opening files in parallel.
func (pfo *ParallelFileOpener) Run() {
var wg sync.WaitGroup
for _, task := range pfo.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *ReadCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpen(path, nocache)
}(task.path, task.rc, task.nocache)
}
wg.Wait()
}
// ParallelStreamWriter is used for parallel writing of data from io.WriterTo to the given dstPath files.
//
// ParallelStreamWriter is needed for speeding up writing data to many files on high-latency
// storage systems such as NFS or Ceph.
type ParallelStreamWriter struct {
tasks []parallelStreamWriterTask
}
type parallelStreamWriterTask struct {
dstPath string
src io.WriterTo
}
// Add adds a task to execute in parallel - to write the data from src to the dstPath.
//
// Tasks are executed in parallel on Run() call.
func (psw *ParallelStreamWriter) Add(dstPath string, src io.WriterTo) {
psw.tasks = append(psw.tasks, parallelStreamWriterTask{
dstPath: dstPath,
src: src,
})
}
// Run executes all the tasks added via Add() call in parallel.
func (psw *ParallelStreamWriter) Run() {
var wg sync.WaitGroup
for _, task := range psw.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, src io.WriterTo) {
defer func() {
wg.Done()
<-concurrencyCh
}()
f := MustCreate(dstPath, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
}
f.MustClose()
}(task.dstPath, task.src)
}
wg.Wait()
}
// concurrencyCh limits the concurrency of parallel operations performed by ParallelFileCreator, ParallelFileOpener and ParallelStreamWriter
var concurrencyCh = make(chan struct{}, 256)

View File

@@ -1,55 +0,0 @@
package filestream
import (
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// ParallelStreamWriter is used for parallel writing of data from io.WriterTo to the given dstPath files.
type ParallelStreamWriter struct {
tasks []parallelStreamWriterTask
}
type parallelStreamWriterTask struct {
dstPath string
src io.WriterTo
}
// Add adds a task to execute in parallel - to write the data from src to the dstPath.
//
// Tasks are executed in parallel on Run() call.
func (psw *ParallelStreamWriter) Add(dstPath string, src io.WriterTo) {
psw.tasks = append(psw.tasks, parallelStreamWriterTask{
dstPath: dstPath,
src: src,
})
}
// Run executes all the tasks added via Add() call in parallel.
func (psw *ParallelStreamWriter) Run() {
var wg sync.WaitGroup
concurrencyCh := make(chan struct{}, min(32, len(psw.tasks)))
for _, task := range psw.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, src io.WriterTo) {
defer func() {
wg.Done()
<-concurrencyCh
}()
f := MustCreate(dstPath, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
}
f.MustClose()
}(task.dstPath, task.src)
}
wg.Wait()
}

View File

@@ -317,30 +317,3 @@ type freeSpaceEntry struct {
func IsDirOrSymlink(de os.DirEntry) bool {
return de.IsDir() || (de.Type()&os.ModeSymlink == os.ModeSymlink)
}
// MustCloser must implement MustClose() function.
type MustCloser interface {
MustClose()
}
// MustCloseParallel closes all the cs in parallel.
//
// Parallel closing reduces the time needed to flush the data to the underlying files on close
// on high-latency storage systems such as NFS or Ceph.
func MustCloseParallel(cs []MustCloser) {
var wg sync.WaitGroup
concurrencyCh := make(chan struct{}, min(32, len(cs)))
for _, c := range cs {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(c MustCloser) {
defer func() {
wg.Done()
<-concurrencyCh
}()
c.MustClose()
}(c)
}
wg.Wait()
}

78
lib/fs/parallel.go Normal file
View File

@@ -0,0 +1,78 @@
package fs
import (
"sync"
)
// ParallelReaderAtOpener opens ReaderAt files in parallel.
//
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
// storage systems such as NFS or Ceph.
type ParallelReaderAtOpener struct {
tasks []parallelReaderAtOpenerTask
}
type parallelReaderAtOpenerTask struct {
path string
rc *MustReadAtCloser
fileSize *uint64
}
// Add adds a task for opening the file at the given path and storing it to *r, while storing the file size into *fileSize.
//
// Call Run() for running all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Add(path string, rc *MustReadAtCloser, fileSize *uint64) {
pro.tasks = append(pro.tasks, parallelReaderAtOpenerTask{
path: path,
rc: rc,
fileSize: fileSize,
})
}
// Run executes all the registered tasks in parallel.
func (pro *ParallelReaderAtOpener) Run() {
var wg sync.WaitGroup
for _, task := range pro.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *MustReadAtCloser, fileSize *uint64) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpenReaderAt(path)
*fileSize = MustFileSize(path)
}(task.path, task.rc, task.fileSize)
}
wg.Wait()
}
// MustCloser must implement MustClose() function.
type MustCloser interface {
MustClose()
}
// MustCloseParallel closes all the cs in parallel.
//
// Parallel closing reduces the time needed to flush the data to the underlying files on close
// on high-latency storage systems such as NFS or Ceph.
func MustCloseParallel(cs []MustCloser) {
var wg sync.WaitGroup
for _, c := range cs {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(c MustCloser) {
defer func() {
wg.Done()
<-concurrencyCh
}()
c.MustClose()
}(c)
}
wg.Wait()
}
// concurrencyCh limits the concurrency of parallel operations performed by ParallelReaderAtOpener and MustCloseParallel
var concurrencyCh = make(chan struct{}, 256)

View File

@@ -148,26 +148,29 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustOpen(metaindexPath, true)
var err error
bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
mrs, err := unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
metaindexFile.MustClose()
if err != nil {
logger.Panicf("FATAL: cannot unmarshal metaindex rows from file %q: %s", metaindexPath, err)
}
indexPath := filepath.Join(path, indexFilename)
indexFile := filestream.MustOpen(indexPath, true)
itemsPath := filepath.Join(path, itemsFilename)
itemsFile := filestream.MustOpen(itemsPath, true)
lensPath := filepath.Join(path, lensFilename)
lensFile := filestream.MustOpen(lensPath, true)
bsr.mrs = mrs
bsr.path = path
bsr.indexReader = indexFile
bsr.itemsReader = itemsFile
bsr.lensReader = lensFile
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pfo filestream.ParallelFileOpener
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
pfo.Add(indexPath, &bsr.indexReader, true)
pfo.Add(itemsPath, &bsr.itemsReader, true)
pfo.Add(lensPath, &bsr.lensReader, true)
pfo.Run()
}
// MustClose closes the bsr.
@@ -176,7 +179,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
func (bsr *blockStreamReader) MustClose() {
if !bsr.isInmemoryBlock {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Cepth.
// such as NFS or Ceph.
cs := []fs.MustCloser{
bsr.indexReader,
bsr.itemsReader,

View File

@@ -74,34 +74,33 @@ func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compres
//
// The bsw doesn't pollute OS page cache if nocache is set.
func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) {
bsw.reset()
bsw.compressLevel = compressLevel
path = filepath.Clean(path)
// Create the directory
fs.MustMkdirFailIfExist(path)
// Create part files in the directory.
// Create part files in the directory in parallel in order to speedup the process
// on high-latency storage systems such as NFS or Ceph.
var pfc filestream.ParallelFileCreator
indexPath := filepath.Join(path, indexFilename)
itemsPath := filepath.Join(path, itemsFilename)
lensPath := filepath.Join(path, lensFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
pfc.Add(indexPath, &bsw.indexWriter, nocache)
pfc.Add(itemsPath, &bsw.itemsWriter, nocache)
pfc.Add(lensPath, &bsw.lensWriter, nocache)
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustCreate(metaindexPath, false)
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
indexPath := filepath.Join(path, indexFilename)
indexFile := filestream.MustCreate(indexPath, nocache)
itemsPath := filepath.Join(path, itemsFilename)
itemsFile := filestream.MustCreate(itemsPath, nocache)
lensPath := filepath.Join(path, lensFilename)
lensFile := filestream.MustCreate(lensPath, nocache)
bsw.reset()
bsw.compressLevel = compressLevel
bsw.metaindexWriter = metaindexFile
bsw.indexWriter = indexFile
bsw.itemsWriter = itemsFile
bsw.lensWriter = lensFile
pfc.Run()
}
// MustClose closes the bsw.
@@ -116,7 +115,7 @@ func (bsw *blockStreamWriter) MustClose() {
fs.MustWriteData(bsw.metaindexWriter, bsw.packedMetaindexBuf)
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Cepth.
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
bsw.metaindexWriter,
bsw.indexWriter,

View File

@@ -93,17 +93,28 @@ func mustOpenFilePart(path string) *part {
metaindexFile := filestream.MustOpen(metaindexPath, true)
metaindexSize := fs.MustFileSize(metaindexPath)
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS or Ceph.
var pro fs.ParallelReaderAtOpener
indexPath := filepath.Join(path, indexFilename)
indexFile := fs.MustOpenReaderAt(indexPath)
indexSize := fs.MustFileSize(indexPath)
itemsPath := filepath.Join(path, itemsFilename)
itemsFile := fs.MustOpenReaderAt(itemsPath)
itemsSize := fs.MustFileSize(itemsPath)
lensPath := filepath.Join(path, lensFilename)
lensFile := fs.MustOpenReaderAt(lensPath)
lensSize := fs.MustFileSize(lensPath)
var indexFile fs.MustReadAtCloser
var indexSize uint64
pro.Add(indexPath, &indexFile, &indexSize)
var itemsFile fs.MustReadAtCloser
var itemsSize uint64
pro.Add(itemsPath, &itemsFile, &itemsSize)
var lensFile fs.MustReadAtCloser
var lensSize uint64
pro.Add(lensPath, &lensFile, &lensSize)
pro.Run()
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
@@ -131,7 +142,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Cepth.
// such as NFS or Ceph.
cs := []fs.MustCloser{
p.indexFile,
p.itemsFile,

View File

@@ -132,15 +132,6 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
bsr.ph.MustReadMetadata(path)
timestampsPath := filepath.Join(path, timestampsFilename)
timestampsFile := filestream.MustOpen(timestampsPath, true)
valuesPath := filepath.Join(path, valuesFilename)
valuesFile := filestream.MustOpen(valuesPath, true)
indexPath := filepath.Join(path, indexFilename)
indexFile := filestream.MustOpen(indexPath, true)
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustOpen(metaindexPath, true)
mrs, err := unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
@@ -148,12 +139,24 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
if err != nil {
logger.Panicf("FATAL: cannot unmarshal metaindex rows from file part %q: %s", metaindexPath, err)
}
bsr.mrs = mrs
bsr.path = path
bsr.timestampsReader = timestampsFile
bsr.valuesReader = valuesFile
bsr.indexReader = indexFile
bsr.mrs = mrs
// Open part files in parallel in order to speed up this operation
// on high-latency storage systems such as NFS or Ceph.
var pfo filestream.ParallelFileOpener
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
pfo.Add(timestampsPath, &bsr.timestampsReader, true)
pfo.Add(valuesPath, &bsr.valuesReader, true)
pfo.Add(indexPath, &bsr.indexReader, true)
pfo.Run()
}
// MustClose closes the bsr.
@@ -161,7 +164,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// It closes *Reader files passed to Init.
func (bsr *blockStreamReader) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Cepth.
// such as NFS or Ceph.
cs := []fs.MustCloser{
bsr.timestampsReader,
bsr.valuesReader,

View File

@@ -81,33 +81,33 @@ func (bsw *blockStreamWriter) MustInitFromInmemoryPart(mp *inmemoryPart, compres
//
// The bsw doesn't pollute OS page cache if nocache is set.
func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, compressLevel int) {
bsw.reset()
bsw.compressLevel = compressLevel
path = filepath.Clean(path)
// Create the directory
fs.MustMkdirFailIfExist(path)
// Create part files in the directory.
// Create part files in the directory in parallel in order to reduce the duration
// of the operation on high-latency storage systems such as NFS and Ceph.
var pfc filestream.ParallelFileCreator
timestampsPath := filepath.Join(path, timestampsFilename)
timestampsFile := filestream.MustCreate(timestampsPath, nocache)
valuesPath := filepath.Join(path, valuesFilename)
valuesFile := filestream.MustCreate(valuesPath, nocache)
indexPath := filepath.Join(path, indexFilename)
indexFile := filestream.MustCreate(indexPath, nocache)
metaindexPath := filepath.Join(path, metaindexFilename)
pfc.Add(timestampsPath, &bsw.timestampsWriter, nocache)
pfc.Add(valuesPath, &bsw.valuesWriter, nocache)
pfc.Add(indexPath, &bsw.indexWriter, nocache)
// Always cache metaindex file in OS page cache, since it is immediately
// read after the merge.
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustCreate(metaindexPath, false)
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
bsw.reset()
bsw.compressLevel = compressLevel
bsw.timestampsWriter = timestampsFile
bsw.valuesWriter = valuesFile
bsw.indexWriter = indexFile
bsw.metaindexWriter = metaindexFile
pfc.Run()
}
// MustClose closes the bsw.
@@ -122,7 +122,7 @@ func (bsw *blockStreamWriter) MustClose() {
fs.MustWriteData(bsw.metaindexWriter, bsw.compressedMetaindexData)
// Close writers in parallel in order to reduce the time needed for closing them
// on high-latency storage systems such as NFS or Cepth.
// on high-latency storage systems such as NFS or Ceph.
cs := []fs.MustCloser{
bsw.timestampsWriter,
bsw.valuesWriter,

View File

@@ -52,22 +52,33 @@ func mustOpenFilePart(path string) *part {
var ph partHeader
ph.MustReadMetadata(path)
timestampsPath := filepath.Join(path, timestampsFilename)
timestampsFile := fs.MustOpenReaderAt(timestampsPath)
timestampsSize := fs.MustFileSize(timestampsPath)
valuesPath := filepath.Join(path, valuesFilename)
valuesFile := fs.MustOpenReaderAt(valuesPath)
valuesSize := fs.MustFileSize(valuesPath)
indexPath := filepath.Join(path, indexFilename)
indexFile := fs.MustOpenReaderAt(indexPath)
indexSize := fs.MustFileSize(indexPath)
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustOpen(metaindexPath, true)
metaindexSize := fs.MustFileSize(metaindexPath)
// Open part files in parallel in order to speed up this process
// on high-latency storage systems such as NFS and Ceph.
var pro fs.ParallelReaderAtOpener
timestampsPath := filepath.Join(path, timestampsFilename)
valuesPath := filepath.Join(path, valuesFilename)
indexPath := filepath.Join(path, indexFilename)
var timestampsFile fs.MustReadAtCloser
var timestampsSize uint64
pro.Add(timestampsPath, &timestampsFile, &timestampsSize)
var valuesFile fs.MustReadAtCloser
var valuesSize uint64
pro.Add(valuesPath, &valuesFile, &valuesSize)
var indexFile fs.MustReadAtCloser
var indexSize uint64
pro.Add(indexPath, &indexFile, &indexSize)
pro.Run()
size := timestampsSize + valuesSize + indexSize + metaindexSize
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
}
@@ -106,7 +117,7 @@ func (p *part) String() string {
// MustClose closes all the part files.
func (p *part) MustClose() {
// Close files in parallel in order to speed up this process on storage systems with high latency
// such as NFS or Cepth.
// such as NFS or Ceph.
cs := []fs.MustCloser{
p.timestampsFile,
p.valuesFile,