mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Revert "app/vmauth: align request body buffering flags"
This reverts commitb3c03c023c. Reason for revert: the original logic was correct from the user's perspective: - The -maxRequestBodySizeToRetry command-line flag controls the size of the request body, which could be retried on backend failure. The meaining of this flag wasn't changed after the introduction of the -requestBufferSize flag in the commite31abfc25c(see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10309 ) - The -requestBufferSize flag controls the size of the buffer for reading request body before sending sending it to the backend and before applying concurrency limits. These flags are independent from user's perspective. The fact that these flags share the implementation, sholdn't be known to the user - this is an implementation detail, which allows avoiding double buffering. Both flags enable request buffering. If the user wants disabling of all the request buffering, then both flags must be set to 0. That's why these flags are cross-mentioned in their -help descriptions. Also the reverted commit had the following issues: - It reduced the default value for the -requestBufferSize flag from 32KiB to 16KiB. The 32KiB value has been calculated and justified at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10309 . It shouldn't increase vmagent memory usage too much for typical workloads. For example, if vmagent handles 10K concurrent requests, then the memory overhead for the request buffering will be 10K*32KiB=320MiB. This is a small price for being able to efficiently handling 10K concurrent requests. - It added a dot to the end of the https://docs.victoriametrics.com/victoriametrics/vmauth/#request-body-buffering link in the description for the description of the -requestBufferSize flag. This breaks clicking the link in some environments, since the trailing dot is considered as a part of the url. - It added a superflouous whitespace in front of the 'Disabling request buffering' text inside the description for the -requstBufferSize flag. - It introduced an unnecessary complexity to the user by mentioning that the zero value at -maxBufferSize disables buffering for request reties (these things must be independent from the user's perspective). - It changed the bufferedBody logic in non-trivial ways, which aren't related to the original issue. If these changes are needed, then they must be justified in a separate issue and must be prepared in a separate pull request / commit. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10675 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10677
This commit is contained in:
@@ -47,13 +47,12 @@ var (
|
||||
"It is recommended setting this value to values smaller than -http.idleConnTimeout set at backend services")
|
||||
responseTimeout = flag.Duration("responseTimeout", 5*time.Minute, "The timeout for receiving a response from backend")
|
||||
|
||||
requestBufferSize = flagutil.NewBytes("requestBufferSize", defaultBodyBufferSize, "The size of the buffer for reading the request body before proxying the request to backends. "+
|
||||
requestBufferSize = flagutil.NewBytes("requestBufferSize", 32*1024, "The size of the buffer for reading the request body before proxying the request to backends. "+
|
||||
"This allows reducing the consumption of backend resources when processing requests from clients connected via slow networks. "+
|
||||
"Set to 0 to disable request buffering. See https://docs.victoriametrics.com/victoriametrics/vmauth/#request-body-buffering. "+
|
||||
" Disabling request buffering also disables request retries configured with '-maxRequestBodySizeToRetry'")
|
||||
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", defaultBodyBufferSize, "The maximum request body size in memory for potential retries at other backends. "+
|
||||
"Request bodies larger than this size cannot be retried if the backend fails. Zero or negative value disables request retries. "+
|
||||
"See also '-requestBufferSize' flag, which controlls a size of the buffer for potential retry.")
|
||||
"Set to 0 to disable request buffering. See https://docs.victoriametrics.com/victoriametrics/vmauth/#request-body-buffering")
|
||||
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size to buffer in memory for potential retries at other backends. "+
|
||||
"Request bodies larger than this size cannot be retried if the backend fails. Zero or negative value disables request body buffering and retries. "+
|
||||
"See also -requestBufferSize")
|
||||
|
||||
maxConcurrentRequests = flag.Int("maxConcurrentRequests", 1000, "The maximum number of concurrent requests vmauth can process simultaneously. "+
|
||||
"Requests exceeding this limit are queued for up to -maxQueueDuration and then rejected with '429 Too Many Requests' http status code if the limit is still reached. "+
|
||||
@@ -90,8 +89,6 @@ var (
|
||||
"Recommended when vmauth is exposed to the internet.")
|
||||
)
|
||||
|
||||
const defaultBodyBufferSize = 16 * 1024
|
||||
|
||||
func main() {
|
||||
// Write flags and help message to stdout, since it is easier to grep or pipe.
|
||||
flag.CommandLine.SetOutput(os.Stdout)
|
||||
@@ -352,25 +349,13 @@ func endConcurrencyLimit() {
|
||||
<-concurrencyLimitCh
|
||||
}
|
||||
|
||||
func getMaxRequestBufSize() int {
|
||||
rbs := requestBufferSize.IntN()
|
||||
rbstr := maxRequestBodySizeToRetry.IntN()
|
||||
if rbs <= 0 {
|
||||
// request buffering disabled
|
||||
// it also disables request retries as a side effect
|
||||
return 0
|
||||
}
|
||||
// for backward compatibility we must use max value for both flags
|
||||
return max(rbs, rbstr)
|
||||
}
|
||||
|
||||
func bufferRequestBody(ctx context.Context, r io.ReadCloser, userName string) (io.ReadCloser, error) {
|
||||
if r == nil {
|
||||
// This is a GET request with nil reader.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
maxBufSize := getMaxRequestBufSize()
|
||||
maxBufSize := max(requestBufferSize.IntN(), maxRequestBodySizeToRetry.IntN())
|
||||
if maxBufSize <= 0 {
|
||||
return r, nil
|
||||
}
|
||||
@@ -401,7 +386,7 @@ func bufferRequestBody(ctx context.Context, r io.ReadCloser, userName string) (i
|
||||
}
|
||||
}
|
||||
|
||||
bb := newBufferedBody(r, buf, maxBufSize, maxRequestBodySizeToRetry.IntN())
|
||||
bb := newBufferedBody(r, buf, maxBufSize)
|
||||
return bb, nil
|
||||
}
|
||||
|
||||
@@ -825,14 +810,11 @@ type bufferedBody struct {
|
||||
// bufOffset is the offset at buf for already read bytes.
|
||||
bufOffset int
|
||||
|
||||
// cannotRetry is set to true if buf size exceeds max retry body size
|
||||
// cannotRetry is set to true after Close() call on non-nil r.
|
||||
cannotRetry bool
|
||||
|
||||
// bodyWasClosed is set to true at Close if request retry is not possible
|
||||
bodyWasClosed bool
|
||||
}
|
||||
|
||||
func newBufferedBody(r io.ReadCloser, buf []byte, maxBufSize, maxBufRetrySize int) *bufferedBody {
|
||||
func newBufferedBody(r io.ReadCloser, buf []byte, maxBufSize int) *bufferedBody {
|
||||
// Do not use sync.Pool here, since http.RoundTrip may still use request body after return.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8051
|
||||
|
||||
@@ -842,15 +824,14 @@ func newBufferedBody(r io.ReadCloser, buf []byte, maxBufSize, maxBufRetrySize in
|
||||
}
|
||||
|
||||
return &bufferedBody{
|
||||
r: r,
|
||||
buf: buf,
|
||||
cannotRetry: len(buf) > maxBufRetrySize || maxBufRetrySize == 0,
|
||||
r: r,
|
||||
buf: buf,
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements io.Reader interface.
|
||||
func (bb *bufferedBody) Read(p []byte) (int, error) {
|
||||
if bb.bodyWasClosed {
|
||||
if bb.cannotRetry {
|
||||
return 0, fmt.Errorf("cannot read already closed body")
|
||||
}
|
||||
if bb.bufOffset < len(bb.buf) {
|
||||
@@ -865,16 +846,14 @@ func (bb *bufferedBody) Read(p []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (bb *bufferedBody) canRetry() bool {
|
||||
return !bb.cannotRetry
|
||||
return bb.r == nil
|
||||
}
|
||||
|
||||
// Close implements io.Closer interface.
|
||||
func (bb *bufferedBody) Close() error {
|
||||
bb.resetReader()
|
||||
if bb.cannotRetry {
|
||||
bb.bodyWasClosed = true
|
||||
}
|
||||
if bb.r != nil {
|
||||
bb.cannotRetry = true
|
||||
return bb.r.Close()
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -1714,6 +1714,13 @@ func TestBufferRequestBody_Failure(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
defaultMaxRequestBodySizeToRetry := maxRequestBodySizeToRetry.String()
|
||||
defer func() {
|
||||
if err := maxRequestBodySizeToRetry.Set(defaultMaxRequestBodySizeToRetry); err != nil {
|
||||
t.Fatalf("cannot reset maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
defaultMaxQueueDuration := *maxQueueDuration
|
||||
defer func() {
|
||||
*maxQueueDuration = defaultMaxQueueDuration
|
||||
@@ -1722,6 +1729,9 @@ func TestBufferRequestBody_Failure(t *testing.T) {
|
||||
f := func(body *mockBody, expectedResponse string) {
|
||||
t.Helper()
|
||||
|
||||
if err := maxRequestBodySizeToRetry.Set("0"); err != nil {
|
||||
t.Fatalf("cannot set maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
if err := requestBufferSize.Set("2048"); err != nil {
|
||||
t.Fatalf("cannot set requestBufferSize: %s", err)
|
||||
}
|
||||
@@ -1834,6 +1844,16 @@ func TestBufferedBody_RetrySuccess(t *testing.T) {
|
||||
t.Fatalf("cannot set requestBufferSize: %s", err)
|
||||
}
|
||||
|
||||
defaultMaxRequestBodySizeToRetry := maxRequestBodySizeToRetry.String()
|
||||
defer func() {
|
||||
if err := maxRequestBodySizeToRetry.Set(defaultMaxRequestBodySizeToRetry); err != nil {
|
||||
t.Fatalf("cannot reset maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
}()
|
||||
if err := maxRequestBodySizeToRetry.Set("0"); err != nil {
|
||||
t.Fatalf("cannot set maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
rb, err := bufferRequestBody(ctx, io.NopCloser(bytes.NewBufferString(s)), "foo")
|
||||
if err != nil {
|
||||
@@ -1882,6 +1902,16 @@ func TestBufferedBody_RetrySuccessPartialRead(t *testing.T) {
|
||||
t.Fatalf("cannot set requestBufferSize: %s", err)
|
||||
}
|
||||
|
||||
defaultMaxRequestBodySizeToRetry := maxRequestBodySizeToRetry.String()
|
||||
defer func() {
|
||||
if err := maxRequestBodySizeToRetry.Set(defaultMaxRequestBodySizeToRetry); err != nil {
|
||||
t.Fatalf("cannot reset maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
}()
|
||||
if err := maxRequestBodySizeToRetry.Set("0"); err != nil {
|
||||
t.Fatalf("cannot set maxRequestBodySizeToRetry: %s", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
rb, err := bufferRequestBody(ctx, io.NopCloser(bytes.NewBufferString(s)), "foo")
|
||||
if err != nil {
|
||||
@@ -1934,6 +1964,16 @@ func TestBufferedBody_RetryFailureTooBigBody(t *testing.T) {
|
||||
f := func(s string, maxBodySize int) {
|
||||
t.Helper()
|
||||
|
||||
defaultRequestBufferSize := requestBufferSize.String()
|
||||
defer func() {
|
||||
if err := requestBufferSize.Set(defaultRequestBufferSize); err != nil {
|
||||
t.Fatalf("cannot reset requestBufferSize: %s", err)
|
||||
}
|
||||
}()
|
||||
if err := requestBufferSize.Set("0"); err != nil {
|
||||
t.Fatalf("cannot set requestBufferSize: %s", err)
|
||||
}
|
||||
|
||||
defaultMaxRequestBodySizeToRetry := maxRequestBodySizeToRetry.String()
|
||||
defer func() {
|
||||
if err := maxRequestBodySizeToRetry.Set(defaultMaxRequestBodySizeToRetry); err != nil {
|
||||
@@ -2009,9 +2049,10 @@ func TestBufferedBody_RetryFailureZeroOrNegativeMaxBodySize(t *testing.T) {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
bb, ok := rb.(*bufferedBody)
|
||||
canRetry := ok && bb.canRetry()
|
||||
if canRetry {
|
||||
t.Fatalf("canRetry() must return false before reading anything")
|
||||
canRetry := !ok || bb.canRetry()
|
||||
|
||||
if !canRetry {
|
||||
t.Fatalf("canRetry() must return true before reading anything")
|
||||
}
|
||||
data, err := io.ReadAll(rb)
|
||||
if err != nil {
|
||||
@@ -2028,8 +2069,8 @@ func TestBufferedBody_RetryFailureZeroOrNegativeMaxBodySize(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in io.ReadAll: %s", err)
|
||||
}
|
||||
if len(data) > 0 {
|
||||
t.Fatalf("unexpected non-empty data read\ngot\n%s", data)
|
||||
if string(data) != s {
|
||||
t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", data, s)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user