mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-02 16:42:10 +03:00
Compare commits
5 Commits
shared-vms
...
cluster-nf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31b5128285 | ||
|
|
7f0fba60ea | ||
|
|
9564779b6d | ||
|
|
25268af76d | ||
|
|
cc3c1b9bb1 |
@@ -1,11 +1,17 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// directories with this filename are scheduled to be removed by MustRemoveDir().
|
||||
@@ -48,45 +54,28 @@ func MustRemoveDir(dirPath string) {
|
||||
// on high-latency storage systems such as NFS.
|
||||
// Directories for VitoriaLogs parts may contain big number of items when wide events are stored there.
|
||||
// Also the number of parts in a partition may be quite big.
|
||||
des := MustReadDir(dirPath)
|
||||
var wg sync.WaitGroup
|
||||
concurrencyCh := make(chan struct{}, min(32, len(des)-1))
|
||||
for _, de := range des {
|
||||
name := de.Name()
|
||||
if name == deleteDirFilename {
|
||||
continue
|
||||
}
|
||||
dirEntryPath := filepath.Join(dirPath, name)
|
||||
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Go(func() {
|
||||
if err := os.RemoveAll(dirEntryPath); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", dirEntryPath, err)
|
||||
}
|
||||
<-concurrencyCh
|
||||
})
|
||||
if tryRemoveDir(dirPath) {
|
||||
return
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Make sure the deleted names are properly synced to the dirPath,
|
||||
// so they are no longer visible after unclean shutdown.
|
||||
MustSyncPath(dirPath)
|
||||
|
||||
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
|
||||
MustRemovePath(deleteFilePath)
|
||||
|
||||
// Sync the directory after the removing deletDirFilename file in order to make sure
|
||||
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9709
|
||||
MustSyncPath(dirPath)
|
||||
|
||||
// Remove the dirPath itself
|
||||
MustRemovePath(dirPath)
|
||||
|
||||
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
|
||||
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
|
||||
// on the next startup.
|
||||
// schedule NFS background dir removal.
|
||||
// NFS may perform "silly rename" before deletion, if client detects more than 1 file reference.
|
||||
// Silly rename is async operation and client may take an additional time before
|
||||
// unlink operation will succeed and could be actually deleted.
|
||||
select {
|
||||
case removeDirConcurrencyCh <- struct{}{}:
|
||||
default:
|
||||
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", dirPath, cap(removeDirConcurrencyCh))
|
||||
}
|
||||
dirRemoverWG.Go(func() {
|
||||
for {
|
||||
if tryRemoveDir(dirPath) {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// IsPartiallyRemovedDir returns true if dirPath is partially removed because of unclean shutdown during the MustRemoveDir() call.
|
||||
@@ -118,7 +107,8 @@ func IsPartiallyRemovedDir(dirPath string) bool {
|
||||
// Use MustRemoveDir for removing non-empty directories.
|
||||
func MustRemovePath(path string) {
|
||||
if err := os.Remove(path); err != nil {
|
||||
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
|
||||
dirFiles := collectExistFiles(path)
|
||||
logger.Fatalf("FATAL: cannot remove %q: %s, exist files: %s", path, err, strings.Join(dirFiles, ","))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,3 +131,156 @@ func MustRemoveDirContents(dir string) {
|
||||
}
|
||||
MustSyncPath(dir)
|
||||
}
|
||||
|
||||
// tryRemoveDir performs removal of directory
|
||||
// it checks error for the first NFS temporary error
|
||||
func tryRemoveDir(dirPath string) bool {
|
||||
des := MustReadDir(dirPath)
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var firstNFSErr error
|
||||
recordNFSErr := func(err error) {
|
||||
mu.Lock()
|
||||
if firstNFSErr == nil {
|
||||
firstNFSErr = err
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
workerCount := max(1, min(32, len(des)))
|
||||
concurrencyCh := make(chan struct{}, workerCount)
|
||||
for _, de := range des {
|
||||
name := de.Name()
|
||||
if name == deleteDirFilename {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(name, ".nfs") {
|
||||
recordNFSErr(fmt.Errorf("dir: %q contains stale NFS file: %q", dirPath, name))
|
||||
continue
|
||||
}
|
||||
dirEntryPath := filepath.Join(dirPath, name)
|
||||
|
||||
concurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(dirEntryPath string) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-concurrencyCh
|
||||
}()
|
||||
if err := os.RemoveAll(dirEntryPath); err != nil {
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Fatalf("FATAL: cannot remove %q: %s", dirEntryPath, err)
|
||||
}
|
||||
recordNFSErr(fmt.Errorf("cannot remove dirEntryPath: %q: %w", dirEntryPath, err))
|
||||
}
|
||||
}(dirEntryPath)
|
||||
}
|
||||
wg.Wait()
|
||||
if firstNFSErr != nil {
|
||||
logger.Warnf("cannot removeDir due to NFS error: %s", firstNFSErr)
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
return false
|
||||
}
|
||||
// Make sure the deleted names are properly synced to the dirPath,
|
||||
// so they are no longer visible after unclean shutdown.
|
||||
MustSyncPath(dirPath)
|
||||
|
||||
// New stale NFS files may have appeared since the loop
|
||||
if hasStaleNFSFiles(dirPath) {
|
||||
nfsDirRemoveFailedAttempts.Inc()
|
||||
return false
|
||||
}
|
||||
|
||||
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
|
||||
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
|
||||
if err := os.Remove(deleteFilePath); err != nil && !os.IsNotExist(err) {
|
||||
// At NFS filesystems, file deletion could fail for any reason and it should be retried
|
||||
if !isTemporaryNFSError(err) {
|
||||
logger.Fatalf("FATAL: cannot remove %q: %s", deleteFilePath, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Sync the directory after the removing deletDirFilename file in order to make sure
|
||||
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9709
|
||||
MustSyncPath(dirPath)
|
||||
|
||||
// Remove the dirPath itself
|
||||
MustRemovePath(dirPath)
|
||||
|
||||
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
|
||||
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
|
||||
// on the next startup.
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
var (
|
||||
dirRemoverWG sync.WaitGroup
|
||||
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
|
||||
_ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 {
|
||||
return float64(len(removeDirConcurrencyCh))
|
||||
})
|
||||
)
|
||||
|
||||
var removeDirConcurrencyCh = make(chan struct{}, 1024)
|
||||
|
||||
func isTemporaryNFSError(err error) bool {
|
||||
var pathErr *os.PathError
|
||||
if errors.As(err, &pathErr) {
|
||||
// Some NFS implementations return EEXIST instead of ENOTEMPTY
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6398
|
||||
if errors.Is(pathErr.Err, syscall.EEXIST) || errors.Is(pathErr.Err, syscall.ENOTEMPTY) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// Do not check for NFS file handle error, usually it means that other client has opened file without proper lock
|
||||
// in this scenario it's better to panic.
|
||||
// User must configure proper locking options for the NFS client to prevent such error.
|
||||
// It must never have "nolock" or "local_lock=all" options to be set.
|
||||
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
|
||||
errStr := err.Error()
|
||||
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
|
||||
}
|
||||
|
||||
func hasStaleNFSFiles(dirPath string) bool {
|
||||
des := MustReadDir(dirPath)
|
||||
for _, de := range des {
|
||||
name := de.Name()
|
||||
if strings.HasPrefix(name, ".nfs") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// MustStopDirRemover must be called in the end of graceful shutdown
|
||||
// in order to wait for removing the remaining directories from removeDirConcurrencyCh.
|
||||
//
|
||||
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called.
|
||||
func MustStopDirRemover() {
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
dirRemoverWG.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
const maxWaitTime = 10 * time.Second
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(maxWaitTime):
|
||||
logger.Errorf("cannot stop dirRemover in %s; the remaining empty NFS directories should be automatically removed on the next startup", maxWaitTime)
|
||||
}
|
||||
}
|
||||
|
||||
func collectExistFiles(dirPath string) []string {
|
||||
des := MustReadDir(dirPath)
|
||||
var files []string
|
||||
for _, de := range des {
|
||||
files = append(files, fmt.Sprintf("name=%q,is_dir=%v", de.Name(), de.IsDir()))
|
||||
}
|
||||
return files
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@@ -32,3 +33,48 @@ func TestIsPartiallyRemovedDir(t *testing.T) {
|
||||
f("empty_dir", "", true)
|
||||
f("regular_dir", "index.bin", false)
|
||||
}
|
||||
|
||||
func TestTryRemoveDir(t *testing.T) {
|
||||
f := func(setup func(t *testing.T, wd string), want bool) {
|
||||
t.Helper()
|
||||
d := t.TempDir()
|
||||
setup(t, d)
|
||||
got := tryRemoveDir(d)
|
||||
if got != want {
|
||||
t.Fatalf("unexpected error: (-%v;+%v)", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
writeEmptyFile := func(t *testing.T, filePath string) {
|
||||
t.Helper()
|
||||
err := os.WriteFile(filePath, []byte("empty"), os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot write file: %q: %s", filePath, err)
|
||||
}
|
||||
}
|
||||
// regular delete
|
||||
setup := func(t *testing.T, wd string) {
|
||||
writeEmptyFile(t, filepath.Join(wd, "metadata.bin"))
|
||||
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
|
||||
}
|
||||
f(setup, true)
|
||||
|
||||
// has stale nfs file
|
||||
setup = func(t *testing.T, wd string) {
|
||||
writeEmptyFile(t, filepath.Join(wd, ".nfs0000"))
|
||||
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
|
||||
}
|
||||
f(setup, false)
|
||||
|
||||
// empty dir
|
||||
f(func(_ *testing.T, _ string) {}, true)
|
||||
|
||||
// delete many files concurrent
|
||||
setup = func(t *testing.T, wd string) {
|
||||
for i := range 60 {
|
||||
writeEmptyFile(t, filepath.Join(wd, fmt.Sprintf("metadata_%d.bin", i)))
|
||||
}
|
||||
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
|
||||
}
|
||||
f(setup, true)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user