lib/httputil: automatically initialize data transfer metrics for the created HTTP transports via NewTransport()

This commit is contained in:
Aliaksandr Valialkin
2025-03-27 11:35:03 +01:00
parent 7c05ec42fe
commit 35b31f904d
25 changed files with 57 additions and 54 deletions

View File

@@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
@@ -126,8 +125,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tr := httputil.NewTransport(false)
tr.DialContext = netutil.NewStatDialFunc("vmagent_remotewrite")
tr := httputil.NewTransport(false, "vmagent_remotewrite")
tr.TLSHandshakeTimeout = tlsHandshakeTimeout.GetOptionalArg(argIdx)
tr.MaxConnsPerHost = 2 * concurrency
tr.MaxIdleConnsPerHost = 2 * concurrency

View File

@@ -11,7 +11,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -83,11 +82,10 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
if err := httputil.CheckURL(*addr); err != nil {
return nil, fmt.Errorf("invalid -datasource.url: %w", err)
}
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vmalert_datasource")
if err != nil {
return nil, fmt.Errorf("failed to create transport for -datasource.url=%q: %w", *addr, err)
}
tr.DialContext = netutil.NewStatDialFunc("vmalert_datasource")
tr.DisableKeepAlives = *disableKeepAlive
tr.MaxIdleConnsPerHost = *maxIdleConnections
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {

View File

@@ -161,7 +161,7 @@ func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg proma
if authCfg.TLSConfig != nil {
tls = authCfg.TLSConfig
}
tr, err := promauth.NewTLSTransport(tls.CertFile, tls.KeyFile, tls.CAFile, tls.ServerName, tls.InsecureSkipVerify)
tr, err := promauth.NewTLSTransport(tls.CertFile, tls.KeyFile, tls.CAFile, tls.ServerName, tls.InsecureSkipVerify, "vmalert_notifier")
if err != nil {
return nil, fmt.Errorf("failed to create transport for alertmanager URL=%q: %w", alertManagerURL, err)
@@ -198,8 +198,10 @@ func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, authCfg proma
argFunc: fn,
authCfg: aCfg,
relabelConfigs: relabelCfg,
client: &http.Client{Transport: tr},
timeout: timeout,
metrics: newNotifierMetrics(alertManagerURL),
client: &http.Client{
Transport: tr,
},
timeout: timeout,
metrics: newNotifierMetrics(alertManagerURL),
}, nil
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -70,12 +69,11 @@ func Init() (datasource.QuerierBuilder, error) {
if err := httputil.CheckURL(*addr); err != nil {
return nil, fmt.Errorf("invalid -remoteRead.url: %w", err)
}
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vmalert_remoteread")
if err != nil {
return nil, fmt.Errorf("failed to create transport for -remoteRead.url=%q: %w", *addr, err)
}
tr.IdleConnTimeout = *idleConnectionTimeout
tr.DialContext = netutil.NewStatDialFunc("vmalert_remoteread")
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {

View File

@@ -93,7 +93,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = httputil.NewTransport(false)
cfg.Transport = httputil.NewTransport(false, "vmalert_remotewrite")
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {

View File

@@ -33,7 +33,7 @@ func NewDebugClient() (*DebugClient, error) {
if err := httputil.CheckURL(*addr); err != nil {
return nil, fmt.Errorf("invalid -remoteWrite.url: %w", err)
}
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vmalert_remotewrite_debug")
if err != nil {
return nil, fmt.Errorf("failed to create transport for -remoteWrite.url=%q: %w", *addr, err)
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -73,12 +72,11 @@ func Init(ctx context.Context) (*Client, error) {
if err := httputil.CheckURL(*addr); err != nil {
return nil, fmt.Errorf("invalid -remoteWrite.url: %w", err)
}
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vmalert_remotewrite")
if err != nil {
return nil, fmt.Errorf("failed to create transport for -remoteWrite.url=%q: %w", *addr, err)
}
tr.IdleConnTimeout = *idleConnectionTimeout
tr.DialContext = netutil.NewStatDialFunc("vmalert_remotewrite")
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {

View File

@@ -508,7 +508,7 @@ func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, i
return nil, fmt.Errorf("cannot initialize promauth.Config: %w", err)
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vmauth_backend")
tr.ResponseHeaderTimeout = *responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr.DisableCompression = true
@@ -517,7 +517,6 @@ func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, i
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost
}
tr.DialContext = netutil.NewStatDialFunc("vmauth_backend")
rt := cfg.NewRoundTripper(tr)
return rt, nil

View File

@@ -70,7 +70,7 @@ func main() {
return fmt.Errorf("invalid -%s: %w", otsdbAddr, err)
}
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify)
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_opentsdb")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %s", otsdbAddr, addr, err)
}
@@ -185,7 +185,7 @@ func main() {
serverName := c.String(remoteReadServerName)
insecureSkipVerify := c.Bool(remoteReadInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify)
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_remoteread")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %s", remoteReadSrcAddr, addr, err)
}
@@ -315,7 +315,7 @@ func main() {
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trSrc := httputil.NewTransport(false)
trSrc := httputil.NewTransport(false, "vmctl_src")
trSrc.DisableKeepAlives = disableKeepAlive
trSrc.TLSClientConfig = srcTC
@@ -345,7 +345,7 @@ func main() {
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trDst := httputil.NewTransport(false)
trDst := httputil.NewTransport(false, "vmctl_dst")
trDst.DisableKeepAlives = disableKeepAlive
trDst.TLSClientConfig = dstTC
@@ -455,7 +455,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
serverName := c.String(vmServerName)
insecureSkipVerify := c.Bool(vmInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify)
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_client")
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %s", vmAddr, addr, err)
}

View File

@@ -73,7 +73,7 @@ func TestRemoteRead(t *testing.T) {
vmCfg: vm.Config{
Addr: "",
Concurrency: 1,
Transport: httputil.NewTransport(false),
Transport: httputil.NewTransport(false, "vmctl_test_read"),
},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",

View File

@@ -67,7 +67,7 @@ func TestVMNativeProcessorRun(t *testing.T) {
t.Fatalf("cannot add series to storage: %s", err)
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "test_client")
tr.DisableKeepAlives = false
srcClient := &native.Client{

View File

@@ -24,7 +24,7 @@ type Client struct {
func NewClient() *Client {
return &Client{
httpCli: &http.Client{
Transport: httputil.NewTransport(false),
Transport: httputil.NewTransport(false, "apptest_client"),
},
}
}

View File

@@ -136,14 +136,14 @@ func (fs *FS) Init() error {
return err
}
tr := httputil.NewTransport(false, "vmbackup_s3_client")
if fs.TLSInsecureSkipVerify {
tr := httputil.NewTransport(false)
tr.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
cfg.HTTPClient = &http.Client{
Transport: tr,
}
}
cfg.HTTPClient = &http.Client{
Transport: tr,
}
var outerErr error

View File

@@ -2,10 +2,14 @@ package httputil
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
// NewTransport returns pre-initialized http.Transport with sane defaults.
//
// Data transfer for the returned transport is tracked with metrics, which are exposed with the given metricsPrefix.
//
// It is OK to change settings of the returned transport before its' usage.
//
// If enableHTTP2 is set, then the returned transport is ready for http2 requests.
@@ -13,9 +17,13 @@ import (
// It is recommended disabling http2 support, since it is too bloated, slow and contains many security breaches.
// See https://www.google.com/search?q=http2+security+issues .
// Also, http2 doesn't bring any advantages over http/1.1 when communicating with server backends.
func NewTransport(enableHTTP2 bool) *http.Transport {
func NewTransport(enableHTTP2 bool, metricsPrefix string) *http.Transport {
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.DialContext = netutil.NewStatDialFunc(metricsPrefix)
if !enableHTTP2 {
// Disable automatically enabled http2
tr.ForceAttemptHTTP2 = false
tr.TLSNextProto = nil
tr.Protocols = nil
}
return tr

View File

@@ -233,7 +233,7 @@ func urlValuesFromMap(m map[string]string) url.Values {
}
func (oi *oauth2ConfigInternal) initTokenSource() error {
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vm_oauth_client")
tr.Proxy = oi.proxyURLFunc
c := &http.Client{
Transport: oi.ac.NewRoundTripper(tr),

View File

@@ -645,7 +645,7 @@ func TestTLSConfigWithCertificatesFilesUpdate(t *testing.T) {
t.Fatalf("unexpected error when parsing config: %s", err)
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "test_client")
client := http.Client{
Transport: ac.NewRoundTripper(tr),
}

View File

@@ -8,13 +8,13 @@ import (
)
// NewTLSTransport creates a new http.Transport from the provided args.
func NewTLSTransport(certFile, keyFile, caFile, serverName string, insecureSkipVerify bool) (*http.Transport, error) {
func NewTLSTransport(certFile, keyFile, caFile, serverName string, insecureSkipVerify bool, metricsPrefix string) (*http.Transport, error) {
tlsCfg, err := NewTLSConfig(certFile, keyFile, caFile, serverName, insecureSkipVerify)
if err != nil {
return nil, err
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, metricsPrefix)
tr.TLSClientConfig = tlsCfg
return tr, nil

View File

@@ -50,7 +50,7 @@ func TestNewTLSTransport(t *testing.T) {
var certFile, keyFile, caFile, serverName string
var insecureSkipVerify bool
tr, err := NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify)
tr, err := NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "test_client")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -78,7 +78,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
}
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vm_promscrape")
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = 2 * sw.ScrapeInterval

View File

@@ -87,7 +87,7 @@ func (tps *testProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "test_client")
resp, err := tr.RoundTrip(r)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)

View File

@@ -260,15 +260,8 @@ func getHTTPClient(ac *promauth.Config, proxyURL *url.URL) *http.Client {
return c
}
tr := httputil.NewTransport(true)
tr.DialContext = netutil.Dialer.DialContext
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = *apiServerTimeout
tr.MaxIdleConnsPerHost = 100
tr := newHTTPTransport(*useHTTP2Client)
if !*useHTTP2Client {
// Disable http2.
tr.Protocols = nil
// Proxy is not supported for http2 client.
// See https://github.com/golang/go/issues/26479
var proxy func(*http.Request) (*url.URL, error)
@@ -286,6 +279,15 @@ func getHTTPClient(ac *promauth.Config, proxyURL *url.URL) *http.Client {
return c
}
func newHTTPTransport(enableHTTP2 bool) *http.Transport {
tr := httputil.NewTransport(enableHTTP2, "vm_promscrape_discovery_kubernetes")
tr.DialContext = netutil.Dialer.DialContext
tr.TLSHandshakeTimeout = 10 * time.Second
tr.IdleConnTimeout = *apiServerTimeout
tr.MaxIdleConnsPerHost = 100
return tr
}
func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
client := getHTTPClient(ac, proxyURL)
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -75,7 +75,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
port = 80
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vm_promscrape_discovery_openstack")
tr.MaxIdleConnsPerHost = 100
cfg := &apiConfig{

View File

@@ -48,7 +48,7 @@ func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vm_promscrape_discovery_yandex")
tr.MaxIdleConnsPerHost = 100
rt := http.RoundTripper(tr)

View File

@@ -111,7 +111,7 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
proxyURLFunc = http.ProxyURL(pu)
}
tr := httputil.NewTransport(false)
tr := httputil.NewTransport(false, "vm_promscrape_discovery")
tr.Proxy = proxyURLFunc
tr.TLSHandshakeTimeout = 10 * time.Second
tr.MaxIdleConnsPerHost = *maxConcurrency
@@ -123,7 +123,7 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *proxy.URL, proxy
Transport: ac.NewRoundTripper(tr),
}
trBlocking := httputil.NewTransport(false)
trBlocking := httputil.NewTransport(false, "vm_promscrape_discovery")
trBlocking.Proxy = proxyURLFunc
trBlocking.TLSHandshakeTimeout = 10 * time.Second
trBlocking.MaxIdleConnsPerHost = 1000

View File

@@ -36,7 +36,7 @@ func Create(createSnapshotURL string) (string, error) {
}
// create Transport
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vm_snapshot_client")
if err != nil {
return "", fmt.Errorf("failed to create transport for -snapshot.createURL=%q: %s", createSnapshotURL, err)
}
@@ -83,7 +83,7 @@ func Delete(deleteSnapshotURL string, snapshotName string) error {
return fmt.Errorf("cannot parse -snapshot.deleteURL: %w", err)
}
// create Transport
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vm_snapshot_client")
if err != nil {
return fmt.Errorf("failed to create transport for -snapshot.deleteURL=%q: %s", deleteSnapshotURL, err)