mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-09 03:43:58 +03:00
Compare commits
6 Commits
gh-2388
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
200c744b6e | ||
|
|
3c192f9238 | ||
|
|
159bc15825 | ||
|
|
8db58ac410 | ||
|
|
42c1f729db | ||
|
|
6851a75c71 |
6
.github/workflows/codeql-analysis-go.yml
vendored
6
.github/workflows/codeql-analysis-go.yml
vendored
@@ -52,14 +52,14 @@ jobs:
|
||||
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
|
||||
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
|
||||
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
|
||||
with:
|
||||
languages: go
|
||||
|
||||
- name: Autobuild
|
||||
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
|
||||
uses: github/codeql-action/autobuild@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
|
||||
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
|
||||
with:
|
||||
category: 'language:go'
|
||||
|
||||
@@ -151,20 +151,10 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||
}
|
||||
tr.Proxy = http.ProxyURL(pu)
|
||||
}
|
||||
|
||||
hc := &http.Client{
|
||||
Transport: authCfg.NewRoundTripper(tr),
|
||||
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
||||
}
|
||||
rwURL, err := url.Parse(remoteWriteURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("BUGL cannot parse already parsed -remoteWrite.url=%q: %s", remoteWriteURL, err)
|
||||
}
|
||||
if strings.HasPrefix(rwURL.Host, "dns+") {
|
||||
rwURL.Host = rwURL.Host[4:]
|
||||
remoteWriteURL = rwURL.String()
|
||||
hc.Transport = httputil.NewLoadBalancerTransport(hc.Transport, rwURL)
|
||||
}
|
||||
retryMaxIntervalFlag := retryMaxTime
|
||||
if retryMaxInterval.String() != "" {
|
||||
retryMaxIntervalFlag = retryMaxInterval
|
||||
|
||||
@@ -130,7 +130,7 @@
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "/^short_version$/",
|
||||
"fields": "/^version$/",
|
||||
"values": false
|
||||
},
|
||||
"showPercentChange": false,
|
||||
@@ -146,11 +146,10 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "vm_app_version{job=~\"$job\",instance=~\"$instance\"}",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"interval": "",
|
||||
"legendFormat": "{{short_version}}",
|
||||
"range": false,
|
||||
"refId": "A"
|
||||
}
|
||||
|
||||
@@ -791,7 +791,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -789,7 +789,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -131,7 +131,7 @@
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "/^short_version$/",
|
||||
"fields": "/^version$/",
|
||||
"values": false
|
||||
},
|
||||
"showPercentChange": false,
|
||||
@@ -147,11 +147,10 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "vm_app_version{job=~\"$job\",instance=~\"$instance\"}",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"interval": "",
|
||||
"legendFormat": "{{short_version}}",
|
||||
"range": false,
|
||||
"refId": "A"
|
||||
}
|
||||
|
||||
@@ -792,7 +792,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -790,7 +790,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -785,7 +785,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -566,7 +566,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -492,14 +492,14 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by (job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Version",
|
||||
"title": "",
|
||||
"type": "table"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -784,7 +784,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -565,7 +565,7 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by(job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
|
||||
@@ -491,14 +491,14 @@
|
||||
},
|
||||
"editorMode": "code",
|
||||
"exemplar": false,
|
||||
"expr": "sum(vm_app_version{job=~\"$job\", instance=~\"$instance\"}) by (job, short_version)",
|
||||
"expr": "sum by(job, version) (label_replace(vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version!=\"\"}, \"version\", \"$1\", \"short_version\", \"(.*)\") or vm_app_version{job=~\"$job\", instance=~\"$instance\", short_version=\"\"})",
|
||||
"format": "table",
|
||||
"instant": true,
|
||||
"range": false,
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Version",
|
||||
"title": "",
|
||||
"type": "table"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -32,16 +32,18 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) : introduce `vmagent_remotewrite_kafka_outbuf_latency_seconds` and `vmagent_remotewrite_kafka_rtt_seconds` metrics for [kafka integration](https://docs.victoriametrics.com/victoriametrics/integrations/kafka/). The metrics could help identify throughput bottlenecks. See [#10730](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly log user information when a missing route error occurs. See [#11052](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11052).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from Mimir object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from [Mimir](https://docs.victoriametrics.com/victoriametrics/vmctl/mimir/#) object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
|
||||
* FEATURE: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/dashboards): show the full `version` label in the `Version` panel when `short_version` label is empty (e.g. custom builds from feature branch). Previously, the panel could appear empty. See [#11047](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11047).
|
||||
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix the `Notifiers` page in web UI appearing blank despite the API returning notifier data correctly. See [#11035](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11035).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): reset the group evaluation timestamp if it exceeds the current host time. Previously, vmalert could use future timestamps for evaluations if the system clock was shifted backward. See [#10985](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10985).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly parse [Prometheus Native Histograms](https://prometheus.io/docs/specs/native_histograms/), previously Protobuf parser could produce unexpected `vmrange` labels. See [#11041](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11041).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly calculate number of loaded users to be printed in startup log. Previously, it was only accounting for static users and skipped JWT configuration entries.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly calculate number of loaded users to be printed in startup log. Previously, it was only accounting for static users and skipped JWT configuration entries. See [#11050](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11050/).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): `integrate()` no longer extrapolates the last sample's value past the end of the time series. Previously, querying `integrate(metric[1h])` at a timestamp where the series had already ended would keep accruing area as if the last value continued indefinitely, producing values much larger than the true integral. See [#9474](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9474). Thanks to @wtfashwin for contribution.
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): avoid returning HTTP 503 for queries with partial results when a storage group is unavailable and `-search.denyPartialResponse` is disabled.
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): avoid returning HTTP 503 for queries with partial results when a storage group is unavailable and `-search.denyPartialResponse` is disabled. See [#11009](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11009). Thanks to @fxrlv for the contribution.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `utf-8` label names for [/federate](https://docs.victoriametrics.com/victoriametrics/#federation) API requests. See [#10968](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10968).
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): persist the `Disable deduplication` toggle under its own local storage key. Before this fix, the toggle state was lost after reload and could overwrite the `Compact view` table setting. See [#11004](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11004). Thanks to @immanuwell for the contribution.
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix intermittent `write: connection timed out` errors caused by silently dropped TCP connections being reused from the connection pool. See [#10735](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10735#issuecomment-4535832301).
|
||||
|
||||
## [v1.144.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.144.0)
|
||||
|
||||
|
||||
@@ -431,22 +431,6 @@ and `-remoteWrite.streamAggr.config`:
|
||||
|
||||
There is also the `-promscrape.configCheckInterval` command-line flag, which can be used to automatically reload configs from the updated `-promscrape.config` file.
|
||||
|
||||
## DNS URLs
|
||||
|
||||
If `vmagent` encounters URLs with the `dns+` prefix in the hostname (such as `http://dns+some-addr:8428/some/path`), it resolves `some-addr` into IP addresses
|
||||
via [DNS A records](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1). The port from the original URL is appended to each discovered IP address.
|
||||
Each discovered IP address is used for round-robin balancing of write requests.
|
||||
|
||||
DNS URLs are supported in the following places:
|
||||
|
||||
* In `-remoteWrite.url` command-line flag. For example, if `victoria-metrics` [DNS A Record](https://datatracker.ietf.org/doc/html/rfc1035#section-3.4.1) record contains
|
||||
`192.168.1.15` IP address, then `-remoteWrite.url=http://dns+victoria-metrics:8428/api/v1/write` is automatically resolved into
|
||||
`-remoteWrite.url=http://192.168.1.15:8428/api/v1/write`.
|
||||
|
||||
DNS URLs are useful when client-side HTTP load balancing is needed. A good example
|
||||
is a [Kubernetes headless Service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services),
|
||||
which returns multiple IP addresses for a single hostname.
|
||||
|
||||
## SRV URLs
|
||||
|
||||
If `vmagent` encounters URLs with `srv+` prefix in hostname (such as `http://srv+some-addr/some/path`), then it resolves `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)
|
||||
|
||||
@@ -1,136 +0,0 @@
|
||||
package httputil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
)
|
||||
|
||||
// NewLoadBalancerTransport returns new RoundTripper that performs round-robin HTTP requests loadbalancing
|
||||
// based on discovered A dns records for the given url host
|
||||
func NewLoadBalancerTransport(origin http.RoundTripper, url *url.URL) http.RoundTripper {
|
||||
host, port, err := net.SplitHostPort(url.Host)
|
||||
if err != nil {
|
||||
host = url.Host
|
||||
}
|
||||
t := &loadbalancerTransport{
|
||||
host: host,
|
||||
port: port,
|
||||
tr: origin,
|
||||
}
|
||||
t.discoverBackends(context.Background())
|
||||
return t
|
||||
}
|
||||
|
||||
type loadbalancerTransport struct {
|
||||
tr http.RoundTripper
|
||||
host string
|
||||
port string
|
||||
|
||||
discovering atomic.Bool
|
||||
lastResolvedAt atomic.Uint64
|
||||
dbs atomic.Pointer[discoveredBackends]
|
||||
}
|
||||
|
||||
type discoveredBackends struct {
|
||||
backends []string
|
||||
idx atomic.Uint64
|
||||
}
|
||||
|
||||
// RoundTrip implements http.RoundTripper interface
|
||||
func (lb *loadbalancerTransport) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
backend := lb.pickBackend(r.Context())
|
||||
if backend == "" {
|
||||
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
|
||||
}
|
||||
|
||||
r2 := r.Clone(r.Context())
|
||||
r2.URL.Host = backend
|
||||
if r2.Host == "" {
|
||||
r2.Host = r.URL.Host
|
||||
}
|
||||
resp, err := lb.tr.RoundTrip(r2)
|
||||
if err != nil {
|
||||
var dnsErr *net.DNSError
|
||||
// perform a single retry for in case of dns lookup error
|
||||
if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
|
||||
lb.lastResolvedAt.Store(0)
|
||||
backend := lb.pickBackend(r.Context())
|
||||
if backend == "" {
|
||||
return nil, fmt.Errorf("no backends found for hostname=%q", lb.host)
|
||||
}
|
||||
|
||||
r2 = r.Clone(r.Context())
|
||||
if r2.Host == "" {
|
||||
r2.Host = r.URL.Host
|
||||
}
|
||||
r2.URL.Host = backend
|
||||
resp, err = lb.tr.RoundTrip(r2)
|
||||
}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (lb *loadbalancerTransport) pickBackend(ctx context.Context) string {
|
||||
dbs := lb.dbs.Load()
|
||||
if dbs == nil || fasttime.UnixTimestamp()-lb.lastResolvedAt.Load() > 30 {
|
||||
if newDBS := lb.discoverBackends(ctx); newDBS != nil {
|
||||
dbs = newDBS
|
||||
}
|
||||
}
|
||||
if dbs == nil || len(dbs.backends) == 0 {
|
||||
return ""
|
||||
}
|
||||
idx := dbs.idx.Add(1) - 1
|
||||
return dbs.backends[idx%uint64(len(dbs.backends))]
|
||||
}
|
||||
|
||||
func (lb *loadbalancerTransport) discoverBackends(ctx context.Context) *discoveredBackends {
|
||||
// prevent concurrent dns lookup
|
||||
if !lb.discovering.CompareAndSwap(false, true) {
|
||||
return lb.dbs.Load()
|
||||
}
|
||||
defer lb.discovering.Store(false)
|
||||
|
||||
addrs, err := netutil.Resolver.LookupIPAddr(ctx, lb.host)
|
||||
if err != nil {
|
||||
logger.Errorf("cannot discover ips for host: %q: %s", lb.host, err)
|
||||
return lb.dbs.Load()
|
||||
}
|
||||
backends := make([]string, 0, len(addrs))
|
||||
for _, addr := range addrs {
|
||||
if !netutil.TCP6Enabled() {
|
||||
ip, ok := netip.AddrFromSlice(addr.IP)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: cannot build ip from slice addr: %q", addr.IP.String())
|
||||
}
|
||||
if !ip.Unmap().Is4() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
ip := addr.IP.String()
|
||||
if len(lb.port) > 0 {
|
||||
ip = net.JoinHostPort(ip, lb.port)
|
||||
}
|
||||
backends = append(backends, ip)
|
||||
}
|
||||
rand.Shuffle(len(backends), func(i, j int) {
|
||||
backends[i], backends[j] = backends[j], backends[i]
|
||||
})
|
||||
dbs := &discoveredBackends{
|
||||
backends: backends,
|
||||
}
|
||||
lb.dbs.Store(dbs)
|
||||
lb.lastResolvedAt.Store(fasttime.UnixTimestamp())
|
||||
return dbs
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
package httputil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
)
|
||||
|
||||
type testRemoteServer struct {
|
||||
mu sync.Mutex
|
||||
requestsPerHost map[string]int
|
||||
|
||||
totalRequests int
|
||||
firstError error
|
||||
}
|
||||
|
||||
func (trs *testRemoteServer) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
trs.mu.Lock()
|
||||
if trs.firstError != nil && trs.totalRequests == 0 {
|
||||
err := trs.firstError
|
||||
trs.firstError = nil
|
||||
trs.totalRequests++
|
||||
trs.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
trs.totalRequests++
|
||||
|
||||
if trs.requestsPerHost == nil {
|
||||
trs.requestsPerHost = make(map[string]int)
|
||||
}
|
||||
trs.requestsPerHost[r.URL.Host]++
|
||||
trs.mu.Unlock()
|
||||
|
||||
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody}, nil
|
||||
}
|
||||
|
||||
type testDNSResolver struct {
|
||||
ips []net.IPAddr
|
||||
}
|
||||
|
||||
func (tdr *testDNSResolver) LookupSRV(_ context.Context, _, _, name string) (cname string, addrs []*net.SRV, err error) {
|
||||
return "", nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
|
||||
}
|
||||
func (tdr *testDNSResolver) LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) {
|
||||
return tdr.ips, nil
|
||||
}
|
||||
|
||||
func (tdr *testDNSResolver) LookupMX(_ context.Context, name string) ([]*net.MX, error) {
|
||||
return nil, fmt.Errorf("unexpected LookupMX call for name=%q", name)
|
||||
}
|
||||
|
||||
func TestLoadbalancerTransport(t *testing.T) {
|
||||
f := func(discoveredIPs []string, trs *testRemoteServer) {
|
||||
t.Helper()
|
||||
|
||||
parsedIPs := make([]net.IPAddr, 0, len(discoveredIPs))
|
||||
for _, dIP := range discoveredIPs {
|
||||
pIP, err := netip.ParseAddr(dIP)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse IP=%q: %s", dIP, err)
|
||||
}
|
||||
parsedIPs = append(parsedIPs, net.IPAddr{IP: pIP.AsSlice()})
|
||||
}
|
||||
tdr := &testDNSResolver{ips: parsedIPs}
|
||||
originResolver := netutil.Resolver
|
||||
defer func() { netutil.Resolver = originResolver }()
|
||||
|
||||
netutil.Resolver = tdr
|
||||
requestURL, err := url.Parse("http://vmsingle.example.com:8429/api/v1/write")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse url: %s", err)
|
||||
}
|
||||
lbt := NewLoadBalancerTransport(trs, requestURL)
|
||||
|
||||
if len(discoveredIPs) == 0 {
|
||||
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create http request: %s", err)
|
||||
}
|
||||
_, err = lbt.RoundTrip(r)
|
||||
if err == nil {
|
||||
t.Fatalf("expected no backends found error")
|
||||
}
|
||||
return
|
||||
}
|
||||
expectedRequestsPerHost := 2
|
||||
for range len(discoveredIPs) * expectedRequestsPerHost {
|
||||
r, err := http.NewRequest(http.MethodGet, requestURL.String(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create http request: %s", err)
|
||||
}
|
||||
resp, err := lbt.RoundTrip(r)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
requestsPerHost := trs.requestsPerHost
|
||||
|
||||
for _, dIP := range discoveredIPs {
|
||||
expectedHostPort := net.JoinHostPort(dIP, "8429")
|
||||
gotRequestsPerHost, ok := requestsPerHost[expectedHostPort]
|
||||
if !ok {
|
||||
t.Fatalf("not found expected backend request for: %q", expectedHostPort)
|
||||
}
|
||||
if gotRequestsPerHost != expectedRequestsPerHost {
|
||||
t.Fatalf("unexpected requests per host: %d:%d (-;+)", expectedRequestsPerHost, gotRequestsPerHost)
|
||||
}
|
||||
}
|
||||
}
|
||||
trs := testRemoteServer{}
|
||||
f([]string{"1.1.1.1"}, &trs)
|
||||
|
||||
trs = testRemoteServer{}
|
||||
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
|
||||
|
||||
// retry dns resolve error
|
||||
trs = testRemoteServer{
|
||||
firstError: &net.DNSError{Err: "no such host", IsNotFound: true},
|
||||
}
|
||||
f([]string{"1.1.1.1", "2.2.2.2", "5.5.5.5"}, &trs)
|
||||
|
||||
// empty backends, expecting error
|
||||
trs = testRemoteServer{}
|
||||
f([]string{}, &trs)
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user