Compare commits

...

1 Commits

Author SHA1 Message Date
f41gh7
5c5d3658a0 lib/httputil: add load-balancing http transport
This commit adds http client round-robin load-balancing with DNS
 discovery. It http client to route HTTP requests evenly for each discovered IP
address for A DNS record.

 Discovered IP addresses are cached locally for 30 seconds and discovered automatically
on `hostname not found` error.

 It's useful to remove intermediate vmauth as a load-balancer between
 vmagent and remote storages. Which simplifies components management and
 reduces operational overhead.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2388
2026-06-08 19:38:13 +02:00
4 changed files with 296 additions and 0 deletions

View File

@@ -151,10 +151,20 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
tr.Proxy = http.ProxyURL(pu)
}
hc := &http.Client{
Transport: authCfg.NewRoundTripper(tr),
Timeout: sendTimeout.GetOptionalArg(argIdx),
}
rwURL, err := url.Parse(remoteWriteURL)
if err != nil {
logger.Fatalf("BUGL cannot parse already parsed -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
if strings.HasPrefix(rwURL.Host, "dns+") {
rwURL.Host = rwURL.Host[4:]
remoteWriteURL = rwURL.String()
hc.Transport = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
}
retryMaxIntervalFlag := retryMaxTime
if retryMaxInterval.String() != "" {
retryMaxIntervalFlag = retryMaxInterval

View File

@@ -431,6 +431,22 @@ and `-remoteWrite.streamAggr.config`:
There is also the `-promscrape.configCheckInterval` command-line flag, which can be used to automatically reload configs from the updated `-promscrape.config` file.
## DNS URLs
If `vmagent` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
Each discovered IP address is used for round-robin balancing of write requests.
DNS URLs are supported in the following places:
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428/api/v1/write` is automatically resolved into
`-remoteWrite.url=http://192.168.1.15:8428/api/v1/write`.
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
which returns multiple IP addresses for a single hostname.
## SRV URLs
If `vmagent` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)

View File

@@ -0,0 +1,136 @@
package httputil
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/netip"
"net/url"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
// NewLoadBalancerTransport returns new RoundTripper that performs round-robin HTTP requests loadbalancing
// based on discovered A dns records for the given url host
func NewLoadBalancerTransport(origin http.RoundTripper, url *url.URL) http.RoundTripper {
host, port, err := net.SplitHostPort(url.Host)
if err != nil {
host = url.Host
}
t := &loadbalancerTransport{
host: host,
port: port,
tr: origin,
}
t.discoverBackends(context.Background())
return t
}
type loadbalancerTransport struct {
tr http.RoundTripper
host string
port string
discovering atomic.Bool
lastResolvedAt atomic.Uint64
dbs atomic.Pointer[discoveredBackends]
}
type discoveredBackends struct {
backends []string
idx atomic.Uint64
}
// RoundTrip implements http.RoundTripper interface
func (lb *loadbalancerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
backend := lb.pickBackend(r.Context())
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
r2 := r.Clone(r.Context())
r2.URL.Host = backend
if r2.Host == "" {
r2.Host = r.URL.Host
}
resp, err := lb.tr.RoundTrip(r2)
if err != nil {
var dnsErr *net.DNSError
// perform a single retry for in case of dns lookup error
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
lb.lastResolvedAt.Store(0)
backend := lb.pickBackend(r.Context())
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
r2 = r.Clone(r.Context())
if r2.Host == "" {
r2.Host = r.URL.Host
}
r2.URL.Host = backend
resp, err = lb.tr.RoundTrip(r2)
}
}
return resp, err
}
func (lb *loadbalancerTransport) pickBackend(ctx context.Context) string {
dbs := lb.dbs.Load()
if dbs == nil || fasttime.UnixTimestamp()-lb.lastResolvedAt.Load() > 30 {
if newDBS := lb.discoverBackends(ctx); newDBS != nil {
dbs = newDBS
}
}
if dbs == nil || len(dbs.backends) == 0 {
return ""
}
idx := dbs.idx.Add(1) - 1
return dbs.backends[idx%uint64(len(dbs.backends))]
}
func (lb *loadbalancerTransport) discoverBackends(ctx context.Context) *discoveredBackends {
// prevent concurrent dns lookup
if !lb.discovering.CompareAndSwap(false, true) {
return lb.dbs.Load()
}
defer lb.discovering.Store(false)
addrs, err := netutil.Resolver.LookupIPAddr(ctx, lb.host)
if err != nil {
logger.Errorf("cannot discover ips for host: %q: %s", lb.host, err)
return lb.dbs.Load()
}
backends := make([]string, 0, len(addrs))
for _, addr := range addrs {
if !netutil.TCP6Enabled() {
ip, ok := netip.AddrFromSlice(addr.IP)
if !ok {
logger.Panicf("BUG: cannot build ip from slice addr: %q", addr.IP.String())
}
if !ip.Unmap().Is4() {
continue
}
}
ip := addr.IP.String()
if len(lb.port) > 0 {
ip = net.JoinHostPort(ip, lb.port)
}
backends = append(backends, ip)
}
rand.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
dbs := &discoveredBackends{
backends: backends,
}
lb.dbs.Store(dbs)
lb.lastResolvedAt.Store(fasttime.UnixTimestamp())
return dbs
}

View File

@@ -0,0 +1,134 @@
package httputil
import (
"context"
"fmt"
"net"
"net/http"
"net/netip"
"net/url"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
type testRemoteServer struct {
mu sync.Mutex
requestsPerHost map[string]int
totalRequests int
firstError error
}
func (trs *testRemoteServer) RoundTrip(r *http.Request) (*http.Response, error) {
trs.mu.Lock()
if trs.firstError != nil && trs.totalRequests == 0 {
err := trs.firstError
trs.firstError = nil
trs.totalRequests++
trs.mu.Unlock()
return nil, err
}
trs.totalRequests++
if trs.requestsPerHost == nil {
trs.requestsPerHost = make(map[string]int)
}
trs.requestsPerHost[r.URL.Host]++
trs.mu.Unlock()
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
}
type testDNSResolver struct {
ips []net.IPAddr
}
func (tdr *testDNSResolver) LookupSRV(_ context.Context, _, _, name string) (cname string, addrs []*net.SRV, err error) {
return "", nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func (tdr *testDNSResolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
return tdr.ips, nil
}
func (tdr *testDNSResolver) LookupMX(_ context.Context, name string) ([]*net.MX, error) {
return nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func TestLoadbalancerTransport(t *testing.T) {
f := func(discoveredIPs []string, trs *testRemoteServer) {
t.Helper()
parsedIPs := make([]net.IPAddr, 0, len(discoveredIPs))
for _, dIP := range discoveredIPs {
pIP, err := netip.ParseAddr(dIP)
if err != nil {
t.Fatalf("cannot parse IP=%q: %s", dIP, err)
}
parsedIPs = append(parsedIPs, net.IPAddr{IP: pIP.AsSlice()})
}
tdr := &testDNSResolver{ips: parsedIPs}
originResolver := netutil.Resolver
defer func() { netutil.Resolver = originResolver }()
netutil.Resolver = tdr
requestURL, err := url.Parse("http://vmsingle.example.com:8429/api/v1/write")
if err != nil {
t.Fatalf("cannot parse url: %s", err)
}
lbt := NewLoadBalancerTransport(trs, requestURL)
if len(discoveredIPs) == 0 {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
_, err = lbt.RoundTrip(r)
if err == nil {
t.Fatalf("expected no backends found error")
}
return
}
expectedRequestsPerHost := 2
for range len(discoveredIPs) * expectedRequestsPerHost {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
resp, err := lbt.RoundTrip(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resp.Body.Close()
}
requestsPerHost := trs.requestsPerHost
for _, dIP := range discoveredIPs {
expectedHostPort := net.JoinHostPort(dIP, "8429")
gotRequestsPerHost, ok := requestsPerHost[expectedHostPort]
if !ok {
t.Fatalf("not found expected backend request for: %q", expectedHostPort)
}
if gotRequestsPerHost != expectedRequestsPerHost {
t.Fatalf("unexpected requests per host: %d:%d (-;+)", expectedRequestsPerHost, gotRequestsPerHost)
}
}
}
trs := testRemoteServer{}
f([]string{"1.1.1.1"}, &trs)
trs = testRemoteServer{}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// retry dns resolve error
trs = testRemoteServer{
firstError: &net.DNSError{Err: "no such host", IsNotFound: true},
}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// empty backends, expecting error
trs = testRemoteServer{}
f([]string{}, &trs)
}