Files
VictoriaMetrics/lib/persistentqueue/fastqueue_test.go

371 lines
9.0 KiB
Go
Raw Permalink Normal View History

package persistentqueue
import (
"fmt"
"sync"
"testing"
"time"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
for i := 0; i < 10; i++ {
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
fq.MustClose()
}
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadInmemory(t *testing.T) {
path := "fast-queue-write-read-inmemory"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
capacity := 100
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if n := fq.GetInmemoryQueueLen(); n != capacity {
t.Fatalf("unexpected size of inmemory queue; got %d; want %d", n, capacity)
}
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadMixed(t *testing.T) {
path := "fast-queue-write-read-mixed"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
var blocks []string
for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i)
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0")
}
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithCloses(t *testing.T) {
path := "fast-queue-write-read-with-closes"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
var blocks []string
for i := 0; i < 2*capacity; i++ {
block := fmt.Sprintf("block %d", i)
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
}
if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0")
}
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
}
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
resultCh := make(chan error)
go func() {
data, ok := fq.MustReadBlock(nil)
if ok {
resultCh <- fmt.Errorf("unexpected ok=true")
return
}
if len(data) != 0 {
resultCh <- fmt.Errorf("unexpected non-empty data=%q", data)
return
}
resultCh <- nil
}()
fq.MustClose()
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
2020-03-10 19:39:55 +02:00
block := "foodsafdsaf sdf"
resultCh := make(chan error)
go func() {
data, ok := fq.MustReadBlock(nil)
if !ok {
resultCh <- fmt.Errorf("unexpected ok=false")
return
}
if string(data) != block {
resultCh <- fmt.Errorf("unexpected block read; got %q; want %q", data, block)
return
}
resultCh <- nil
}()
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
var blocks []string
blocksMap := make(map[string]bool)
var blocksMapLock sync.Mutex
for i := 0; i < 1000; i++ {
block := fmt.Sprintf("block %d", i)
blocks = append(blocks, block)
blocksMap[block] = true
}
// Start readers
var readersWG sync.WaitGroup
for i := 0; i < 10; i++ {
readersWG.Add(1)
go func() {
defer readersWG.Done()
for {
data, ok := fq.MustReadBlock(nil)
if !ok {
return
}
blocksMapLock.Lock()
if !blocksMap[string(data)] {
panic(fmt.Errorf("unexpected data read from the queue: %q", data))
}
delete(blocksMap, string(data))
blocksMapLock.Unlock()
}
}()
}
// Start writers
blocksCh := make(chan string)
var writersWG sync.WaitGroup
for i := 0; i < 10; i++ {
writersWG.Add(1)
go func() {
defer writersWG.Done()
for block := range blocksCh {
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
panic(fmt.Errorf("TryWriteBlock must return true in this context"))
}
}
}()
}
// feed writers
for _, block := range blocks {
blocksCh <- block
}
close(blocksCh)
// Wait for writers to finish
writersWG.Wait()
// wait for a while, so readers could catch up
time.Sleep(100 * time.Millisecond)
// Close fq
fq.MustClose()
// Wait for readers to finish
readersWG.Wait()
// Collect the remaining data
fq = MustOpenFastQueue(path, "foobar", 5, 0, false)
resultCh := make(chan error)
go func() {
for len(blocksMap) > 0 {
data, ok := fq.MustReadBlock(nil)
if !ok {
resultCh <- fmt.Errorf("unexpected ok=false")
return
}
if !blocksMap[string(data)] {
resultCh <- fmt.Errorf("unexpected data read from fq: %q", data)
return
}
delete(blocksMap, string(data))
}
resultCh <- nil
}()
select {
case err := <-resultCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second * 5):
t.Fatalf("timeout")
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if fq.TryWriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq-force-write"
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d", i)
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
app/vmagent: follow-up for 090cb2c9de8d533eaba45a3ebbdb0d2503e97e00 - Add Try* prefix to functions, which return bool result in order to improve readability and reduce the probability of missing check for the result returned from these functions. - Call the adjustSampleValues() only once on input samples. Previously it was called on every attempt to flush data to peristent queue. - Properly restore the initial state of WriteRequest passed to tryPushWriteRequest() before returning from this function after unsuccessful push to persistent queue. Previously a part of WriteRequest samples may be lost in such case. - Add -remoteWrite.dropSamplesOnOverload command-line flag, which can be used for dropping incoming samples instead of returning 429 Too Many Requests error to the client when -remoteWrite.disableOnDiskQueue is set and the remote storage cannot keep up with the data ingestion rate. - Add vmagent_remotewrite_samples_dropped_total metric, which counts the number of dropped samples. - Add vmagent_remotewrite_push_failures_total metric, which counts the number of unsuccessful attempts to push data to persistent queue when -remoteWrite.disableOnDiskQueue is set. - Remove vmagent_remotewrite_aggregation_metrics_dropped_total and vm_promscrape_push_samples_dropped_total metrics, because they are replaced with vmagent_remotewrite_samples_dropped_total metric. - Update 'Disabling on-disk persistence' docs at docs/vmagent.md - Update stale comments in the code Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5088 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110
2023-11-25 11:31:30 +02:00
if fq.TryWriteBlock([]byte("error-block")) {
t.Fatalf("expect false due to full queue")
}
for i := 0; i < capacity; i++ {
block := fmt.Sprintf("block %d-%d", i, i)
fq.MustWriteBlockIgnoreDisabledPQ([]byte(block))
blocks = append(blocks, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
lib/fs: simplify the code for directory removal and make it compatible with object storage (S3) and NFS - Drop the code needed for asynchronous removal of the directory on NFS shares. This code was needed when VictoriaMetrics could keep open files after their deletion or renaming. This is no longer the case after the commit 43b24164efdd35c2641a289ab93f5cb94279cb60 . Now files are deleted only after all the readers close them. This updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 - Unify MustRemoveAll() and MustRemoveDirAtomic() into MustRemoveDir() and MustRemovePath() functions: - The MustRemoveDir() deletes the given directory with all its contents, in an "atomic" way: it creates a special `.delete-this-dir` file in the directory, then removes all its contents except of this file, and later removes the `.delete-this-dir` file together with the directory itself. This makes possible easily determining whether the given directory needs to be deleted after unclean shutdown - if it contains the `.delete-this-dir` file or if it is empty, it must be deleted. Add IsPartiallyRemovedDir() function, which can be used for detecting whether the given directory must be removed at starup. Previously the MustRemoveDirAtomic() was using a "trick" for atomic directory removal: it was "atomically" renaming the directory to a temporary directory with '.must-remove.' marker in the directory name, and after that it was removing the renamed directory. On startup all the directories with the `.must-remove.` marker were deleted if they are left after unclean shutdown. This "trick" doesn't work for NFS and object storage such as S3, since these storage systems do not support atomic renaming of directories with multiple entries inside. The new MustRemoveDir() function doesn't use this "trick", so it can be safely used in NFS and S3-like storage systems. This is based on the pull request from @func25 - https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9486/files . - The MustRemovePath() deletes the given file or an empty directory. - Delete the existing parts and partitions at startup if they were partially deleted. - Consistently use fs.MustRemoveDir() and fs.MustRemovePath() instead of os.RemoveAll() across the codebase. This reduces the amounts of bolierplate code related to error handling. - Consistently use fs.MustWriteSync() instead of os.WriteFile() across the codebase.
2025-07-25 18:41:17 +02:00
fs.MustRemoveDir(path)
}