lib/httputil: add NewTransport() function for creating pre-initialized net/http.Transport

This commit is contained in:
Aliaksandr Valialkin
2025-03-26 18:38:24 +01:00
parent cd94593383
commit e5f4826964
18 changed files with 136 additions and 72 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
@@ -124,14 +125,15 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
if err != nil {
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tr := &http.Transport{
DialContext: netutil.NewStatDialFunc("vmagent_remotewrite"),
TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx),
MaxConnsPerHost: 2 * concurrency,
MaxIdleConnsPerHost: 2 * concurrency,
IdleConnTimeout: time.Minute,
WriteBufferSize: 64 * 1024,
}
tr := httputil.NewTransport(false)
tr.DialContext = netutil.NewStatDialFunc("vmagent_remotewrite")
tr.TLSHandshakeTimeout = tlsHandshakeTimeout.GetOptionalArg(argIdx)
tr.MaxConnsPerHost = 2 * concurrency
tr.MaxIdleConnsPerHost = 2 * concurrency
tr.IdleConnTimeout = time.Minute
tr.WriteBufferSize = 64 * 1024
pURL := proxyURL.GetOptionalArg(argIdx)
if len(pURL) > 0 {
if !strings.Contains(pURL, "://") {

View File

@@ -16,6 +16,7 @@ import (
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@@ -92,7 +93,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
cfg.Transport = httputil.NewTransport(false)
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {

View File

@@ -23,6 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
@@ -507,7 +508,7 @@ func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, i
return nil, fmt.Errorf("cannot initialize promauth.Config: %w", err)
}
tr := http.DefaultTransport.(*http.Transport).Clone()
tr := httputil.NewTransport(false)
tr.ResponseHeaderTimeout = *responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr.DisableCompression = true

View File

@@ -308,10 +308,13 @@ func main() {
return fmt.Errorf("failed to create TLS Config: %s", err)
}
srcHTTPClient := &http.Client{Transport: &http.Transport{
DisableKeepAlives: disableKeepAlive,
TLSClientConfig: srcTC,
}}
trSrc := httputil.NewTransport(false)
trSrc.DisableKeepAlives = disableKeepAlive
trSrc.TLSClientConfig = srcTC
srcHTTPClient := &http.Client{
Transport: trSrc,
}
dstAddr := strings.Trim(c.String(vmNativeDstAddr), "/")
dstExtraLabels := c.StringSlice(vmExtraLabel)
@@ -335,10 +338,13 @@ func main() {
return fmt.Errorf("failed to create TLS Config: %s", err)
}
dstHTTPClient := &http.Client{Transport: &http.Transport{
DisableKeepAlives: disableKeepAlive,
TLSClientConfig: dstTC,
}}
trDst := httputil.NewTransport(false)
trDst.DisableKeepAlives = disableKeepAlive
trDst.TLSClientConfig = dstTC
dstHTTPClient := &http.Client{
Transport: trDst,
}
p := vmNativeProcessor{
rateLimit: c.Int64(vmRateLimit),

View File

@@ -2,7 +2,6 @@ package main
import (
"context"
"net/http"
"testing"
"time"
@@ -14,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
)
func TestRemoteRead(t *testing.T) {
@@ -70,8 +70,11 @@ func TestRemoteRead(t *testing.T) {
{
name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1,
Transport: http.DefaultTransport.(*http.Transport)},
vmCfg: vm.Config{
Addr: "",
Concurrency: 1,
Transport: httputil.NewTransport(false),
},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,

View File

@@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
@@ -65,17 +66,24 @@ func TestVMNativeProcessorRun(t *testing.T) {
t.Fatalf("cannot add series to storage: %s", err)
}
tr := httputil.NewTransport(false)
tr.DisableKeepAlives = false
srcClient := &native.Client{
AuthCfg: nil,
Addr: src.URL(),
ExtraLabels: []string{},
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
HTTPClient: &http.Client{
Transport: tr,
},
}
dstClient := &native.Client{
AuthCfg: nil,
Addr: dst.URL(),
ExtraLabels: []string{},
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
HTTPClient: &http.Client{
Transport: tr,
},
}
isSilent = true

View File

@@ -8,6 +8,8 @@ import (
"strconv"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
)
// Client is used for interacting with the apps over the network.
@@ -22,7 +24,7 @@ type Client struct {
func NewClient() *Client {
return &Client{
httpCli: &http.Client{
Transport: &http.Transport{},
Transport: httputil.NewTransport(false),
},
}
}

View File

@@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -136,10 +137,13 @@ func (fs *FS) Init() error {
}
if fs.TLSInsecureSkipVerify {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
tr := httputil.NewTransport(false)
tr.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
cfg.HTTPClient = &http.Client{
Transport: tr,
}
cfg.HTTPClient = &http.Client{Transport: tr}
}
var outerErr error

View File

@@ -15,13 +15,13 @@ func Transport(URL, certFile, keyFile, caFile, serverName string, insecureSkipVe
if err != nil {
return nil, fmt.Errorf("failed to parse URL: %w", err)
}
t := http.DefaultTransport.(*http.Transport).Clone()
tr := NewTransport(false)
tlsCfg, err := TLSConfig(certFile, keyFile, caFile, serverName, insecureSkipVerify)
if err != nil {
return nil, err
}
t.TLSClientConfig = tlsCfg
return t, nil
tr.TLSClientConfig = tlsCfg
return tr, nil
}
// TLSConfig creates tls.Config object from provided arguments

22
lib/httputil/transport.go Normal file
View File

@@ -0,0 +1,22 @@
package httputil
import (
"net/http"
)
// NewTransport returns pre-initialized http.Transport with sane defaults.
//
// It is OK to change settings of the returned transport before its' usage.
//
// If enableHTTP2 is set, then the returned transport is ready for http2 requests.
//
// It is recommended disabling http2 support, since it is too bloated, slow and contains many security breaches.
// See https://www.google.com/search?q=http2+security+issues .
// Also, http2 doesn't bring any advantages over http/1.1 when communicating with server backends.
func NewTransport(enableHTTP2 bool) *http.Transport {
tr := http.DefaultTransport.(*http.Transport).Clone()
if !enableHTTP2 {
tr.Protocols = nil
}
return tr
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
@@ -232,10 +233,10 @@ func urlValuesFromMap(m map[string]string) url.Values {
}
func (oi *oauth2ConfigInternal) initTokenSource() error {
tr := httputil.NewTransport(false)
tr.Proxy = oi.proxyURLFunc
c := &http.Client{
Transport: oi.ac.NewRoundTripper(&http.Transport{
Proxy: oi.proxyURLFunc,
}),
Transport: oi.ac.NewRoundTripper(tr),
}
oi.ctx = context.WithValue(context.Background(), oauth2.HTTPClient, c)
oi.tokenSource = oi.cfg.TokenSource(oi.ctx)

View File

@@ -18,6 +18,8 @@ import (
"time"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
)
func TestOptionsNewConfigFailure(t *testing.T) {
@@ -643,8 +645,9 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
t.Fatalf("unexpected error when parsing config: %s", err)
}
tr := httputil.NewTransport(false)
client := http.Client{
Transport: ac.NewRoundTripper(&http.Transport{}),
Transport: ac.NewRoundTripper(tr),
}
resp, err := client.Do(&http.Request{

View File

@@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
@@ -77,18 +78,19 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
}
}
tr := httputil.NewTransport(false)
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = 2 * sw.ScrapeInterval
tr.DisableCompression = *disableCompression || sw.DisableCompression
tr.DisableKeepAlives = *disableKeepAlive || sw.DisableKeepAlive
tr.DialContext = dialFunc
tr.MaxIdleConnsPerHost = 100
tr.MaxResponseHeaderBytes = int64(maxResponseHeadersSize.N)
hc := &http.Client{
Transport: ac.NewRoundTripper(&http.Transport{
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: dialFunc,
MaxIdleConnsPerHost: 100,
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
}),
Timeout: sw.ScrapeTimeout,
Transport: ac.NewRoundTripper(tr),
Timeout: sw.ScrapeTimeout,
}
if sw.DenyRedirects {
hc.CheckRedirect = func(_ *http.Request, _ []*http.Request) error {

View File

@@ -12,6 +12,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
)
@@ -86,7 +87,8 @@ func (tps *testProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
resp, err := http.DefaultTransport.RoundTrip(r)
tr := httputil.NewTransport(false)
resp, err := tr.RoundTrip(r)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return

View File

@@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@@ -259,7 +260,7 @@ func getHTTPClient(ac *promauth.Config, proxyURL *url.URL) *http.Client {
return c
}
tr := http.DefaultTransport.(*http.Transport).Clone()
tr := httputil.NewTransport(true)
tr.DialContext = netutil.Dialer.DialContext
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = *apiServerTimeout

View File

@@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
@@ -73,11 +74,13 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
if port == 0 {
port = 80
}
tr := httputil.NewTransport(false)
tr.MaxIdleConnsPerHost = 100
cfg := &apiConfig{
client: &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 100,
},
Transport: tr,
},
availability: sdc.Availability,
region: sdc.Region,
@@ -94,9 +97,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg.client.CloseIdleConnections()
return nil, fmt.Errorf("cannot parse TLS config: %w", err)
}
cfg.client.Transport = ac.NewRoundTripper(&http.Transport{
MaxIdleConnsPerHost: 100,
})
cfg.client.Transport = ac.NewRoundTripper(tr)
}
// use public compute endpoint by default
if len(cfg.availability) == 0 {

View File

@@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutil"
@@ -47,10 +48,10 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
tr := &http.Transport{
MaxIdleConnsPerHost: 100,
}
tr := httputil.NewTransport(false)
tr.MaxIdleConnsPerHost = 100
rt := http.RoundTripper(tr)
if sdc.TLSConfig != nil {
opts := &promauth.Options{
BaseDir: baseDir,

View File

@@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
@@ -110,25 +111,28 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
proxyURLFunc = http.ProxyURL(pu)
}
tr := httputil.NewTransport(false)
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.MaxIdleConnsPerHost = *maxConcurrency
tr.ResponseHeaderTimeout = DefaultClientReadTimeout
tr.DialContext = dialFunc
client := &http.Client{
Timeout: DefaultClientReadTimeout,
Transport: ac.NewRoundTripper(&http.Transport{
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: *maxConcurrency,
ResponseHeaderTimeout: DefaultClientReadTimeout,
DialContext: dialFunc,
}),
Timeout: DefaultClientReadTimeout,
Transport: ac.NewRoundTripper(tr),
}
trBlocking := httputil.NewTransport(false)
trBlocking.Proxy = proxyURLFunc
trBlocking.TLSHandshakeTimeout = 10 * time.Second
trBlocking.MaxIdleConnsPerHost = 1000
trBlocking.ResponseHeaderTimeout = BlockingClientReadTimeout
trBlocking.DialContext = dialFunc
blockingClient := &http.Client{
Timeout: BlockingClientReadTimeout,
Transport: ac.NewRoundTripper(&http.Transport{
Proxy: proxyURLFunc,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 1000,
ResponseHeaderTimeout: BlockingClientReadTimeout,
DialContext: dialFunc,
}),
Timeout: BlockingClientReadTimeout,
Transport: ac.NewRoundTripper(trBlocking),
}
setHTTPHeaders := func(_ *http.Request) error { return nil }