mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 16:59:40 +03:00
Compare commits
1 Commits
vmagent-dr
...
fs-paralle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
305f1c91f8 |
@@ -2,148 +2,73 @@ package filestream
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// ParallelFileCreator is used for parallel creating of files for the given dstPath.
|
||||
//
|
||||
// ParallelFileCreator is needed for speeding up creating many files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
type ParallelFileCreator struct {
|
||||
tasks []parallelFileCreatorTask
|
||||
}
|
||||
|
||||
type parallelFileCreatorTask struct {
|
||||
dstPath string
|
||||
// FileCreatorTask a task for creating the file at the given path and assigning it to *wc.
|
||||
type FileCreatorTask struct {
|
||||
path string
|
||||
wc *WriteCloser
|
||||
nocache bool
|
||||
}
|
||||
|
||||
// Add registers a task for creating the file at dstPath and assigning it to *wc.
|
||||
//
|
||||
// Tasks are executed in parallel on Run() call.
|
||||
func (pfc *ParallelFileCreator) Add(dstPath string, wc *WriteCloser, nocache bool) {
|
||||
pfc.tasks = append(pfc.tasks, parallelFileCreatorTask{
|
||||
dstPath: dstPath,
|
||||
// NewFileCreatorTask creates new task for creating the file at the given path an assigning it to *wc
|
||||
func NewFileCreatorTask(path string, wc *WriteCloser, nocache bool) *FileCreatorTask {
|
||||
return &FileCreatorTask{
|
||||
path: path,
|
||||
wc: wc,
|
||||
nocache: nocache,
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs all the registered tasks for creating files in parallel.
|
||||
func (pfc *ParallelFileCreator) Run() {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := fsutil.GetConcurrencyCh()
|
||||
for _, task := range pfc.tasks {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(dstPath string, wc *WriteCloser, nocache bool) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
|
||||
*wc = MustCreate(dstPath, nocache)
|
||||
}(task.dstPath, task.wc, task.nocache)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// ParallelFileOpener is used for parallel opening of files at the given dstPath.
|
||||
//
|
||||
// ParallelFileOpener is needed for speeding up opening many files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
type ParallelFileOpener struct {
|
||||
tasks []parallelFileOpenerTask
|
||||
// Run executes file creating task
|
||||
func (t *FileCreatorTask) Run() {
|
||||
*t.wc = MustCreate(t.path, t.nocache)
|
||||
}
|
||||
|
||||
type parallelFileOpenerTask struct {
|
||||
// FileOpenerTask a task for opening the file at the given path and assigning it to *rc.
|
||||
type FileOpenerTask struct {
|
||||
path string
|
||||
rc *ReadCloser
|
||||
nocache bool
|
||||
}
|
||||
|
||||
// Add registers a task for opening the file ath the given path and assigning it to *rc.
|
||||
//
|
||||
// Tasks are executed in parallel on Run() call.
|
||||
func (pfo *ParallelFileOpener) Add(path string, rc *ReadCloser, nocache bool) {
|
||||
pfo.tasks = append(pfo.tasks, parallelFileOpenerTask{
|
||||
// NewFileOpenerTask creates new task for opening the file at the given path an assigning it to *rc
|
||||
func NewFileOpenerTask(path string, rc *ReadCloser, nocache bool) *FileOpenerTask {
|
||||
return &FileOpenerTask{
|
||||
path: path,
|
||||
rc: rc,
|
||||
nocache: nocache,
|
||||
})
|
||||
}
|
||||
|
||||
// Run runs all the registered tasks for opening files in parallel.
|
||||
func (pfo *ParallelFileOpener) Run() {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := fsutil.GetConcurrencyCh()
|
||||
for _, task := range pfo.tasks {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(path string, rc *ReadCloser, nocache bool) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
|
||||
*rc = MustOpen(path, nocache)
|
||||
}(task.path, task.rc, task.nocache)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// ParallelStreamWriter is used for parallel writing of data from io.WriterTo to the given dstPath files.
|
||||
//
|
||||
// ParallelStreamWriter is needed for speeding up writing data to many files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
type ParallelStreamWriter struct {
|
||||
tasks []parallelStreamWriterTask
|
||||
// Run executes file opening task
|
||||
func (t *FileOpenerTask) Run() {
|
||||
*t.rc = MustOpen(t.path, t.nocache)
|
||||
}
|
||||
|
||||
type parallelStreamWriterTask struct {
|
||||
dstPath string
|
||||
src io.WriterTo
|
||||
// StreamWriterTask adds a task to execute in parallel - to write the data from src to the path.
|
||||
type StreamWriterTask struct {
|
||||
path string
|
||||
src io.WriterTo
|
||||
}
|
||||
|
||||
// Add adds a task to execute in parallel - to write the data from src to the dstPath.
|
||||
//
|
||||
// Tasks are executed in parallel on Run() call.
|
||||
func (psw *ParallelStreamWriter) Add(dstPath string, src io.WriterTo) {
|
||||
psw.tasks = append(psw.tasks, parallelStreamWriterTask{
|
||||
dstPath: dstPath,
|
||||
src: src,
|
||||
})
|
||||
}
|
||||
|
||||
// Run executes all the tasks added via Add() call in parallel.
|
||||
func (psw *ParallelStreamWriter) Run() {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := fsutil.GetConcurrencyCh()
|
||||
for _, task := range psw.tasks {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(dstPath string, src io.WriterTo) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
|
||||
f := MustCreate(dstPath, false)
|
||||
if _, err := src.WriteTo(f); err != nil {
|
||||
f.MustClose()
|
||||
// Do not call MustRemovePath(path), so the user could inspect
|
||||
// the file contents during investigation of the issue.
|
||||
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
|
||||
}
|
||||
f.MustClose()
|
||||
}(task.dstPath, task.src)
|
||||
// NewStreamWriterTask creates new task for writing the data from src to the path
|
||||
func NewStreamWriterTask(path string, src io.WriterTo) *StreamWriterTask {
|
||||
return &StreamWriterTask{
|
||||
path: path,
|
||||
src: src,
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (t *StreamWriterTask) Run() {
|
||||
f := MustCreate(t.path, false)
|
||||
if _, err := t.src.WriteTo(f); err != nil {
|
||||
f.MustClose()
|
||||
// Do not call MustRemovePath(path), so the user could inspect
|
||||
// the file contents during investigation of the issue.
|
||||
logger.Panicf("FATAL: cannot write data to %q: %s", t.path, err)
|
||||
}
|
||||
f.MustClose()
|
||||
}
|
||||
|
||||
@@ -18,8 +18,8 @@ func getDefaultConcurrency() int {
|
||||
return n
|
||||
}
|
||||
|
||||
// GetConcurrencyCh returns a channel for limiting the concurrency of operations with files.
|
||||
func GetConcurrencyCh() chan struct{} {
|
||||
// getConcurrencyCh returns a channel for limiting the concurrency of operations with files.
|
||||
func getConcurrencyCh() chan struct{} {
|
||||
concurrencyChOnce.Do(initConcurrencyCh)
|
||||
return concurrencyCh
|
||||
}
|
||||
@@ -30,3 +30,39 @@ func initConcurrencyCh() {
|
||||
|
||||
var concurrencyChOnce sync.Once
|
||||
var concurrencyCh chan struct{}
|
||||
|
||||
type parallelTask interface {
|
||||
Run()
|
||||
}
|
||||
|
||||
// ParallelExecutor is used for parallel files operations
|
||||
//
|
||||
// ParallelExecutor is needed for speeding up files operations on high-latency storage systems such as NFS or Ceph.
|
||||
type ParallelExecutor struct {
|
||||
tasks []parallelTask
|
||||
}
|
||||
|
||||
// Add registers a task for parallel file operations
|
||||
//
|
||||
// Tasks are executed in parallel on Run() call.
|
||||
func (pe *ParallelExecutor) Add(task parallelTask) {
|
||||
pe.tasks = append(pe.tasks, task)
|
||||
}
|
||||
|
||||
func (pe *ParallelExecutor) Run() {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := getConcurrencyCh()
|
||||
for _, task := range pe.tasks {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(task parallelTask) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
task.Run()
|
||||
}(task)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@@ -1,55 +1,27 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
)
|
||||
|
||||
// ParallelReaderAtOpener opens ReaderAt files in parallel.
|
||||
//
|
||||
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
type ParallelReaderAtOpener struct {
|
||||
tasks []parallelReaderAtOpenerTask
|
||||
}
|
||||
|
||||
type parallelReaderAtOpenerTask struct {
|
||||
// ReaderAtOpenerTask task to open ReaderAt files in parallel.
|
||||
type ReaderAtOpenerTask struct {
|
||||
path string
|
||||
rc *MustReadAtCloser
|
||||
fileSize *uint64
|
||||
}
|
||||
|
||||
// Add adds a task for opening the file at the given path and storing it to *r, while storing the file size into *fileSize.
|
||||
// NewReaderAtOpenerTask creates new task for writing the data from src to the path
|
||||
//
|
||||
// Call Run() for running all the registered tasks in parallel.
|
||||
func (pro *ParallelReaderAtOpener) Add(path string, rc *MustReadAtCloser, fileSize *uint64) {
|
||||
pro.tasks = append(pro.tasks, parallelReaderAtOpenerTask{
|
||||
// ParallelReaderAtOpener speeds up opening multiple ReaderAt files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
func NewReaderAtOpenerTask(path string, rc *MustReadAtCloser, fileSize *uint64) *ReaderAtOpenerTask {
|
||||
return &ReaderAtOpenerTask{
|
||||
path: path,
|
||||
rc: rc,
|
||||
fileSize: fileSize,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes all the registered tasks in parallel.
|
||||
func (pro *ParallelReaderAtOpener) Run() {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := fsutil.GetConcurrencyCh()
|
||||
for _, task := range pro.tasks {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
|
||||
go func(path string, rc *MustReadAtCloser, fileSize *uint64) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
|
||||
*rc = MustOpenReaderAt(path)
|
||||
*fileSize = MustFileSize(path)
|
||||
}(task.path, task.rc, task.fileSize)
|
||||
}
|
||||
wg.Wait()
|
||||
func (t *ReaderAtOpenerTask) Run() {
|
||||
*t.rc = OpenReaderAt(t.path)
|
||||
*t.fileSize = MustFileSize(t.path)
|
||||
}
|
||||
|
||||
// MustCloser must implement MustClose() function.
|
||||
@@ -57,23 +29,24 @@ type MustCloser interface {
|
||||
MustClose()
|
||||
}
|
||||
|
||||
// MustCloseParallel closes all the cs in parallel.
|
||||
// CloserTask task to close all the MustCloser in parallel.
|
||||
//
|
||||
// Parallel closing reduces the time needed to flush the data to the underlying files on close
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
func MustCloseParallel(cs []MustCloser) {
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := fsutil.GetConcurrencyCh()
|
||||
for _, c := range cs {
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(c MustCloser) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
c.MustClose()
|
||||
}(c)
|
||||
}
|
||||
wg.Wait()
|
||||
type CloserTask struct {
|
||||
c MustCloser
|
||||
}
|
||||
|
||||
// NewCloserTask creates new task for writing the data from src to the path
|
||||
//
|
||||
// NewCloserTask speeds up opening multiple MustCloser files on high-latency
|
||||
// storage systems such as NFS or Ceph.
|
||||
func NewCloserTask(c MustCloser) *CloserTask {
|
||||
return &CloserTask{
|
||||
c: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *CloserTask) Run() {
|
||||
t.c.MustClose()
|
||||
}
|
||||
|
||||
@@ -148,10 +148,10 @@ func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// MustOpenReaderAt opens ReaderAt for reading from the file located at path.
|
||||
// OpenReaderAt opens ReaderAt for reading from the file located at path.
|
||||
//
|
||||
// MustClose must be called on the returned ReaderAt when it is no longer needed.
|
||||
func MustOpenReaderAt(path string) *ReaderAt {
|
||||
func OpenReaderAt(path string) *ReaderAt {
|
||||
var r ReaderAt
|
||||
r.path = path
|
||||
return &r
|
||||
|
||||
@@ -19,7 +19,7 @@ func testReaderAt(t *testing.T, bufSize int) {
|
||||
data := make([]byte, fileSize)
|
||||
MustWriteSync(path, data)
|
||||
defer MustRemovePath(path)
|
||||
r := MustOpenReaderAt(path)
|
||||
r := OpenReaderAt(path)
|
||||
defer r.MustClose()
|
||||
|
||||
buf := make([]byte, bufSize)
|
||||
|
||||
@@ -26,7 +26,7 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
|
||||
data := make([]byte, fileSize)
|
||||
MustWriteSync(path, data)
|
||||
defer MustRemovePath(path)
|
||||
r := MustOpenReaderAt(path)
|
||||
r := OpenReaderAt(path)
|
||||
defer r.MustClose()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -160,17 +161,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
// Open part files in parallel in order to speed up this process
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
|
||||
var pfo filestream.ParallelFileOpener
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
|
||||
pfo.Add(indexPath, &bsr.indexReader, true)
|
||||
pfo.Add(itemsPath, &bsr.itemsReader, true)
|
||||
pfo.Add(lensPath, &bsr.lensReader, true)
|
||||
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
|
||||
pe.Add(filestream.NewFileOpenerTask(itemsPath, &bsr.itemsReader, true))
|
||||
pe.Add(filestream.NewFileOpenerTask(lensPath, &bsr.lensReader, true))
|
||||
|
||||
pfo.Run()
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
// MustClose closes the bsr.
|
||||
@@ -180,12 +181,11 @@ func (bsr *blockStreamReader) MustClose() {
|
||||
if !bsr.isInmemoryBlock {
|
||||
// Close files in parallel in order to speed up this process on storage systems with high latency
|
||||
// such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
bsr.indexReader,
|
||||
bsr.itemsReader,
|
||||
bsr.lensReader,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(bsr.indexReader))
|
||||
pe.Add(fs.NewCloserTask(bsr.itemsReader))
|
||||
pe.Add(fs.NewCloserTask(bsr.lensReader))
|
||||
pe.Run()
|
||||
}
|
||||
bsr.reset()
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
)
|
||||
|
||||
type blockStreamWriter struct {
|
||||
@@ -85,22 +86,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
|
||||
// Create part files in the directory in parallel in order to speedup the process
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
|
||||
var pfc filestream.ParallelFileCreator
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
|
||||
pfc.Add(indexPath, &bsw.indexWriter, nocache)
|
||||
pfc.Add(itemsPath, &bsw.itemsWriter, nocache)
|
||||
pfc.Add(lensPath, &bsw.lensWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
|
||||
pe.Add(filestream.NewFileCreatorTask(itemsPath, &bsw.itemsWriter, nocache))
|
||||
pe.Add(filestream.NewFileCreatorTask(lensPath, &bsw.lensWriter, nocache))
|
||||
|
||||
// Always cache metaindex file in OS page cache, since it is immediately
|
||||
// read after the merge.
|
||||
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
|
||||
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
|
||||
|
||||
pfc.Run()
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
// MustClose closes the bsw.
|
||||
@@ -116,13 +117,12 @@ func (bsw *blockStreamWriter) MustClose() {
|
||||
|
||||
// Close writers in parallel in order to reduce the time needed for closing them
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
bsw.metaindexWriter,
|
||||
bsw.indexWriter,
|
||||
bsw.itemsWriter,
|
||||
bsw.lensWriter,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.indexWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.itemsWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.lensWriter))
|
||||
pe.Run()
|
||||
|
||||
bsw.reset()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -42,12 +43,12 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
|
||||
var psw filestream.ParallelStreamWriter
|
||||
psw.Add(metaindexPath, &mp.metaindexData)
|
||||
psw.Add(indexPath, &mp.indexData)
|
||||
psw.Add(itemsPath, &mp.itemsData)
|
||||
psw.Add(lensPath, &mp.lensData)
|
||||
psw.Run()
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
|
||||
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
|
||||
pe.Add(filestream.NewStreamWriterTask(itemsPath, &mp.itemsData))
|
||||
pe.Add(filestream.NewStreamWriterTask(lensPath, &mp.lensData))
|
||||
pe.Run()
|
||||
|
||||
mp.ph.MustWriteMetadata(path)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
@@ -96,7 +97,7 @@ func mustOpenFilePart(path string) *part {
|
||||
// Open part files in parallel in order to speed up this process
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
|
||||
var pro fs.ParallelReaderAtOpener
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
@@ -104,17 +105,17 @@ func mustOpenFilePart(path string) *part {
|
||||
|
||||
var indexFile fs.MustReadAtCloser
|
||||
var indexSize uint64
|
||||
pro.Add(indexPath, &indexFile, &indexSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
|
||||
|
||||
var itemsFile fs.MustReadAtCloser
|
||||
var itemsSize uint64
|
||||
pro.Add(itemsPath, &itemsFile, &itemsSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(itemsPath, &itemsFile, &itemsSize))
|
||||
|
||||
var lensFile fs.MustReadAtCloser
|
||||
var lensSize uint64
|
||||
pro.Add(lensPath, &lensFile, &lensSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(lensPath, &lensFile, &lensSize))
|
||||
|
||||
pro.Run()
|
||||
pe.Run()
|
||||
|
||||
size := metaindexSize + indexSize + itemsSize + lensSize
|
||||
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
|
||||
@@ -143,12 +144,11 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
||||
func (p *part) MustClose() {
|
||||
// Close files in parallel in order to speed up this process on storage systems with high latency
|
||||
// such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
p.indexFile,
|
||||
p.itemsFile,
|
||||
p.lensFile,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(p.indexFile))
|
||||
pe.Add(fs.NewCloserTask(p.itemsFile))
|
||||
pe.Add(fs.NewCloserTask(p.lensFile))
|
||||
pe.Run()
|
||||
|
||||
idxbCache.RemoveBlocksForPart(p)
|
||||
ibCache.RemoveBlocksForPart(p)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -146,17 +147,17 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
// Open part files in parallel in order to speed up this operation
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
|
||||
var pfo filestream.ParallelFileOpener
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
|
||||
pfo.Add(timestampsPath, &bsr.timestampsReader, true)
|
||||
pfo.Add(valuesPath, &bsr.valuesReader, true)
|
||||
pfo.Add(indexPath, &bsr.indexReader, true)
|
||||
pe.Add(filestream.NewFileOpenerTask(timestampsPath, &bsr.timestampsReader, true))
|
||||
pe.Add(filestream.NewFileOpenerTask(valuesPath, &bsr.valuesReader, true))
|
||||
pe.Add(filestream.NewFileOpenerTask(indexPath, &bsr.indexReader, true))
|
||||
|
||||
pfo.Run()
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
// MustClose closes the bsr.
|
||||
@@ -165,12 +166,11 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
func (bsr *blockStreamReader) MustClose() {
|
||||
// Close files in parallel in order to speed up this process on storage systems with high latency
|
||||
// such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
bsr.timestampsReader,
|
||||
bsr.valuesReader,
|
||||
bsr.indexReader,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(bsr.timestampsReader))
|
||||
pe.Add(fs.NewCloserTask(bsr.valuesReader))
|
||||
pe.Add(fs.NewCloserTask(bsr.indexReader))
|
||||
pe.Run()
|
||||
|
||||
bsr.reset()
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -92,22 +93,22 @@ func (bsw *blockStreamWriter) MustInitFromFilePart(path string, nocache bool, co
|
||||
// Create part files in the directory in parallel in order to reduce the duration
|
||||
// of the operation on high-latency storage systems such as NFS and Ceph.
|
||||
|
||||
var pfc filestream.ParallelFileCreator
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
|
||||
pfc.Add(timestampsPath, &bsw.timestampsWriter, nocache)
|
||||
pfc.Add(valuesPath, &bsw.valuesWriter, nocache)
|
||||
pfc.Add(indexPath, &bsw.indexWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(timestampsPath, &bsw.timestampsWriter, nocache))
|
||||
pe.Add(filestream.NewFileCreatorTask(valuesPath, &bsw.valuesWriter, nocache))
|
||||
pe.Add(filestream.NewFileCreatorTask(indexPath, &bsw.indexWriter, nocache))
|
||||
|
||||
// Always cache metaindex file in OS page cache, since it is immediately
|
||||
// read after the merge.
|
||||
pfc.Add(metaindexPath, &bsw.metaindexWriter, false)
|
||||
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &bsw.metaindexWriter, false))
|
||||
|
||||
pfc.Run()
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
// MustClose closes the bsw.
|
||||
@@ -123,13 +124,12 @@ func (bsw *blockStreamWriter) MustClose() {
|
||||
|
||||
// Close writers in parallel in order to reduce the time needed for closing them
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
bsw.timestampsWriter,
|
||||
bsw.valuesWriter,
|
||||
bsw.indexWriter,
|
||||
bsw.metaindexWriter,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(bsw.timestampsWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.valuesWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.indexWriter))
|
||||
pe.Add(fs.NewCloserTask(bsw.metaindexWriter))
|
||||
pe.Run()
|
||||
|
||||
bsw.reset()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -44,12 +45,13 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
|
||||
var psw filestream.ParallelStreamWriter
|
||||
psw.Add(timestampsPath, &mp.timestampsData)
|
||||
psw.Add(valuesPath, &mp.valuesData)
|
||||
psw.Add(indexPath, &mp.indexData)
|
||||
psw.Add(metaindexPath, &mp.metaindexData)
|
||||
psw.Run()
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestampsData))
|
||||
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.valuesData))
|
||||
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.indexData))
|
||||
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindexData))
|
||||
pe.Run()
|
||||
|
||||
mp.ph.MustWriteMetadata(path)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
)
|
||||
@@ -59,7 +60,7 @@ func mustOpenFilePart(path string) *part {
|
||||
// Open part files in parallel in order to speed up this process
|
||||
// on high-latency storage systems such as NFS and Ceph.
|
||||
|
||||
var pro fs.ParallelReaderAtOpener
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
@@ -67,17 +68,17 @@ func mustOpenFilePart(path string) *part {
|
||||
|
||||
var timestampsFile fs.MustReadAtCloser
|
||||
var timestampsSize uint64
|
||||
pro.Add(timestampsPath, ×tampsFile, ×tampsSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(timestampsPath, ×tampsFile, ×tampsSize))
|
||||
|
||||
var valuesFile fs.MustReadAtCloser
|
||||
var valuesSize uint64
|
||||
pro.Add(valuesPath, &valuesFile, &valuesSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(valuesPath, &valuesFile, &valuesSize))
|
||||
|
||||
var indexFile fs.MustReadAtCloser
|
||||
var indexSize uint64
|
||||
pro.Add(indexPath, &indexFile, &indexSize)
|
||||
pe.Add(fs.NewReaderAtOpenerTask(indexPath, &indexFile, &indexSize))
|
||||
|
||||
pro.Run()
|
||||
pe.Run()
|
||||
|
||||
size := timestampsSize + valuesSize + indexSize + metaindexSize
|
||||
return newPart(&ph, path, size, metaindexFile, timestampsFile, valuesFile, indexFile)
|
||||
@@ -118,12 +119,11 @@ func (p *part) String() string {
|
||||
func (p *part) MustClose() {
|
||||
// Close files in parallel in order to speed up this process on storage systems with high latency
|
||||
// such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
p.timestampsFile,
|
||||
p.valuesFile,
|
||||
p.indexFile,
|
||||
}
|
||||
fs.MustCloseParallel(cs)
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(p.timestampsFile))
|
||||
pe.Add(fs.NewCloserTask(p.valuesFile))
|
||||
pe.Add(fs.NewCloserTask(p.indexFile))
|
||||
pe.Run()
|
||||
|
||||
ibCache.RemoveBlocksForPart(p)
|
||||
}
|
||||
|
||||
63
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block_stream_reader.go
generated
vendored
63
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block_stream_reader.go
generated
vendored
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
@@ -95,10 +96,9 @@ func (r *bloomValuesReader) totalBytesRead() uint64 {
|
||||
return r.bloom.bytesRead + r.values.bytesRead
|
||||
}
|
||||
|
||||
func (r *bloomValuesReader) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
|
||||
dst = append(dst, &r.bloom)
|
||||
dst = append(dst, &r.values)
|
||||
return dst
|
||||
func (r *bloomValuesReader) appendCloserTasks(pe *fsutil.ParallelExecutor) {
|
||||
pe.Add(fs.NewCloserTask(&r.bloom))
|
||||
pe.Add(fs.NewCloserTask(&r.values))
|
||||
}
|
||||
|
||||
type bloomValuesStreamReader struct {
|
||||
@@ -181,23 +181,22 @@ func (sr *streamReaders) totalBytesRead() uint64 {
|
||||
func (sr *streamReaders) MustClose() {
|
||||
// Close files in parallel in order to reduce the time needed for this operation
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
&sr.columnNamesReader,
|
||||
&sr.columnIdxsReader,
|
||||
&sr.metaindexReader,
|
||||
&sr.indexReader,
|
||||
&sr.columnsHeaderIndexReader,
|
||||
&sr.columnsHeaderReader,
|
||||
&sr.timestampsReader,
|
||||
}
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(&sr.columnNamesReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.columnIdxsReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.metaindexReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.indexReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.columnsHeaderIndexReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.columnsHeaderReader))
|
||||
pe.Add(fs.NewCloserTask(&sr.timestampsReader))
|
||||
|
||||
cs = sr.messageBloomValuesReader.appendClosers(cs)
|
||||
cs = sr.oldBloomValuesReader.appendClosers(cs)
|
||||
sr.messageBloomValuesReader.appendCloserTasks(&pe)
|
||||
sr.oldBloomValuesReader.appendCloserTasks(&pe)
|
||||
for i := range sr.bloomValuesShards {
|
||||
cs = sr.bloomValuesShards[i].appendClosers(cs)
|
||||
sr.bloomValuesShards[i].appendCloserTasks(&pe)
|
||||
}
|
||||
|
||||
fs.MustCloseParallel(cs)
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {
|
||||
@@ -355,63 +354,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
||||
// Open data readers in parallel in order to reduce the time for this operation
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
|
||||
var pfo filestream.ParallelFileOpener
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
var columnNamesReader filestream.ReadCloser
|
||||
if bsr.ph.FormatVersion >= 1 {
|
||||
pfo.Add(columnNamesPath, &columnNamesReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(columnNamesPath, &columnNamesReader, nocache))
|
||||
}
|
||||
|
||||
var columnIdxsReader filestream.ReadCloser
|
||||
if bsr.ph.FormatVersion >= 3 {
|
||||
pfo.Add(columnIdxsPath, &columnIdxsReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(columnIdxsPath, &columnIdxsReader, nocache))
|
||||
}
|
||||
|
||||
var metaindexReader filestream.ReadCloser
|
||||
pfo.Add(metaindexPath, &metaindexReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(metaindexPath, &metaindexReader, nocache))
|
||||
|
||||
var indexReader filestream.ReadCloser
|
||||
pfo.Add(indexPath, &indexReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(indexPath, &indexReader, nocache))
|
||||
|
||||
var columnsHeaderIndexReader filestream.ReadCloser
|
||||
if bsr.ph.FormatVersion >= 1 {
|
||||
pfo.Add(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache))
|
||||
}
|
||||
|
||||
var columnsHeaderReader filestream.ReadCloser
|
||||
pfo.Add(columnsHeaderPath, &columnsHeaderReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(columnsHeaderPath, &columnsHeaderReader, nocache))
|
||||
|
||||
var timestampsReader filestream.ReadCloser
|
||||
pfo.Add(timestampsPath, ×tampsReader, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(timestampsPath, ×tampsReader, nocache))
|
||||
|
||||
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
|
||||
messageValuesPath := filepath.Join(path, messageValuesFilename)
|
||||
var messageBloomValuesReader bloomValuesStreamReader
|
||||
pfo.Add(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache)
|
||||
pfo.Add(messageValuesPath, &messageBloomValuesReader.values, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(messageBloomFilterPath, &messageBloomValuesReader.bloom, nocache))
|
||||
pe.Add(filestream.NewFileOpenerTask(messageValuesPath, &messageBloomValuesReader.values, nocache))
|
||||
|
||||
var oldBloomValuesReader bloomValuesStreamReader
|
||||
var bloomValuesShards []bloomValuesStreamReader
|
||||
if bsr.ph.FormatVersion < 1 {
|
||||
bloomPath := filepath.Join(path, oldBloomFilename)
|
||||
pfo.Add(bloomPath, &oldBloomValuesReader.bloom, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(bloomPath, &oldBloomValuesReader.bloom, nocache))
|
||||
|
||||
valuesPath := filepath.Join(path, oldValuesFilename)
|
||||
pfo.Add(valuesPath, &oldBloomValuesReader.values, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(valuesPath, &oldBloomValuesReader.values, nocache))
|
||||
} else {
|
||||
bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount)
|
||||
for i := range bloomValuesShards {
|
||||
shard := &bloomValuesShards[i]
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
pfo.Add(bloomPath, &shard.bloom, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(bloomPath, &shard.bloom, nocache))
|
||||
|
||||
valuesPath := getValuesFilePath(path, uint64(i))
|
||||
pfo.Add(valuesPath, &shard.values, nocache)
|
||||
pe.Add(filestream.NewFileOpenerTask(valuesPath, &shard.values, nocache))
|
||||
}
|
||||
}
|
||||
|
||||
pfo.Run()
|
||||
pe.Run()
|
||||
|
||||
// Initialize streamReaders
|
||||
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
|
||||
|
||||
53
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block_stream_writer.go
generated
vendored
53
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/block_stream_writer.go
generated
vendored
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
@@ -84,10 +85,9 @@ func (w *bloomValuesWriter) totalBytesWritten() uint64 {
|
||||
return w.bloom.bytesWritten + w.values.bytesWritten
|
||||
}
|
||||
|
||||
func (w *bloomValuesWriter) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
|
||||
dst = append(dst, &w.bloom)
|
||||
dst = append(dst, &w.values)
|
||||
return dst
|
||||
func (w *bloomValuesWriter) appendCloserTasks(pe *fsutil.ParallelExecutor) {
|
||||
pe.Add(fs.NewCloserTask(&w.bloom))
|
||||
pe.Add(fs.NewCloserTask(&w.values))
|
||||
}
|
||||
|
||||
type bloomValuesStreamWriter struct {
|
||||
@@ -158,22 +158,21 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
|
||||
func (sw *streamWriters) MustClose() {
|
||||
// Flush and close files in parallel in order to reduce the time needed for this operation
|
||||
// on high-latency storage systems such as NFS or Ceph.
|
||||
cs := []fs.MustCloser{
|
||||
&sw.columnNamesWriter,
|
||||
&sw.columnIdxsWriter,
|
||||
&sw.metaindexWriter,
|
||||
&sw.indexWriter,
|
||||
&sw.columnsHeaderIndexWriter,
|
||||
&sw.columnsHeaderWriter,
|
||||
&sw.timestampsWriter,
|
||||
}
|
||||
var pe fsutil.ParallelExecutor
|
||||
pe.Add(fs.NewCloserTask(&sw.columnNamesWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.columnIdxsWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.metaindexWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.indexWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.columnsHeaderIndexWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.columnsHeaderWriter))
|
||||
pe.Add(fs.NewCloserTask(&sw.timestampsWriter))
|
||||
|
||||
cs = sw.messageBloomValuesWriter.appendClosers(cs)
|
||||
sw.messageBloomValuesWriter.appendCloserTasks(&pe)
|
||||
for i := range sw.bloomValuesShards {
|
||||
cs = sw.bloomValuesShards[i].appendClosers(cs)
|
||||
sw.bloomValuesShards[i].appendCloserTasks(&pe)
|
||||
}
|
||||
|
||||
fs.MustCloseParallel(cs)
|
||||
pe.Run()
|
||||
}
|
||||
|
||||
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
|
||||
@@ -312,39 +311,39 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
|
||||
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
|
||||
var pfc filestream.ParallelFileCreator
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
// Always cache columnNames file, since it is re-read immediately after part creation
|
||||
var columnNamesWriter filestream.WriteCloser
|
||||
pfc.Add(columnNamesPath, &columnNamesWriter, false)
|
||||
pe.Add(filestream.NewFileCreatorTask(columnNamesPath, &columnNamesWriter, false))
|
||||
|
||||
// Always cache columnIdxs file, since it is re-read immediately after part creation
|
||||
var columnIdxsWriter filestream.WriteCloser
|
||||
pfc.Add(columnIdxsPath, &columnIdxsWriter, false)
|
||||
pe.Add(filestream.NewFileCreatorTask(columnIdxsPath, &columnIdxsWriter, false))
|
||||
|
||||
// Always cache metaindex file, since it is re-read immediately after part creation
|
||||
var metaindexWriter filestream.WriteCloser
|
||||
pfc.Add(metaindexPath, &metaindexWriter, false)
|
||||
pe.Add(filestream.NewFileCreatorTask(metaindexPath, &metaindexWriter, false))
|
||||
|
||||
var indexWriter filestream.WriteCloser
|
||||
pfc.Add(indexPath, &indexWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(indexPath, &indexWriter, nocache))
|
||||
|
||||
var columnsHeaderIndexWriter filestream.WriteCloser
|
||||
pfc.Add(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(columnsHeaderIndexPath, &columnsHeaderIndexWriter, nocache))
|
||||
|
||||
var columnsHeaderWriter filestream.WriteCloser
|
||||
pfc.Add(columnsHeaderPath, &columnsHeaderWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(columnsHeaderPath, &columnsHeaderWriter, nocache))
|
||||
|
||||
var timestampsWriter filestream.WriteCloser
|
||||
pfc.Add(timestampsPath, ×tampsWriter, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(timestampsPath, ×tampsWriter, nocache))
|
||||
|
||||
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
|
||||
messageValuesPath := filepath.Join(path, messageValuesFilename)
|
||||
var messageBloomValuesWriter bloomValuesStreamWriter
|
||||
pfc.Add(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache)
|
||||
pfc.Add(messageValuesPath, &messageBloomValuesWriter.values, nocache)
|
||||
pe.Add(filestream.NewFileCreatorTask(messageBloomFilterPath, &messageBloomValuesWriter.bloom, nocache))
|
||||
pe.Add(filestream.NewFileCreatorTask(messageValuesPath, &messageBloomValuesWriter.values, nocache))
|
||||
|
||||
pfc.Run()
|
||||
pe.Run()
|
||||
|
||||
createBloomValuesWriter := func(shardIdx uint64) bloomValuesStreamWriter {
|
||||
bloomPath := getBloomFilePath(path, shardIdx)
|
||||
|
||||
27
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/inmemory_part.go
generated
vendored
27
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/inmemory_part.go
generated
vendored
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
)
|
||||
|
||||
// inmemoryPart is an in-memory part.
|
||||
@@ -120,26 +121,26 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
||||
messageValuesPath := filepath.Join(path, messageValuesFilename)
|
||||
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
|
||||
|
||||
var psw filestream.ParallelStreamWriter
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
psw.Add(columnNamesPath, &mp.columnNames)
|
||||
psw.Add(columnIdxsPath, &mp.columnIdxs)
|
||||
psw.Add(metaindexPath, &mp.metaindex)
|
||||
psw.Add(indexPath, &mp.index)
|
||||
psw.Add(columnsHeaderIndexPath, &mp.columnsHeaderIndex)
|
||||
psw.Add(columnsHeaderPath, &mp.columnsHeader)
|
||||
psw.Add(timestampsPath, &mp.timestamps)
|
||||
pe.Add(filestream.NewStreamWriterTask(columnNamesPath, &mp.columnNames))
|
||||
pe.Add(filestream.NewStreamWriterTask(columnIdxsPath, &mp.columnIdxs))
|
||||
pe.Add(filestream.NewStreamWriterTask(metaindexPath, &mp.metaindex))
|
||||
pe.Add(filestream.NewStreamWriterTask(indexPath, &mp.index))
|
||||
pe.Add(filestream.NewStreamWriterTask(columnsHeaderIndexPath, &mp.columnsHeaderIndex))
|
||||
pe.Add(filestream.NewStreamWriterTask(columnsHeaderPath, &mp.columnsHeader))
|
||||
pe.Add(filestream.NewStreamWriterTask(timestampsPath, &mp.timestamps))
|
||||
|
||||
psw.Add(messageBloomFilterPath, &mp.messageBloomValues.bloom)
|
||||
psw.Add(messageValuesPath, &mp.messageBloomValues.values)
|
||||
pe.Add(filestream.NewStreamWriterTask(messageBloomFilterPath, &mp.messageBloomValues.bloom))
|
||||
pe.Add(filestream.NewStreamWriterTask(messageValuesPath, &mp.messageBloomValues.values))
|
||||
|
||||
bloomPath := getBloomFilePath(path, 0)
|
||||
psw.Add(bloomPath, &mp.fieldBloomValues.bloom)
|
||||
pe.Add(filestream.NewStreamWriterTask(bloomPath, &mp.fieldBloomValues.bloom))
|
||||
|
||||
valuesPath := getValuesFilePath(path, 0)
|
||||
psw.Add(valuesPath, &mp.fieldBloomValues.values)
|
||||
pe.Add(filestream.NewStreamWriterTask(valuesPath, &mp.fieldBloomValues.values))
|
||||
|
||||
psw.Run()
|
||||
pe.Run()
|
||||
|
||||
mp.ph.mustWriteMetadata(path)
|
||||
|
||||
|
||||
46
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/part.go
generated
vendored
46
vendor/github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage/part.go
generated
vendored
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
@@ -54,10 +55,9 @@ type bloomValuesReaderAt struct {
|
||||
values fs.MustReadAtCloser
|
||||
}
|
||||
|
||||
func (r *bloomValuesReaderAt) appendClosers(dst []fs.MustCloser) []fs.MustCloser {
|
||||
dst = append(dst, r.bloom)
|
||||
dst = append(dst, r.values)
|
||||
return dst
|
||||
func (r *bloomValuesReaderAt) appendCloserTasks(pe *fsutil.ParallelExecutor) {
|
||||
pe.Add(fs.NewCloserTask(r.bloom))
|
||||
pe.Add(fs.NewCloserTask(r.values))
|
||||
}
|
||||
|
||||
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
|
||||
@@ -137,36 +137,36 @@ func mustOpenFilePart(pt *partition, path string) *part {
|
||||
mrs.MustClose()
|
||||
|
||||
// Open data files
|
||||
p.indexFile = fs.MustOpenReaderAt(indexPath)
|
||||
p.indexFile = fs.OpenReaderAt(indexPath)
|
||||
if p.ph.FormatVersion >= 1 {
|
||||
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
|
||||
p.columnsHeaderIndexFile = fs.OpenReaderAt(columnsHeaderIndexPath)
|
||||
}
|
||||
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
|
||||
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
|
||||
p.columnsHeaderFile = fs.OpenReaderAt(columnsHeaderPath)
|
||||
p.timestampsFile = fs.OpenReaderAt(timestampsPath)
|
||||
|
||||
// Open files with bloom filters and column values
|
||||
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
|
||||
p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
|
||||
p.messageBloomValues.bloom = fs.OpenReaderAt(messageBloomFilterPath)
|
||||
|
||||
messageValuesPath := filepath.Join(path, messageValuesFilename)
|
||||
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
|
||||
p.messageBloomValues.values = fs.OpenReaderAt(messageValuesPath)
|
||||
|
||||
if p.ph.FormatVersion < 1 {
|
||||
bloomPath := filepath.Join(path, oldBloomFilename)
|
||||
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
|
||||
p.oldBloomValues.bloom = fs.OpenReaderAt(bloomPath)
|
||||
|
||||
valuesPath := filepath.Join(path, oldValuesFilename)
|
||||
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
|
||||
p.oldBloomValues.values = fs.OpenReaderAt(valuesPath)
|
||||
} else {
|
||||
p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount)
|
||||
for i := range p.bloomValuesShards {
|
||||
shard := &p.bloomValuesShards[i]
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
shard.bloom = fs.MustOpenReaderAt(bloomPath)
|
||||
shard.bloom = fs.OpenReaderAt(bloomPath)
|
||||
|
||||
valuesPath := getValuesFilePath(path, uint64(i))
|
||||
shard.values = fs.MustOpenReaderAt(valuesPath)
|
||||
shard.values = fs.OpenReaderAt(valuesPath)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,25 +176,25 @@ func mustOpenFilePart(pt *partition, path string) *part {
|
||||
func mustClosePart(p *part) {
|
||||
// Close files in parallel in order to speed up this operation
|
||||
// on high-latency storage systems such as NFS and Ceph.
|
||||
var cs []fs.MustCloser
|
||||
var pe fsutil.ParallelExecutor
|
||||
|
||||
cs = append(cs, p.indexFile)
|
||||
pe.Add(fs.NewCloserTask(p.indexFile))
|
||||
if p.ph.FormatVersion >= 1 {
|
||||
cs = append(cs, p.columnsHeaderIndexFile)
|
||||
pe.Add(fs.NewCloserTask(p.columnsHeaderIndexFile))
|
||||
}
|
||||
cs = append(cs, p.columnsHeaderFile)
|
||||
cs = append(cs, p.timestampsFile)
|
||||
cs = p.messageBloomValues.appendClosers(cs)
|
||||
pe.Add(fs.NewCloserTask(p.columnsHeaderFile))
|
||||
pe.Add(fs.NewCloserTask(p.timestampsFile))
|
||||
p.messageBloomValues.appendCloserTasks(&pe)
|
||||
|
||||
if p.ph.FormatVersion < 1 {
|
||||
cs = p.oldBloomValues.appendClosers(cs)
|
||||
p.oldBloomValues.appendCloserTasks(&pe)
|
||||
} else {
|
||||
for i := range p.bloomValuesShards {
|
||||
cs = p.bloomValuesShards[i].appendClosers(cs)
|
||||
p.bloomValuesShards[i].appendCloserTasks(&pe)
|
||||
}
|
||||
}
|
||||
|
||||
fs.MustCloseParallel(cs)
|
||||
pe.Run()
|
||||
|
||||
p.pt = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user