lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS

- Drop the code needed for asynchronous removal of the directory on NFS shares.
  This code was needed when VictoriaMetrics could keep open files after their deletion
  or renaming. This is no longer the case after the commit 43b24164ef .
  Now files are deleted only after all the readers close them.
  This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61

- Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath()
  functions:

  - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way:
    it creates a special `.delete-this-dir` file in the directory, then removes all its contents
    except of this file, and later removes the `.delete-this-dir` file together with the directory
    itself. This makes possible easily determining whether the given directory needs to be deleted
    after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted.
    Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed
    at starup.

    Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming
    the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it
    was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted
    if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3,
    since these storage systems do not support atomic renaming of directories with multiple entries inside.
    The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems.

    This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files .

  - The MustRemovePath() deletes the given file or an empty directory.

- Delete the existing parts and partitions at startup if they were partially deleted.

- Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase.
  This reduces the amounts of bolierplate code related to error handling.

- Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
This commit is contained in:
Aliaksandr Valialkin
2025-07-25 18:41:17 +02:00
parent e8c622766b
commit 83da33d8cf
53 changed files with 403 additions and 594 deletions

View File

@@ -17,7 +17,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
@@ -125,8 +124,6 @@ func main() {
vmstorage.Stop()
vmselect.Stop()
fs.MustStopDirRemover()
logger.Infof("the VictoriaMetrics has been stopped in %.3f seconds", time.Since(startTime).Seconds())
}

View File

@@ -55,6 +55,6 @@ func tearDown() {
srv.Close()
logger.ResetOutputForTest()
tmpDataDir := flag.Lookup("remoteWrite.tmpDataPath").Value.String()
fs.MustRemoveAll(tmpDataDir)
fs.MustRemoveDir(tmpDataDir)
}

View File

@@ -253,7 +253,7 @@ func dropDanglingQueues() {
if _, ok := existingQueues[dirname]; !ok {
logger.Infof("removing dangling queue %q", dirname)
fullPath := filepath.Join(queuesDir, dirname)
fs.MustRemoveAll(fullPath)
fs.MustRemoveDir(fullPath)
removed++
}
}

View File

@@ -106,7 +106,7 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
vminsert.Init()
vmselect.Init()
// storagePath will be created again when closing vmselect, so remove it again.
defer fs.MustRemoveAll(storagePath)
defer fs.MustRemoveDir(storagePath)
defer vminsert.Stop()
defer vmselect.Stop()
disableAlertgroupLabel = disableGroupLabel
@@ -303,7 +303,7 @@ checkCheck:
func tearDown() {
vmstorage.Stop()
metrics.UnregisterAllMetrics()
fs.MustRemoveAll(storagePath)
fs.MustRemoveDir(storagePath)
}
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, testGroups []vmalertconfig.Group, externalLabels map[string]string) (checkErrs []error) {

View File

@@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/rule"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
)
@@ -109,7 +110,7 @@ groups:
t.Fatal(err)
}
defer func() { _ = os.Remove(f.Name()) }()
writeToFile(t, f.Name(), rules1)
writeToFile(f.Name(), rules1)
*configCheckInterval = 200 * time.Millisecond
*rulePath = []string{f.Name()}
@@ -164,7 +165,7 @@ groups:
t.Fatalf("expected to have exactly 1 group loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), rules2)
writeToFile(f.Name(), rules2)
time.Sleep(*configCheckInterval * 2)
checkCfg(nil)
groupsLen = lenLocked(m)
@@ -172,7 +173,7 @@ groups:
t.Fatalf("expected to have exactly 2 groups loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), rules1)
writeToFile(f.Name(), rules1)
procutil.SelfSIGHUP()
time.Sleep(*configCheckInterval / 2)
checkCfg(nil)
@@ -181,7 +182,7 @@ groups:
t.Fatalf("expected to have exactly 1 group loaded; got %d", groupsLen)
}
writeToFile(t, f.Name(), `corrupted`)
writeToFile(f.Name(), `corrupted`)
procutil.SelfSIGHUP()
time.Sleep(*configCheckInterval / 2)
checkCfg(fmt.Errorf("config error"))
@@ -194,10 +195,6 @@ groups:
<-syncCh
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
err := os.WriteFile(file, []byte(b), 0644)
if err != nil {
t.Fatal(err)
}
func writeToFile(file, b string) {
fs.MustWriteSync(file, []byte(b))
}

View File

@@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
)
@@ -21,7 +22,7 @@ func TestConfigWatcherReload(t *testing.T) {
}
defer func() { _ = os.Remove(f.Name()) }()
writeToFile(t, f.Name(), `
writeToFile(f.Name(), `
static_configs:
- targets:
- localhost:9093
@@ -43,7 +44,7 @@ static_configs:
}
defer func() { _ = os.Remove(f2.Name()) }()
writeToFile(t, f2.Name(), `
writeToFile(f2.Name(), `
static_configs:
- targets:
- 127.0.0.1:9093
@@ -75,7 +76,7 @@ func TestConfigWatcherStart(t *testing.T) {
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
writeToFile(consulSDFile.Name(), fmt.Sprintf(`
scheme: https
path_prefix: proxy
consul_sd_configs:
@@ -126,7 +127,7 @@ func TestConfigWatcherReloadConcurrent(t *testing.T) {
}
defer func() { _ = os.Remove(consulSDFile.Name()) }()
writeToFile(t, consulSDFile.Name(), fmt.Sprintf(`
writeToFile(consulSDFile.Name(), fmt.Sprintf(`
consul_sd_configs:
- server: %s
services:
@@ -142,7 +143,7 @@ consul_sd_configs:
}
defer func() { _ = os.Remove(staticAndConsulSDFile.Name()) }()
writeToFile(t, staticAndConsulSDFile.Name(), fmt.Sprintf(`
writeToFile(staticAndConsulSDFile.Name(), fmt.Sprintf(`
static_configs:
- targets:
- localhost:9093
@@ -187,9 +188,8 @@ consul_sd_configs:
wg.Wait()
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
checkErr(t, os.WriteFile(file, []byte(b), 0644))
func writeToFile(file, b string) {
fs.MustWriteSync(file, []byte(b))
}
func checkErr(t *testing.T, err error) {

View File

@@ -61,7 +61,7 @@ func getDefaultMaxConcurrentRequests() int {
// Init initializes vmselect
func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
fs.RemoveDirContents(tmpDirPath)
fs.MustRemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)

View File

@@ -27,7 +27,7 @@ func InitTmpBlocksDir(tmpDirPath string) {
tmpDirPath = os.TempDir()
}
tmpBlocksDir = filepath.Join(tmpDirPath, "searchResults")
fs.MustRemoveAll(tmpBlocksDir)
fs.MustRemoveDirContents(tmpBlocksDir)
fs.MustMkdirIfNotExist(tmpBlocksDir)
}

View File

@@ -122,7 +122,7 @@ func InitRollupResultCache(cachePath string) {
if len(rollupResultCachePath) > 0 {
if *resetRollupResultCacheOnStartup {
logger.Infof("removing rollupResult cache at %q because -search.resetRollupResultCacheOnStartup command-line flag is set", rollupResultCachePath)
fs.MustRemoveAll(rollupResultCachePath)
fs.MustRemoveDir(rollupResultCachePath)
} else {
logger.Infof("loading rollupResult cache from %q...", rollupResultCachePath)
}

View File

@@ -22,8 +22,8 @@ func TestRollupResultCacheInitStop(t *testing.T) {
InitRollupResultCache(cacheFilePath)
StopRollupResultCache()
}
fs.MustRemoveAll(cacheFilePath)
fs.MustRemoveAll(cacheFilePath + ".key.prefix")
fs.MustRemoveDir(cacheFilePath)
fs.MustRemovePath(cacheFilePath + ".key.prefix")
})
}

View File

@@ -2,7 +2,6 @@ package apptest
import (
"fmt"
"os"
"path"
"path/filepath"
"testing"
@@ -60,7 +59,7 @@ func (tc *TestCase) Stop() {
app.Stop()
}
if !tc.t.Failed() {
fs.MustRemoveAll(tc.Dir())
fs.MustRemoveDir(tc.Dir())
}
}
@@ -149,9 +148,7 @@ func (tc *TestCase) MustStartVmagent(instance string, flags []string, promScrape
tc.t.Helper()
promScrapeConfigFilePath := path.Join(tc.t.TempDir(), "prometheus.yml")
if err := os.WriteFile(promScrapeConfigFilePath, []byte(promScrapeConfigFileYAML), os.ModePerm); err != nil {
tc.t.Fatalf("cannot init vmagent: prom config file write failed: %s", err)
}
fs.MustWriteSync(promScrapeConfigFilePath, []byte(promScrapeConfigFileYAML))
app, err := StartVmagent(instance, flags, tc.cli, promScrapeConfigFilePath)
if err != nil {
tc.t.Fatalf("Could not start %s: %v", instance, err)
@@ -195,9 +192,7 @@ func (tc *TestCase) MustStartVmauth(instance string, flags []string, configFileY
tc.t.Helper()
configFilePath := path.Join(tc.t.TempDir(), "config.yaml")
if err := os.WriteFile(configFilePath, []byte(configFileYAML), os.ModePerm); err != nil {
tc.t.Fatalf("cannot init vmauth: config file write failed: %s", err)
}
fs.MustWriteSync(configFilePath, []byte(configFileYAML))
app, err := StartVmauth(instance, flags, tc.cli, configFilePath)
if err != nil {
tc.t.Fatalf("Could not start %s: %v", instance, err)

View File

@@ -2,13 +2,13 @@ package tests
import (
"fmt"
"os"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@@ -43,9 +43,7 @@ func TestSingleIngestionWithRelabeling(t *testing.T) {
target_label: __name__
`
relabelFilePath := fmt.Sprintf("%s/%s", t.TempDir(), relabelFileName)
if err := os.WriteFile(relabelFilePath, []byte(relabelingRules), os.ModePerm); err != nil {
t.Fatalf("cannot create file=%q: %s", relabelFilePath, err)
}
fs.MustWriteSync(relabelFilePath, []byte(relabelingRules))
sut := tc.MustStartVmsingle("relabeling-ingest",
[]string{fmt.Sprintf(`-relabelConfig=%s`, relabelFilePath),
`-retentionPeriod=100y`})

View File

@@ -4,11 +4,11 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
// TestSingleVMAgentReloadConfigs verifies that vmagent reload new configurations on SIGHUP signal
@@ -23,9 +23,7 @@ func TestSingleVMAgentReloadConfigs(t *testing.T) {
target_label: label1
`
relabelFilePath := fmt.Sprintf("%s/%s", t.TempDir(), "relabel_config.yaml")
if err := os.WriteFile(relabelFilePath, []byte(relabelingRules), os.ModePerm); err != nil {
t.Fatalf("cannot create file=%q: %s", relabelFilePath, err)
}
fs.MustWriteSync(relabelFilePath, []byte(relabelingRules))
vmagent := tc.MustStartVmagent("vmagent", []string{
`-remoteWrite.flushInterval=50ms`,
@@ -60,9 +58,7 @@ func TestSingleVMAgentReloadConfigs(t *testing.T) {
target_label: label1
`
if err := os.WriteFile(relabelFilePath, []byte(relabelingRules), os.ModePerm); err != nil {
t.Fatalf("cannot create file=%q: %s", relabelFilePath, err)
}
fs.MustWriteSync(relabelFilePath, []byte(relabelingRules))
vmagent.ReloadRelabelConfigs(t)

View File

@@ -2,11 +2,12 @@ package apptest
import (
"fmt"
"os"
"regexp"
"syscall"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
var httpBuilitinListenAddrRE = regexp.MustCompile(`pprof handlers are exposed at http://(.*:\d{1,5})/debug/pprof/`)
@@ -59,9 +60,7 @@ func StartVmauth(instance string, flags []string, cli *Client, configFilePath st
func (app *Vmauth) UpdateConfiguration(t *testing.T, configFileYAML string) {
t.Helper()
ct := int(time.Now().Unix())
if err := os.WriteFile(app.configFilePath, []byte(configFileYAML), os.ModePerm); err != nil {
t.Fatalf("unexpected error at UpdateConfiguration, cannot write configFile content: %s", err)
}
fs.MustWriteSync(app.configFilePath, []byte(configFileYAML))
if err := app.process.Signal(syscall.SIGHUP); err != nil {
t.Fatalf("unexpected signal error: %s", err)
}

View File

@@ -4,11 +4,12 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestParseMetadataSecurityCredentialsFailure(t *testing.T) {
@@ -131,10 +132,7 @@ func TestGetAPICredentials(t *testing.T) {
if len(c.webTokenPath) > 0 {
tempDir := t.TempDir()
c.webTokenPath = filepath.Join(tempDir, c.webTokenPath)
err := os.WriteFile(c.webTokenPath, []byte("webtoken"), 0644)
if err != nil {
t.Fatalf("Failed to create webtoken file: %v", err)
}
fs.MustWriteSync(c.webTokenPath, []byte("webtoken"))
}
rt := &fakeRoundTripper{
responses: make(map[string]*http.Response),

View File

@@ -192,7 +192,8 @@ func (r *Restore) Run(ctx context.Context) error {
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
return removeRestoreLock(r.Dst.Dir)
removeRestoreLock(r.Dst.Dir)
return nil
}
type statWriter struct {
@@ -218,10 +219,7 @@ func createRestoreLock(dstDir string) error {
return f.Close()
}
func removeRestoreLock(dstDir string) error {
func removeRestoreLock(dstDir string) {
lockF := path.Join(dstDir, backupnames.RestoreInProgressFilename)
if err := os.Remove(lockF); err != nil {
return fmt.Errorf("cannot remove restore lock file %q: %w", lockF, err)
}
return nil
fs.MustRemovePath(lockF)
}

View File

@@ -2,101 +2,123 @@ package fs
import (
"os"
"strings"
"time"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/metrics"
)
// MustRemoveAll removes path with all the contents.
// directories with this filename are scheduled to be removed by MustRemoveDir().
const deleteDirFilename = ".delete-this-dir"
// MustRemoveDir removes the dirPath with all its contents.
//
// It properly fsyncs the parent directory after path removal.
//
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func MustRemoveAll(path string) {
if tryRemoveAll(path) {
// The dirPath contents may be partially deleted if unclean shutdown happens during the removal.
// The caller must verify whether the given directory is partially removed via IsPartiallyRemovedDir() call
// on the startup before using it. If the directory is partially removed, it must be removed again
// via MustRemoveDir() call.
func MustRemoveDir(dirPath string) {
if !IsPathExist(dirPath) {
// Nothing do delete.
return
}
select {
case removeDirConcurrencyCh <- struct{}{}:
default:
logger.Panicf("FATAL: cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirConcurrencyCh))
// The code below is written in the way that partially deleted directories could be deleted
// on the next start after unclean shutdown, by verifying them with IsPartiallyRemovedDir() call.
//
// The code below doesn't depend on atomic renaming of directories, since it isn't supported
// by NFS and object storage.
// Create a deleteDirFilename file, which indicates that the dirPath must be removed.
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
f, err := os.Create(deleteFilePath)
if err != nil {
logger.Panicf("FATAL: cannot create %q while deleting %q: %s", deleteFilePath, dirPath, err)
}
dirRemoverWG.Add(1)
go func() {
defer func() {
dirRemoverWG.Done()
<-removeDirConcurrencyCh
}()
for {
time.Sleep(time.Second)
if tryRemoveAll(path) {
return
}
if err := f.Close(); err != nil {
logger.Panicf("FATAL: cannot close %q: %s", deleteFilePath, err)
}
// Make sure the deleteDirFilename file is visible in the dirPath.
MustSyncPath(dirPath)
// Remove the contents of the dirPath except of the deleteDirFilename file.
des := MustReadDir(dirPath)
for _, de := range des {
name := de.Name()
if name == deleteDirFilename {
continue
}
}()
dirEntryPath := filepath.Join(dirPath, name)
if err := os.RemoveAll(dirEntryPath); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
}
// Make sure the deleted names are properly synced to the dirPath,
// so they are no longer visible after unclean shutdown.
MustSyncPath(dirPath)
// Remove the deleteDirFilename file
MustRemovePath(deleteFilePath)
// Remove the dirPath itself
MustRemovePath(dirPath)
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
// on the next startup.
}
var dirRemoverWG syncwg.WaitGroup
func tryRemoveAll(path string) bool {
err := os.RemoveAll(path)
if err == nil || isStaleNFSFileHandleError(err) {
// Make sure the parent directory doesn't contain references
// to the current directory.
mustSyncParentDirIfExists(path)
// IsPartiallyRemovedDir returns true if dirPath is partially removed because of unclean shutdown during the MustRemoveDir() call.
//
// The caller must call MustRemoveDir(dirPath) on partially removed dirPath.
func IsPartiallyRemovedDir(dirPath string) bool {
des := MustReadDir(dirPath)
if len(des) == 0 {
// Delete empty dirs too, since they may appear when the unclean shutdown happens after the deleteDirFilename is deleted,
// but before the directory is deleted istelf.
return true
}
if !isTemporaryNFSError(err) {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
for _, de := range des {
if de.IsDir() {
continue
}
name := de.Name()
if name == deleteFilePath {
// The directory contains the deleteDirFilename. This means it is partially deleted.
return true
}
}
// NFS prevents from removing directories with open files.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Schedule for later directory removal.
nfsDirRemoveFailedAttempts.Inc()
return false
}
var (
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)
_ = metrics.NewGauge(`vm_nfs_pending_dirs_to_remove`, func() float64 {
return float64(len(removeDirConcurrencyCh))
})
)
var removeDirConcurrencyCh = make(chan struct{}, 1024)
func isStaleNFSFileHandleError(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, "stale NFS file handle")
}
func isTemporaryNFSError(err error) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6396 for details.
errStr := err.Error()
return strings.Contains(errStr, "directory not empty") ||
strings.Contains(errStr, "device or resource busy") ||
strings.Contains(errStr, "file exists")
}
// MustStopDirRemover must be called in the end of graceful shutdown
// in order to wait for removing the remaining directories from removeDirConcurrencyCh.
// MustRemovePath removes the given path. It must be either a file or an empty directory.
//
// It is expected that nobody calls MustRemoveAll when MustStopDirRemover is called.
func MustStopDirRemover() {
doneCh := make(chan struct{})
go func() {
dirRemoverWG.Wait()
close(doneCh)
}()
const maxWaitTime = 10 * time.Second
select {
case <-doneCh:
return
case <-time.After(maxWaitTime):
logger.Errorf("cannot stop dirRemover in %s; the remaining empty NFS directories should be automatically removed on the next startup", maxWaitTime)
// Use MustRemoveDir for removing non-empty directories.
func MustRemovePath(path string) {
if err := os.Remove(path); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
}
}
// MustRemoveDirContents removes all the contents of the given dir if it exists.
//
// It doesn't remove the dir itself, so the dir may be mounted to a separate partition.
func MustRemoveDirContents(dir string) {
if !IsPathExist(dir) {
// The path doesn't exist, so nothing to remove.
return
}
des := MustReadDir(dir)
for _, de := range des {
name := de.Name()
fullPath := filepath.Join(dir, name)
if err := os.RemoveAll(fullPath); err != nil {
logger.Panicf("FATAL: cannot remove %s: %s", fullPath, err)
}
}
MustSyncPath(dir)
}

View File

@@ -6,9 +6,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@@ -43,7 +41,7 @@ func MustWriteStreamSync(path string, src io.WriterTo) {
f := filestream.MustCreate(path, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemoveAll(path), so the user could inspect
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", path, err)
}
@@ -62,7 +60,7 @@ func MustWriteSync(path string, data []byte) {
f := filestream.MustCreate(path, false)
if _, err := f.Write(data); err != nil {
f.MustClose()
// Do not call MustRemoveAll(path), so the user could inspect
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(data), path, err)
}
@@ -94,7 +92,7 @@ func MustWriteAtomic(path string, data []byte, canOverwrite bool) {
// Atomically move the temporary file from tmpPath to path.
if err := os.Rename(tmpPath, path); err != nil {
// do not call MustRemoveAll(tmpPath) here, so the user could inspect
// do not call MustRemovePath(tmpPath) here, so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot move temporary file %q to %q: %s", tmpPath, path, err)
}
@@ -146,35 +144,6 @@ func mustMkdirSync(path string) {
MustSyncPath(parentDirPath)
}
// RemoveDirContents removes all the contents of the given dir if it exists.
//
// It doesn't remove the dir itself, so the dir may be mounted
// to a separate partition.
func RemoveDirContents(dir string) {
if !IsPathExist(dir) {
// The path doesn't exist, so nothing to remove.
return
}
d, err := os.Open(dir)
if err != nil {
logger.Panicf("FATAL: cannot open dir: %s", err)
}
defer MustClose(d)
names, err := d.Readdirnames(-1)
if err != nil {
logger.Panicf("FATAL: cannot read contents of the dir %q: %s", dir, err)
}
for _, name := range names {
if name == "." || name == ".." || name == "lost+found" {
// Skip special dirs.
continue
}
fullPath := filepath.Join(dir, name)
MustRemoveAll(fullPath)
}
MustSyncPath(dir)
}
// MustClose must close the given file f.
func MustClose(f *os.File) {
fname := f.Name()
@@ -206,39 +175,6 @@ func IsPathExist(path string) bool {
return true
}
func mustSyncParentDirIfExists(path string) {
parentDirPath := filepath.Dir(path)
if !IsPathExist(parentDirPath) {
return
}
MustSyncPath(parentDirPath)
}
// MustRemoveDirAtomic removes the given dir atomically.
//
// It uses the following algorithm:
//
// 1. Atomically rename the "<dir>" to "<dir>.must-remove.<XYZ>",
// where <XYZ> is an unique number.
// 2. Remove the "<dir>.must-remove.XYZ" in background.
//
// If the process crashes after the step 1, then the directory must be removed
// on the next process start by calling MustRemoveTemporaryDirs on the parent directory.
func MustRemoveDirAtomic(dir string) {
if !IsPathExist(dir) {
return
}
mustRemoveDirAtomic(dir)
parentDir := filepath.Dir(dir)
MustSyncPath(parentDir)
}
var atomicDirRemoveCounter = func() *atomicutil.Uint64 {
var x atomicutil.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()
// MustReadDir reads directory entries at the given dir.
func MustReadDir(dir string) []os.DirEntry {
des, err := os.ReadDir(dir)
@@ -248,25 +184,6 @@ func MustReadDir(dir string) []os.DirEntry {
return des
}
// MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix.
//
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic call.
func MustRemoveTemporaryDirs(dir string) {
des := MustReadDir(dir)
for _, de := range des {
if !IsDirOrSymlink(de) {
// Skip non-directories
continue
}
dirName := de.Name()
if IsScheduledForRemoval(dirName) {
fullPath := filepath.Join(dir, dirName)
MustRemoveAll(fullPath)
}
}
MustSyncPath(dir)
}
// MustHardLinkFiles makes hard links for all the files from srcDir in dstDir.
func MustHardLinkFiles(srcDir, dstDir string) {
mustMkdirSync(dstDir)
@@ -406,11 +323,6 @@ type freeSpaceEntry struct {
freeSpace uint64
}
// IsScheduledForRemoval returns true if the filename contains .must-remove. substring
func IsScheduledForRemoval(filename string) bool {
return strings.Contains(filename, ".must-remove.")
}
// IsDirOrSymlink returns true if de is directory or symlink.
func IsDirOrSymlink(de os.DirEntry) bool {
return de.IsDir() || (de.Type()&os.ModeSymlink == os.ModeSymlink)

View File

@@ -1,10 +1,6 @@
package fs
import (
"fmt"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
)
@@ -14,15 +10,6 @@ func freeSpace(stat statfs_t) uint64 {
return uint64(stat.Bavail) * uint64(stat.Bsize)
}
func mustRemoveDirAtomic(dir string) {
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
}
MustRemoveAll(tmpDir)
}
func statfs(path string, buf *statfs_t) (err error) {
return unix.Statvfs(path, buf)
}

View File

@@ -3,10 +3,6 @@
package fs
import (
"fmt"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
)
@@ -16,15 +12,6 @@ func freeSpace(stat statfs_t) uint64 {
return uint64(stat.Bavail) * uint64(stat.Bsize)
}
func mustRemoveDirAtomic(dir string) {
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
}
MustRemoveAll(tmpDir)
}
func statfs(path string, stat *statfs_t) (err error) {
return unix.Statfs(path, stat)
}

View File

@@ -1,10 +1,6 @@
package fs
import (
"fmt"
"os"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"golang.org/x/sys/unix"
)
@@ -14,15 +10,6 @@ func freeSpace(stat statfs_t) uint64 {
return uint64(stat.F_bavail) * uint64(stat.F_bsize)
}
func mustRemoveDirAtomic(dir string) {
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
}
MustRemoveAll(tmpDir)
}
func statfs(path string, stat *statfs_t) (err error) {
return unix.Statfs(path, stat)
}

View File

@@ -9,15 +9,6 @@ import (
"golang.org/x/sys/unix"
)
func mustRemoveDirAtomic(dir string) {
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
}
MustRemoveAll(tmpDir)
}
func mmap(fd int, length int) (data []byte, err error) {
return unix.Mmap(fd, 0, length, unix.PROT_READ, unix.MAP_SHARED)

View File

@@ -15,18 +15,6 @@ import (
func mustSyncPath(_ string) {
}
func mustRemoveDirAtomic(dir string) {
n := atomicDirRemoveCounter.Add(1)
tmpDir := fmt.Sprintf("%s.must-remove.%d", dir, n)
if err := os.Rename(dir, tmpDir); err != nil {
logger.Panicf("FATAL: cannot move %s to %s: %s", dir, tmpDir, err)
}
if err := os.RemoveAll(tmpDir); err != nil {
logger.Warnf("cannot remove dir: %q: %s; restart VictoriaMetrics to complete dir removal; "+
"see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70#issuecomment-1491529183", tmpDir, err)
}
}
const (
lockfileExclusiveLock = 2
fileFlagNormal = 0x00000080

View File

@@ -2,7 +2,6 @@ package fs
import (
"fmt"
"os"
"testing"
)
@@ -18,10 +17,8 @@ func testReaderAt(t *testing.T, bufSize int) {
path := "TestReaderAt"
const fileSize = 8 * 1024 * 1024
data := make([]byte, fileSize)
if err := os.WriteFile(path, data, 0600); err != nil {
t.Fatalf("cannot create %q: %s", path, err)
}
defer MustRemoveAll(path)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := MustOpenReaderAt(path)
defer r.MustClose()

View File

@@ -2,7 +2,6 @@ package fs
import (
"fmt"
"os"
"testing"
)
@@ -25,10 +24,8 @@ func benchmarkReaderAtMustReadAt(b *testing.B, isMmap bool) {
path := "BenchmarkReaderAtMustReadAt"
const fileSize = 8 * 1024 * 1024
data := make([]byte, fileSize)
if err := os.WriteFile(path, data, 0600); err != nil {
b.Fatalf("cannot create %q: %s", path, err)
}
defer MustRemoveAll(path)
MustWriteSync(path, data)
defer MustRemovePath(path)
r := MustOpenReaderAt(path)
defer r.MustClose()

View File

@@ -327,7 +327,7 @@ func (pw *partWrapper) decRef() {
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
}
}
@@ -1483,12 +1483,11 @@ func (tb *Table) nextMergeIdx() uint64 {
func mustOpenParts(path string) []*partWrapper {
// The path can be missing after restoring from backup, so create it if needed.
fs.MustMkdirIfNotExist(path)
fs.MustRemoveTemporaryDirs(path)
// Remove txn and tmp directories, which may be left after the upgrade
// to v1.90.0 and newer versions.
fs.MustRemoveAll(filepath.Join(path, "txn"))
fs.MustRemoveAll(filepath.Join(path, "tmp"))
fs.MustRemoveDir(filepath.Join(path, "txn"))
fs.MustRemoveDir(filepath.Join(path, "tmp"))
partsFile := filepath.Join(path, partsFilename)
partNames := mustReadPartNames(partsFile, path)
@@ -1505,8 +1504,8 @@ func mustOpenParts(path string) []*partWrapper {
partPath := filepath.Join(path, partName)
if !fs.IsPathExist(partPath) {
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
"ensure %q contents is not corrupted; remove %q to rebuild its content from the list of existing parts",
partPath, partsFile, partsFile, partsFile)
"ensure %q contents is not corrupted; remove %q from %q in order to restore access to the remaining data",
partPath, partsFile, partsFile, partPath, partsFile)
}
m[partName] = struct{}{}
@@ -1520,7 +1519,7 @@ func mustOpenParts(path string) []*partWrapper {
if _, ok := m[fn]; !ok {
deletePath := filepath.Join(path, fn)
logger.Infof("deleting %q because it isn't listed in %q; this is the expected case after unclean shutdown", deletePath, partsFile)
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
}
}
fs.MustSyncPath(path)
@@ -1804,5 +1803,5 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([
func isSpecialDir(name string) bool {
// Snapshots and cache dirs aren't used anymore.
// Keep them here for backwards compatibility.
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache" || fs.IsScheduledForRemoval(name)
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache"
}

View File

@@ -8,6 +8,8 @@ import (
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestMain(m *testing.M) {
@@ -17,12 +19,8 @@ func TestMain(m *testing.M) {
func TestTableSearchSerial(t *testing.T) {
const path = "TestTableSearchSerial"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
const itemsCount = 1e5
@@ -52,12 +50,8 @@ func TestTableSearchSerial(t *testing.T) {
func TestTableSearchConcurrent(t *testing.T) {
const path = "TestTableSearchConcurrent"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
const itemsCount = 1e5
items := func() []string {

View File

@@ -4,9 +4,10 @@ import (
"bytes"
"fmt"
"math/rand"
"os"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func BenchmarkTableSearch(b *testing.B) {
@@ -21,12 +22,8 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
r := rand.New(rand.NewSource(1))
path := fmt.Sprintf("BenchmarkTableSearch-%d", itemsCount)
if err := os.RemoveAll(path); err != nil {
b.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
tb, items, err := newTestTable(r, path, itemsCount)
if err != nil {

View File

@@ -4,20 +4,17 @@ import (
"bytes"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
// Create a new table
var isReadOnly atomic.Bool
@@ -35,26 +32,20 @@ func TestTableOpenClose(t *testing.T) {
func TestTableAddItemsTooLongItem(t *testing.T) {
const path = "TestTableAddItemsTooLongItem"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
tb.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveDir(path)
}
func TestTableAddItemsSerial(t *testing.T) {
r := rand.New(rand.NewSource(1))
const path = "TestTableAddItemsSerial"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
var flushes atomic.Uint64
flushCallback := func() {
@@ -105,9 +96,7 @@ func testAddItemsSerial(r *rand.Rand, tb *Table, itemsCount int) {
func TestTableCreateSnapshotAt(t *testing.T) {
const path = "TestTableCreateSnapshotAt"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
@@ -168,19 +157,15 @@ func TestTableCreateSnapshotAt(t *testing.T) {
tb1.MustClose()
tb.MustClose()
_ = os.RemoveAll(snapshot2)
_ = os.RemoveAll(snapshot1)
_ = os.RemoveAll(path)
fs.MustRemoveDir(snapshot2)
fs.MustRemoveDir(snapshot1)
fs.MustRemoveDir(path)
}
func TestTableAddItemsConcurrentStress(t *testing.T) {
const path = "TestTableAddItemsConcurrentStress"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
rawItemsShardsPerTableOrig := rawItemsShardsPerTable
maxBlocksPerShardOrig := maxBlocksPerShard
@@ -235,12 +220,8 @@ func TestTableAddItemsConcurrentStress(t *testing.T) {
func TestTableAddItemsConcurrent(t *testing.T) {
const path = "TestTableAddItemsConcurrent"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
var flushes atomic.Uint64
flushCallback := func() {

View File

@@ -3,10 +3,10 @@ package netutil
import (
"crypto/tls"
"errors"
"fmt"
"os"
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestCipherSuitesFromNamesSucces(t *testing.T) {
@@ -122,8 +122,8 @@ func TestGetServerTLSConfig(t *testing.T) {
mustCreateFile("test.crt", testCRT)
mustCreateFile("test.key", testPK)
defer func() {
_ = os.Remove("test.crt")
_ = os.Remove("test.key")
fs.MustRemovePath("test.crt")
fs.MustRemovePath("test.key")
}()
// check cert file not exist
@@ -187,7 +187,5 @@ YwXfJbKUZnJlv9XplwR7Dw==
)
func mustCreateFile(path, contents string) {
if err := os.WriteFile(path, []byte(contents), 0600); err != nil {
panic(fmt.Errorf("cannot create file %q with %d bytes contents: %w", path, len(contents), err))
}
fs.MustWriteSync(path, []byte(contents))
}

View File

@@ -5,21 +5,23 @@ import (
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close"
mustDeleteDir(path)
fs.MustRemoveDir(path)
for i := 0; i < 10; i++ {
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
fq.MustClose()
}
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadInmemory(t *testing.T) {
path := "fast-queue-write-read-inmemory"
mustDeleteDir(path)
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
@@ -47,12 +49,12 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
}
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadMixed(t *testing.T) {
path := "fast-queue-write-read-mixed"
mustDeleteDir(path)
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
@@ -83,12 +85,12 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithCloses(t *testing.T) {
path := "fast-queue-write-read-with-closes"
mustDeleteDir(path)
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
@@ -124,12 +126,12 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close"
mustDeleteDir(path)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
resultCh := make(chan error)
@@ -154,12 +156,12 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("timeout")
}
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write"
mustDeleteDir(path)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
block := "foodsafdsaf sdf"
@@ -188,12 +190,12 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
t.Fatalf("timeout")
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent"
mustDeleteDir(path)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
@@ -287,12 +289,12 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
t.Fatalf("timeout")
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq"
mustDeleteDir(path)
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
@@ -323,12 +325,12 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
}
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq-force-write"
mustDeleteDir(path)
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
@@ -364,5 +366,5 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
}
}
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func BenchmarkFastQueueThroughputSerial(b *testing.B) {
@@ -15,11 +16,11 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false)
defer func() {
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
for i := 0; i < b.N; i++ {
writeReadIterationFastQueue(fq, block, iterationsCount)
@@ -36,11 +37,11 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false)
defer func() {
fq.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {

View File

@@ -84,7 +84,7 @@ func (q *queue) mustResetFiles() {
}
q.reader.MustClose()
q.writer.MustClose()
fs.MustRemoveAll(q.readerPath)
fs.MustRemovePath(q.readerPath)
q.writerOffset = 0
q.writerLocalOffset = 0
@@ -136,7 +136,7 @@ func mustOpenInternal(path, name string, chunkFileSize, maxBlockSize, maxPending
q, err := tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
if err != nil {
logger.Errorf("cannot open persistent queue at %q: %s; cleaning it up and trying again", path, err)
fs.RemoveDirContents(path)
fs.MustRemoveDirContents(path)
q, err = tryOpeningQueue(path, name, chunkFileSize, maxBlockSize, maxPendingBytes)
if err != nil {
logger.Panicf("FATAL: %s", err)
@@ -189,7 +189,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
// path contents is broken or missing. Re-create it from scratch.
fs.MustClose(q.flockF)
fs.RemoveDirContents(path)
fs.MustRemoveDirContents(path)
q.flockF = fs.MustCreateFlockFile(path)
mi.Reset()
mi.Name = q.name
@@ -229,17 +229,17 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
}
if offset%q.chunkFileSize != 0 {
logger.Errorf("unexpected offset for chunk file %q: %d; it must be multiple of %d; removing the file", filepath, offset, q.chunkFileSize)
fs.MustRemoveAll(filepath)
fs.MustRemovePath(filepath)
continue
}
if mi.ReaderOffset >= offset+q.chunkFileSize {
logger.Errorf("unexpected chunk file found from the past: %q; removing it", filepath)
fs.MustRemoveAll(filepath)
fs.MustRemovePath(filepath)
continue
}
if mi.WriterOffset < offset {
logger.Errorf("unexpected chunk file found from the future: %q; removing it", filepath)
fs.MustRemoveAll(filepath)
fs.MustRemovePath(filepath)
continue
}
if mi.ReaderOffset >= offset && mi.ReaderOffset < offset+q.chunkFileSize {
@@ -254,13 +254,13 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
if fileSize := fs.MustFileSize(q.readerPath); fileSize < q.readerLocalOffset {
logger.Errorf("chunk file %q size is too small for the given reader offset; file size %d bytes; reader offset: %d bytes; removing the file",
q.readerPath, fileSize, q.readerLocalOffset)
fs.MustRemoveAll(q.readerPath)
fs.MustRemovePath(q.readerPath)
continue
}
r, err := filestream.OpenReaderAt(q.readerPath, int64(q.readerLocalOffset), true)
if err != nil {
logger.Errorf("cannot open %q for reading at offset %d: %s; removing this file", q.readerPath, q.readerLocalOffset, err)
fs.MustRemoveAll(filepath)
fs.MustRemovePath(filepath)
continue
}
q.reader = r
@@ -279,7 +279,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
if fileSize < q.writerLocalOffset {
logger.Errorf("%q size (%d bytes) is smaller than the writer offset (%d bytes); removing the file",
q.writerPath, fileSize, q.writerLocalOffset)
fs.MustRemoveAll(q.writerPath)
fs.MustRemovePath(q.writerPath)
continue
}
logger.Warnf("%q size (%d bytes) is bigger than writer offset (%d bytes); "+
@@ -289,7 +289,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
w, err := filestream.OpenWriterAt(q.writerPath, int64(q.writerLocalOffset), false)
if err != nil {
logger.Errorf("cannot open %q for writing at offset %d: %s; removing this file", q.writerPath, q.writerLocalOffset, err)
fs.MustRemoveAll(filepath)
fs.MustRemovePath(filepath)
continue
}
q.writer = w
@@ -528,7 +528,7 @@ var errEmptyQueue = fmt.Errorf("the queue is empty")
func (q *queue) nextChunkFileForRead() error {
// Remove the current chunk and go to the next chunk.
q.reader.MustClose()
fs.MustRemoveAll(q.readerPath)
fs.MustRemovePath(q.readerPath)
if n := q.readerOffset % q.chunkFileSize; n > 0 {
q.readerOffset += q.chunkFileSize - n
}
@@ -642,9 +642,7 @@ func (mi *metainfo) WriteToFile(path string) error {
if err != nil {
return fmt.Errorf("cannot marshal persistent queue metainfo %#v: %w", mi, err)
}
if err := os.WriteFile(path, data, 0600); err != nil {
return fmt.Errorf("cannot write persistent queue metainfo to %q: %w", path, err)
}
fs.MustWriteSync(path, data)
fs.MustSyncPath(path)
return nil
}

View File

@@ -6,11 +6,13 @@ import (
"path/filepath"
"strconv"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestQueueOpenClose(t *testing.T) {
path := "queue-open-close"
mustDeleteDir(path)
fs.MustRemoveDir(path)
for i := 0; i < 3; i++ {
q := mustOpen(path, "foobar", 0)
if n := q.GetPendingBytes(); n > 0 {
@@ -18,7 +20,7 @@ func TestQueueOpenClose(t *testing.T) {
}
q.MustClose()
}
mustDeleteDir(path)
fs.MustRemoveDir(path)
}
func TestQueueOpen(t *testing.T) {
@@ -28,7 +30,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, metainfoFilename), "foobarbaz")
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("junk-files-and-dirs", func(_ *testing.T) {
path := "queue-open-junk-files-and-dir"
@@ -38,7 +40,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(filepath.Join(path, "junk-dir"))
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("invalid-chunk-offset", func(_ *testing.T) {
path := "queue-open-invalid-chunk-offset"
@@ -47,7 +49,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 1234)), "qwere")
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("too-new-chunk", func(_ *testing.T) {
path := "queue-open-too-new-chunk"
@@ -56,7 +58,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 100*uint64(DefaultChunkFileSize))), "asdf")
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("too-old-chunk", func(t *testing.T) {
path := "queue-open-too-old-chunk"
@@ -72,7 +74,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 0)), "adfsfd")
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("too-big-reader-offset", func(t *testing.T) {
path := "queue-open-too-big-reader-offset"
@@ -86,7 +88,7 @@ func TestQueueOpen(t *testing.T) {
}
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("metainfo-dir", func(_ *testing.T) {
path := "queue-open-metainfo-dir"
@@ -94,7 +96,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateDir(filepath.Join(path, metainfoFilename))
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("too-small-reader-file", func(t *testing.T) {
path := "too-small-reader-file"
@@ -110,7 +112,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 0)), "sdf")
q := mustOpen(path, mi.Name, 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("invalid-writer-file-size", func(_ *testing.T) {
path := "too-small-reader-file"
@@ -119,7 +121,7 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 0)), "sdfdsf")
q := mustOpen(path, "foobar", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
t.Run("invalid-queue-name", func(t *testing.T) {
path := "invalid-queue-name"
@@ -133,17 +135,17 @@ func TestQueueOpen(t *testing.T) {
mustCreateFile(filepath.Join(path, fmt.Sprintf("%016X", 0)), "sdf")
q := mustOpen(path, "baz", 0)
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
})
}
func TestQueueResetIfEmpty(t *testing.T) {
path := "queue-reset-if-empty"
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
block := make([]byte, 1024*1024)
@@ -170,11 +172,11 @@ func TestQueueResetIfEmpty(t *testing.T) {
func TestQueueWriteRead(t *testing.T) {
path := "queue-write-read"
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
for j := 0; j < 5; j++ {
@@ -206,11 +208,11 @@ func TestQueueWriteRead(t *testing.T) {
func TestQueueWriteCloseRead(t *testing.T) {
path := "queue-write-close-read"
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
for j := 0; j < 5; j++ {
@@ -247,11 +249,11 @@ func TestQueueWriteCloseRead(t *testing.T) {
func TestQueueChunkManagementSimple(t *testing.T) {
path := "queue-chunk-management-simple"
mustDeleteDir(path)
fs.MustRemoveDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer mustDeleteDir(path)
defer fs.MustRemoveDir(path)
defer q.MustClose()
var blocks []string
for i := 0; i < 100; i++ {
@@ -278,13 +280,13 @@ func TestQueueChunkManagementSimple(t *testing.T) {
func TestQueueChunkManagementPeriodicClose(t *testing.T) {
path := "queue-chunk-management-periodic-close"
mustDeleteDir(path)
fs.MustRemoveDir(path)
const chunkFileSize = 100
const maxBlockSize = 20
q := mustOpenInternal(path, "foobar", chunkFileSize, maxBlockSize, 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
var blocks []string
for i := 0; i < 100; i++ {
@@ -316,11 +318,11 @@ func TestQueueChunkManagementPeriodicClose(t *testing.T) {
func TestQueueLimitedSize(t *testing.T) {
const maxPendingBytes = 1000
path := "queue-limited-size"
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", maxPendingBytes)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
// Check that small blocks are successfully buffered and read
@@ -371,24 +373,16 @@ func TestQueueLimitedSize(t *testing.T) {
}
func mustCreateFile(path, contents string) {
if err := os.WriteFile(path, []byte(contents), 0600); err != nil {
panic(fmt.Errorf("cannot create file %q with %d bytes contents: %w", path, len(contents), err))
}
fs.MustWriteSync(path, []byte(contents))
}
func mustCreateDir(path string) {
mustDeleteDir(path)
fs.MustRemoveDir(path)
if err := os.MkdirAll(path, 0700); err != nil {
panic(fmt.Errorf("cannot create dir %q: %w", path, err))
}
}
func mustDeleteDir(path string) {
if err := os.RemoveAll(path); err != nil {
panic(fmt.Errorf("cannot remove dir %q: %w", path, err))
}
}
func mustCreateEmptyMetainfo(path, name string) {
var mi metainfo
mi.Name = name

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func BenchmarkQueueThroughputSerial(b *testing.B) {
@@ -16,11 +17,11 @@ func BenchmarkQueueThroughputSerial(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-serial-%d", blockSize)
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", 0)
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
for i := 0; i < b.N; i++ {
writeReadIteration(q, block, iterationsCount)
@@ -37,12 +38,12 @@ func BenchmarkQueueThroughputConcurrent(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
fs.MustRemoveDir(path)
q := mustOpen(path, "foobar", 0)
var qLock sync.Mutex
defer func() {
q.MustClose()
mustDeleteDir(path)
fs.MustRemoveDir(path)
}()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {

View File

@@ -13,12 +13,12 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
)
@@ -611,11 +611,9 @@ func TestConfigHeaders(t *testing.T) {
func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
// Generate and save a self-signed CA certificate and a certificate signed by the CA
caPEM, certPEM, keyPEM := mustGenerateCertificates()
_ = os.WriteFile("testdata/ca.pem", caPEM, 0644)
fs.MustWriteSync("testdata/ca.pem", caPEM)
defer func() {
_ = os.Remove("testdata/ca.pem")
}()
defer fs.MustRemovePath("testdata/ca.pem")
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
@@ -663,7 +661,7 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
// Update CA file with new CA and get config
ca2PEM, _, _ := mustGenerateCertificates()
_ = os.WriteFile("testdata/ca.pem", ca2PEM, 0644)
fs.MustWriteSync("testdata/ca.pem", ca2PEM)
// Wait for cert cache expiration
time.Sleep(2 * time.Second)

View File

@@ -354,7 +354,7 @@ func (db *indexDB) decRef() {
}
logger.Infof("dropping indexDB %q", tbPath)
fs.MustRemoveDirAtomic(tbPath)
fs.MustRemoveDir(tbPath)
logger.Infof("indexDB %q has been dropped", tbPath)
}

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"math/rand"
"os"
"reflect"
"regexp"
"sort"
@@ -69,7 +68,7 @@ func TestTagFiltersToMetricIDsCache(t *testing.T) {
t.Helper()
path := t.Name()
defer fs.MustRemoveAll(path)
defer fs.MustRemoveDir(path)
s := MustOpenStorage(path, OpenOptions{})
defer s.MustClose()
@@ -96,7 +95,7 @@ func TestTagFiltersToMetricIDsCache(t *testing.T) {
func TestTagFiltersToMetricIDsCache_EmptyMetricIDList(t *testing.T) {
path := t.Name()
defer fs.MustRemoveAll(path)
defer fs.MustRemoveDir(path)
s := MustOpenStorage(path, OpenOptions{})
defer s.MustClose()
idb, putIndexDB := s.getCurrIndexDB()
@@ -574,9 +573,7 @@ func TestIndexDBOpenClose(t *testing.T) {
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
db.MustClose()
}
if err := os.RemoveAll(tableName); err != nil {
t.Fatalf("cannot remove indexDB: %s", err)
}
fs.MustRemoveDir(tableName)
}
func TestIndexDB(t *testing.T) {
@@ -608,7 +605,7 @@ func TestIndexDB(t *testing.T) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
})
t.Run("concurrent", func(t *testing.T) {
@@ -645,7 +642,7 @@ func TestIndexDB(t *testing.T) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
})
}
@@ -1621,9 +1618,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
putIndexDB()
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func TestSearchTSIDWithTimeRange(t *testing.T) {
@@ -2113,7 +2108,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
@@ -2142,12 +2137,12 @@ func stopTestStorage(s *Storage) {
s.metricIDCache.Stop()
s.metricNameCache.Stop()
s.tsidCache.Stop()
fs.MustRemoveDirAtomic(s.cachePath)
fs.MustRemoveDir(s.cachePath)
}
func TestSearchContainsTimeRange(t *testing.T) {
path := t.Name()
os.RemoveAll(path)
fs.MustRemoveDir(path)
s := MustOpenStorage(path, OpenOptions{})
db, putIndexDB := s.getCurrIndexDB()
@@ -2271,5 +2266,5 @@ func TestSearchContainsTimeRange(t *testing.T) {
db.extDB.putIndexSearch(isExt)
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}

View File

@@ -74,7 +74,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, timestamp int64, startOffset, recordsPerLoop int) {
@@ -256,7 +256,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
@@ -311,7 +311,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
putIndexDB()
s.MustClose()
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
func BenchmarkMarshalUnmarshalMetricIDs(b *testing.B) {

View File

@@ -188,7 +188,7 @@ func (pw *partWrapper) decRef() {
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
}
}
@@ -232,8 +232,8 @@ func (pt *partition) startBackgroundWorkers() {
func (pt *partition) Drop() {
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
fs.MustRemoveDirAtomic(pt.smallPartsPath)
fs.MustRemoveDirAtomic(pt.bigPartsPath)
fs.MustRemoveDir(pt.smallPartsPath)
fs.MustRemoveDir(pt.bigPartsPath)
logger.Infof("partition %q has been dropped", pt.name)
}
@@ -1596,7 +1596,7 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *
if ph.RowsCount == 0 {
// The created part is empty. Remove it
if mpNew == nil {
fs.MustRemoveAll(dstPartPath)
fs.MustRemoveDir(dstPartPath)
}
return nil
}
@@ -1919,12 +1919,11 @@ func getPartsSize(pws []*partWrapper) uint64 {
func mustOpenParts(partsFile, path string, partNames []string) []*partWrapper {
// The path can be missing after restoring from backup, so create it if needed.
fs.MustMkdirIfNotExist(path)
fs.MustRemoveTemporaryDirs(path)
// Remove txn and tmp directories, which may be left after the upgrade
// to v1.90.0 and newer versions.
fs.MustRemoveAll(filepath.Join(path, "txn"))
fs.MustRemoveAll(filepath.Join(path, "tmp"))
fs.MustRemoveDir(filepath.Join(path, "txn"))
fs.MustRemoveDir(filepath.Join(path, "tmp"))
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
// or after the update from versions prior to v1.90.0.
@@ -1938,8 +1937,8 @@ func mustOpenParts(partsFile, path string, partNames []string) []*partWrapper {
partPath := filepath.Join(path, partName)
if !fs.IsPathExist(partPath) {
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
"ensure %q contents is not corrupted; remove %q to rebuild its content from the list of existing parts",
partPath, partsFile, partsFile, partsFile)
"ensure %q contents is not corrupted; remove %q from %q in order to restore access to the remaining data",
partPath, partsFile, partsFile, partPath, partsFile)
}
m[partName] = struct{}{}
@@ -1953,7 +1952,7 @@ func mustOpenParts(partsFile, path string, partNames []string) []*partWrapper {
if _, ok := m[fn]; !ok {
deletePath := filepath.Join(path, fn)
logger.Infof("deleting %q because it isn't listed in %q; this is the expected case after unclean shutdown", deletePath, partsFile)
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
}
}
fs.MustSyncPath(path)
@@ -2107,5 +2106,5 @@ func mustReadPartNamesFromDir(srcDir string) []string {
}
func isSpecialDir(name string) bool {
return name == "tmp" || name == "txn" || name == snapshotsDirname || fs.IsScheduledForRemoval(name)
return name == "tmp" || name == "txn" || name == snapshotsDirname
}

View File

@@ -97,7 +97,7 @@ func BenchmarkSearch_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
fs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()

View File

@@ -203,10 +203,10 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
if fs.IsPathExist(filepath.Join(s.cachePath, resetCacheOnStartupFilename)) {
logger.Infof("removing cache directory at %q, since it contains `%s` file...", s.cachePath, resetCacheOnStartupFilename)
// Do not use fs.MustRemoveAll() here, since the cache directory may be mounted
// to a separate filesystem. In this case the fs.MustRemoveAll() will fail while
// Do not use fs.MustRemoveDir() here, since the cache directory may be mounted
// to a separate filesystem. In this case the fs.MustRemoveDir() will fail while
// trying to remove the mount root.
fs.RemoveDirContents(s.cachePath)
fs.MustRemoveDirContents(s.cachePath)
logger.Infof("cache directory at %q has been successfully removed", s.cachePath)
}
@@ -222,7 +222,6 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
// Pre-create snapshots directory if it is missing.
snapshotsPath := filepath.Join(path, snapshotsDirname)
fs.MustMkdirIfNotExist(snapshotsPath)
fs.MustRemoveTemporaryDirs(snapshotsPath)
// Initialize series cardinality limiter.
if opts.MaxHourlySeries > 0 {
@@ -272,7 +271,6 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
idbPath := filepath.Join(path, indexdbDirname)
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
fs.MustMkdirIfNotExist(idbSnapshotsPath)
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
idbNext, idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
idbCurr.SetExtDB(idbPrev)
@@ -498,8 +496,8 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error {
s.tb.MustDeleteSnapshot(snapshotName)
idbPath := filepath.Join(s.path, indexdbDirname, snapshotsDirname, snapshotName)
fs.MustRemoveDirAtomic(idbPath)
fs.MustRemoveDirAtomic(snapshotPath)
fs.MustRemoveDir(idbPath)
fs.MustRemoveDir(snapshotPath)
logger.Infof("deleted snapshot %q in %.3f seconds", snapshotPath, time.Since(startTime).Seconds())
@@ -1121,9 +1119,7 @@ func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
// Marshal e.v
dst = marshalUint64Set(dst, &e.v)
if err := os.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
fs.MustWriteSync(path, dst)
}
func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
@@ -1136,9 +1132,7 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
// Marshal hm.m
dst = marshalUint64Set(dst, hm.m)
if err := os.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
fs.MustWriteSync(path, dst)
}
func unmarshalUint64Set(src []byte) (*uint64set.Set, []byte, error) {
@@ -2595,7 +2589,6 @@ func (s *Storage) storeTSIDToCache(tsid *generationTSID, metricName []byte) {
func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB) {
fs.MustMkdirIfNotExist(path)
fs.MustRemoveTemporaryDirs(path)
// Search for the three most recent tables - the prev, curr and next.
des := fs.MustReadDir(path)
@@ -2610,6 +2603,13 @@ func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB)
// Skip invalid directories.
continue
}
tableDirPath := filepath.Join(path, tableName)
if fs.IsPartiallyRemovedDir(tableDirPath) {
// Finish the removal of partially deleted directory, which can occur
// when the directory was removed during unclean shutdown.
fs.MustRemoveDir(tableDirPath)
continue
}
tableNames = append(tableNames, tableName)
}
sort.Slice(tableNames, func(i, j int) bool {
@@ -2633,7 +2633,7 @@ func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB)
for _, tn := range tableNames[:len(tableNames)-3] {
pathToRemove := filepath.Join(path, tn)
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
fs.MustRemoveAll(pathToRemove)
fs.MustRemoveDir(pathToRemove)
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
}
fs.MustSyncPath(path)

View File

@@ -2,7 +2,6 @@ package storage
import (
"fmt"
"io/fs"
"math"
"math/rand"
"os"
@@ -18,7 +17,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
vmfs "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/google/go-cmp/cmp"
)
@@ -514,9 +513,7 @@ func TestStorageOpenClose(t *testing.T) {
s := MustOpenStorage(path, opts)
s.MustClose()
}
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func TestStorageRandTimestamps(t *testing.T) {
@@ -558,9 +555,7 @@ func TestStorageRandTimestamps(t *testing.T) {
}
})
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func testStorageRandTimestamps(s *Storage) error {
@@ -793,9 +788,7 @@ func TestStorageDeleteSeries(t *testing.T) {
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func testStorageDeleteSeries(s *Storage, workerNum int) error {
@@ -1451,9 +1444,7 @@ func TestStorageRegisterMetricNamesSerial(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
@@ -1476,9 +1467,7 @@ func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func testStorageRegisterMetricNames(s *Storage) error {
@@ -1614,9 +1603,7 @@ func TestStorageAddRowsSerial(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func TestStorageAddRowsConcurrent(t *testing.T) {
@@ -1645,9 +1632,7 @@ func TestStorageAddRowsConcurrent(t *testing.T) {
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
func testGenerateMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
@@ -2097,7 +2082,7 @@ func testRotateIndexDB(t *testing.T, mrs []MetricRow, op func(s *Storage)) {
func testListDirEntries(t *testing.T, root string, ignorePrefix ...string) []string {
t.Helper()
var paths []string
f := func(path string, _ fs.DirEntry, err error) error {
f := func(path string, _ os.DirEntry, err error) error {
if err != nil {
return err
}
@@ -2201,7 +2186,7 @@ func TestStorageSnapshots_CreateListDelete(t *testing.T) {
assertPathDoesNotExist := func(path string) {
t.Helper()
if vmfs.IsPathExist(path) {
if fs.IsPathExist(path) {
t.Fatalf("path was not expected to exist: %q", path)
}
}
@@ -2251,9 +2236,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
t.Fatalf("expecting zero snapshots; got %q", snapshots)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
fs.MustRemoveDir(path)
}
// testRemoveAll removes all storage data produced by a test if the test hasn't
@@ -2264,7 +2247,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
func testRemoveAll(t *testing.T) {
defer func() {
if !t.Failed() {
vmfs.MustRemoveAll(t.Name())
fs.MustRemoveDir(t.Name())
}
}()
}
@@ -4477,9 +4460,10 @@ func TestMustOpenIndexDBTables_prevOnly(t *testing.T) {
storageDataPath := t.Name()
idbPath := filepath.Join(storageDataPath, indexdbDirname)
prevName := "123456789ABCDEF0"
prevPath := filepath.Join(idbPath, prevName)
vmfs.MustMkdirIfNotExist(prevPath)
createEmptyIndexdb(prevPath)
assertPathsExist(t, prevPath)
s := MustOpenStorage(storageDataPath, OpenOptions{})
@@ -4497,12 +4481,15 @@ func TestMustOpenIndexDBTables_currAndPrev(t *testing.T) {
storageDataPath := t.Name()
idbPath := filepath.Join(storageDataPath, indexdbDirname)
prevName := "123456789ABCDEF0"
prevPath := filepath.Join(idbPath, prevName)
vmfs.MustMkdirIfNotExist(prevPath)
createEmptyIndexdb(prevPath)
currName := "123456789ABCDEF1"
currPath := filepath.Join(idbPath, currName)
vmfs.MustMkdirIfNotExist(currPath)
createEmptyIndexdb(currPath)
assertPathsExist(t, prevPath, currPath)
s := MustOpenStorage(storageDataPath, OpenOptions{})
@@ -4522,13 +4509,16 @@ func TestMustOpenIndexDBTables_nextAndCurrAndPrev(t *testing.T) {
idbPath := filepath.Join(storageDataPath, indexdbDirname)
prevName := "123456789ABCDEF0"
prevPath := filepath.Join(idbPath, prevName)
vmfs.MustMkdirIfNotExist(prevPath)
createEmptyIndexdb(prevPath)
currName := "123456789ABCDEF1"
currPath := filepath.Join(idbPath, currName)
vmfs.MustMkdirIfNotExist(currPath)
createEmptyIndexdb(currPath)
nextName := "123456789ABCDEF2"
nextPath := filepath.Join(idbPath, nextName)
vmfs.MustMkdirIfNotExist(nextPath)
createEmptyIndexdb(nextPath)
assertPathsExist(t, prevPath, currPath, nextPath)
s := MustOpenStorage(storageDataPath, OpenOptions{})
@@ -4546,21 +4536,27 @@ func TestMustOpenIndexDBTables_ObsoleteDirsAreRemoved(t *testing.T) {
storageDataPath := t.Name()
idbPath := filepath.Join(storageDataPath, indexdbDirname)
obsolete1Name := "123456789ABCDEEE"
obsolete1Path := filepath.Join(idbPath, obsolete1Name)
vmfs.MustMkdirIfNotExist(obsolete1Path)
createEmptyIndexdb(obsolete1Path)
obsolete2Name := "123456789ABCDEEF"
obsolete2Path := filepath.Join(idbPath, obsolete2Name)
vmfs.MustMkdirIfNotExist(obsolete2Path)
createEmptyIndexdb(obsolete2Path)
prevName := "123456789ABCDEF0"
prevPath := filepath.Join(idbPath, prevName)
vmfs.MustMkdirIfNotExist(prevPath)
createEmptyIndexdb(prevPath)
currName := "123456789ABCDEF1"
currPath := filepath.Join(idbPath, currName)
vmfs.MustMkdirIfNotExist(currPath)
createEmptyIndexdb(currPath)
nextName := "123456789ABCDEF2"
nextPath := filepath.Join(idbPath, nextName)
vmfs.MustMkdirIfNotExist(nextPath)
createEmptyIndexdb(nextPath)
assertPathsExist(t, obsolete1Path, obsolete2Path, prevPath, currPath, nextPath)
s := MustOpenStorage(storageDataPath, OpenOptions{})
@@ -4579,15 +4575,19 @@ func TestMustRotateIndexDBs_dirNames(t *testing.T) {
storageDataPath := t.Name()
idbPath := filepath.Join(storageDataPath, indexdbDirname)
prevName := "123456789ABCDEF0"
prevPath := filepath.Join(idbPath, prevName)
vmfs.MustMkdirIfNotExist(prevPath)
createEmptyIndexdb(prevPath)
currName := "123456789ABCDEF1"
currPath := filepath.Join(idbPath, currName)
vmfs.MustMkdirIfNotExist(currPath)
createEmptyIndexdb(currPath)
nextName := "123456789ABCDEF2"
nextPath := filepath.Join(idbPath, nextName)
vmfs.MustMkdirIfNotExist(nextPath)
createEmptyIndexdb(nextPath)
assertPathsExist(t, prevPath, currPath, nextPath)
s := MustOpenStorage(storageDataPath, OpenOptions{})
@@ -4614,11 +4614,17 @@ func TestMustRotateIndexDBs_dirNames(t *testing.T) {
}
}
func createEmptyIndexdb(path string) {
fs.MustMkdirIfNotExist(path)
partsFilePath := filepath.Join(path, "parts.json")
fs.MustWriteAtomic(partsFilePath, []byte("[]"), false)
}
func assertPathsExist(t *testing.T, paths ...string) {
t.Helper()
for _, path := range paths {
if !vmfs.IsPathExist(path) {
if !fs.IsPathExist(path) {
t.Fatalf("path does not exist: %s", path)
}
}
@@ -4628,7 +4634,7 @@ func assertPathsDoNotExist(t *testing.T, paths ...string) {
t.Helper()
for _, path := range paths {
if vmfs.IsPathExist(path) {
if fs.IsPathExist(path) {
t.Fatalf("path exists: %s", path)
}
}

View File

@@ -2,19 +2,19 @@ package storage
import (
"fmt"
"io/fs"
"os"
"path/filepath"
"slices"
"sync/atomic"
"testing"
"time"
vmfs "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/google/go-cmp/cmp"
)
func BenchmarkStorageAddRows(b *testing.B) {
defer vmfs.MustRemoveAll(b.Name())
defer fs.MustRemoveDir(b.Name())
f := func(b *testing.B, numRows int) {
b.Helper()
@@ -92,7 +92,7 @@ func BenchmarkStorageAddRows_VariousTimeRanges(b *testing.B) {
b.StopTimer()
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -167,7 +167,7 @@ func BenchmarkStorageSearchMetricNames_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -235,7 +235,7 @@ func BenchmarkStorageSearchLabelNames_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -301,7 +301,7 @@ func BenchmarkStorageSearchLabelValues_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -359,7 +359,7 @@ func BenchmarkStorageSearchTagValueSuffixes_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -417,7 +417,7 @@ func BenchmarkStorageSearchGraphitePaths_VariousTimeRanges(b *testing.B) {
}
s.MustClose()
vmfs.MustRemoveAll(b.Name())
fs.MustRemoveDir(b.Name())
// Start timer again to conclude the benchmark correctly.
b.StartTimer()
@@ -566,7 +566,7 @@ func BenchmarkStorageInsertWithAndWithoutPerDayIndex(b *testing.B) {
indexSize = benchmarkDirSize(path + "/indexdb")
s.MustClose()
vmfs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
b.ReportMetric(float64(rowsAddedTotal)/float64(b.Elapsed().Seconds()), "rows/s")
@@ -594,7 +594,7 @@ func BenchmarkStorageInsertWithAndWithoutPerDayIndex(b *testing.B) {
// benchmarkDirSize calculates the size of a directory.
func benchmarkDirSize(path string) int64 {
var size int64
err := fs.WalkDir(os.DirFS(path), ".", func(_ string, d fs.DirEntry, err error) error {
err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
if err != nil {
panic(err)
}

View File

@@ -95,19 +95,15 @@ func mustOpenTable(path string, s *Storage) *table {
// Create directories for small and big partitions if they don't exist yet.
smallPartitionsPath := filepath.Join(path, smallDirname)
fs.MustMkdirIfNotExist(smallPartitionsPath)
fs.MustRemoveTemporaryDirs(smallPartitionsPath)
smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname)
fs.MustMkdirIfNotExist(smallSnapshotsPath)
fs.MustRemoveTemporaryDirs(smallSnapshotsPath)
bigPartitionsPath := filepath.Join(path, bigDirname)
fs.MustMkdirIfNotExist(bigPartitionsPath)
fs.MustRemoveTemporaryDirs(bigPartitionsPath)
bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname)
fs.MustMkdirIfNotExist(bigSnapshotsPath)
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
// Open partitions.
pts := mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, s)
@@ -160,9 +156,9 @@ func (tb *table) MustCreateSnapshot(snapshotName string) (string, string) {
// MustDeleteSnapshot deletes snapshot with the given snapshotName.
func (tb *table) MustDeleteSnapshot(snapshotName string) {
smallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
fs.MustRemoveDirAtomic(smallDir)
fs.MustRemoveDir(smallDir)
bigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
fs.MustRemoveDirAtomic(bigDir)
fs.MustRemoveDir(bigDir)
}
func (tb *table) addPartitionLocked(pt *partition) {
@@ -591,6 +587,14 @@ func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool)
// Skip directory with snapshots
continue
}
ptDirPath := filepath.Join(partitionsPath, ptName)
if fs.IsPartiallyRemovedDir(ptDirPath) {
// Finish the removal of partially deleted partition directories.
// Partially deleted partition directories may occur when unclean shutdown happens
// in the middle of directory removal.
fs.MustRemoveDir(ptDirPath)
continue
}
ptNames[ptName] = true
}
}

View File

@@ -3,10 +3,11 @@ package storage
import (
"fmt"
"math/rand"
"os"
"sort"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestTableSearch(t *testing.T) {
@@ -185,11 +186,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
// Create a table from rowss and test search on it.
strg := newTestStorage()
tb := mustOpenTable("test-table", strg)
defer func() {
if err := os.RemoveAll("test-table"); err != nil {
t.Fatalf("cannot remove table directory: %s", err)
}
}()
defer fs.MustRemoveDir("test-table")
for _, rows := range rowss {
tb.MustAddRows(rows)

View File

@@ -11,14 +11,13 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestMain(m *testing.M) {
isDebug = true
n := m.Run()
if err := os.RemoveAll("benchmarkTableSearch"); err != nil {
panic(fmt.Errorf("cannot remove benchmark tables: %w", err))
}
fs.MustRemoveDir("benchmarkTableSearch")
os.Exit(n)
}

View File

@@ -1,20 +1,17 @@
package storage
import (
"os"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose"
const retention = 123 * retention31Days
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
fs.MustRemoveDir(path)
defer fs.MustRemoveDir(path)
// Create a new table
strg := newTestStorage()

View File

@@ -3,11 +3,11 @@ package storage
import (
"fmt"
"math/rand"
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func BenchmarkTableAddRows(b *testing.B) {
@@ -100,9 +100,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose()
// Remove the table.
if err := os.RemoveAll(tablePath); err != nil {
b.Fatalf("cannot remove table %q: %s", tablePath, err)
}
fs.MustRemoveDir(tablePath)
}
stopTestStorage(strg)
}

View File

@@ -8,12 +8,14 @@ import (
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func TestLoadFromFileOrNewError(t *testing.T) {
defer os.RemoveAll(t.Name())
defer fs.MustRemoveDir(t.Name())
f := func(path string, expErr string) {
logBuffer := &bytes.Buffer{}
@@ -40,15 +42,11 @@ func TestLoadFromFileOrNewError(t *testing.T) {
f(path, "missing files; init new cache")
path = initCacheForTest(t, `missingMetadata`, 10000)
if err := os.Remove(filepath.Join(path, `metadata.bin`)); err != nil {
t.Fatalf("failed to remove metadata.bin file: %v", err)
}
fs.MustRemovePath(filepath.Join(path, `metadata.bin`))
f(path, "missing files; init new cache")
path = initCacheForTest(t, `invalidMetadata`, 10000)
if err := os.WriteFile(filepath.Join(path, `metadata.bin`), []byte(""), 0644); err != nil {
t.Fatalf("failed to write test metadata file: %v", err)
}
fs.MustWriteSync(filepath.Join(path, `metadata.bin`), nil)
f(path, "invalid: cannot read maxBucketChunks")
path = initCacheForTest(t, `cacheMismatch`, 87654321)
@@ -56,7 +54,7 @@ func TestLoadFromFileOrNewError(t *testing.T) {
}
func TestLoadFromFileOrNewOK(t *testing.T) {
defer os.RemoveAll(t.Name())
defer fs.MustRemoveDir(t.Name())
cachePath := initCacheForTest(t, `ok`, 10000)

View File

@@ -144,7 +144,7 @@ func (pw *partWrapper) decRef() {
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
}
}
@@ -570,7 +570,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
if needStop(stopCh) {
// Remove incomplete destination part
if dstPartType != partInmemory {
fs.MustRemoveAll(dstPartPath)
fs.MustRemoveDir(dstPartPath)
}
return
}
@@ -644,7 +644,7 @@ func (ddb *datadb) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *in
if ph.RowsCount == 0 {
// The created part is empty. Remove it
if mpNew == nil {
fs.MustRemoveAll(dstPartPath)
fs.MustRemoveDir(dstPartPath)
}
return nil
}
@@ -1217,7 +1217,7 @@ func mustRemoveUnusedDirs(path string, partNames []string) {
fn := de.Name()
if _, ok := m[fn]; !ok {
deletePath := filepath.Join(path, fn)
fs.MustRemoveAll(deletePath)
fs.MustRemoveDir(deletePath)
removedDirs++
}
}

View File

@@ -53,7 +53,7 @@ func mustCreatePartition(path string) {
//
// The partition must be closed with MustClose before deleting it.
func mustDeletePartition(path string) {
fs.MustRemoveAll(path)
fs.MustRemoveDir(path)
}
// mustOpenPartition opens partition at the given path for the given Storage.

View File

@@ -273,6 +273,13 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
for i, de := range des {
fname := de.Name()
partitionDir := filepath.Join(partitionsPath, fname)
if fs.IsPartiallyRemovedDir(partitionDir) {
// Drop partially removed partition directory. This may happen when unclean shutdown happens during partition deletion.
fs.MustRemoveDir(partitionDir)
continue
}
wg.Add(1)
concurrencyLimiterCh <- struct{}{}
go func(idx int) {