Compare commits

...

1 Commits

Author SHA1 Message Date
f41gh7
faf6718da9 app/vmagent: add new flag to prioritize recently ingested data
This commit adds a new flag remoteWrite.inmemoryQueueWorkers. Which
 dedicates set of workers for processing only
in-memory part of vmagent persistentqueue. It should help to mitigate
an issue when stale data at file-based queue prevents from ingestion
recent data.

 There is a downside in case of remote storage is not reachable,
it's possible to get only a part of data ingested and other part queued
into file-based queue. But it should be acceptable, because there is no
strong guarantees for the data ingestion order.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833
2026-06-11 17:41:23 +02:00
9 changed files with 255 additions and 45 deletions

View File

@@ -204,9 +204,13 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
inmemoryWorkers := inmemoryQueueWorkers.GetOptionalArg(argIdx)
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
return float64(concurrency + inmemoryWorkers)
})
for range inmemoryWorkers {
c.wg.Go(c.runWorkerForInmemoryQueue)
}
for range concurrency {
c.wg.Go(c.runWorker)
}
@@ -348,6 +352,52 @@ func (c *client) runWorker() {
}
}
func (c *client) runWorkerForInmemoryQueue() {
var ok bool
var block []byte
ch := make(chan bool, 1)
for {
block, ok = c.fq.MustReadInMemoryBlockBlocking(block[:0])
if !ok {
return
}
go func() {
startTime := time.Now()
ch <- c.sendBlock(block)
c.sendDuration.Add(time.Since(startTime).Seconds())
}()
select {
case ok := <-ch:
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
return
case <-c.stopCh:
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
// If it succeeds, drain the remaining in-memory queue before returning.
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
select {
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
} else {
c.drainInMemoryQueue(stopCtx, block[:0])
}
case <-stopCtx.Done():
// Return unsent block to the queue.
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
}
return
}
}
}
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
req, err := c.newRequest(url, body)
if err != nil {

View File

@@ -66,6 +66,9 @@ var (
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
inmemoryQueueWorkers = flagutil.NewArrayInt("remoteWrite.inmemoryQueueWorkers", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
@@ -923,7 +926,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled, inmemoryQueueWorkers.GetOptionalArg(argIdx) > 0)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes())
})

View File

@@ -77,7 +77,7 @@ func TestRemoteWriteContext_TryPushTimeSeries(t *testing.T) {
path := "fast-queue-write-test"
fs.MustRemoveDir(path)
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false)
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false, false)
defer fs.MustRemoveDir(path)
defer fq.MustClose()

View File

@@ -332,13 +332,11 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
@@ -641,3 +639,102 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
}
}
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv.Close()
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer remoteWriteSrv2.Close()
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
"-remoteWrite.disableOnDiskQueue=true",
// use only 1 worker to get a full queue faster
"-remoteWrite.queues=1",
"-remoteWrite.flushInterval=1ms",
"-remoteWrite.inmemoryQueueWorkers=1",
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
// See initRemoteWriteCtxs function in remotewrite.go for details.
"-remoteWrite.maxRowsPerBlock=1000000000",
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
// Delay retry logic to avoid race conditions with waitFor assertions.
// It improves the test stability on resource-constrained runners.
// Should be bigger than retries * period
"-remoteWrite.retryMinInterval=3s",
})
const (
retries = 20
period = 100 * time.Millisecond
)
waitFor := func(f func() bool) {
t.Helper()
for range retries {
if f() {
return
}
time.Sleep(period)
}
t.Fatalf("timed out waiting for retry #%d", retries)
}
// Real remote write URLs are hidden in metrics
url1 := "1:secret-url"
url2 := "2:secret-url"
// Wait until first request got flushed to remote write server
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Wait until second request got flushed to remote write server
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
},
)
}
// Send one more request.
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
},
)
}

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueueWorkers` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -928,6 +928,37 @@ vmagent will generate the following persistent queue folders:
2_0AAFDF53E314A72A
```
### On-disk persistence and data processing order
By default, vmagent processes data in FIFO order. If data has been written
to the on-disk queue, it must be flushed to the remote storage before newly
ingested data can be forwarded there. During long outages, vmagent may
accumulate large amounts of data in the file-based queue, which can introduce
a significant lag between the moment data is collected by vmagent and the
moment it becomes visible at the remote storage.
This behavior can be changed with the `-remoteWrite.inmemoryQueueWorkers`
command-line flag. When set to a non-zero value, vmagent starts the given
number of additional workers, which send only recently ingested data from
the in-memory queue, while the workers configured via `-remoteWrite.queues`
drain the file-based backlog concurrently. This reduces the delivery lag for
fresh samples after remote storage outages or slowdowns. The flag can be set
individually per each `-remoteWrite.url`.
Note that these workers are started in addition to the workers configured
via `-remoteWrite.queues`, so the total number of concurrent connections to
the remote storage becomes the sum of both flags. Take this into account if
the remote storage limits the number of concurrent requests.
This flag has the following possible limitations:
* Samples may arrive at the remote storage out of order, since recent data
can be delivered before the older backlogged data. Do not use this option
if the remote storage doesn't accept out-of-order samples.
* Recent data isn't guaranteed to take the fast path: if the in-memory queue
is full, newly ingested data is still written to the file-based queue and
is delivered in FIFO order by the generic workers.
### Disabling On-disk persistence
There are cases when it is better to disable on-disk persistence for pending data on the `vmagent` side:

View File

@@ -26,6 +26,8 @@ type FastQueue struct {
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
prioritizeInMemoryData bool
// pq is file-based queue
pq *queue
@@ -48,12 +50,13 @@ type FastQueue struct {
// reaches maxPendingSize.
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool, prioritizeInMemoryData bool) *FastQueue {
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
pq: pq,
isPQDisabled: isPQDisabled,
prioritizeInMemoryData: prioritizeInMemoryData,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
@@ -97,7 +100,7 @@ func (fq *FastQueue) IsWriteBlocked() bool {
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
return len(fq.ch) == cap(fq.ch) || (fq.pq.GetPendingBytes() > 0 && !fq.prioritizeInMemoryData)
}
// UnblockAllReaders unblocks all the readers.
@@ -194,18 +197,25 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
// fast path: there is pending data at file-based queue,
// it must be drained before in-memory queue could be used.
// File-based queue could be non-empty after vmagent restart
// and vmagent couldn't flush in-memory queue during shutdown.
return false
}
if !fq.prioritizeInMemoryData {
fq.flushInmemoryBlocksToFileIfNeededLocked()
if fq.pq.GetPendingBytes() > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
@@ -216,7 +226,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
@@ -239,16 +249,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
continue
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
@@ -259,6 +268,27 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.cond.Wait()
}
}
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It does not block waiting for new blocks.
@@ -277,9 +307,6 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()

View File

@@ -13,7 +13,7 @@ func TestFastQueueOpenClose(_ *testing.T) {
path := "fast-queue-open-close"
fs.MustRemoveDir(path)
for range 10 {
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
fq := MustOpenFastQueue(path, "foobar", 100, 0, false, false)
fq.MustClose()
}
fs.MustRemoveDir(path)
@@ -24,7 +24,7 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -57,7 +57,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@@ -93,7 +93,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 100
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
}
@@ -106,7 +106,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
blocks = append(blocks, block)
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
}
if n := fq.GetPendingBytes(); n == 0 {
t.Fatalf("the number of pending bytes must be greater than 0")
@@ -120,7 +120,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
}
if n := fq.GetPendingBytes(); n != 0 {
t.Fatalf("the number of pending bytes must be 0; got %d", n)
@@ -133,7 +133,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
path := "fast-queue-read-unblock-by-close"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false, false)
resultCh := make(chan error)
go func() {
data, ok := fq.MustReadBlock(nil)
@@ -163,7 +163,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
path := "fast-queue-read-unblock-by-write"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
fq := MustOpenFastQueue(path, "foobar", 13, 0, false, false)
block := "foodsafdsaf sdf"
resultCh := make(chan error)
go func() {
@@ -197,7 +197,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
path := "fast-queue-read-write-concurrent"
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
fq := MustOpenFastQueue(path, "foobar", 5, 0, false, false)
var blocks []string
blocksMap := make(map[string]bool)
@@ -259,7 +259,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
readersWG.Wait()
// Collect the remaining data
fq = MustOpenFastQueue(path, "foobar", 5, 0, false)
fq = MustOpenFastQueue(path, "foobar", 5, 0, false, false)
resultCh := make(chan error)
go func() {
for len(blocksMap) > 0 {
@@ -293,7 +293,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -310,7 +310,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
@@ -329,7 +329,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
fs.MustRemoveDir(path)
capacity := 20
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
@@ -351,7 +351,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
}
fq.MustClose()
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {

View File

@@ -17,7 +17,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false, false)
defer func() {
fq.MustClose()
fs.MustRemoveDir(path)
@@ -38,7 +38,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
fs.MustRemoveDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false, false)
defer func() {
fq.MustClose()
fs.MustRemoveDir(path)