Compare commits

...

3 Commits

Author SHA1 Message Date
f41gh7
e848e6c05f add changelog entry 2026-06-09 15:49:46 +02:00
f41gh7
35bb0cbf07 add loadbalancer to vmalert 2026-06-09 12:10:35 +02:00
f41gh7
a991f6279c lib/httputil: add load-balancing http transport
This commit adds http client round-robin load-balancing with DNS
 discovery. It http client to route HTTP requests evenly for each discovered IP
address for A DNS record.

 Discovered IP addresses are cached locally for 30 seconds and discovered automatically
on `hostname not found` error.

 It's useful to remove intermediate vmauth as a load-balancer between
 vmagent and remote storages. Which simplifies components management and
 reduces operational overhead.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2388
2026-06-09 12:10:35 +02:00
9 changed files with 357 additions and 9 deletions

View File

@@ -151,10 +151,20 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
tr.Proxy = http.ProxyURL(pu)
}
hc := &http.Client{
Transport: authCfg.NewRoundTripper(tr),
Timeout: sendTimeout.GetOptionalArg(argIdx),
}
rwURL, err := url.Parse(remoteWriteURL)
if err != nil {
logger.Fatalf("BUGL cannot parse already parsed -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
if strings.HasPrefix(rwURL.Host, "dns+") {
rwURL.Host = rwURL.Host[4:]
remoteWriteURL = rwURL.String()
hc.Transport = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
}
retryMaxIntervalFlag := retryMaxTime
if retryMaxInterval.String() != "" {
retryMaxIntervalFlag = retryMaxInterval

View File

@@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -94,6 +95,16 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost
}
tr.IdleConnTimeout = *idleConnectionTimeout
hc := &http.Client{Transport: tr}
datasourceURL, err := url.Parse(*addr)
if err != nil {
logger.Fatalf("BUG: cannot parse already parsed -datasource.url=%q: %s", *addr, err)
}
if strings.HasPrefix(datasourceURL.Host, "dns+") {
datasourceURL.Host = datasourceURL.Host[4:]
tr := httputil.NewLoadBalancerTransport(tr, datasourceURL)
hc.Transport = tr
}
if extraParams == nil {
extraParams = url.Values{}
@@ -120,9 +131,9 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
}
return &Client{
c: &http.Client{Transport: tr},
c: hc,
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(*addr, "/"),
datasourceURL: strings.TrimSuffix(datasourceURL.String(), "/"),
appendTypePrefix: *appendTypePrefix,
queryStep: *queryStep,
extraParams: extraParams,

View File

@@ -4,12 +4,15 @@ import (
"flag"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/vmalertutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
@@ -76,6 +79,15 @@ func Init() (datasource.QuerierBuilder, error) {
return nil, fmt.Errorf("failed to create transport for -remoteRead.url=%q: %w", *addr, err)
}
tr.IdleConnTimeout = *idleConnectionTimeout
c := &http.Client{Transport: tr}
rrURL, err := url.Parse(*addr)
if err != nil {
logger.Fatalf("BUG: cannot parse already parsed -remoteRead.url=%q: %s", *addr, err)
}
if strings.HasPrefix(rrURL.Host, "dns+") {
rrURL.Host = rrURL.Host[4:]
c.Transport = httputil.NewLoadBalancerTransport(tr, rrURL)
}
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {
@@ -89,6 +101,5 @@ func Init() (datasource.QuerierBuilder, error) {
if err != nil {
return nil, fmt.Errorf("failed to configure auth: %w", err)
}
c := &http.Client{Transport: tr}
return datasource.NewPrometheusClient(*addr, authCfg, false, c), nil
return datasource.NewPrometheusClient(rrURL.String(), authCfg, false, c), nil
}

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"sync"
@@ -111,12 +112,21 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
hc := &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
}
rwURL, err := url.Parse(cfg.Addr)
if err != nil {
logger.Fatalf("cannot parse already parsed -remoteWrite.url=%q: %s", cfg.Addr, err)
}
if strings.HasPrefix(rwURL.Host, "dns+") {
rwURL.Host = rwURL.Host[4:]
hc.Transport = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
c: hc,
addr: strings.TrimSuffix(rwURL.String(), "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add client side round-robin load-balancing with `DNS` discovery. See [#2388](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2388) and these [vmagent DNS URLs](https://docs.victoriametrics.com/victoriametrics/vmagent/#dns-urls), [vmalert DNS URLs](https://docs.victoriametrics.com/victoriametrics/vmalert/#dns-urls).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -431,6 +431,22 @@ and `-remoteWrite.streamAggr.config`:
There is also the `-promscrape.configCheckInterval` command-line flag, which can be used to automatically reload configs from the updated `-promscrape.config` file.
## DNS URLs
If `vmagent` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
Each discovered IP address is used for round-robin balancing of write requests.
DNS URLs are supported in the following places:
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428/api/v1/write` is automatically resolved into
`-remoteWrite.url=http://192.168.1.15:8428/api/v1/write`.
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
which returns multiple IP addresses for a single hostname.
## SRV URLs
If `vmagent` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)

View File

@@ -1470,6 +1470,22 @@ alert_relabel_configs:
The configuration file can be [hot-reloaded](#hot-config-reload).
## DNS URLs
If `vmalert` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
Each discovered IP address is used for round-robin balancing of write requests.
DNS URLs are supported in the following places:
* In `-remoteWrite.url`, `-remoteRead.url` and `-datasource.url` command-line flags. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428` is automatically resolved into
`-remoteWrite.url=http://192.168.1.15:8428`.
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
which returns multiple IP addresses for a single hostname.
## Contributing
`vmalert` is mostly designed and built by VictoriaMetrics community.

View File

@@ -0,0 +1,138 @@
package httputil
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/netip"
"net/url"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
// NewLoadBalancerTransport returns new RoundTripper that performs round-robin HTTP requests loadbalancing
// based on discovered A dns records for the given url host
func NewLoadBalancerTransport(origin http.RoundTripper, url *url.URL) http.RoundTripper {
host, port, err := net.SplitHostPort(url.Host)
if err != nil {
host = url.Host
}
t := &loadbalancerTransport{
host: host,
port: port,
tr: origin,
}
t.discoverBackendsLocked(context.Background())
return t
}
type loadbalancerTransport struct {
tr http.RoundTripper
host string
port string
// mu protects fields below
mu sync.Mutex
lastResolvedAt time.Time
dbs *discoveredBackends
}
type discoveredBackends struct {
backends []string
idx uint64
}
// RoundTrip implements http.RoundTripper interface
func (lb *loadbalancerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
backend := lb.pickBackend(r.Context(), false)
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
r2 := r.Clone(r.Context())
r2.URL.Host = backend
if r2.Host == "" {
r2.Host = r.URL.Host
}
resp, err := lb.tr.RoundTrip(r2)
if err != nil {
var dnsErr *net.DNSError
// perform a single retry for in case of dns lookup error
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
backend := lb.pickBackend(r.Context(), true)
if backend == "" {
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
}
r2 = r.Clone(r.Context())
if r2.Host == "" {
r2.Host = r.URL.Host
}
r2.URL.Host = backend
resp, err = lb.tr.RoundTrip(r2)
}
}
return resp, err
}
func (lb *loadbalancerTransport) pickBackend(ctx context.Context, forceDiscovery bool) string {
ct := time.Now()
lb.mu.Lock()
defer lb.mu.Unlock()
if forceDiscovery && !ct.Before(lb.lastResolvedAt) {
// prevent concurrent force discovery
lb.lastResolvedAt = time.Time{}
}
if lb.dbs == nil || ct.Sub(lb.lastResolvedAt) > 30*time.Second {
lb.discoverBackendsLocked(ctx)
}
if lb.dbs == nil || len(lb.dbs.backends) == 0 {
return ""
}
idx := lb.dbs.idx
lb.dbs.idx++
return lb.dbs.backends[idx%uint64(len(lb.dbs.backends))]
}
func (lb *loadbalancerTransport) discoverBackendsLocked(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
addrs, err := netutil.Resolver.LookupIPAddr(ctx, lb.host)
if err != nil {
logger.Errorf("cannot discover ips for host: %q: %s", lb.host, err)
return
}
backends := make([]string, 0, len(addrs))
for _, addr := range addrs {
if !netutil.TCP6Enabled() {
ip, ok := netip.AddrFromSlice(addr.IP)
if !ok {
logger.Panicf("BUG: cannot build netip Addr from slice addr: %q", addr.IP.String())
}
if !ip.Unmap().Is4() {
continue
}
}
ip := addr.IP.String()
if len(lb.port) > 0 {
ip = net.JoinHostPort(ip, lb.port)
}
backends = append(backends, ip)
}
rand.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
dbs := &discoveredBackends{
backends: backends,
}
lb.dbs = dbs
lb.lastResolvedAt = time.Now()
}

View File

@@ -0,0 +1,134 @@
package httputil
import (
"context"
"fmt"
"net"
"net/http"
"net/netip"
"net/url"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
type testRemoteServer struct {
mu sync.Mutex
requestsPerHost map[string]int
totalRequests int
firstError error
}
func (trs *testRemoteServer) RoundTrip(r *http.Request) (*http.Response, error) {
trs.mu.Lock()
if trs.firstError != nil && trs.totalRequests == 0 {
err := trs.firstError
trs.firstError = nil
trs.totalRequests++
trs.mu.Unlock()
return nil, err
}
trs.totalRequests++
if trs.requestsPerHost == nil {
trs.requestsPerHost = make(map[string]int)
}
trs.requestsPerHost[r.URL.Host]++
trs.mu.Unlock()
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
}
type testDNSResolver struct {
ips []net.IPAddr
}
func (tdr *testDNSResolver) LookupSRV(_ context.Context, _, _, name string) (cname string, addrs []*net.SRV, err error) {
return "", nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func (tdr *testDNSResolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
return tdr.ips, nil
}
func (tdr *testDNSResolver) LookupMX(_ context.Context, name string) ([]*net.MX, error) {
return nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
}
func TestLoadbalancerTransport(t *testing.T) {
f := func(discoveredIPs []string, trs *testRemoteServer) {
t.Helper()
parsedIPs := make([]net.IPAddr, 0, len(discoveredIPs))
for _, dIP := range discoveredIPs {
pIP, err := netip.ParseAddr(dIP)
if err != nil {
t.Fatalf("cannot parse IP=%q: %s", dIP, err)
}
parsedIPs = append(parsedIPs, net.IPAddr{IP: pIP.AsSlice()})
}
tdr := &testDNSResolver{ips: parsedIPs}
originResolver := netutil.Resolver
defer func() { netutil.Resolver = originResolver }()
netutil.Resolver = tdr
requestURL, err := url.Parse("http://vmsingle.example.com:8429/api/v1/write")
if err != nil {
t.Fatalf("cannot parse url: %s", err)
}
lbt := NewLoadBalancerTransport(trs, requestURL)
if len(discoveredIPs) == 0 {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
_, err = lbt.RoundTrip(r)
if err == nil {
t.Fatalf("expected no backends found error")
}
return
}
expectedRequestsPerHost := 2
for range len(discoveredIPs) * expectedRequestsPerHost {
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
if err != nil {
t.Fatalf("cannot create http request: %s", err)
}
resp, err := lbt.RoundTrip(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resp.Body.Close()
}
requestsPerHost := trs.requestsPerHost
for _, dIP := range discoveredIPs {
expectedHostPort := net.JoinHostPort(dIP, "8429")
gotRequestsPerHost, ok := requestsPerHost[expectedHostPort]
if !ok {
t.Fatalf("not found expected backend request for: %q", expectedHostPort)
}
if gotRequestsPerHost != expectedRequestsPerHost {
t.Fatalf("unexpected requests per host: %d:%d (-;+)", expectedRequestsPerHost, gotRequestsPerHost)
}
}
}
trs := testRemoteServer{}
f([]string{"1.1.1.1"}, &trs)
trs = testRemoteServer{}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// retry dns resolve error
trs = testRemoteServer{
firstError: &net.DNSError{Err: "no such host", IsNotFound: true},
}
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
// empty backends, expecting error
trs = testRemoteServer{}
f([]string{}, &trs)
}