Compare commits

...

5 Commits

Author SHA1 Message Date
f41gh7
31b5128285 lib/fs: add a check if NFS stale file present
File deletion may return no error and create .nfs file instead.
It actually depends on client version and used mount options.

 Add tests for happy path for tryRemoveDir
2026-03-16 22:43:25 +01:00
f41gh7
7f0fba60ea do not attemp remove nfs temporary files 2026-03-12 16:57:21 +01:00
f41gh7
9564779b6d add debug information for failed dir removal 2026-03-12 16:44:48 +01:00
f41gh7
25268af76d lib/fs: retry deleteDirFilename deletion 2026-03-09 22:43:50 +01:00
f41gh7
cc3c1b9bb1 Update docs/victoriametrics/changelog/CHANGELOG.md 2026-03-04 11:52:05 +01:00
2 changed files with 226 additions and 37 deletions

View File

@@ -1,11 +1,17 @@
package fs
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// directories with this filename are scheduled to be removed by MustRemoveDir().
@@ -48,45 +54,28 @@ func MustRemoveDir(dirPath string) {
// on high-latency storage systems such as NFS.
// Directories for VitoriaLogs parts may contain big number of items when wide events are stored there.
// Also the number of parts in a partition may be quite big.
des := MustReadDir(dirPath)
var wg sync.WaitGroup
concurrencyCh := make(chan struct{}, min(32, len(des)-1))
for _, de := range des {
name := de.Name()
if name == deleteDirFilename {
continue
}
dirEntryPath := filepath.Join(dirPath, name)
concurrencyCh <- struct{}{}
wg.Go(func() {
if err := os.RemoveAll(dirEntryPath); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
<-concurrencyCh
})
if tryRemoveDir(dirPath) {
return
}
wg.Wait()
// Make sure the deleted names are properly synced to the dirPath,
// so they are no longer visible after unclean shutdown.
MustSyncPath(dirPath)
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
MustRemovePath(deleteFilePath)
// Sync the directory after the removing deletDirFilename file in order to make sure
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
// and https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9709
MustSyncPath(dirPath)
// Remove the dirPath itself
MustRemovePath(dirPath)
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
// on the next startup.
// schedule NFS background dir removal.
// NFS may perform "silly rename" before deletion, if client detects more than 1 file reference.
// Silly rename is async operation and client may take an additional time before
// unlink operation will succeed and could be actually deleted.
select {
case removeDirConcurrencyCh <- struct{}{}:
default:
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", dirPath, cap(removeDirConcurrencyCh))
}
dirRemoverWG.Go(func() {
for {
if tryRemoveDir(dirPath) {
return
}
time.Sleep(time.Second)
}
})
}
// IsPartiallyRemovedDir returns true if dirPath is partially removed because of unclean shutdown during the MustRemoveDir() call.
@@ -118,7 +107,8 @@ func IsPartiallyRemovedDir(dirPath string) bool {
// Use MustRemoveDir for removing non-empty directories.
func MustRemovePath(path string) {
if err := os.Remove(path); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
dirFiles := collectExistFiles(path)
logger.Fatalf("FATAL: cannot remove %q: %s, exist files: %s", path, err, strings.Join(dirFiles, ","))
}
}
@@ -141,3 +131,156 @@ func MustRemoveDirContents(dir string) {
}
MustSyncPath(dir)
}
// tryRemoveDir performs removal of directory
// it checks error for the first NFS temporary error
func tryRemoveDir(dirPath string) bool {
des := MustReadDir(dirPath)
var wg sync.WaitGroup
var mu sync.Mutex
var firstNFSErr error
recordNFSErr := func(err error) {
mu.Lock()
if firstNFSErr == nil {
firstNFSErr = err
}
mu.Unlock()
}
workerCount := max(1, min(32, len(des)))
concurrencyCh := make(chan struct{}, workerCount)
for _, de := range des {
name := de.Name()
if name == deleteDirFilename {
continue
}
if strings.HasPrefix(name, ".nfs") {
recordNFSErr(fmt.Errorf("dir: %q contains stale NFS file: %q", dirPath, name))
continue
}
dirEntryPath := filepath.Join(dirPath, name)
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dirEntryPath string) {
defer func() {
wg.Done()
<-concurrencyCh
}()
if err := os.RemoveAll(dirEntryPath); err != nil {
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
recordNFSErr(fmt.Errorf("cannot remove dirEntryPath: %q: %w", dirEntryPath, err))
}
}(dirEntryPath)
}
wg.Wait()
if firstNFSErr != nil {
logger.Warnf("cannot removeDir due to NFS error: %s", firstNFSErr)
nfsDirRemoveFailedAttempts.Inc()
return false
}
// Make sure the deleted names are properly synced to the dirPath,
// so they are no longer visible after unclean shutdown.
MustSyncPath(dirPath)
// New stale NFS files may have appeared since the loop
if hasStaleNFSFiles(dirPath) {
nfsDirRemoveFailedAttempts.Inc()
return false
}
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
if err := os.Remove(deleteFilePath); err != nil && !os.IsNotExist(err) {
// At NFS filesystems, file deletion could fail for any reason and it should be retried
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", deleteFilePath, err)
}
return false
}
// Sync the directory after the removing deletDirFilename file in order to make sure
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
// and https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9709
MustSyncPath(dirPath)
// Remove the dirPath itself
MustRemovePath(dirPath)
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
// on the next startup.
return true
}
var (
dirRemoverWG sync.WaitGroup
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
_ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 {
return float64(len(removeDirConcurrencyCh))
})
)
var removeDirConcurrencyCh = make(chan struct{}, 1024)
func isTemporaryNFSError(err error) bool {
var pathErr *os.PathError
if errors.As(err, &pathErr) {
// Some NFS implementations return EEXIST instead of ENOTEMPTY
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6398
if errors.Is(pathErr.Err, syscall.EEXIST) || errors.Is(pathErr.Err, syscall.ENOTEMPTY) {
return true
}
}
// Do not check for NFS file handle error, usually it means that other client has opened file without proper lock
// in this scenario it's better to panic.
// User must configure proper locking options for the NFS client to prevent such error.
// It must never have "nolock" or "local_lock=all" options to be set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
errStr := err.Error()
return strings.Contains(errStr, "directory not empty") || strings.Contains(errStr, "device or resource busy")
}
func hasStaleNFSFiles(dirPath string) bool {
des := MustReadDir(dirPath)
for _, de := range des {
name := de.Name()
if strings.HasPrefix(name, ".nfs") {
return true
}
}
return false
}
// MustStopDirRemover must be called in the end of graceful shutdown
// in order to wait for removing the remaining directories from removeDirConcurrencyCh.
//
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called.
func MustStopDirRemover() {
doneCh := make(chan struct{})
go func() {
dirRemoverWG.Wait()
close(doneCh)
}()
const maxWaitTime = 10 * time.Second
select {
case <-doneCh:
return
case <-time.After(maxWaitTime):
logger.Errorf("cannot stop dirRemover in %s; the remaining empty NFS directories should be automatically removed on the next startup", maxWaitTime)
}
}
func collectExistFiles(dirPath string) []string {
des := MustReadDir(dirPath)
var files []string
for _, de := range des {
files = append(files, fmt.Sprintf("name=%q,is_dir=%v", de.Name(), de.IsDir()))
}
return files
}

View File

@@ -1,6 +1,7 @@
package fs
import (
"fmt"
"os"
"path/filepath"
"testing"
@@ -32,3 +33,48 @@ func TestIsPartiallyRemovedDir(t *testing.T) {
f("empty_dir", "", true)
f("regular_dir", "index.bin", false)
}
func TestTryRemoveDir(t *testing.T) {
f := func(setup func(t *testing.T, wd string), want bool) {
t.Helper()
d := t.TempDir()
setup(t, d)
got := tryRemoveDir(d)
if got != want {
t.Fatalf("unexpected error: (-%v;+%v)", want, got)
}
}
writeEmptyFile := func(t *testing.T, filePath string) {
t.Helper()
err := os.WriteFile(filePath, []byte("empty"), os.ModePerm)
if err != nil {
t.Fatalf("cannot write file: %q: %s", filePath, err)
}
}
// regular delete
setup := func(t *testing.T, wd string) {
writeEmptyFile(t, filepath.Join(wd, "metadata.bin"))
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
}
f(setup, true)
// has stale nfs file
setup = func(t *testing.T, wd string) {
writeEmptyFile(t, filepath.Join(wd, ".nfs0000"))
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
}
f(setup, false)
// empty dir
f(func(_ *testing.T, _ string) {}, true)
// delete many files concurrent
setup = func(t *testing.T, wd string) {
for i := range 60 {
writeEmptyFile(t, filepath.Join(wd, fmt.Sprintf("metadata_%d.bin", i)))
}
writeEmptyFile(t, filepath.Join(wd, deleteDirFilename))
}
f(setup, true)
}