lib/topology: add remote write target discovery metric

Dedicated lib/topology package with background DNS resolution for
configured remote write targets. Exposes Register/Stop API and a
separate vm_topology_discovery_targets metric joinable by url label.

Does not modify existing vmagent_remotewrite_* metrics.

Known limitations:
- single 5s context shared across all targets in refresh cycle
- Register/Stop not hardened for concurrent use

Addresses #10093
This commit is contained in:
kirillyu
2026-04-11 10:26:11 +02:00
parent 7514511c68
commit 4bfee70a5f
4 changed files with 558 additions and 0 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/topology"
)
var (
@@ -204,6 +205,7 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
})
topology.Register(c.remoteWriteURL, c.sanitizedURL)
for range concurrency {
c.wg.Go(c.runWorker)
}

View File

@@ -36,6 +36,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/topology"
)
var (
@@ -339,6 +340,8 @@ func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()
topology.Stop()
sasGlobal.Load().MustStop()
if deduplicatorGlobal != nil {
deduplicatorGlobal.MustStop()

339
lib/topology/topology.go Normal file
View File

@@ -0,0 +1,339 @@
package topology
import (
"context"
"fmt"
"io"
"net"
"net/url"
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
const (
discoveryInterval = 30 * time.Second
discoveryTimeout = 5 * time.Second
)
var global state
type state struct {
mu sync.RWMutex
ms *metrics.Set
refreshCh chan struct{}
stopCh chan struct{}
wg sync.WaitGroup
targets map[string]*target
}
type target struct {
urlLabel string
addrLabel string
host string
resolvedIPs []string
hasResolved bool
}
type targetSnapshot struct {
urlLabel string
host string
}
type targetSample struct {
urlLabel string
addrLabel string
ip string
}
// Register adds a remote write target for background topology discovery.
// rawURL is used for DNS resolution, sanitizedURL is used as the metric label.
func Register(rawURL, sanitizedURL string) {
t, err := newTarget(rawURL, sanitizedURL)
if err != nil {
logger.Errorf("cannot register topology target for -remoteWrite.url=%q: %s", sanitizedURL, err)
return
}
global.register(t)
}
// Stop stops background topology discovery and unregisters topology metrics.
func Stop() {
global.stop()
}
func (s *state) register(t *target) {
s.mu.Lock()
defer s.mu.Unlock()
if s.targets == nil {
s.targets = make(map[string]*target)
}
if s.ms == nil {
ms := metrics.NewSet()
ms.RegisterMetricsWriter(s.writeMetrics)
metrics.RegisterSet(ms)
s.ms = ms
}
s.targets[t.urlLabel] = t
if s.stopCh != nil {
s.notifyRefreshLocked()
return
}
s.refreshCh = make(chan struct{}, 1)
s.stopCh = make(chan struct{})
s.wg.Add(1)
go s.run(s.stopCh, s.refreshCh)
s.notifyRefreshLocked()
}
func (s *state) stop() {
s.mu.Lock()
stopCh := s.stopCh
ms := s.ms
s.refreshCh = nil
s.stopCh = nil
s.mu.Unlock()
if stopCh != nil {
close(stopCh)
s.wg.Wait()
}
if ms != nil {
metrics.UnregisterSet(ms, true)
}
s.mu.Lock()
s.ms = nil
s.targets = nil
s.mu.Unlock()
}
func (s *state) run(stopCh, refreshCh chan struct{}) {
defer s.wg.Done()
ticker := time.NewTicker(discoveryInterval)
defer ticker.Stop()
for {
select {
case <-refreshCh:
s.refresh()
case <-ticker.C:
s.refresh()
case <-stopCh:
return
}
}
}
func (s *state) notifyRefreshLocked() {
select {
case s.refreshCh <- struct{}{}:
default:
}
}
func (s *state) refresh() {
snapshots := s.snapshots()
if len(snapshots) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), discoveryTimeout)
defer cancel()
results := make(map[string][]string, len(snapshots))
for _, snap := range snapshots {
resolvedIPs, ok := resolveIPs(ctx, snap.host)
if !ok {
continue
}
results[snap.urlLabel] = resolvedIPs
}
s.mu.Lock()
defer s.mu.Unlock()
for urlLabel, resolvedIPs := range results {
t := s.targets[urlLabel]
if t == nil {
continue
}
t.applyResolvedIPs(resolvedIPs)
}
}
func (s *state) snapshots() []targetSnapshot {
s.mu.RLock()
defer s.mu.RUnlock()
snapshots := make([]targetSnapshot, 0, len(s.targets))
for _, t := range s.targets {
snapshots = append(snapshots, targetSnapshot{
urlLabel: t.urlLabel,
host: t.host,
})
}
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].urlLabel < snapshots[j].urlLabel
})
return snapshots
}
func (s *state) writeMetrics(w io.Writer) {
for _, sample := range s.samples() {
fmt.Fprintf(w, `vm_topology_discovery_targets{url=%q,addr=%q,resolved_ip=%q} 1`+"\n",
sample.urlLabel, sample.addrLabel, sample.ip)
}
}
func (s *state) samples() []targetSample {
s.mu.RLock()
defer s.mu.RUnlock()
samples := make([]targetSample, 0, len(s.targets))
for _, t := range s.targets {
if !t.hasResolved {
continue
}
for _, ip := range t.resolvedIPs {
samples = append(samples, targetSample{
urlLabel: t.urlLabel,
addrLabel: t.addrLabel,
ip: ip,
})
}
}
sort.Slice(samples, func(i, j int) bool {
a, b := samples[i], samples[j]
if a.urlLabel != b.urlLabel {
return a.urlLabel < b.urlLabel
}
if a.addrLabel != b.addrLabel {
return a.addrLabel < b.addrLabel
}
return a.ip < b.ip
})
return samples
}
func (t *target) applyResolvedIPs(resolvedIPs []string) {
if len(resolvedIPs) == 0 {
return
}
t.resolvedIPs = resolvedIPs
t.hasResolved = true
}
func newTarget(rawURL, sanitizedURL string) (*target, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("cannot parse raw URL: %w", err)
}
host, port, ok := getURLHostPort(u)
if !ok {
return nil, fmt.Errorf("cannot determine topology addr for %q", rawURL)
}
return &target{
urlLabel: sanitizedURL,
addrLabel: joinAddr(host, port),
host: host,
}, nil
}
func getURLHostPort(u *url.URL) (string, string, bool) {
if u == nil || u.Host == "" {
return "", "", false
}
host := u.Hostname()
if host == "" {
return "", "", false
}
port := u.Port()
if port == "" && !strings.HasPrefix(host, "srv+") {
switch u.Scheme {
case "http":
port = "80"
case "https":
port = "443"
default:
return "", "", false
}
}
return host, port, true
}
func resolveIPs(ctx context.Context, host string) ([]string, bool) {
if strings.HasPrefix(host, "srv+") {
return resolveSRV(ctx, strings.TrimPrefix(host, "srv+"))
}
return resolveIPAddrs(ctx, host)
}
func resolveSRV(ctx context.Context, host string) ([]string, bool) {
_, srvs, err := netutil.Resolver.LookupSRV(ctx, "", "", host)
if err != nil {
logger.Warnf("cannot resolve topology SRV addr %q: %s", host, err)
return nil, false
}
if len(srvs) == 0 {
logger.Warnf("missing topology SRV records for %q", host)
return nil, false
}
var resolvedIPs []string
for _, srv := range srvs {
srvHost := strings.TrimSuffix(srv.Target, ".")
ips, ok := resolveIPAddrs(ctx, srvHost)
if !ok {
continue
}
resolvedIPs = append(resolvedIPs, ips...)
}
resolvedIPs = sortAndDedupStrings(resolvedIPs)
if len(resolvedIPs) == 0 {
return nil, false
}
return resolvedIPs, true
}
func resolveIPAddrs(ctx context.Context, host string) ([]string, bool) {
ips, err := netutil.Resolver.LookupIPAddr(ctx, host)
if err != nil {
logger.Warnf("cannot resolve topology IPs for %q: %s", host, err)
return nil, false
}
if len(ips) == 0 {
logger.Warnf("missing topology IPs for %q", host)
return nil, false
}
resolvedIPs := make([]string, len(ips))
for i, ip := range ips {
resolvedIPs[i] = ip.String()
}
return sortAndDedupStrings(resolvedIPs), true
}
func sortAndDedupStrings(a []string) []string {
if len(a) == 0 {
return nil
}
sort.Strings(a)
return slices.Compact(a)
}
func joinAddr(host, port string) string {
if port == "" {
return host
}
return net.JoinHostPort(host, port)
}

View File

@@ -0,0 +1,214 @@
package topology
import (
"bytes"
"context"
"fmt"
"net"
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
)
func TestNewTarget(t *testing.T) {
f := func(rawURL, sanitizedURL string, want *target) {
t.Helper()
got, err := newTarget(rawURL, sanitizedURL)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected target\ngot:\n%#v\nwant:\n%#v", got, want)
}
}
f("http://vminsert:8480/api/v1/write", "1:secret-url", &target{
urlLabel: "1:secret-url",
addrLabel: "vminsert:8480",
host: "vminsert",
})
f("http://vminsert/api/v1/write", "1:secret-url", &target{
urlLabel: "1:secret-url",
addrLabel: "vminsert:80",
host: "vminsert",
})
f("https://vminsert/api/v1/write", "1:secret-url", &target{
urlLabel: "1:secret-url",
addrLabel: "vminsert:443",
host: "vminsert",
})
f("http://srv+vmselect/api/v1/write", "1:secret-url", &target{
urlLabel: "1:secret-url",
addrLabel: "srv+vmselect",
host: "srv+vmselect",
})
f("http://[2001:db8::1]:8480/api/v1/write", "1:secret-url", &target{
urlLabel: "1:secret-url",
addrLabel: "[2001:db8::1]:8480",
host: "2001:db8::1",
})
}
func TestResolveIPsDirect(t *testing.T) {
withResolver(t, &fakeResolver{
lookupIPAddrResults: map[string][]net.IPAddr{
"vminsert": {
{IP: net.ParseIP("10.20.30.40")},
{IP: net.ParseIP("10.20.30.40")},
{IP: net.ParseIP("10.20.30.41")},
},
},
}, func() {
got, ok := resolveIPs(context.Background(), "vminsert")
if !ok {
t.Fatalf("expected successful direct resolution")
}
want := []string{"10.20.30.40", "10.20.30.41"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected resolved IPs\ngot:\n%v\nwant:\n%v", got, want)
}
})
}
func TestResolveIPsSRV(t *testing.T) {
withResolver(t, &fakeResolver{
lookupSRVResults: map[string][]*net.SRV{
"vmselect": {
{Target: "vmselect-0.local.", Port: 8481},
{Target: "vmselect-1.local.", Port: 8481},
},
},
lookupIPAddrResults: map[string][]net.IPAddr{
"vmselect-0.local": {
{IP: net.ParseIP("10.20.30.50")},
},
"vmselect-1.local": {
{IP: net.ParseIP("10.20.30.51")},
},
},
}, func() {
got, ok := resolveIPs(context.Background(), "srv+vmselect")
if !ok {
t.Fatalf("expected successful SRV resolution")
}
want := []string{"10.20.30.50", "10.20.30.51"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected resolved IPs\ngot:\n%v\nwant:\n%v", got, want)
}
})
}
func TestTargetKeepsLastSuccessfulIPs(t *testing.T) {
tg := &target{
urlLabel: "1:secret-url",
addrLabel: "vminsert:8480",
host: "vminsert",
}
st := &state{
targets: map[string]*target{
tg.urlLabel: tg,
},
}
want := []targetSample{{
urlLabel: "1:secret-url",
addrLabel: "vminsert:8480",
ip: "10.20.30.40",
}}
// No samples before first successful resolution.
withResolver(t, &fakeResolver{}, func() {
st.refresh()
if got := st.samples(); len(got) != 0 {
t.Fatalf("expected no samples before first successful resolution; got %v", got)
}
})
// Samples appear after successful resolution.
withResolver(t, &fakeResolver{
lookupIPAddrResults: map[string][]net.IPAddr{
"vminsert": {
{IP: net.ParseIP("10.20.30.40")},
},
},
}, func() {
st.refresh()
if got := st.samples(); !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected samples after successful refresh\ngot:\n%v\nwant:\n%v", got, want)
}
})
// Last successful set retained after failed resolution.
withResolver(t, &fakeResolver{}, func() {
st.refresh()
if got := st.samples(); !reflect.DeepEqual(got, want) {
t.Fatalf("unexpected samples after failed refresh fallback\ngot:\n%v\nwant:\n%v", got, want)
}
})
}
func TestWriteMetrics(t *testing.T) {
st := &state{
targets: map[string]*target{
"2:secret-url": {
urlLabel: "2:secret-url",
addrLabel: "srv+vmselect",
resolvedIPs: []string{"10.20.30.51", "10.20.30.50"},
hasResolved: true,
},
"1:secret-url": {
urlLabel: "1:secret-url",
addrLabel: "vminsert:8480",
resolvedIPs: []string{"10.20.30.40"},
hasResolved: true,
},
},
}
var buf bytes.Buffer
st.writeMetrics(&buf)
const want = "" +
"vm_topology_discovery_targets{url=\"1:secret-url\",addr=\"vminsert:8480\",resolved_ip=\"10.20.30.40\"} 1\n" +
"vm_topology_discovery_targets{url=\"2:secret-url\",addr=\"srv+vmselect\",resolved_ip=\"10.20.30.50\"} 1\n" +
"vm_topology_discovery_targets{url=\"2:secret-url\",addr=\"srv+vmselect\",resolved_ip=\"10.20.30.51\"} 1\n"
if got := buf.String(); got != want {
t.Fatalf("unexpected metrics output\ngot:\n%s\nwant:\n%s", got, want)
}
}
type fakeResolver struct {
lookupSRVResults map[string][]*net.SRV
lookupIPAddrResults map[string][]net.IPAddr
}
func (r *fakeResolver) LookupSRV(_ context.Context, _, _, name string) (string, []*net.SRV, error) {
if results, ok := r.lookupSRVResults[name]; ok {
return name, results, nil
}
return name, nil, fmt.Errorf("no SRV results found for host: %s", name)
}
func (r *fakeResolver) LookupIPAddr(_ context.Context, host string) ([]net.IPAddr, error) {
if results, ok := r.lookupIPAddrResults[host]; ok {
return results, nil
}
return nil, fmt.Errorf("no IP results found for host: %s", host)
}
func (r *fakeResolver) LookupMX(_ context.Context, host string) ([]*net.MX, error) {
return nil, fmt.Errorf("no MX results found for host: %s", host)
}
func withResolver(t *testing.T, resolver *fakeResolver, f func()) {
t.Helper()
origResolver := netutil.Resolver
netutil.Resolver = resolver
defer func() {
netutil.Resolver = origResolver
}()
f()
}