mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
lib/promscrape: use chunkedbuffer.Buffer instead of bytesutil.ByteBuffer for reading response body from scrape targets
This reduces memory usage when reading large response bodies because the underlying buffer doesn't need to be re-allocated during the read of large response body in the buffer. Also decompress response body under the processScrapedDataConcurrencyLimitCh . This reduces CPU usage and RAM usage a bit when scraping thousands of targets.
This commit is contained in:
@@ -25,6 +25,13 @@ func (bb *ByteBuffer) Reset() {
|
||||
bb.B = bb.B[:0]
|
||||
}
|
||||
|
||||
// Grow grows bb capacity, so it can accept n bytes without additional allocations.
|
||||
func (bb *ByteBuffer) Grow(n int) {
|
||||
bLen := len(bb.B)
|
||||
bb.B = slicesutil.SetLength(bb.B, bLen+n)
|
||||
bb.B = bb.B[:bLen]
|
||||
}
|
||||
|
||||
// Write appends p to bb.
|
||||
func (bb *ByteBuffer) Write(p []byte) (int, error) {
|
||||
bb.B = append(bb.B, p...)
|
||||
|
||||
@@ -44,6 +44,8 @@ func TestByteBuffer(t *testing.T) {
|
||||
t.Fatalf("unexpected bb.B; got %q; want %q", bb.B, data1)
|
||||
}
|
||||
|
||||
bb.Grow(10)
|
||||
|
||||
data2 := []byte("1")
|
||||
n, err = bb.Write(data2)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,15 +1,40 @@
|
||||
package chunkedbuffer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
const chunkSize = 4 * 1024
|
||||
|
||||
// Get returns Buffer from the pool.
|
||||
//
|
||||
// Return back the Buffer to the pool via Put() call when it is no longer needed.
|
||||
func Get() *Buffer {
|
||||
v := cbPool.Get()
|
||||
if v == nil {
|
||||
return &Buffer{}
|
||||
}
|
||||
return v.(*Buffer)
|
||||
}
|
||||
|
||||
// Put returns cb to the pool, so it could be re-used via Get() call.
|
||||
//
|
||||
// The cb cannot be used after Put() call.
|
||||
func Put(cb *Buffer) {
|
||||
cb.Reset()
|
||||
cbPool.Put(cb)
|
||||
}
|
||||
|
||||
var cbPool sync.Pool
|
||||
|
||||
// Buffer provides in-memory buffer optimized for storing big bytes volumes.
|
||||
//
|
||||
// It stores the data in chunks of fixed size. This reduces memory fragmentation
|
||||
@@ -89,12 +114,45 @@ func (cb *Buffer) MustReadAt(p []byte, off int64) {
|
||||
}
|
||||
}
|
||||
|
||||
// ReadFrom reads all the data from r and appends it to cb.
|
||||
func (cb *Buffer) ReadFrom(r io.Reader) (int64, error) {
|
||||
v := copyBufPool.Get()
|
||||
if v == nil {
|
||||
v = new([16 * 1024]byte)
|
||||
}
|
||||
b := (v.(*[16 * 1024]byte))[:]
|
||||
|
||||
bytesRead := int64(0)
|
||||
for {
|
||||
n, err := r.Read(b)
|
||||
cb.MustWrite(b[:n])
|
||||
bytesRead += int64(n)
|
||||
if err != nil {
|
||||
copyBufPool.Put(v)
|
||||
if errors.Is(err, io.EOF) {
|
||||
return bytesRead, nil
|
||||
}
|
||||
return bytesRead, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var copyBufPool sync.Pool
|
||||
|
||||
// WriteTo writes cb data to w.
|
||||
func (cb *Buffer) WriteTo(w io.Writer) (int64, error) {
|
||||
if len(cb.chunks) == 0 {
|
||||
bLen := cb.Len()
|
||||
if bLen == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
switch t := w.(type) {
|
||||
case *bytesutil.ByteBuffer:
|
||||
t.Grow(bLen)
|
||||
case *bytes.Buffer:
|
||||
t.Grow(bLen)
|
||||
}
|
||||
|
||||
nTotal := 0
|
||||
|
||||
// Write all the chunks except the last one, which may be incomplete.
|
||||
@@ -123,6 +181,16 @@ func (cb *Buffer) WriteTo(w io.Writer) (int64, error) {
|
||||
return int64(nTotal), nil
|
||||
}
|
||||
|
||||
// MustWriteTo writes cb contents w.
|
||||
//
|
||||
// Use this function only if w cannot return errors. For example, if w is bytes.Buffer of bytesutil.ByteBuffer.
|
||||
// If w can return errors, then use WriteTo function instead.
|
||||
func (cb *Buffer) MustWriteTo(w io.Writer) {
|
||||
if _, err := cb.WriteTo(w); err != nil {
|
||||
logger.Panicf("BUG: unexpected error writing Buffer data to the provided writer: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Path returns cb path.
|
||||
func (cb *Buffer) Path() string {
|
||||
return fmt.Sprintf("Buffer/%p/mem", cb)
|
||||
@@ -185,8 +253,7 @@ func (r *reader) MustClose() {
|
||||
func getChunk() *[chunkSize]byte {
|
||||
v := chunkPool.Get()
|
||||
if v == nil {
|
||||
var chunk [chunkSize]byte
|
||||
return &chunk
|
||||
return new([chunkSize]byte)
|
||||
}
|
||||
return v.(*[chunkSize]byte)
|
||||
}
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
)
|
||||
|
||||
func TestBuffer(t *testing.T) {
|
||||
var cb Buffer
|
||||
cb := Get()
|
||||
defer Put(cb)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
cb.Reset()
|
||||
|
||||
@@ -84,8 +86,8 @@ func TestBuffer(t *testing.T) {
|
||||
}
|
||||
|
||||
// Copy the data to another chunked buffer via WriteTo.
|
||||
var cb2 Buffer
|
||||
n, err = cb.WriteTo(&cb2)
|
||||
cb2 := Get()
|
||||
n, err = cb.WriteTo(cb2)
|
||||
if err != nil {
|
||||
t.Fatalf("error when writing data to another chunked buffer: %s", err)
|
||||
}
|
||||
@@ -111,6 +113,40 @@ func TestBuffer(t *testing.T) {
|
||||
|
||||
// Verify MustClose at chunked buffer
|
||||
cb2.MustClose()
|
||||
|
||||
Put(cb2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuffer_ReadFrom(t *testing.T) {
|
||||
cb := Get()
|
||||
defer Put(cb)
|
||||
|
||||
bb := bytes.NewBufferString("foo")
|
||||
n, err := cb.ReadFrom(bb)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if n != 3 {
|
||||
t.Fatalf("unexpected number of bytes written: %d; want 3", n)
|
||||
}
|
||||
|
||||
bb = bytes.NewBufferString("bar")
|
||||
n, err = cb.ReadFrom(bb)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if n != 3 {
|
||||
t.Fatalf("unexpected number of bytes written: %d; want 3", n)
|
||||
}
|
||||
|
||||
var bbResult bytes.Buffer
|
||||
cb.MustWriteTo(&bbResult)
|
||||
|
||||
result := bbResult.String()
|
||||
resultExpected := "foobar"
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,13 +200,7 @@ func TestBuffer_ReaderSingleChunk(t *testing.T) {
|
||||
func TestBuffer_WriteToZeroData(t *testing.T) {
|
||||
var cb Buffer
|
||||
var bb bytes.Buffer
|
||||
n, err := cb.WriteTo(&bb)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if n != 0 {
|
||||
t.Fatalf("unexpected data written from cb with len=%d", n)
|
||||
}
|
||||
cb.MustWriteTo(&bb)
|
||||
if bbLen := bb.Len(); bbLen != 0 {
|
||||
t.Fatalf("unexpected data written to bb with len=%d; data=%q", bbLen, bb.Bytes())
|
||||
}
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
@@ -40,6 +40,7 @@ type client struct {
|
||||
setHeaders func(req *http.Request) error
|
||||
setProxyHeaders func(req *http.Request) error
|
||||
maxScrapeSize int64
|
||||
disableCompression bool
|
||||
}
|
||||
|
||||
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
@@ -82,12 +83,15 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
tr.Proxy = proxyURLFunc
|
||||
tr.TLSHandshakeTimeout = 10 * time.Second
|
||||
tr.IdleConnTimeout = 2 * sw.ScrapeInterval
|
||||
tr.DisableCompression = *disableCompression || sw.DisableCompression
|
||||
tr.DisableKeepAlives = *disableKeepAlive || sw.DisableKeepAlive
|
||||
tr.DialContext = dialFunc
|
||||
tr.MaxIdleConnsPerHost = 100
|
||||
tr.MaxResponseHeaderBytes = int64(maxResponseHeadersSize.N)
|
||||
|
||||
// The client handles response compression manually in order to optimize it.
|
||||
// See client.ReadData
|
||||
tr.DisableCompression = true
|
||||
|
||||
hc := &http.Client{
|
||||
Transport: ac.NewRoundTripper(tr),
|
||||
Timeout: sw.ScrapeTimeout,
|
||||
@@ -106,17 +110,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
||||
setHeaders: setHeaders,
|
||||
setProxyHeaders: setProxyHeaders,
|
||||
maxScrapeSize: sw.MaxScrapeSize,
|
||||
disableCompression: *disableCompression || sw.DisableCompression,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
|
||||
func (c *client) ReadData(dst *chunkedbuffer.Buffer) (bool, error) {
|
||||
deadline := time.Now().Add(c.c.Timeout)
|
||||
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.scrapeURL, nil)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
|
||||
return false, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
// The following `Accept` header has been copied from Prometheus sources.
|
||||
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
|
||||
@@ -129,28 +135,32 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
|
||||
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
||||
req.Header.Set("User-Agent", "vm_promscrape")
|
||||
if err := c.setHeaders(req); err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
||||
return false, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
if err := c.setProxyHeaders(req); err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
||||
return false, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
if !c.disableCompression {
|
||||
req.Header.Set("Accept-Encoding", "gzip")
|
||||
}
|
||||
|
||||
scrapeRequests.Inc()
|
||||
resp, err := c.c.Do(req)
|
||||
if err != nil {
|
||||
cancel()
|
||||
if ue, ok := err.(*url.Error); ok && ue.Timeout() {
|
||||
scrapesTimedout.Inc()
|
||||
}
|
||||
return fmt.Errorf("cannot perform request to %q: %w", c.scrapeURL, err)
|
||||
return false, fmt.Errorf("cannot perform request to %q: %w", c.scrapeURL, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
cancel()
|
||||
return fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
respBody = []byte(err.Error())
|
||||
}
|
||||
return false, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
|
||||
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
|
||||
}
|
||||
scrapesOK.Inc()
|
||||
@@ -158,21 +168,21 @@ func (c *client) ReadData(dst *bytesutil.ByteBuffer) error {
|
||||
// Read the data from resp.Body
|
||||
r := io.LimitReader(resp.Body, c.maxScrapeSize)
|
||||
_, err = dst.ReadFrom(r)
|
||||
_ = resp.Body.Close()
|
||||
cancel()
|
||||
if err != nil {
|
||||
if ue, ok := err.(*url.Error); ok && ue.Timeout() {
|
||||
scrapesTimedout.Inc()
|
||||
}
|
||||
return fmt.Errorf("cannot read data from %s: %w", c.scrapeURL, err)
|
||||
return false, fmt.Errorf("cannot read data from %s: %w", c.scrapeURL, err)
|
||||
}
|
||||
if int64(len(dst.B)) >= c.maxScrapeSize {
|
||||
if int64(dst.Len()) >= c.maxScrapeSize {
|
||||
maxScrapeSizeExceeded.Inc()
|
||||
return fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize or max_scrape_size in the scrape config (%d bytes). "+
|
||||
return false, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize or max_scrape_size in the scrape config (%d bytes). "+
|
||||
"Possible solutions are: reduce the response size for the target, increase -promscrape.maxScrapeSize command-line flag, "+
|
||||
"increase max_scrape_size value in scrape config for the given target", c.scrapeURL, c.maxScrapeSize)
|
||||
}
|
||||
return nil
|
||||
|
||||
isGzipped := resp.Header.Get("Content-Encoding") == "gzip"
|
||||
return isGzipped, nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
@@ -149,20 +149,25 @@ func TestClientProxyReadOk(t *testing.T) {
|
||||
// bump timeout for slow CIs
|
||||
ScrapeTimeout: 5 * time.Second,
|
||||
// force connection re-creating to avoid broken conns in slow CIs
|
||||
DisableKeepAlive: true,
|
||||
AuthConfig: newTestAuthConfig(t, isBackendTLS, backendAuth),
|
||||
ProxyAuthConfig: newTestAuthConfig(t, isProxyTLS, proxyAuth),
|
||||
MaxScrapeSize: 16000,
|
||||
DisableKeepAlive: true,
|
||||
AuthConfig: newTestAuthConfig(t, isBackendTLS, backendAuth),
|
||||
ProxyAuthConfig: newTestAuthConfig(t, isProxyTLS, proxyAuth),
|
||||
MaxScrapeSize: 16000,
|
||||
DisableCompression: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %s", err)
|
||||
}
|
||||
|
||||
var bb bytesutil.ByteBuffer
|
||||
if err = c.ReadData(&bb); err != nil {
|
||||
var cb chunkedbuffer.Buffer
|
||||
isGzipped, err := c.ReadData(&cb)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error at ReadData: %s", err)
|
||||
}
|
||||
got, err := io.ReadAll(bb.NewReader())
|
||||
if isGzipped {
|
||||
t.Fatalf("the response musn't be gzipped")
|
||||
}
|
||||
got, err := io.ReadAll(cb.NewReader())
|
||||
if err != nil {
|
||||
t.Fatalf("err read: %s", err)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -26,6 +27,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
@@ -188,7 +190,7 @@ type scrapeWork struct {
|
||||
Config *ScrapeWork
|
||||
|
||||
// ReadData is called for reading the scrape response data into dst.
|
||||
ReadData func(dst *bytesutil.ByteBuffer) error
|
||||
ReadData func(dst *chunkedbuffer.Buffer) (bool, error)
|
||||
|
||||
// PushData is called for pushing collected data.
|
||||
//
|
||||
@@ -323,6 +325,12 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{})
|
||||
// Do not send staleness markers on graceful shutdown as Prometheus does.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079
|
||||
default:
|
||||
// The code below is CPU-bound, while it may allocate big amounts of memory.
|
||||
// That's why it is a good idea to limit the number of concurrent goroutines,
|
||||
// which may execute this code, in order to limit memory usage under high load
|
||||
// without sacrificing the performance.
|
||||
processScrapedDataConcurrencyLimitCh <- struct{}{}
|
||||
|
||||
// Send staleness markers to all the metrics scraped last time from the target
|
||||
// when the given target disappears as Prometheus does.
|
||||
// Use the current real timestamp for staleness markers, so queries
|
||||
@@ -332,6 +340,8 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{})
|
||||
lastScrapeStr := bytesutil.ToUnsafeString(bbLastScrape.B)
|
||||
sw.sendStaleSeries(lastScrapeStr, "", t, true)
|
||||
leveledbytebufferpool.Put(bbLastScrape)
|
||||
|
||||
<-processScrapedDataConcurrencyLimitCh
|
||||
}
|
||||
if sl := sw.getSeriesLimiter(); sl != nil {
|
||||
sl.MustStop()
|
||||
@@ -402,28 +412,33 @@ func (sw *scrapeWork) needStreamParseMode(responseSize int) bool {
|
||||
|
||||
// getTargetResponse() fetches response from sw target in the same way as when scraping the target.
|
||||
func (sw *scrapeWork) getTargetResponse() ([]byte, error) {
|
||||
var bb bytesutil.ByteBuffer
|
||||
if err := sw.ReadData(&bb); err != nil {
|
||||
cb := chunkedbuffer.Get()
|
||||
defer chunkedbuffer.Put(cb)
|
||||
|
||||
isGzipped, err := sw.ReadData(cb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bb.B, nil
|
||||
|
||||
var bb bytesutil.ByteBuffer
|
||||
err = readFromBuffer(&bb, cb, isGzipped)
|
||||
chunkedbuffer.Put(cb)
|
||||
return bb.B, err
|
||||
}
|
||||
|
||||
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
|
||||
body := leveledbytebufferpool.Get(sw.prevBodyLen)
|
||||
|
||||
// Read the scrape response into body.
|
||||
// It is OK to do for stream parsing mode, since the most of RAM
|
||||
// Read the whole scrape response into cb.
|
||||
// It is OK to do this for stream parsing mode, since the most of RAM
|
||||
// is occupied during parsing of the read response body below.
|
||||
// This also allows measuring the real scrape duration, which doesn't include
|
||||
// the time needed for processing of the read response.
|
||||
err := sw.ReadData(body)
|
||||
cb := chunkedbuffer.Get()
|
||||
isGzipped, err := sw.ReadData(cb)
|
||||
|
||||
// Measure scrape duration.
|
||||
endTimestamp := time.Now().UnixMilli()
|
||||
scrapeDurationSeconds := float64(endTimestamp-realTimestamp) / 1e3
|
||||
scrapeDuration.Update(scrapeDurationSeconds)
|
||||
scrapeResponseSize.Update(float64(len(body.B)))
|
||||
|
||||
// The code below is CPU-bound, while it may allocate big amounts of memory.
|
||||
// That's why it is a good idea to limit the number of concurrent goroutines,
|
||||
@@ -431,7 +446,19 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||
// without sacrificing the performance.
|
||||
processScrapedDataConcurrencyLimitCh <- struct{}{}
|
||||
|
||||
if err == nil && sw.needStreamParseMode(len(body.B)) {
|
||||
// Copy the read scrape response to body in order to parse it and send
|
||||
// the parsed results to remote storage.
|
||||
body := leveledbytebufferpool.Get(sw.prevBodyLen)
|
||||
if err == nil {
|
||||
err = readFromBuffer(body, cb, isGzipped)
|
||||
}
|
||||
chunkedbuffer.Put(cb)
|
||||
|
||||
bodyLen := len(body.B)
|
||||
sw.prevBodyLen = bodyLen
|
||||
scrapeResponseSize.Update(float64(bodyLen))
|
||||
|
||||
if err == nil && sw.needStreamParseMode(bodyLen) {
|
||||
// Process response body from scrape target in streaming manner.
|
||||
// This case is optimized for targets exposing more than ten thousand of metrics per target,
|
||||
// such as kube-state-metrics.
|
||||
@@ -443,15 +470,33 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||
err = sw.processDataOneShot(scrapeTimestamp, realTimestamp, body.B, scrapeDurationSeconds, err)
|
||||
}
|
||||
|
||||
<-processScrapedDataConcurrencyLimitCh
|
||||
|
||||
leveledbytebufferpool.Put(body)
|
||||
|
||||
<-processScrapedDataConcurrencyLimitCh
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
var processScrapedDataConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
func readFromBuffer(dst *bytesutil.ByteBuffer, src *chunkedbuffer.Buffer, isGzipped bool) error {
|
||||
if !isGzipped {
|
||||
src.MustWriteTo(dst)
|
||||
return nil
|
||||
}
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(src.NewReader(), "gzip")
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decompress response body: %w", err)
|
||||
}
|
||||
_, err = dst.ReadFrom(reader)
|
||||
protoparserutil.PutUncompressedReader(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read gzipped response body: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, body []byte, scrapeDurationSeconds float64, err error) error {
|
||||
up := 1
|
||||
|
||||
@@ -497,6 +542,7 @@ func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, b
|
||||
samplesDropped = wc.applySeriesLimit(sw)
|
||||
}
|
||||
responseSize := len(bodyString)
|
||||
|
||||
am := &autoMetrics{
|
||||
up: up,
|
||||
scrapeDurationSeconds: scrapeDurationSeconds,
|
||||
@@ -507,9 +553,9 @@ func (sw *scrapeWork) processDataOneShot(scrapeTimestamp, realTimestamp int64, b
|
||||
seriesLimitSamplesDropped: samplesDropped,
|
||||
}
|
||||
wc.addAutoMetrics(sw, am, scrapeTimestamp)
|
||||
|
||||
sw.pushData(&wc.writeRequest)
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
sw.prevBodyLen = responseSize
|
||||
writeRequestCtxPool.Put(wc)
|
||||
|
||||
if !areIdenticalSeries {
|
||||
@@ -607,7 +653,6 @@ func (sw *scrapeWork) processDataInStreamMode(scrapeTimestamp, realTimestamp int
|
||||
seriesLimitSamplesDropped: int(samplesDroppedTotal.Load()),
|
||||
}
|
||||
sw.pushAutoMetrics(am, scrapeTimestamp)
|
||||
sw.prevBodyLen = responseSize
|
||||
|
||||
if !areIdenticalSeries {
|
||||
// Send stale markers for disappeared metrics with the real scrape timestamp
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
@@ -93,9 +93,9 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
readDataCalls := 0
|
||||
sw.ReadData = func(_ *bytesutil.ByteBuffer) error {
|
||||
sw.ReadData = func(_ *chunkedbuffer.Buffer) (bool, error) {
|
||||
readDataCalls++
|
||||
return fmt.Errorf("error when reading data")
|
||||
return false, fmt.Errorf("error when reading data")
|
||||
}
|
||||
|
||||
pushDataCalls := 0
|
||||
@@ -149,10 +149,10 @@ func testScrapeWorkScrapeInternalSuccess(t *testing.T, streamParse bool) {
|
||||
sw.Config = cfg
|
||||
|
||||
readDataCalls := 0
|
||||
sw.ReadData = func(dst *bytesutil.ByteBuffer) error {
|
||||
sw.ReadData = func(dst *chunkedbuffer.Buffer) (bool, error) {
|
||||
readDataCalls++
|
||||
dst.B = append(dst.B, data...)
|
||||
return nil
|
||||
dst.MustWrite([]byte(data))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var pushDataMu sync.Mutex
|
||||
@@ -543,15 +543,14 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
|
||||
sw.Config = cfg
|
||||
|
||||
readDataCalls := 0
|
||||
sw.ReadData = func(dst *bytesutil.ByteBuffer) error {
|
||||
sw.ReadData = func(dst *chunkedbuffer.Buffer) (bool, error) {
|
||||
readDataCalls++
|
||||
dst.B = append(dst.B, data...)
|
||||
return nil
|
||||
dst.MustWrite([]byte(data))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var pushDataCalls atomic.Int64
|
||||
var pushedTimeseries atomic.Int64
|
||||
var pushDataErr error
|
||||
sw.PushData = func(_ *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
pushDataCalls.Add(1)
|
||||
pushedTimeseries.Add(int64(len(wr.Timeseries)))
|
||||
@@ -568,9 +567,6 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
tsmGlobal.Unregister(&sw)
|
||||
if pushDataErr != nil {
|
||||
t.Fatalf("unexpected error: %s", pushDataErr)
|
||||
}
|
||||
if readDataCalls != 1 {
|
||||
t.Fatalf("unexpected number of readData calls; got %d; want %d", readDataCalls, 1)
|
||||
}
|
||||
@@ -630,8 +626,8 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
|
||||
func TestWriteRequestCtx_AddRowNoRelabeling(t *testing.T) {
|
||||
f := func(row string, cfg *ScrapeWork, dataExpected string) {
|
||||
t.Helper()
|
||||
var wc writeRequestCtx
|
||||
r := parsePromRow(row)
|
||||
var wc writeRequestCtx
|
||||
wc.addRow(cfg, r, r.Timestamp, false)
|
||||
tss := wc.writeRequest.Timeseries
|
||||
tssExpected := parseData(dataExpected)
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
||||
)
|
||||
@@ -133,12 +133,14 @@ func benchmarkScrapeWorkScrapeInternal(b *testing.B, data []byte, streamParse bo
|
||||
protoparserutil.StartUnmarshalWorkers()
|
||||
defer protoparserutil.StopUnmarshalWorkers()
|
||||
|
||||
readData := func(dst *bytesutil.ByteBuffer) error {
|
||||
dst.B = append(dst.B, data...)
|
||||
return nil
|
||||
readData := func(dst *chunkedbuffer.Buffer) (bool, error) {
|
||||
dst.MustWrite(data)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(data)))
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var sw scrapeWork
|
||||
sw.Config = &ScrapeWork{
|
||||
|
||||
Reference in New Issue
Block a user