diff --git a/lib/bytesutil/bytebuffer.go b/lib/bytesutil/bytebuffer.go index fd3a937472..4edc73ef3d 100644 --- a/lib/bytesutil/bytebuffer.go +++ b/lib/bytesutil/bytebuffer.go @@ -25,6 +25,13 @@ func (bb *ByteBuffer) Reset() { bb.B = bb.B[:0] } +// Grow grows bb capacity, so it can accept n bytes without additional allocations. +func (bb *ByteBuffer) Grow(n int) { + bLen := len(bb.B) + bb.B = slicesutil.SetLength(bb.B, bLen+n) + bb.B = bb.B[:bLen] +} + // Write appends p to bb. func (bb *ByteBuffer) Write(p []byte) (int, error) { bb.B = append(bb.B, p...) diff --git a/lib/bytesutil/bytebuffer_test.go b/lib/bytesutil/bytebuffer_test.go index 85cb3c97e5..1e43074d16 100644 --- a/lib/bytesutil/bytebuffer_test.go +++ b/lib/bytesutil/bytebuffer_test.go @@ -44,6 +44,8 @@ func TestByteBuffer(t *testing.T) { t.Fatalf("unexpected bb.B; got %q; want %q", bb.B, data1) } + bb.Grow(10) + data2 := []byte("1") n, err = bb.Write(data2) if err != nil { diff --git a/lib/chunkedbuffer/chunked_buffer.go b/lib/chunkedbuffer/buffer.go similarity index 72% rename from lib/chunkedbuffer/chunked_buffer.go rename to lib/chunkedbuffer/buffer.go index 4967802d32..c85f1203a8 100644 --- a/lib/chunkedbuffer/chunked_buffer.go +++ b/lib/chunkedbuffer/buffer.go @@ -1,15 +1,40 @@ package chunkedbuffer import ( + "bytes" + "errors" "fmt" "io" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) const chunkSize = 4 * 1024 +// Get returns Buffer from the pool. +// +// Return back the Buffer to the pool via Put() call when it is no longer needed. +func Get() *Buffer { + v := cbPool.Get() + if v == nil { + return &Buffer{} + } + return v.(*Buffer) +} + +// Put returns cb to the pool, so it could be re-used via Get() call. +// +// The cb cannot be used after Put() call. +func Put(cb *Buffer) { + cb.Reset() + cbPool.Put(cb) +} + +var cbPool sync.Pool + // Buffer provides in-memory buffer optimized for storing big bytes volumes. // // It stores the data in chunks of fixed size. This reduces memory fragmentation @@ -89,12 +114,45 @@ func (cb *Buffer) MustReadAt(p []byte, off int64) { } } +// ReadFrom reads all the data from r and appends it to cb. +func (cb *Buffer) ReadFrom(r io.Reader) (int64, error) { + v := copyBufPool.Get() + if v == nil { + v = new([16 * 1024]byte) + } + b := (v.(*[16 * 1024]byte))[:] + + bytesRead := int64(0) + for { + n, err := r.Read(b) + cb.MustWrite(b[:n]) + bytesRead += int64(n) + if err != nil { + copyBufPool.Put(v) + if errors.Is(err, io.EOF) { + return bytesRead, nil + } + return bytesRead, err + } + } +} + +var copyBufPool sync.Pool + // WriteTo writes cb data to w. func (cb *Buffer) WriteTo(w io.Writer) (int64, error) { - if len(cb.chunks) == 0 { + bLen := cb.Len() + if bLen == 0 { return 0, nil } + switch t := w.(type) { + case *bytesutil.ByteBuffer: + t.Grow(bLen) + case *bytes.Buffer: + t.Grow(bLen) + } + nTotal := 0 // Write all the chunks except the last one, which may be incomplete. @@ -123,6 +181,16 @@ func (cb *Buffer) WriteTo(w io.Writer) (int64, error) { return int64(nTotal), nil } +// MustWriteTo writes cb contents w. +// +// Use this function only if w cannot return errors. For example, if w is bytes.Buffer of bytesutil.ByteBuffer. +// If w can return errors, then use WriteTo function instead. +func (cb *Buffer) MustWriteTo(w io.Writer) { + if _, err := cb.WriteTo(w); err != nil { + logger.Panicf("BUG: unexpected error writing Buffer data to the provided writer: %s", err) + } +} + // Path returns cb path. func (cb *Buffer) Path() string { return fmt.Sprintf("Buffer/%p/mem", cb) @@ -185,8 +253,7 @@ func (r *reader) MustClose() { func getChunk() *[chunkSize]byte { v := chunkPool.Get() if v == nil { - var chunk [chunkSize]byte - return &chunk + return new([chunkSize]byte) } return v.(*[chunkSize]byte) } diff --git a/lib/chunkedbuffer/chunked_buffer_test.go b/lib/chunkedbuffer/buffer_test.go similarity index 87% rename from lib/chunkedbuffer/chunked_buffer_test.go rename to lib/chunkedbuffer/buffer_test.go index 4306795e86..42ee5ecbc2 100644 --- a/lib/chunkedbuffer/chunked_buffer_test.go +++ b/lib/chunkedbuffer/buffer_test.go @@ -8,7 +8,9 @@ import ( ) func TestBuffer(t *testing.T) { - var cb Buffer + cb := Get() + defer Put(cb) + for i := 0; i < 10; i++ { cb.Reset() @@ -84,8 +86,8 @@ func TestBuffer(t *testing.T) { } // Copy the data to another chunked buffer via WriteTo. - var cb2 Buffer - n, err = cb.WriteTo(&cb2) + cb2 := Get() + n, err = cb.WriteTo(cb2) if err != nil { t.Fatalf("error when writing data to another chunked buffer: %s", err) } @@ -111,6 +113,40 @@ func TestBuffer(t *testing.T) { // Verify MustClose at chunked buffer cb2.MustClose() + + Put(cb2) + } +} + +func TestBuffer_ReadFrom(t *testing.T) { + cb := Get() + defer Put(cb) + + bb := bytes.NewBufferString("foo") + n, err := cb.ReadFrom(bb) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if n != 3 { + t.Fatalf("unexpected number of bytes written: %d; want 3", n) + } + + bb = bytes.NewBufferString("bar") + n, err = cb.ReadFrom(bb) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if n != 3 { + t.Fatalf("unexpected number of bytes written: %d; want 3", n) + } + + var bbResult bytes.Buffer + cb.MustWriteTo(&bbResult) + + result := bbResult.String() + resultExpected := "foobar" + if result != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } } @@ -164,13 +200,7 @@ func TestBuffer_ReaderSingleChunk(t *testing.T) { func TestBuffer_WriteToZeroData(t *testing.T) { var cb Buffer var bb bytes.Buffer - n, err := cb.WriteTo(&bb) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - if n != 0 { - t.Fatalf("unexpected data written from cb with len=%d", n) - } + cb.MustWriteTo(&bb) if bbLen := bb.Len(); bbLen != 0 { t.Fatalf("unexpected data written to bb with len=%d; data=%q", bbLen, bb.Bytes()) } diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 495f8217e4..8b34adf1d5 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -12,7 +12,7 @@ import ( "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -40,6 +40,7 @@ type client struct { setHeaders func(req *http.Request) error setProxyHeaders func(req *http.Request) error maxScrapeSize int64 + disableCompression bool } func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { @@ -82,12 +83,15 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { tr.Proxy = proxyURLFunc tr.TLSHandshakeTimeout = 10 * time.Second tr.IdleConnTimeout = 2 * sw.ScrapeInterval - tr.DisableCompression = *disableCompression || sw.DisableCompression tr.DisableKeepAlives = *disableKeepAlive || sw.DisableKeepAlive tr.DialContext = dialFunc tr.MaxIdleConnsPerHost = 100 tr.MaxResponseHeaderBytes = int64(maxResponseHeadersSize.N) + // The client handles response compression manually in order to optimize it. + // See client.ReadData + tr.DisableCompression = true + hc := &http.Client{ Transport: ac.NewRoundTripper(tr), Timeout: sw.ScrapeTimeout, @@ -106,17 +110,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { setHeaders: setHeaders, setProxyHeaders: setProxyHeaders, maxScrapeSize: sw.MaxScrapeSize, + disableCompression: *disableCompression || sw.DisableCompression, } return c, nil } -func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { +func (c *client) ReadData(dst *chunkedbuffer.Buffer) (bool, error) { deadline := time.Now().Add(c.c.Timeout) ctx, cancel := context.WithDeadline(c.ctx, deadline) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.scrapeURL, nil) if err != nil { - cancel() - return fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err) + return false, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err) } // The following `Accept` header has been copied from Prometheus sources. // See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 . @@ -129,28 +135,32 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr) req.Header.Set("User-Agent", "vm_promscrape") if err := c.setHeaders(req); err != nil { - cancel() - return fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err) + return false, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err) } if err := c.setProxyHeaders(req); err != nil { - cancel() - return fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err) + return false, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err) } + if !c.disableCompression { + req.Header.Set("Accept-Encoding", "gzip") + } + scrapeRequests.Inc() resp, err := c.c.Do(req) if err != nil { - cancel() if ue, ok := err.(*url.Error); ok && ue.Timeout() { scrapesTimedout.Inc() } - return fmt.Errorf("cannot perform request to %q: %w", c.scrapeURL, err) + return false, fmt.Errorf("cannot perform request to %q: %w", c.scrapeURL, err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc() - respBody, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - cancel() - return fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q", + respBody, err := io.ReadAll(resp.Body) + if err != nil { + respBody = []byte(err.Error()) + } + return false, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q", c.scrapeURL, resp.StatusCode, http.StatusOK, respBody) } scrapesOK.Inc() @@ -158,21 +168,21 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error { // Read the data from resp.Body r := io.LimitReader(resp.Body, c.maxScrapeSize) _, err = dst.ReadFrom(r) - _ = resp.Body.Close() - cancel() if err != nil { if ue, ok := err.(*url.Error); ok && ue.Timeout() { scrapesTimedout.Inc() } - return fmt.Errorf("cannot read data from %s: %w", c.scrapeURL, err) + return false, fmt.Errorf("cannot read data from %s: %w", c.scrapeURL, err) } - if int64(len(dst.B)) >= c.maxScrapeSize { + if int64(dst.Len()) >= c.maxScrapeSize { maxScrapeSizeExceeded.Inc() - return fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize or max_scrape_size in the scrape config (%d bytes). "+ + return false, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize or max_scrape_size in the scrape config (%d bytes). "+ "Possible solutions are: reduce the response size for the target, increase -promscrape.maxScrapeSize command-line flag, "+ "increase max_scrape_size value in scrape config for the given target", c.scrapeURL, c.maxScrapeSize) } - return nil + + isGzipped := resp.Header.Get("Content-Encoding") == "gzip" + return isGzipped, nil } var ( diff --git a/lib/promscrape/client_test.go b/lib/promscrape/client_test.go index 8654c69826..fbf33862bd 100644 --- a/lib/promscrape/client_test.go +++ b/lib/promscrape/client_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" @@ -149,20 +149,25 @@ func TestClientProxyReadOk(t *testing.T) { // bump timeout for slow CIs ScrapeTimeout: 5 * time.Second, // force connection re-creating to avoid broken conns in slow CIs - DisableKeepAlive: true, - AuthConfig: newTestAuthConfig(t, isBackendTLS, backendAuth), - ProxyAuthConfig: newTestAuthConfig(t, isProxyTLS, proxyAuth), - MaxScrapeSize: 16000, + DisableKeepAlive: true, + AuthConfig: newTestAuthConfig(t, isBackendTLS, backendAuth), + ProxyAuthConfig: newTestAuthConfig(t, isProxyTLS, proxyAuth), + MaxScrapeSize: 16000, + DisableCompression: true, }) if err != nil { t.Fatalf("failed to create client: %s", err) } - var bb bytesutil.ByteBuffer - if err = c.ReadData(&bb); err != nil { + var cb chunkedbuffer.Buffer + isGzipped, err := c.ReadData(&cb) + if err != nil { t.Fatalf("unexpected error at ReadData: %s", err) } - got, err := io.ReadAll(bb.NewReader()) + if isGzipped { + t.Fatalf("the response musn't be gzipped") + } + got, err := io.ReadAll(cb.NewReader()) if err != nil { t.Fatalf("err read: %s", err) } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index d7b99e38e5..2ea94d74e7 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -26,6 +27,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" @@ -188,7 +190,7 @@ type scrapeWork struct { Config *ScrapeWork // ReadData is called for reading the scrape response data into dst. - ReadData func(dst *bytesutil.ByteBuffer) error + ReadData func(dst *chunkedbuffer.Buffer) (bool, error) // PushData is called for pushing collected data. // @@ -323,6 +325,12 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{}) // Do not send staleness markers on graceful shutdown as Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079 default: + // The code below is CPU-bound, while it may allocate big amounts of memory. + // That's why it is a good idea to limit the number of concurrent goroutines, + // which may execute this code, in order to limit memory usage under high load + // without sacrificing the performance. + processScrapedDataConcurrencyLimitCh <- struct{}{} + // Send staleness markers to all the metrics scraped last time from the target // when the given target disappears as Prometheus does. // Use the current real timestamp for staleness markers, so queries @@ -332,6 +340,8 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{}) lastScrapeStr := bytesutil.ToUnsafeString(bbLastScrape.B) sw.sendStaleSeries(lastScrapeStr, "", t, true) leveledbytebufferpool.Put(bbLastScrape) + + <-processScrapedDataConcurrencyLimitCh } if sl := sw.getSeriesLimiter(); sl != nil { sl.MustStop() @@ -402,28 +412,33 @@ func (sw *scrapeWork) needStreamParseMode(responseSize int) bool { // getTargetResponse() fetches response from sw target in the same way as when scraping the target. func (sw *scrapeWork) getTargetResponse() ([]byte, error) { - var bb bytesutil.ByteBuffer - if err := sw.ReadData(&bb); err != nil { + cb := chunkedbuffer.Get() + defer chunkedbuffer.Put(cb) + + isGzipped, err := sw.ReadData(cb) + if err != nil { return nil, err } - return bb.B, nil + + var bb bytesutil.ByteBuffer + err = readFromBuffer(&bb, cb, isGzipped) + chunkedbuffer.Put(cb) + return bb.B, err } func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { - body := leveledbytebufferpool.Get(sw.prevBodyLen) - - // Read the scrape response into body. - // It is OK to do for stream parsing mode, since the most of RAM + // Read the whole scrape response into cb. + // It is OK to do this for stream parsing mode, since the most of RAM // is occupied during parsing of the read response body below. // This also allows measuring the real scrape duration, which doesn't include // the time needed for processing of the read response. - err := sw.ReadData(body) + cb := chunkedbuffer.Get() + isGzipped, err := sw.ReadData(cb) // Measure scrape duration. endTimestamp := time.Now().UnixMilli() scrapeDurationSeconds := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(scrapeDurationSeconds) - scrapeResponseSize.Update(float64(len(body.B))) // The code below is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent goroutines, @@ -431,7 +446,19 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error // without sacrificing the performance. processScrapedDataConcurrencyLimitCh <- struct{}{} - if err == nil && sw.needStreamParseMode(len(body.B)) { + // Copy the read scrape response to body in order to parse it and send + // the parsed results to remote storage. + body := leveledbytebufferpool.Get(sw.prevBodyLen) + if err == nil { + err = readFromBuffer(body, cb, isGzipped) + } + chunkedbuffer.Put(cb) + + bodyLen := len(body.B) + sw.prevBodyLen = bodyLen + scrapeResponseSize.Update(float64(bodyLen)) + + if err == nil && sw.needStreamParseMode(bodyLen) { // Process response body from scrape target in streaming manner. // This case is optimized for targets exposing more than ten thousand of metrics per target, // such as kube-state-metrics. @@ -443,15 +470,33 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error err = sw.processDataOneShot(scrapeTimestamp, realTimestamp, body.B, scrapeDurationSeconds, err) } - <-processScrapedDataConcurrencyLimitCh - leveledbytebufferpool.Put(body) + <-processScrapedDataConcurrencyLimitCh + return err } var processScrapedDataConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) +func readFromBuffer(dst *bytesutil.ByteBuffer, src *chunkedbuffer.Buffer, isGzipped bool) error { + if !isGzipped { + src.MustWriteTo(dst) + return nil + } + + reader, err := protoparserutil.GetUncompressedReader(src.NewReader(), "gzip") + if err != nil { + return fmt.Errorf("cannot decompress response body: %w", err) + } + _, err = dst.ReadFrom(reader) + protoparserutil.PutUncompressedReader(reader) + if err != nil { + return fmt.Errorf("cannot read gzipped response body: %w", err) + } + return nil +} + func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, body []byte, scrapeDurationSeconds float64, err error) error { up := 1 @@ -497,6 +542,7 @@ func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, b samplesDropped = wc.applySeriesLimit(sw) } responseSize := len(bodyString) + am := &autoMetrics{ up: up, scrapeDurationSeconds: scrapeDurationSeconds, @@ -507,9 +553,9 @@ func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, b seriesLimitSamplesDropped: samplesDropped, } wc.addAutoMetrics(sw, am, scrapeTimestamp) + sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) - sw.prevBodyLen = responseSize writeRequestCtxPool.Put(wc) if !areIdenticalSeries { @@ -607,7 +653,6 @@ func (sw *scrapeWork) processDataInStreamMode(scrapeTimestamp, realTimestamp int seriesLimitSamplesDropped: int(samplesDroppedTotal.Load()), } sw.pushAutoMetrics(am, scrapeTimestamp) - sw.prevBodyLen = responseSize if !areIdenticalSeries { // Send stale markers for disappeared metrics with the real scrape timestamp diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 5fadaac692..659f5c0742 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" @@ -93,9 +93,9 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) { } readDataCalls := 0 - sw.ReadData = func(_ *bytesutil.ByteBuffer) error { + sw.ReadData = func(_ *chunkedbuffer.Buffer) (bool, error) { readDataCalls++ - return fmt.Errorf("error when reading data") + return false, fmt.Errorf("error when reading data") } pushDataCalls := 0 @@ -149,10 +149,10 @@ func testScrapeWorkScrapeInternalSuccess(t *testing.T, streamParse bool) { sw.Config = cfg readDataCalls := 0 - sw.ReadData = func(dst *bytesutil.ByteBuffer) error { + sw.ReadData = func(dst *chunkedbuffer.Buffer) (bool, error) { readDataCalls++ - dst.B = append(dst.B, data...) - return nil + dst.MustWrite([]byte(data)) + return false, nil } var pushDataMu sync.Mutex @@ -543,15 +543,14 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) { sw.Config = cfg readDataCalls := 0 - sw.ReadData = func(dst *bytesutil.ByteBuffer) error { + sw.ReadData = func(dst *chunkedbuffer.Buffer) (bool, error) { readDataCalls++ - dst.B = append(dst.B, data...) - return nil + dst.MustWrite([]byte(data)) + return false, nil } var pushDataCalls atomic.Int64 var pushedTimeseries atomic.Int64 - var pushDataErr error sw.PushData = func(_ *auth.Token, wr *prompbmarshal.WriteRequest) { pushDataCalls.Add(1) pushedTimeseries.Add(int64(len(wr.Timeseries))) @@ -568,9 +567,6 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) { } } tsmGlobal.Unregister(&sw) - if pushDataErr != nil { - t.Fatalf("unexpected error: %s", pushDataErr) - } if readDataCalls != 1 { t.Fatalf("unexpected number of readData calls; got %d; want %d", readDataCalls, 1) } @@ -630,8 +626,8 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) { func TestWriteRequestCtx_AddRowNoRelabeling(t *testing.T) { f := func(row string, cfg *ScrapeWork, dataExpected string) { t.Helper() - var wc writeRequestCtx r := parsePromRow(row) + var wc writeRequestCtx wc.addRow(cfg, r, r.Timestamp, false) tss := wc.writeRequest.Timeseries tssExpected := parseData(dataExpected) diff --git a/lib/promscrape/scrapework_timing_test.go b/lib/promscrape/scrapework_timing_test.go index 7913778f80..c8eedcb5e0 100644 --- a/lib/promscrape/scrapework_timing_test.go +++ b/lib/promscrape/scrapework_timing_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" ) @@ -133,12 +133,14 @@ func benchmarkScrapeWorkScrapeInternal(b *testing.B, data []byte, streamParse bo protoparserutil.StartUnmarshalWorkers() defer protoparserutil.StopUnmarshalWorkers() - readData := func(dst *bytesutil.ByteBuffer) error { - dst.B = append(dst.B, data...) - return nil + readData := func(dst *chunkedbuffer.Buffer) (bool, error) { + dst.MustWrite(data) + return false, nil } + b.ReportAllocs() b.SetBytes(int64(len(data))) + b.RunParallel(func(pb *testing.PB) { var sw scrapeWork sw.Config = &ScrapeWork{