mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-09 03:43:58 +03:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c5d3658a0 |
@@ -151,10 +151,20 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||
}
|
||||
tr.Proxy = http.ProxyURL(pu)
|
||||
}
|
||||
|
||||
hc := &http.Client{
|
||||
Transport: authCfg.NewRoundTripper(tr),
|
||||
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
||||
}
|
||||
rwURL, err := url.Parse(remoteWriteURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("BUGL cannot parse already parsed -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
}
|
||||
if strings.HasPrefix(rwURL.Host, "dns+") {
|
||||
rwURL.Host = rwURL.Host[4:]
|
||||
remoteWriteURL = rwURL.String()
|
||||
hc.Transport = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
|
||||
}
|
||||
retryMaxIntervalFlag := retryMaxTime
|
||||
if retryMaxInterval.String() != "" {
|
||||
retryMaxIntervalFlag = retryMaxInterval
|
||||
|
||||
@@ -431,6 +431,22 @@ and `-remoteWrite.streamAggr.config`:
|
||||
|
||||
There is also the `-promscrape.configCheckInterval` command-line flag, which can be used to automatically reload configs from the updated `-promscrape.config` file.
|
||||
|
||||
## DNS URLs
|
||||
|
||||
If `vmagent` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
|
||||
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
|
||||
Each discovered IP address is used for round-robin balancing of write requests.
|
||||
|
||||
DNS URLs are supported in the following places:
|
||||
|
||||
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
|
||||
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428/api/v1/write` is automatically resolved into
|
||||
`-remoteWrite.url=http://192.168.1.15:8428/api/v1/write`.
|
||||
|
||||
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
|
||||
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
|
||||
which returns multiple IP addresses for a single hostname.
|
||||
|
||||
## SRV URLs
|
||||
|
||||
If `vmagent` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)
|
||||
|
||||
136
lib/httputil/transport_lb.go
Normal file
136
lib/httputil/transport_lb.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package httputil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
)
|
||||
|
||||
// NewLoadBalancerTransport returns new RoundTripper that performs round-robin HTTP requests loadbalancing
|
||||
// based on discovered A dns records for the given url host
|
||||
func NewLoadBalancerTransport(origin http.RoundTripper, url *url.URL) http.RoundTripper {
|
||||
host, port, err := net.SplitHostPort(url.Host)
|
||||
if err != nil {
|
||||
host = url.Host
|
||||
}
|
||||
t := &loadbalancerTransport{
|
||||
host: host,
|
||||
port: port,
|
||||
tr: origin,
|
||||
}
|
||||
t.discoverBackends(context.Background())
|
||||
return t
|
||||
}
|
||||
|
||||
type loadbalancerTransport struct {
|
||||
tr http.RoundTripper
|
||||
host string
|
||||
port string
|
||||
|
||||
discovering atomic.Bool
|
||||
lastResolvedAt atomic.Uint64
|
||||
dbs atomic.Pointer[discoveredBackends]
|
||||
}
|
||||
|
||||
type discoveredBackends struct {
|
||||
backends []string
|
||||
idx atomic.Uint64
|
||||
}
|
||||
|
||||
// RoundTrip implements http.RoundTripper interface
|
||||
func (lb *loadbalancerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
backend := lb.pickBackend(r.Context())
|
||||
if backend == "" {
|
||||
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
|
||||
}
|
||||
|
||||
r2 := r.Clone(r.Context())
|
||||
r2.URL.Host = backend
|
||||
if r2.Host == "" {
|
||||
r2.Host = r.URL.Host
|
||||
}
|
||||
resp, err := lb.tr.RoundTrip(r2)
|
||||
if err != nil {
|
||||
var dnsErr *net.DNSError
|
||||
// perform a single retry for in case of dns lookup error
|
||||
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
|
||||
lb.lastResolvedAt.Store(0)
|
||||
backend := lb.pickBackend(r.Context())
|
||||
if backend == "" {
|
||||
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
|
||||
}
|
||||
|
||||
r2 = r.Clone(r.Context())
|
||||
if r2.Host == "" {
|
||||
r2.Host = r.URL.Host
|
||||
}
|
||||
r2.URL.Host = backend
|
||||
resp, err = lb.tr.RoundTrip(r2)
|
||||
}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (lb *loadbalancerTransport) pickBackend(ctx context.Context) string {
|
||||
dbs := lb.dbs.Load()
|
||||
if dbs == nil || fasttime.UnixTimestamp()-lb.lastResolvedAt.Load() > 30 {
|
||||
if newDBS := lb.discoverBackends(ctx); newDBS != nil {
|
||||
dbs = newDBS
|
||||
}
|
||||
}
|
||||
if dbs == nil || len(dbs.backends) == 0 {
|
||||
return ""
|
||||
}
|
||||
idx := dbs.idx.Add(1) - 1
|
||||
return dbs.backends[idx%uint64(len(dbs.backends))]
|
||||
}
|
||||
|
||||
func (lb *loadbalancerTransport) discoverBackends(ctx context.Context) *discoveredBackends {
|
||||
// prevent concurrent dns lookup
|
||||
if !lb.discovering.CompareAndSwap(false, true) {
|
||||
return lb.dbs.Load()
|
||||
}
|
||||
defer lb.discovering.Store(false)
|
||||
|
||||
addrs, err := netutil.Resolver.LookupIPAddr(ctx, lb.host)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot discover ips for host: %q: %s", lb.host, err)
|
||||
return lb.dbs.Load()
|
||||
}
|
||||
backends := make([]string, 0, len(addrs))
|
||||
for _, addr := range addrs {
|
||||
if !netutil.TCP6Enabled() {
|
||||
ip, ok := netip.AddrFromSlice(addr.IP)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: cannot build ip from slice addr: %q", addr.IP.String())
|
||||
}
|
||||
if !ip.Unmap().Is4() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
ip := addr.IP.String()
|
||||
if len(lb.port) > 0 {
|
||||
ip = net.JoinHostPort(ip, lb.port)
|
||||
}
|
||||
backends = append(backends, ip)
|
||||
}
|
||||
rand.Shuffle(len(backends), func(i, j int) {
|
||||
backends[i], backends[j] = backends[j], backends[i]
|
||||
})
|
||||
dbs := &discoveredBackends{
|
||||
backends: backends,
|
||||
}
|
||||
lb.dbs.Store(dbs)
|
||||
lb.lastResolvedAt.Store(fasttime.UnixTimestamp())
|
||||
return dbs
|
||||
}
|
||||
134
lib/httputil/transport_lb_test.go
Normal file
134
lib/httputil/transport_lb_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package httputil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
)
|
||||
|
||||
type testRemoteServer struct {
|
||||
mu sync.Mutex
|
||||
requestsPerHost map[string]int
|
||||
|
||||
totalRequests int
|
||||
firstError error
|
||||
}
|
||||
|
||||
func (trs *testRemoteServer) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
trs.mu.Lock()
|
||||
if trs.firstError != nil && trs.totalRequests == 0 {
|
||||
err := trs.firstError
|
||||
trs.firstError = nil
|
||||
trs.totalRequests++
|
||||
trs.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
trs.totalRequests++
|
||||
|
||||
if trs.requestsPerHost == nil {
|
||||
trs.requestsPerHost = make(map[string]int)
|
||||
}
|
||||
trs.requestsPerHost[r.URL.Host]++
|
||||
trs.mu.Unlock()
|
||||
|
||||
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
|
||||
}
|
||||
|
||||
type testDNSResolver struct {
|
||||
ips []net.IPAddr
|
||||
}
|
||||
|
||||
func (tdr *testDNSResolver) LookupSRV(_ context.Context, _, _, name string) (cname string, addrs []*net.SRV, err error) {
|
||||
return "", nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
|
||||
}
|
||||
func (tdr *testDNSResolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
|
||||
return tdr.ips, nil
|
||||
}
|
||||
|
||||
func (tdr *testDNSResolver) LookupMX(_ context.Context, name string) ([]*net.MX, error) {
|
||||
return nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
|
||||
}
|
||||
|
||||
func TestLoadbalancerTransport(t *testing.T) {
|
||||
f := func(discoveredIPs []string, trs *testRemoteServer) {
|
||||
t.Helper()
|
||||
|
||||
parsedIPs := make([]net.IPAddr, 0, len(discoveredIPs))
|
||||
for _, dIP := range discoveredIPs {
|
||||
pIP, err := netip.ParseAddr(dIP)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse IP=%q: %s", dIP, err)
|
||||
}
|
||||
parsedIPs = append(parsedIPs, net.IPAddr{IP: pIP.AsSlice()})
|
||||
}
|
||||
tdr := &testDNSResolver{ips: parsedIPs}
|
||||
originResolver := netutil.Resolver
|
||||
defer func() { netutil.Resolver = originResolver }()
|
||||
|
||||
netutil.Resolver = tdr
|
||||
requestURL, err := url.Parse("http://vmsingle.example.com:8429/api/v1/write")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse url: %s", err)
|
||||
}
|
||||
lbt := NewLoadBalancerTransport(trs, requestURL)
|
||||
|
||||
if len(discoveredIPs) == 0 {
|
||||
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create http request: %s", err)
|
||||
}
|
||||
_, err = lbt.RoundTrip(r)
|
||||
if err == nil {
|
||||
t.Fatalf("expected no backends found error")
|
||||
}
|
||||
return
|
||||
}
|
||||
expectedRequestsPerHost := 2
|
||||
for range len(discoveredIPs) * expectedRequestsPerHost {
|
||||
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create http request: %s", err)
|
||||
}
|
||||
resp, err := lbt.RoundTrip(r)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
requestsPerHost := trs.requestsPerHost
|
||||
|
||||
for _, dIP := range discoveredIPs {
|
||||
expectedHostPort := net.JoinHostPort(dIP, "8429")
|
||||
gotRequestsPerHost, ok := requestsPerHost[expectedHostPort]
|
||||
if !ok {
|
||||
t.Fatalf("not found expected backend request for: %q", expectedHostPort)
|
||||
}
|
||||
if gotRequestsPerHost != expectedRequestsPerHost {
|
||||
t.Fatalf("unexpected requests per host: %d:%d (-;+)", expectedRequestsPerHost, gotRequestsPerHost)
|
||||
}
|
||||
}
|
||||
}
|
||||
trs := testRemoteServer{}
|
||||
f([]string{"1.1.1.1"}, &trs)
|
||||
|
||||
trs = testRemoteServer{}
|
||||
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
|
||||
|
||||
// retry dns resolve error
|
||||
trs = testRemoteServer{
|
||||
firstError: &net.DNSError{Err: "no such host", IsNotFound: true},
|
||||
}
|
||||
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
|
||||
|
||||
// empty backends, expecting error
|
||||
trs = testRemoteServer{}
|
||||
f([]string{}, &trs)
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user