Compare commits

...

2 Commits

Author SHA1 Message Date
f41gh7
c1a28c8fa4 address review comments 2026-06-17 11:36:02 +02:00
f41gh7
faf6718da9 app/vmagent: add new flag to prioritize recently ingested data
This commit adds a new flag remoteWrite.inmemoryQueueWorkers. Which
 dedicates set of workers for processing only
in-memory part of vmagent persistentqueue. It should help to mitigate
an issue when stale data at file-based queue prevents from ingestion
recent data.

 There is a downside in case of remote storage is not reachable,
it's possible to get only a part of data ingested and other part queued
into file-based queue. But it should be acceptable, because there is no
strong guarantees for the data ingestion order.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833
2026-06-11 17:41:23 +02:00
9 changed files with 213 additions and 54 deletions

View File

@@ -204,11 +204,19 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
inmemoryWorkers := inmemoryQueueWorkers.GetOptionalArg(argIdx)
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
return float64(concurrency + inmemoryWorkers)
})
for range inmemoryWorkers {
c.wg.Go(func() {
c.runWorker(c.fq.MustReadInMemoryBlockBlocking)
})
}
for range concurrency {
c.wg.Go(c.runWorker)
c.wg.Go(func() {
c.runWorker(c.fq.MustReadBlock)
})
}
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
}
@@ -302,12 +310,12 @@ func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
return cfg, nil
}
func (c *client) runWorker() {
func (c *client) runWorker(readBlock func(dst []byte) ([]byte, bool)) {
var ok bool
var block []byte
ch := make(chan bool, 1)
for {
block, ok = c.fq.MustReadBlock(block[:0])
block, ok = readBlock(block[:0])
if !ok {
return
}

View File

@@ -66,6 +66,9 @@ var (
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
inmemoryQueueWorkers = flagutil.NewArrayInt("remoteWrite.inmemoryQueueWorkers", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
@@ -923,7 +926,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled, inmemoryQueueWorkers.GetOptionalArg(argIdx) > 0)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes())
})

View File

@@ -77,7 +77,7 @@ func TestRemoteWriteContext_TryPushTimeSeries(t *testing.T) {
path := "fast-queue-write-test"
fs.MustRemoveDir(path)
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false)
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false, false)
defer fs.MustRemoveDir(path)
defer fq.MustClose()

View File

@@ -332,13 +332,11 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
@@ -641,3 +639,102 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
}
}
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv.Close()
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer remoteWriteSrv2.Close()
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
"-remoteWrite.disableOnDiskQueue=true",
// use only 1 worker to get a full queue faster
"-remoteWrite.queues=1",
"-remoteWrite.flushInterval=1ms",
"-remoteWrite.inmemoryQueueWorkers=1",
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
// See initRemoteWriteCtxs function in remotewrite.go for details.
"-remoteWrite.maxRowsPerBlock=1000000000",
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
// Delay retry logic to avoid race conditions with waitFor assertions.
// It improves the test stability on resource-constrained runners.
// Should be bigger than retries * period
"-remoteWrite.retryMinInterval=3s",
})
const (
retries = 20
period = 100 * time.Millisecond
)
waitFor := func(f func() bool) {
t.Helper()
for range retries {
if f() {
return
}
time.Sleep(period)
}
t.Fatalf("timed out waiting for retry #%d", retries)
}
// Real remote write URLs are hidden in metrics
url1 := "1:secret-url"
url2 := "2:secret-url"
// Wait until first request got flushed to remote write server
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Wait until second request got flushed to remote write server
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
},
)
}
// Send one more request.
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
},
)
}

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueueWorkers` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -928,6 +928,29 @@ vmagent will generate the following persistent queue folders:
2_0AAFDF53E314A72A
```
### On-disk persistence and data processing order
By default, vmagent processes data in FIFO order. If data has been written to the on-disk queue,
it must be flushed to the remote storage before newly ingested data can be forwarded there.
During long outages, vmagent may accumulate large amounts of data in the file-based queue,
which can introduce a significant lag between the moment data is collected by vmagent and the
moment it becomes visible at the remote storage.
This behavior can be changed with the `-remoteWrite.inmemoryQueueWorkers` {{% available_from "#" %}} command-line flag.
When set to a non-zero value, vmagent starts the given number of additional workers,
which send only recently ingested data from the in-memory queue, while the workers configured via `-remoteWrite.queues` drain the file-based backlog concurrently.
This reduces the delivery lag for fresh samples after remote storage outages or slowdowns. The flag can be set individually per each `-remoteWrite.url`.
Note that these workers are started in addition to the workers configured via `-remoteWrite.queues`, so the total number of concurrent connections to
the remote storage becomes the sum of both flags. Take this into account if the remote storage limits the number of concurrent requests.
This flag has the following possible limitations:
* Samples may arrive at the remote storage out of order, since recent data can be delivered before the older backlogged data.
Do not use this option if the remote storage doesn't accept out-of-order samples.
* Recent data isn't guaranteed to take the fast path: if the in-memory queue is full,
newly ingested data is still written to the file-based queue and is delivered in FIFO order by the generic workers.
### Disabling On-disk persistence
There are cases when it is better to disable on-disk persistence for pending data on the `vmagent` side:

View File

@@ -26,6 +26,8 @@ type FastQueue struct {
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
prioritizeInMemoryData bool
// pq is file-based queue
pq *queue
@@ -48,12 +50,13 @@ type FastQueue struct {
// reaches maxPendingSize.
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool, prioritizeInMemoryData bool) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
pq: pq,
isPQDisabled: isPQDisabled,
prioritizeInMemoryData: prioritizeInMemoryData,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
@@ -97,7 +100,7 @@ func (fq *FastQueue) IsWriteBlocked() bool {
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
return len(fq.ch) == cap(fq.ch) || (fq.pq.GetPendingBytes() > 0 && !fq.prioritizeInMemoryData)
}
// UnblockAllReaders unblocks all the readers.
@@ -193,19 +196,25 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
defer fq.mu.Unlock()
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
// fast path: there is pending data at file-based queue,
// it must be drained before in-memory queue could be used.
// File-based queue could be non-empty after vmagent restart
// and vmagent couldn't flush in-memory queue during shutdown.
return false
}
if !fq.prioritizeInMemoryData {
fq.flushInmemoryBlocksToFileIfNeededLocked()
if fq.pq.GetPendingBytes() > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
@@ -216,7 +225,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
@@ -229,12 +238,41 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
}
// MustReadBlock reads the next block from fq into dst and returns it.
// It first reads from the in-memory queue, then checks file-based queue.
// It first reads from the file-based queue, then checks in-memory queue.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.pq.ResetIfEmpty()
fq.cond.Wait()
}
}
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
@@ -242,19 +280,10 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.pq.ResetIfEmpty()
fq.cond.Wait()
}
}
@@ -277,9 +306,6 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()

View File

@@ -13,7 +13,7 @@ func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close"
fs.MustRemoveDir(path)
for range 10 {
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
fq := MustOpenFastQueue(path, "foobar", 100, 0, false, false)
fq.MustClose()
}
fs.MustRemoveDir(path)
@@ -24,7 +24,7 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -57,7 +57,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@@ -93,7 +93,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@@ -106,7 +106,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
blocks = append(blocks, block)
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
}
if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0")
@@ -120,7 +120,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
}
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
@@ -133,7 +133,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false, false)
resultCh := make(chan error)
go func() {
data, ok := fq.MustReadBlock(nil)
@@ -163,7 +163,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false, false)
block := "foodsafdsaf sdf"
resultCh := make(chan error)
go func() {
@@ -197,7 +197,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false, false)
var blocks []string
blocksMap := make(map[string]bool)
@@ -259,7 +259,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
readersWG.Wait()
// Collect the remaining data
fq = MustOpenFastQueue(path, "foobar", 5, 0, false)
fq = MustOpenFastQueue(path, "foobar", 5, 0, false, false)
resultCh := make(chan error)
go func() {
for len(blocksMap) > 0 {
@@ -293,7 +293,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -310,7 +310,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
@@ -329,7 +329,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -351,7 +351,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {

View File

@@ -17,7 +17,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false, false)
defer func() {
fq.MustClose()
fs.MustRemoveDir(path)
@@ -38,7 +38,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false, false)
defer func() {
fq.MustClose()
fs.MustRemoveDir(path)