mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/backup: speed up restores on linuxsystems (#10661)
Related to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10680 We noticed that backup restores in our environment were much slower than the hardware/bandwidth constraints would suggest and we traced this down to a couple of bottlenecks. This PR attempts to address all of them. #### Lack of pre-allocation of files, This was causing writes far into files to be quite slow as new blocks needed to be continually allocated. This was particularly bad on ext4 for us, but will likely be applicable to most disks and filesystems, you'll see the impl here is linux specific but this is mostly because I don't have a test env for any other platform and didn't want to blindly make changes without a validation env. This comes with the downside of no longer being to to resume a restore mid file, and requiring the re-downloading of parts already in the file size the file will appear at full size from the very start. This is I think _generally_ a good tradeoff for the restore speed gains, it is definitely a tradeoff so I've included a flag to disable the pre-allocation behavior and fall back to the existing part diffing logic. #### Fsync after each part With many small parts in relatively few files, or in high concurrency setups the the writerCloser fsync on each part(actually double fsync since both `filestream.Writer.mustFlush` and `filestream.Writer.mustClose` both fsync). Was causing slowdowns since we would be continually queuing fsyncs. With the pre-allocation pattern the file is only "ready" once re-named so I moved to a per file fsync after rename. #### Concurrent read/write The previous download pattern was to do a read from the remoteFs, with whatever latency that entailed, then sequentially do a write, again with whatever latency that entailed. This meant that throughput was limited to `readLatency + writeLatency * blockSize`. Similar to how `crossTypeCopy` is implemented in the backup process we can instead use `io.pipe` to allow two goroutines to work in parallel with a small buffer between them. #### Pagecache avoidance `filestream.Writer` does quite a lot to avoid polluting the page cache, but this is not relevent in a restore context and with large sequential block writes its much more effecient to let the OS flush the pagecache whenever it wants rather than doing a bunch of small buffer syscalls to flush blocks. Therefore this switches over to a much simplier directWriterCloser that does direct file IO and lets the OS handle flushes while mid write. ### Performance Before the changes we were seeing writes speeds of only 100MBps, this was a restore from EBS volumes, ext with 1GB/s throughput with <img width="1613" height="586" alt="Screenshot 2026-03-16 at 1 29 46 PM" src="https://github.com/user-attachments/assets/5d54dcb7-cb59-43e0-9247-fda8c70feb2f" /> After these changes in the same restore env we're seeing 600MBs flat rates. <img width="1611" height="471" alt="Screenshot 2026-03-16 at 1 31 33 PM" src="https://github.com/user-attachments/assets/ea8e2eb7-533a-48fa-99e0-0b38286e5572" /> Signed-off-by: Max Kotliar <kotlyar.maksim@gmail.com> Co-authored-by: Max Kotliar <mkotlyar@victoriametrics.com>
This commit is contained in:
committed by
GitHub
parent
f1f70e976e
commit
febafc1cf1
@@ -30,6 +30,7 @@ var (
|
||||
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
|
||||
maxBytesPerSecond = flagutil.NewBytes("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
|
||||
skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for 'backup complete' file in -src. This may be useful for restoring from old backups, which were created without 'backup complete' file")
|
||||
SkipPreallocation = flag.Bool("skipFilePreallocation", false, "Whether to skip pre-allocated files. This will likely be slower in most cases, but allows restores to resume mid file on failure")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -63,6 +64,7 @@ func main() {
|
||||
Src: srcFS,
|
||||
Dst: dstFS,
|
||||
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
|
||||
SkipPreallocation: *SkipPreallocation,
|
||||
}
|
||||
pushmetrics.Init()
|
||||
if err := a.Run(ctx); err != nil {
|
||||
|
||||
@@ -34,6 +34,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): extend JWT [claim matching](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-matching) with array claim values support. See [#10647](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10647). Thanks to @andriibeee for the contribution.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): allow specifying `basic_auth` in scrape configs without `username`. Previously this resulted in a config error. Now a warning is logged instead. See [#6956](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6956). Thanks to @andriibeee for the contribution.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add support for negative buckets in [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) `ExponentialHistogram` during ingestion. See [#9896-comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9896#issuecomment-4037522985).
|
||||
* FEATURE: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): Improve restore speed on linux systems by pre-allocating files and optimizing write path. This behavior can be disabled with `-skipFilePreallocation`. See [#10661](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10661/). Thanks to @BenNF for the contribution.
|
||||
|
||||
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): retry RPC by dialing a new connection instead of reusing a pooled one when the previous attempt fails with `io.EOF`, `broken pipe` or `reset by peer`. This reduces query failures caused by stale connections to restarted vmstorage nodes. See [#10314](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10314)
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): fix autocomplete dropdown not closing on the Raw Query page. See [#10665](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10665)
|
||||
|
||||
@@ -14,7 +14,7 @@ aliases:
|
||||
---
|
||||
`vmrestore` restores data from backups created by [vmbackup](https://docs.victoriametrics.com/victoriametrics/vmbackup/).
|
||||
|
||||
Restore process can be interrupted at any time. It is automatically resumed from the interruption point when restarting `vmrestore` with the same args.
|
||||
Restore process can be interrupted at any time. It is automatically resumed when restarting `vmrestore` with the same args. If file preallocation is enabled{{% available_from "#" %}}, it resumes from the last complete file, if file preallocation is disabled via `-skipFilePreallocation` then it resumes from the interruption point mid file.
|
||||
|
||||
## Usage
|
||||
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package actions
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -40,6 +42,11 @@ type Restore struct {
|
||||
//
|
||||
// This may be needed for restoring from old backups with missing `backup complete` file.
|
||||
SkipBackupCompleteCheck bool
|
||||
|
||||
// SkipPreallocation may be set in order to skip preallocation of files during restore.
|
||||
//
|
||||
// This will likely be slower in most cases, but allows restores to resume mid file
|
||||
SkipPreallocation bool
|
||||
}
|
||||
|
||||
// Run runs r with the provided settings.
|
||||
@@ -58,6 +65,10 @@ func (r *Restore) Run(ctx context.Context) error {
|
||||
src := r.Src
|
||||
dst := r.Dst
|
||||
|
||||
if !r.SkipPreallocation {
|
||||
dst.UseTmpFiles = true
|
||||
}
|
||||
|
||||
if !r.SkipBackupCompleteCheck {
|
||||
ok, err := src.HasFile(backupnames.BackupCompleteFilename)
|
||||
if err != nil {
|
||||
@@ -71,6 +82,10 @@ func (r *Restore) Run(ctx context.Context) error {
|
||||
|
||||
logger.Infof("starting restore from %s to %s", src, dst)
|
||||
|
||||
if err := dst.CleanupTmpFiles(); err != nil {
|
||||
return fmt.Errorf("cannot cleanup temporary files at %s: %w", dst, err)
|
||||
}
|
||||
|
||||
logger.Infof("obtaining list of parts at %s", src)
|
||||
srcParts, err := src.ListParts()
|
||||
if err != nil {
|
||||
@@ -164,25 +179,21 @@ func (r *Restore) Run(ctx context.Context) error {
|
||||
var bytesDownloaded atomic.Uint64
|
||||
err = runParallelPerPath(ctx, concurrency, perPath, func(parts []common.Part) error {
|
||||
// Sort partsToCopy in order to properly grow file size during downloading
|
||||
// and to properly resume downloading of incomplete files on the next Restore.Run call.
|
||||
common.SortParts(parts)
|
||||
if !r.SkipPreallocation {
|
||||
if err := dst.PreallocateFile(parts[0]); err != nil {
|
||||
return fmt.Errorf("cannot preallocate %s: %w", parts[0].Path, err)
|
||||
}
|
||||
}
|
||||
for _, p := range parts {
|
||||
logger.Infof("downloading %s from %s to %s", &p, src, dst)
|
||||
wc, err := dst.NewWriteCloser(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create writer for %q to %s: %w", &p, dst, err)
|
||||
}
|
||||
sw := &statWriter{
|
||||
w: wc,
|
||||
bytesWritten: &bytesDownloaded,
|
||||
}
|
||||
if err := src.DownloadPart(p, sw); err != nil {
|
||||
return fmt.Errorf("cannot download %s to %s: %w", &p, dst, err)
|
||||
}
|
||||
if err := wc.Close(); err != nil {
|
||||
return fmt.Errorf("cannot close reader from %s from %s: %w", &p, src, err)
|
||||
if err := pipelinedDownload(src, dst, p, &bytesDownloaded); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := dst.FinalizeFile(parts[0]); err != nil {
|
||||
return fmt.Errorf("cannot finalize %s: %w", parts[0].Path, err)
|
||||
}
|
||||
return nil
|
||||
}, func(elapsed time.Duration) {
|
||||
if elapsed.Seconds() <= 0 {
|
||||
@@ -210,6 +221,69 @@ func (r *Restore) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const writeBufSize = 2 * 1024 * 1024
|
||||
|
||||
var writeBufPool = sync.Pool{
|
||||
New: func() any {
|
||||
buf := make([]byte, writeBufSize)
|
||||
return &buf
|
||||
},
|
||||
}
|
||||
|
||||
const readBufSize = 8 * 1024 * 1024
|
||||
|
||||
var readBufPool = sync.Pool{
|
||||
New: func() any {
|
||||
return bufio.NewWriterSize(nil, readBufSize)
|
||||
},
|
||||
}
|
||||
|
||||
func pipelinedDownload(src common.RemoteFS, dst *fslocal.FS, p common.Part, bytesDownloaded *atomic.Uint64) error {
|
||||
wc, err := dst.NewDirectWriteCloser(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create writer for %q to %s: %w", &p, dst, err)
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
buf := readBufPool.Get().(*bufio.Writer)
|
||||
buf.Reset(pw)
|
||||
err := src.DownloadPart(p, buf)
|
||||
if err == nil {
|
||||
err = buf.Flush()
|
||||
}
|
||||
readBufPool.Put(buf)
|
||||
pw.CloseWithError(err)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
sw := &statWriter{
|
||||
w: wc,
|
||||
bytesWritten: bytesDownloaded,
|
||||
}
|
||||
bufp := writeBufPool.Get().(*[]byte)
|
||||
_, writeErr := io.CopyBuffer(sw, pr, *bufp)
|
||||
writeBufPool.Put(bufp)
|
||||
pr.Close()
|
||||
|
||||
downloadErr := <-errCh
|
||||
|
||||
closeErr := wc.Close()
|
||||
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("cannot write %s to %s: %w", &p, dst, writeErr)
|
||||
}
|
||||
if downloadErr != nil {
|
||||
return fmt.Errorf("cannot download %s from %s: %w", &p, src, downloadErr)
|
||||
}
|
||||
if closeErr != nil {
|
||||
return fmt.Errorf("cannot close writer for %s at %s: %w", &p, dst, closeErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type statWriter struct {
|
||||
w io.Writer
|
||||
bytesWritten *atomic.Uint64
|
||||
|
||||
@@ -107,7 +107,7 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
||||
}
|
||||
|
||||
func isSpecialFile(name string) bool {
|
||||
return name == "flock.lock" || name == backupnames.RestoreInProgressFilename || name == backupnames.RestoreMarkFileName
|
||||
return name == "flock.lock" || name == backupnames.RestoreInProgressFilename || name == backupnames.RestoreMarkFileName || strings.HasSuffix(name, ".tmp")
|
||||
}
|
||||
|
||||
// RemoveEmptyDirs recursively removes empty directories under the given dir.
|
||||
|
||||
@@ -25,6 +25,8 @@ type FS struct {
|
||||
MaxBytesPerSecond int
|
||||
|
||||
bl *bandwidthLimiter
|
||||
|
||||
UseTmpFiles bool
|
||||
}
|
||||
|
||||
// Init initializes fs.
|
||||
@@ -121,26 +123,87 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) {
|
||||
return blrc, nil
|
||||
}
|
||||
|
||||
// NewWriteCloser returns io.WriteCloser for the given part p located in fs.
|
||||
func (fs *FS) NewWriteCloser(p common.Part) (io.WriteCloser, error) {
|
||||
path := fs.path(p)
|
||||
// NewDirectWriteCloser returns an io.WriteCloser that writes directly to the
|
||||
// underlying file without buffering, enabling large IO sizes from the caller.
|
||||
// On platforms with preallocation, writes go to a .tmp file that must be
|
||||
// finalized with FinalizeFile.
|
||||
func (fs *FS) NewDirectWriteCloser(p common.Part) (io.WriteCloser, error) {
|
||||
path := fs.writePath(p)
|
||||
if err := fs.mkdirAll(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w, err := filestream.OpenWriterAt(path, int64(p.Offset), true)
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open writer for %q at offset %d: %w", path, p.Offset, err)
|
||||
return nil, fmt.Errorf("cannot open file %q: %w", path, err)
|
||||
}
|
||||
wc := &writeCloser{
|
||||
w: w,
|
||||
n: p.Size,
|
||||
path: path,
|
||||
if _, err := f.Seek(int64(p.Offset), io.SeekStart); err != nil {
|
||||
_ = f.Close()
|
||||
return nil, fmt.Errorf("cannot seek to offset %d in %q: %w", p.Offset, path, err)
|
||||
}
|
||||
dwc := &directWriteCloser{
|
||||
f: f,
|
||||
n: p.Size,
|
||||
}
|
||||
if fs.bl == nil {
|
||||
return wc, nil
|
||||
return dwc, nil
|
||||
}
|
||||
blwc := fs.bl.NewWriteCloser(wc)
|
||||
return blwc, nil
|
||||
// with a bandwidth limiter max throughput is not a concern
|
||||
// so we fallback to the filestream backed limited writerCloser
|
||||
return fs.bl.NewWriteCloser(dwc), nil
|
||||
}
|
||||
|
||||
// PreallocateFile pre-allocates disk space for the file being written.
|
||||
func (fs *FS) PreallocateFile(p common.Part) error {
|
||||
path := fs.writePath(p)
|
||||
if err := fs.mkdirAll(path); err != nil {
|
||||
return err
|
||||
}
|
||||
return preallocateFile(path, int64(p.FileSize))
|
||||
}
|
||||
|
||||
// FinalizeFile syncs the completed file to disk. On platforms with
|
||||
// preallocation, it first renames the .tmp file to its final path.
|
||||
func (fs *FS) FinalizeFile(p common.Part) error {
|
||||
finalPath := fs.path(p)
|
||||
if canPreallocate && fs.UseTmpFiles {
|
||||
tmpPath := fs.tmpPath(p)
|
||||
if err := os.Rename(tmpPath, finalPath); err != nil {
|
||||
return fmt.Errorf("cannot rename %q to %q: %w", tmpPath, finalPath, err)
|
||||
}
|
||||
}
|
||||
f, err := os.Open(finalPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open %q for fsync: %w", finalPath, err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
_ = f.Close()
|
||||
return fmt.Errorf("cannot fsync %q: %w", finalPath, err)
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// CleanupTmpFiles removes leftover .tmp files from interrupted restores.
|
||||
// On platforms without preallocation this is a no-op.
|
||||
func (fs *FS) CleanupTmpFiles() error {
|
||||
if !canPreallocate {
|
||||
return nil
|
||||
}
|
||||
if _, err := os.Stat(fs.Dir); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return filepath.Walk(fs.Dir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() && strings.HasSuffix(path, ".tmp") {
|
||||
logger.Infof("removing incomplete temporary file %q", path)
|
||||
return os.Remove(path)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// DeletePath deletes the given path from fs and returns the size for the deleted file.
|
||||
@@ -188,6 +251,17 @@ func (fs *FS) path(p common.Part) string {
|
||||
return p.LocalPath(fs.Dir)
|
||||
}
|
||||
|
||||
func (fs *FS) tmpPath(p common.Part) string {
|
||||
return fs.path(p) + ".tmp"
|
||||
}
|
||||
|
||||
func (fs *FS) writePath(p common.Part) string {
|
||||
if canPreallocate && fs.UseTmpFiles {
|
||||
return fs.tmpPath(p)
|
||||
}
|
||||
return fs.path(p)
|
||||
}
|
||||
|
||||
type limitedReadCloser struct {
|
||||
r *filestream.Reader
|
||||
n uint64
|
||||
@@ -213,27 +287,26 @@ func (lrc *limitedReadCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type writeCloser struct {
|
||||
w *filestream.Writer
|
||||
n uint64
|
||||
path string
|
||||
type directWriteCloser struct {
|
||||
f *os.File
|
||||
n uint64
|
||||
}
|
||||
|
||||
func (wc *writeCloser) Write(p []byte) (int, error) {
|
||||
n, err := wc.w.Write(p)
|
||||
if uint64(n) > wc.n {
|
||||
return n, fmt.Errorf("too much data written; got %d bytes; want %d bytes", n, wc.n)
|
||||
func (dwc *directWriteCloser) Write(p []byte) (int, error) {
|
||||
n, err := dwc.f.Write(p)
|
||||
if uint64(n) > dwc.n {
|
||||
return n, fmt.Errorf("too much data written; got %d bytes; want %d bytes", n, dwc.n)
|
||||
}
|
||||
wc.n -= uint64(n)
|
||||
dwc.n -= uint64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (wc *writeCloser) Close() error {
|
||||
wc.w.MustFlush(true)
|
||||
wc.w.MustClose()
|
||||
if wc.n != 0 {
|
||||
return fmt.Errorf("missing data writes for %d bytes", wc.n)
|
||||
func (dwc *directWriteCloser) Close() error {
|
||||
if err := dwc.f.Close(); err != nil {
|
||||
return fmt.Errorf("cannot close file %q: %w", dwc.f.Name(), err)
|
||||
}
|
||||
if dwc.n != 0 {
|
||||
return fmt.Errorf("missing data writes for %d bytes", dwc.n)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
27
lib/backup/fslocal/prealloc_linux.go
Normal file
27
lib/backup/fslocal/prealloc_linux.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package fslocal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const canPreallocate = true
|
||||
|
||||
func preallocateFile(path string, size int64) error {
|
||||
if size <= 0 {
|
||||
return nil
|
||||
}
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open %q for preallocation: %w", path, err)
|
||||
}
|
||||
err = syscall.Fallocate(int(f.Fd()), 0, 0, size)
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fallocate %d bytes for %q: %w", size, path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
9
lib/backup/fslocal/prealloc_other.go
Normal file
9
lib/backup/fslocal/prealloc_other.go
Normal file
@@ -0,0 +1,9 @@
|
||||
//go:build !linux
|
||||
|
||||
package fslocal
|
||||
|
||||
const canPreallocate = false
|
||||
|
||||
func preallocateFile(_ string, _ int64) error {
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user