mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-12 21:34:04 +03:00
Compare commits
1 Commits
dependabot
...
gh-8833
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
faf6718da9 |
@@ -204,9 +204,13 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
|
||||
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
|
||||
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
|
||||
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
|
||||
inmemoryWorkers := inmemoryQueueWorkers.GetOptionalArg(argIdx)
|
||||
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
|
||||
return float64(concurrency)
|
||||
return float64(concurrency + inmemoryWorkers)
|
||||
})
|
||||
for range inmemoryWorkers {
|
||||
c.wg.Go(c.runWorkerForInmemoryQueue)
|
||||
}
|
||||
for range concurrency {
|
||||
c.wg.Go(c.runWorker)
|
||||
}
|
||||
@@ -348,6 +352,52 @@ func (c *client) runWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) runWorkerForInmemoryQueue() {
|
||||
var ok bool
|
||||
var block []byte
|
||||
ch := make(chan bool, 1)
|
||||
for {
|
||||
block, ok = c.fq.MustReadInMemoryBlockBlocking(block[:0])
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
startTime := time.Now()
|
||||
ch <- c.sendBlock(block)
|
||||
c.sendDuration.Add(time.Since(startTime).Seconds())
|
||||
}()
|
||||
select {
|
||||
case ok := <-ch:
|
||||
if ok {
|
||||
// The block has been sent successfully
|
||||
continue
|
||||
}
|
||||
// Return unsent block to the queue.
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
return
|
||||
case <-c.stopCh:
|
||||
// c must be stopped. Wait up to 5 seconds for the in-flight request to complete.
|
||||
// If it succeeds, drain the remaining in-memory queue before returning.
|
||||
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case ok := <-ch:
|
||||
if !ok {
|
||||
// Return unsent block to the queue.
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
} else {
|
||||
c.drainInMemoryQueue(stopCtx, block[:0])
|
||||
}
|
||||
case <-stopCtx.Done():
|
||||
// Return unsent block to the queue.
|
||||
c.fq.MustWriteBlockIgnoreDisabledPQ(block)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) doRequest(url string, body []byte) (*http.Response, error) {
|
||||
req, err := c.newRequest(url, body)
|
||||
if err != nil {
|
||||
|
||||
@@ -66,6 +66,9 @@ var (
|
||||
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
||||
"isn't enough for sending high volume of collected data to remote storage. "+
|
||||
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
|
||||
inmemoryQueueWorkers = flagutil.NewArrayInt("remoteWrite.inmemoryQueueWorkers", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
|
||||
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
|
||||
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
|
||||
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
|
||||
"It is hidden by default, since it can contain sensitive info such as auth key")
|
||||
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
|
||||
@@ -923,7 +926,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
if maxInmemoryBlocks < 2 {
|
||||
maxInmemoryBlocks = 2
|
||||
}
|
||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
|
||||
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled, inmemoryQueueWorkers.GetOptionalArg(argIdx) > 0)
|
||||
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(fq.GetPendingBytes())
|
||||
})
|
||||
|
||||
@@ -77,7 +77,7 @@ func TestRemoteWriteContext_TryPushTimeSeries(t *testing.T) {
|
||||
|
||||
path := "fast-queue-write-test"
|
||||
fs.MustRemoveDir(path)
|
||||
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false)
|
||||
fq := persistentqueue.MustOpenFastQueue(path, "test", 100, 0, false, false)
|
||||
defer fs.MustRemoveDir(path)
|
||||
defer fq.MustClose()
|
||||
|
||||
|
||||
@@ -332,13 +332,11 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
|
||||
},
|
||||
)
|
||||
|
||||
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
|
||||
// since worker is busy with the first request.
|
||||
for i := range 2 {
|
||||
@@ -641,3 +639,102 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
|
||||
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer remoteWriteSrv.Close()
|
||||
|
||||
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer remoteWriteSrv2.Close()
|
||||
|
||||
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
|
||||
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
|
||||
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
|
||||
"-remoteWrite.disableOnDiskQueue=true",
|
||||
// use only 1 worker to get a full queue faster
|
||||
"-remoteWrite.queues=1",
|
||||
"-remoteWrite.flushInterval=1ms",
|
||||
"-remoteWrite.inmemoryQueueWorkers=1",
|
||||
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
|
||||
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
|
||||
// See initRemoteWriteCtxs function in remotewrite.go for details.
|
||||
"-remoteWrite.maxRowsPerBlock=1000000000",
|
||||
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
|
||||
|
||||
// Delay retry logic to avoid race conditions with waitFor assertions.
|
||||
// It improves the test stability on resource-constrained runners.
|
||||
// Should be bigger than retries * period
|
||||
"-remoteWrite.retryMinInterval=3s",
|
||||
})
|
||||
|
||||
const (
|
||||
retries = 20
|
||||
period = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
waitFor := func(f func() bool) {
|
||||
t.Helper()
|
||||
for range retries {
|
||||
if f() {
|
||||
return
|
||||
}
|
||||
time.Sleep(period)
|
||||
}
|
||||
t.Fatalf("timed out waiting for retry #%d", retries)
|
||||
}
|
||||
|
||||
// Real remote write URLs are hidden in metrics
|
||||
url1 := "1:secret-url"
|
||||
url2 := "2:secret-url"
|
||||
|
||||
// Wait until first request got flushed to remote write server
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
|
||||
},
|
||||
)
|
||||
// Wait until second request got flushed to remote write server
|
||||
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
|
||||
},
|
||||
)
|
||||
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
|
||||
// since worker is busy with the first request.
|
||||
for i := range 2 {
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Send one more request.
|
||||
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
|
||||
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
|
||||
}, apptest.QueryOpts{})
|
||||
|
||||
waitFor(
|
||||
func() bool {
|
||||
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueueWorkers` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
@@ -928,6 +928,37 @@ vmagent will generate the following persistent queue folders:
|
||||
2_0AAFDF53E314A72A
|
||||
```
|
||||
|
||||
### On-disk persistence and data processing order
|
||||
|
||||
By default, vmagent processes data in FIFO order. If data has been written
|
||||
to the on-disk queue, it must be flushed to the remote storage before newly
|
||||
ingested data can be forwarded there. During long outages, vmagent may
|
||||
accumulate large amounts of data in the file-based queue, which can introduce
|
||||
a significant lag between the moment data is collected by vmagent and the
|
||||
moment it becomes visible at the remote storage.
|
||||
|
||||
This behavior can be changed with the `-remoteWrite.inmemoryQueueWorkers`
|
||||
command-line flag. When set to a non-zero value, vmagent starts the given
|
||||
number of additional workers, which send only recently ingested data from
|
||||
the in-memory queue, while the workers configured via `-remoteWrite.queues`
|
||||
drain the file-based backlog concurrently. This reduces the delivery lag for
|
||||
fresh samples after remote storage outages or slowdowns. The flag can be set
|
||||
individually per each `-remoteWrite.url`.
|
||||
|
||||
Note that these workers are started in addition to the workers configured
|
||||
via `-remoteWrite.queues`, so the total number of concurrent connections to
|
||||
the remote storage becomes the sum of both flags. Take this into account if
|
||||
the remote storage limits the number of concurrent requests.
|
||||
|
||||
This flag has the following possible limitations:
|
||||
|
||||
* Samples may arrive at the remote storage out of order, since recent data
|
||||
can be delivered before the older backlogged data. Do not use this option
|
||||
if the remote storage doesn't accept out-of-order samples.
|
||||
* Recent data isn't guaranteed to take the fast path: if the in-memory queue
|
||||
is full, newly ingested data is still written to the file-based queue and
|
||||
is delivered in FIFO order by the generic workers.
|
||||
|
||||
### Disabling On-disk persistence
|
||||
|
||||
There are cases when it is better to disable on-disk persistence for pending data on the `vmagent` side:
|
||||
|
||||
@@ -26,6 +26,8 @@ type FastQueue struct {
|
||||
// isPQDisabled is set to true when pq is disabled.
|
||||
isPQDisabled bool
|
||||
|
||||
prioritizeInMemoryData bool
|
||||
|
||||
// pq is file-based queue
|
||||
pq *queue
|
||||
|
||||
@@ -48,12 +50,13 @@ type FastQueue struct {
|
||||
// reaches maxPendingSize.
|
||||
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
|
||||
// in-memory queue part can be stored on disk during graceful shutdown.
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
|
||||
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool, prioritizeInMemoryData bool) *FastQueue {
|
||||
pq := mustOpen(path, name, maxPendingBytes)
|
||||
fq := &FastQueue{
|
||||
pq: pq,
|
||||
isPQDisabled: isPQDisabled,
|
||||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||
pq: pq,
|
||||
isPQDisabled: isPQDisabled,
|
||||
prioritizeInMemoryData: prioritizeInMemoryData,
|
||||
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
|
||||
}
|
||||
fq.cond.L = &fq.mu
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
@@ -97,7 +100,7 @@ func (fq *FastQueue) IsWriteBlocked() bool {
|
||||
}
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
|
||||
return len(fq.ch) == cap(fq.ch) || (fq.pq.GetPendingBytes() > 0 && !fq.prioritizeInMemoryData)
|
||||
}
|
||||
|
||||
// UnblockAllReaders unblocks all the readers.
|
||||
@@ -194,18 +197,25 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
|
||||
|
||||
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
|
||||
|
||||
fq.flushInmemoryBlocksToFileIfNeededLocked()
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
||||
// So put the block to file-based queue.
|
||||
if len(fq.ch) > 0 {
|
||||
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
|
||||
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
|
||||
// fast path: there is pending data at file-based queue,
|
||||
// it must be drained before in-memory queue could be used.
|
||||
// File-based queue could be non-empty after vmagent restart
|
||||
// and vmagent couldn't flush in-memory queue during shutdown.
|
||||
return false
|
||||
}
|
||||
if !fq.prioritizeInMemoryData {
|
||||
fq.flushInmemoryBlocksToFileIfNeededLocked()
|
||||
if fq.pq.GetPendingBytes() > 0 {
|
||||
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
|
||||
// So put the block to file-based queue.
|
||||
|
||||
if !isPQWriteAllowed {
|
||||
return false
|
||||
}
|
||||
fq.pq.MustWriteBlock(block)
|
||||
return true
|
||||
}
|
||||
if !isPQWriteAllowed {
|
||||
return false
|
||||
}
|
||||
fq.pq.MustWriteBlock(block)
|
||||
return true
|
||||
}
|
||||
if len(fq.ch) == cap(fq.ch) {
|
||||
// There is no space left in the in-memory queue. Put the data to file-based queue.
|
||||
@@ -216,7 +226,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
|
||||
fq.pq.MustWriteBlock(block)
|
||||
return true
|
||||
}
|
||||
// Fast path - put the block to in-memory queue.
|
||||
|
||||
bb := blockBufPool.Get()
|
||||
bb.B = append(bb.B[:0], block...)
|
||||
fq.ch <- bb
|
||||
@@ -239,16 +249,15 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
|
||||
return dst, false
|
||||
}
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
data, ok := fq.pq.MustReadBlockNonblocking(dst)
|
||||
if ok {
|
||||
return data, true
|
||||
}
|
||||
dst = data
|
||||
continue
|
||||
}
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
if fq.stopDeadline > 0 {
|
||||
return dst, false
|
||||
@@ -259,6 +268,27 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
|
||||
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
|
||||
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
|
||||
fq.mu.Lock()
|
||||
defer fq.mu.Unlock()
|
||||
|
||||
for {
|
||||
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
|
||||
return dst, false
|
||||
}
|
||||
if len(fq.ch) > 0 {
|
||||
return fq.mustReadInMemoryBlockLocked(dst), true
|
||||
}
|
||||
if fq.stopDeadline > 0 {
|
||||
return dst, false
|
||||
}
|
||||
// There are no blocks. Wait for new block.
|
||||
fq.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
|
||||
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
|
||||
// It does not block waiting for new blocks.
|
||||
@@ -277,9 +307,6 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
|
||||
if len(fq.ch) == 0 {
|
||||
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
|
||||
}
|
||||
if n := fq.pq.GetPendingBytes(); n > 0 {
|
||||
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
|
||||
}
|
||||
bb := <-fq.ch
|
||||
fq.pendingInmemoryBytes -= uint64(len(bb.B))
|
||||
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
|
||||
|
||||
@@ -13,7 +13,7 @@ func TestFastQueueOpenClose(_ *testing.T) {
|
||||
path := "fast-queue-open-close"
|
||||
fs.MustRemoveDir(path)
|
||||
for range 10 {
|
||||
fq := MustOpenFastQueue(path, "foobar", 100, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", 100, 0, false, false)
|
||||
fq.MustClose()
|
||||
}
|
||||
fs.MustRemoveDir(path)
|
||||
@@ -24,7 +24,7 @@ func TestFastQueueWriteReadInmemory(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
|
||||
if n := fq.GetInmemoryQueueLen(); n != 0 {
|
||||
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
|
||||
}
|
||||
@@ -57,7 +57,7 @@ func TestFastQueueWriteReadMixed(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
capacity := 100
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
}
|
||||
@@ -106,7 +106,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
||||
|
||||
blocks = append(blocks, block)
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
|
||||
}
|
||||
if n := fq.GetPendingBytes(); n == 0 {
|
||||
t.Fatalf("the number of pending bytes must be greater than 0")
|
||||
@@ -120,7 +120,7 @@ func TestFastQueueWriteReadWithCloses(t *testing.T) {
|
||||
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
|
||||
}
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, false, false)
|
||||
}
|
||||
if n := fq.GetPendingBytes(); n != 0 {
|
||||
t.Fatalf("the number of pending bytes must be 0; got %d", n)
|
||||
@@ -133,7 +133,7 @@ func TestFastQueueReadUnblockByClose(t *testing.T) {
|
||||
path := "fast-queue-read-unblock-by-close"
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foorbar", 123, 0, false, false)
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
data, ok := fq.MustReadBlock(nil)
|
||||
@@ -163,7 +163,7 @@ func TestFastQueueReadUnblockByWrite(t *testing.T) {
|
||||
path := "fast-queue-read-unblock-by-write"
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foobar", 13, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", 13, 0, false, false)
|
||||
block := "foodsafdsaf sdf"
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
@@ -197,7 +197,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
|
||||
path := "fast-queue-read-write-concurrent"
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
fq := MustOpenFastQueue(path, "foobar", 5, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", 5, 0, false, false)
|
||||
|
||||
var blocks []string
|
||||
blocksMap := make(map[string]bool)
|
||||
@@ -259,7 +259,7 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
|
||||
readersWG.Wait()
|
||||
|
||||
// Collect the remaining data
|
||||
fq = MustOpenFastQueue(path, "foobar", 5, 0, false)
|
||||
fq = MustOpenFastQueue(path, "foobar", 5, 0, false, false)
|
||||
resultCh := make(chan error)
|
||||
go func() {
|
||||
for len(blocksMap) > 0 {
|
||||
@@ -293,7 +293,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
capacity := 20
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
|
||||
if n := fq.GetInmemoryQueueLen(); n != 0 {
|
||||
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
|
||||
}
|
||||
@@ -310,7 +310,7 @@ func TestFastQueueWriteReadWithDisabledPQ(t *testing.T) {
|
||||
}
|
||||
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
|
||||
for _, block := range blocks {
|
||||
buf, ok := fq.MustReadBlock(nil)
|
||||
if !ok {
|
||||
@@ -329,7 +329,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
capacity := 20
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true)
|
||||
fq := MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
|
||||
if n := fq.GetInmemoryQueueLen(); n != 0 {
|
||||
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
|
||||
}
|
||||
@@ -351,7 +351,7 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
|
||||
}
|
||||
|
||||
fq.MustClose()
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true)
|
||||
fq = MustOpenFastQueue(path, "foobar", capacity, 0, true, false)
|
||||
for _, block := range blocks {
|
||||
buf, ok := fq.MustReadBlock(nil)
|
||||
if !ok {
|
||||
|
||||
@@ -17,7 +17,7 @@ func BenchmarkFastQueueThroughputSerial(b *testing.B) {
|
||||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-fast-queue-throughput-serial-%d", blockSize)
|
||||
fs.MustRemoveDir(path)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*2, 0, false, false)
|
||||
defer func() {
|
||||
fq.MustClose()
|
||||
fs.MustRemoveDir(path)
|
||||
@@ -38,7 +38,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
|
||||
b.SetBytes(int64(blockSize) * iterationsCount)
|
||||
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
|
||||
fs.MustRemoveDir(path)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false)
|
||||
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0, false, false)
|
||||
defer func() {
|
||||
fq.MustClose()
|
||||
fs.MustRemoveDir(path)
|
||||
|
||||
Reference in New Issue
Block a user