mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-29 23:00:51 +03:00
Compare commits
6 Commits
cond-based
...
feature/me
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0baa091b77 | ||
|
|
7bdd01e23e | ||
|
|
697bb33354 | ||
|
|
fb07c59da4 | ||
|
|
e06cc2c69b | ||
|
|
81c15c21a9 |
@@ -38,8 +38,10 @@ var (
|
||||
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
||||
"is sent after temporary unavailability of the remote storage. See also -maxIngestionRate")
|
||||
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url")
|
||||
retryMinInterval = flagutil.NewArrayDuration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxTime")
|
||||
retryMaxTime = flagutil.NewArrayDuration("remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. Change this value if it is expected for -remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
||||
retryMinInterval = flagutil.NewArrayDuration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
||||
// deprecated in the future. use -remoteWrite.retryMaxInterval instead
|
||||
retryMaxTime = flagutil.NewArrayDuration("remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. This flag is deprecated, use -remoteWrite.retryMaxInterval instead")
|
||||
retryMaxInterval = flagutil.NewArrayDuration("remoteWrite.retryMaxInterval", time.Minute, "The maximum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. The delay doubles with each retry until this maximum is reached, after which it remains constant. See also -remoteWrite.retryMinInterval")
|
||||
proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+
|
||||
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
||||
|
||||
@@ -97,7 +99,7 @@ type client struct {
|
||||
hc *http.Client
|
||||
|
||||
retryMinInterval time.Duration
|
||||
retryMaxTime time.Duration
|
||||
retryMaxInterval time.Duration
|
||||
|
||||
sendBlock func(block []byte) bool
|
||||
authCfg *promauth.Config
|
||||
@@ -151,6 +153,10 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||
Transport: authCfg.NewRoundTripper(tr),
|
||||
Timeout: sendTimeout.GetOptionalArg(argIdx),
|
||||
}
|
||||
retryMaxIntervalFlag := retryMaxTime
|
||||
if retryMaxInterval.String() != "" {
|
||||
retryMaxIntervalFlag = retryMaxInterval
|
||||
}
|
||||
c := &client{
|
||||
sanitizedURL: sanitizedURL,
|
||||
remoteWriteURL: remoteWriteURL,
|
||||
@@ -159,7 +165,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
|
||||
fq: fq,
|
||||
hc: hc,
|
||||
retryMinInterval: retryMinInterval.GetOptionalArg(argIdx),
|
||||
retryMaxTime: retryMaxTime.GetOptionalArg(argIdx),
|
||||
retryMaxInterval: retryMaxIntervalFlag.GetOptionalArg(argIdx),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
c.sendBlock = c.sendBlockHTTP
|
||||
@@ -404,7 +410,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
|
||||
// Otherwise, it tries sending the block to remote storage indefinitely.
|
||||
func (c *client) sendBlockHTTP(block []byte) bool {
|
||||
c.rl.Register(len(block))
|
||||
maxRetryDuration := timeutil.AddJitterToDuration(c.retryMaxTime)
|
||||
maxRetryDuration := timeutil.AddJitterToDuration(c.retryMaxInterval)
|
||||
retryDuration := timeutil.AddJitterToDuration(c.retryMinInterval)
|
||||
retriesCount := 0
|
||||
|
||||
|
||||
@@ -279,6 +279,9 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
rwctxs := make([]*remoteWriteCtx, len(urls))
|
||||
rwctxIdx := make([]int, len(urls))
|
||||
if retryMaxTime.String() != "" {
|
||||
logger.Warnf("-remoteWrite.retryMaxTime is deprecated; use -remoteWrite.retryMaxInterval instead")
|
||||
}
|
||||
for i, remoteWriteURLRaw := range urls {
|
||||
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
|
||||
if err != nil {
|
||||
|
||||
@@ -10,8 +10,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
|
||||
@@ -42,11 +40,13 @@ import (
|
||||
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
|
||||
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -81,6 +81,7 @@ var (
|
||||
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
|
||||
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
|
||||
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
|
||||
maxMemoryUsage = flag.Int("insert.circuitBreakMemoryUsage", 90, "Reject insert requests when memory usage exceeds a certain percentage. 0 means no circuit breaking. An integer value from 1-100 represents 1%-100%.")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -217,6 +218,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
// This is not our link.
|
||||
return false
|
||||
}
|
||||
if *maxMemoryUsage >= 1 && *maxMemoryUsage <= 100 {
|
||||
if memory.CurrentPercentage() > *maxMemoryUsage {
|
||||
httpserver.Errorf(w, r, "server overloaded, request rejected by circuit breaker")
|
||||
return true
|
||||
}
|
||||
}
|
||||
at, err := auth.NewTokenPossibleMultitenant(p.AuthToken)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "auth error: %s", err)
|
||||
|
||||
@@ -3,6 +3,7 @@ package graphite
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sort"
|
||||
@@ -210,7 +211,7 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
jsonp := r.FormValue("jsonp")
|
||||
denyPartialResponse := httputil.GetDenyPartialResponse(r)
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, nil, 0)
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, math.MaxInt, nil, 0)
|
||||
metricNames, isPartial, err := netstorage.LabelValues(nil, denyPartialResponse, "__name__", sq, 0, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`cannot obtain metric names: %w`, err)
|
||||
|
||||
@@ -25,6 +25,11 @@ type PrometheusQuerier interface {
|
||||
PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse
|
||||
PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse
|
||||
PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte
|
||||
|
||||
// TODO(@rtm0): Prometheus does not provide this API. Either move it to a
|
||||
// separate interface or rename this interface to allow for multiple querier
|
||||
// types.
|
||||
GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse
|
||||
}
|
||||
|
||||
// Writer contains methods for writing new data
|
||||
@@ -395,6 +400,10 @@ type TSDBStatusResponse struct {
|
||||
Data TSDBStatusResponseData
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndexResponse is an in-memory representation of the json response
|
||||
// returned by the /graphite/metrics/index.json endpoint.
|
||||
type GraphiteMetricsIndexResponse = []string
|
||||
|
||||
// AdminTenantsResponse is an in-memory representation of the json response
|
||||
// returned by the /api/v1/admin/tenants endpoint.
|
||||
type AdminTenantsResponse struct {
|
||||
|
||||
62
apptest/tests/graphite_test.go
Normal file
62
apptest/tests/graphite_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
func testMetricsIndex(t *testing.T, sut at.PrometheusWriteQuerier) {
|
||||
// verify index is empty at the start
|
||||
expected := at.GraphiteMetricsIndexResponse{}
|
||||
tenant := "1:2"
|
||||
got := sut.GraphiteMetricsIndex(t, at.QueryOpts{Tenant: tenant})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// Mon Feb 5 09:57:36 CET 2024
|
||||
const ingestTimestamp = ` 1707123456700`
|
||||
dataSet := []string{
|
||||
`metric_name_1{label="foo"} 10`,
|
||||
`metric_name_1{label="bar"} 10`,
|
||||
`metric_name_2{label="baz"} 20`,
|
||||
`metric_name_1{label="baz"} 10`,
|
||||
`metric_name_3{label="baz"} 30`,
|
||||
}
|
||||
|
||||
for idx := range dataSet {
|
||||
dataSet[idx] += ingestTimestamp
|
||||
}
|
||||
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, dataSet, at.QueryOpts{Tenant: tenant})
|
||||
sut.ForceFlush(t)
|
||||
|
||||
// verify ingested metrics correctly returned in index response
|
||||
expected = []string{"metric_name_1", "metric_name_2", "metric_name_3"}
|
||||
|
||||
got = sut.GraphiteMetricsIndex(t, at.QueryOpts{Tenant: tenant})
|
||||
if diff := cmp.Diff(expected, got); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleMetricsIndex(t *testing.T) {
|
||||
tc := at.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
sut := tc.MustStartDefaultVmsingle()
|
||||
|
||||
testMetricsIndex(tc.T(), sut)
|
||||
}
|
||||
|
||||
func TestClusterMetricsIndex(t *testing.T) {
|
||||
tc := at.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
sut := tc.MustStartDefaultCluster()
|
||||
|
||||
testMetricsIndex(tc.T(), sut)
|
||||
}
|
||||
@@ -227,6 +227,25 @@ func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date strin
|
||||
return status
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndex sends a query to a /graphite/metrics/index.json
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := fmt.Sprintf("http://%s/select/%s/graphite/metrics/index.json", app.httpListenAddr, opts.getTenant())
|
||||
res, statusCode := app.cli.Get(t, seriesURL)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var index GraphiteMetricsIndexResponse
|
||||
if err := json.Unmarshal([]byte(res), &index); err != nil {
|
||||
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// APIV1AdminTenants sends a query to a /admin/tenants endpoint
|
||||
func (app *Vmselect) APIV1AdminTenants(t *testing.T) *AdminTenantsResponse {
|
||||
t.Helper()
|
||||
|
||||
@@ -318,6 +318,25 @@ func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
|
||||
return NewPrometheusAPIV1SeriesResponse(t, res)
|
||||
}
|
||||
|
||||
// GraphiteMetricsIndex sends a query to a /metrics/index.json
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
|
||||
func (app *Vmsingle) GraphiteMetricsIndex(t *testing.T, _ QueryOpts) GraphiteMetricsIndexResponse {
|
||||
t.Helper()
|
||||
|
||||
seriesURL := fmt.Sprintf("http://%s/metrics/index.json", app.httpListenAddr)
|
||||
res, statusCode := app.cli.Get(t, seriesURL)
|
||||
if statusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
|
||||
}
|
||||
|
||||
var index GraphiteMetricsIndexResponse
|
||||
if err := json.Unmarshal([]byte(res), &index); err != nil {
|
||||
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
// APIV1StatusMetricNamesStats sends a query to a /api/v1/status/metric_names_stats endpoint
|
||||
// and returns the statistics response for given params.
|
||||
//
|
||||
|
||||
@@ -44,6 +44,18 @@ services:
|
||||
deploy:
|
||||
replicas: 0
|
||||
|
||||
# vlagent is needed for HA setup and its replica count is set to 1 in compose-ha.yml file
|
||||
vlagent:
|
||||
image: victoriametrics/vlagent:v0.0.1
|
||||
volumes:
|
||||
- vlagent:/vlagent
|
||||
command:
|
||||
- '--remoteWrite.tmpDataPath=/vlagent'
|
||||
- '--remoteWrite.url=http://victorialogs:9428/internal/insert'
|
||||
- '--remoteWrite.url=http://victorialogs-2:9428/internal/insert'
|
||||
deploy:
|
||||
replicas: 0
|
||||
|
||||
victoriametrics:
|
||||
image: victoriametrics/victoria-metrics:v1.112.0
|
||||
ports:
|
||||
@@ -63,3 +75,4 @@ volumes:
|
||||
victorialogs:
|
||||
victorialogs-2:
|
||||
victoriametrics:
|
||||
vlagent:
|
||||
|
||||
@@ -2,3 +2,6 @@ services:
|
||||
victorialogs-2:
|
||||
deploy:
|
||||
replicas: 1
|
||||
vlagent:
|
||||
deploy:
|
||||
replicas: 1
|
||||
|
||||
@@ -32,20 +32,8 @@
|
||||
[OUTPUT]
|
||||
Name http
|
||||
Match *
|
||||
host victorialogs
|
||||
port 9428
|
||||
compress gzip
|
||||
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
|
||||
format json_lines
|
||||
json_date_format iso8601
|
||||
header AccountID 0
|
||||
header ProjectID 0
|
||||
|
||||
[OUTPUT]
|
||||
Name http
|
||||
Match *
|
||||
host victorialogs-2
|
||||
port 9428
|
||||
host vlagent
|
||||
port 9429
|
||||
compress gzip
|
||||
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
|
||||
format json_lines
|
||||
|
||||
@@ -13,12 +13,7 @@ input {
|
||||
|
||||
output {
|
||||
http {
|
||||
url => "http://victorialogs:9428/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
|
||||
format => "json"
|
||||
http_method => "post"
|
||||
}
|
||||
http {
|
||||
url => "http://victorialogs-2:9428/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
|
||||
url => "http://vlagent:9429/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
|
||||
format => "json"
|
||||
http_method => "post"
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
exporters:
|
||||
elasticsearch:
|
||||
endpoints:
|
||||
- http://victorialogs:9428/insert/elasticsearch
|
||||
- http://victorialogs-2:9428/insert/elasticsearch
|
||||
- http://vlagent:9429/insert/elasticsearch
|
||||
receivers:
|
||||
filelog:
|
||||
include: [/var/lib/docker/containers/**/*.log]
|
||||
|
||||
@@ -18,26 +18,7 @@ sinks:
|
||||
type: http
|
||||
inputs:
|
||||
- parser
|
||||
uri: http://victorialogs:9428/insert/jsonline
|
||||
encoding:
|
||||
codec: json
|
||||
framing:
|
||||
method: newline_delimited
|
||||
compression: gzip
|
||||
healthcheck:
|
||||
enabled: false
|
||||
request:
|
||||
headers:
|
||||
AccountID: '0'
|
||||
ProjectID: '0'
|
||||
VL-Stream-Fields: source_type,host,container_name,label.com.docker.compose.service
|
||||
VL-Msg-Field: message.msg
|
||||
VL-Time-Field: timestamp
|
||||
vlogs-2:
|
||||
type: http
|
||||
inputs:
|
||||
- parser
|
||||
uri: http://victorialogs-2:9428/insert/jsonline
|
||||
uri: http://vlagent:9429/insert/jsonline
|
||||
encoding:
|
||||
codec: json
|
||||
framing:
|
||||
|
||||
@@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): remove duplicate kubernetes targets from [service-discovery-debug](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging) page. See [8626](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8626) issue for details.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `concurrency` option to kafka remoteWrite producer. See [9249](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9249) issue for details.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): introduce a new flag `-retryMaxInterval` as a replacement for the deprecated `-retryMaxTime` flag. The new flag more accurately reflects the behavior it controls. See [#9169](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9169) for more details.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `/api/v1/notifiers` API endpoint for returning list of configured or discovered notifiers.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `datasource_type` query argument for `/api/v1/rules` and `/api/v1/alerts` endpoints to filter response by rule's datasource [type](https://docs.victoriametrics.com/victoriametrics/vmalert/#groups). See [#8537](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8537).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): respect group order defined in the rule file during [replay mode](https://docs.victoriametrics.com/victoriametrics/vmalert/#rules-backfilling) to allow chained group if needed. See [#9334](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9334).
|
||||
@@ -29,7 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add ability to proxy `/api/v1/notifiers` to vmalert when `-vmalert.proxyURL` is set. See [9267](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9267) PR for details.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `vm_cache_eviction_bytes_total` counter metrics to reflect cache evictions due to expiration, misses and cache size. See [9293](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9293) PR for details. Thanks to the @BenNF
|
||||
* FEATURE: all the [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/enterprise.html) components: improve error message when an empty license is provided via the `-license` or `-licenseFile` command-line flags. See [#9337](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9337) for the details.
|
||||
* FEATURE: [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): attempt to reuse an existing connection from the pool on dial or handshake failure when creating a new connection. The change should reduce transient handshake or dial errors between `vmselect` and `vmstorage`. See [#9345](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9345)
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): enhance `MustReadAt` panic message to include filename for easier debugging of out-of-range reads. See [#9106](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9106).
|
||||
|
||||
* BUGFIX: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683) and [dashboards/vmalert](https://grafana.com/grafana/dashboards/14950): fix ad-hoc filters auto-complete and filtering on panels that use MetricsQL specific expressions. See [#8657](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8657).
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): automatically retry requests failing with `Expired Token` errors. This helps to avoid failed backups when using [EKS Pod Identity](https://docs.aws.amazon.com/eks/latest/userguide/pod-id-how-it-works.html) for authentication. See [#9280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9280).
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
// at windows only files could be synced
|
||||
// Sync for directories is not supported.
|
||||
func mustSyncPath(path string) {
|
||||
func mustSyncPath(_ string) {
|
||||
}
|
||||
|
||||
func mustRemoveDirAtomic(dir string) {
|
||||
@@ -66,7 +66,7 @@ var (
|
||||
mmapByAddr = map[uintptr]windows.Handle{}
|
||||
)
|
||||
|
||||
func mmap(fd int, length int) ([]byte, error) {
|
||||
func mmap(fd, length int) ([]byte, error) {
|
||||
flProtect := uint32(windows.PAGE_READONLY)
|
||||
dwDesiredAccess := uint32(windows.FILE_MAP_READ)
|
||||
// https://learn.microsoft.com/en-us/windows/win32/memory/creating-a-file-mapping-object#file-mapping-size
|
||||
@@ -81,7 +81,11 @@ func mmap(fd int, length int) ([]byte, error) {
|
||||
windows.CloseHandle(h)
|
||||
return nil, os.NewSyscallError("MapViewOfFile", errno)
|
||||
}
|
||||
data := unsafe.Slice((*byte)(unsafe.Pointer(addr)), length)
|
||||
|
||||
// mitigate go vet false positive
|
||||
// https://github.com/golang/go/issues/58625
|
||||
addrPtr := *(*unsafe.Pointer)(unsafe.Pointer(&addr))
|
||||
data := unsafe.Slice((*byte)(addrPtr), length)
|
||||
|
||||
mmapByAddrLock.Lock()
|
||||
mmapByAddr[addr] = h
|
||||
@@ -121,7 +125,7 @@ func mustGetFreeSpace(path string) uint64 {
|
||||
}
|
||||
|
||||
// stub
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
||||
func fadviseSequentialRead(_ *os.File, _ bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
|
||||
}
|
||||
} else {
|
||||
if off > int64(len(mr.mmapData)-len(p)) {
|
||||
logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(mr.mmapData)-len(p), len(p))
|
||||
logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d in file %q", off, len(mr.mmapData)-len(p), len(p), r.path)
|
||||
}
|
||||
src := mr.mmapData[off:]
|
||||
// The copy() below may result in thread block as described at https://valyala.medium.com/mmap-in-go-considered-harmful-d92a25cb161d .
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
|
||||
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
|
||||
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
|
||||
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
|
||||
memCheckInterval = flag.Duration("memory.checkInterval", 2*time.Second, "How often to check the memory usage.")
|
||||
)
|
||||
|
||||
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
|
||||
@@ -20,10 +30,13 @@ var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
|
||||
})
|
||||
|
||||
var (
|
||||
allowedMemory int
|
||||
remainingMemory int
|
||||
memoryLimit int
|
||||
allowedMemory int
|
||||
remainingMemory int
|
||||
memoryLimit int
|
||||
currentMemory atomic.Int64
|
||||
currentMemoryPercentage atomic.Int32
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
|
||||
func initOnce() {
|
||||
@@ -45,6 +58,35 @@ func initOnce() {
|
||||
remainingMemory = memoryLimit - allowedMemory
|
||||
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
|
||||
}
|
||||
if *memCheckInterval == 0 {
|
||||
return
|
||||
}
|
||||
// enable memory detection if configured
|
||||
currentAvailableBytes, _ := getAvailableMemory()
|
||||
currentUsedBytes := max(0, memoryLimit-currentAvailableBytes)
|
||||
currentMemory.Store(int64(currentUsedBytes))
|
||||
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
|
||||
|
||||
go func() {
|
||||
// Register SIGHUP handler for config reload before loadRelabelConfigs.
|
||||
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||
sighupCh := procutil.NewSighupChan()
|
||||
t := time.NewTicker(*memCheckInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
return
|
||||
case <-t.C:
|
||||
currentAvailableBytes, _ = getAvailableMemory()
|
||||
currentUsedBytes = max(0, memoryLimit-currentAvailableBytes)
|
||||
currentMemory.Store(int64(currentUsedBytes))
|
||||
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
|
||||
logger.Infof("current: %dMiB, total: %dMiB, percent: %d%%", currentUsedBytes/1024/1024, memoryLimit/1024/1024, currentMemoryPercentage.Load())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Allowed returns the amount of system memory allowed to use by the app.
|
||||
@@ -62,3 +104,58 @@ func Remaining() int {
|
||||
once.Do(initOnce)
|
||||
return remainingMemory
|
||||
}
|
||||
|
||||
// Current return memory usage in byte. The value is updated every 5 seconds.
|
||||
func Current() int {
|
||||
once.Do(initOnce)
|
||||
return int(currentMemory.Load())
|
||||
}
|
||||
|
||||
// CurrentPercentage return memory usage percentage in [0-100] int. The value is updated every 5 seconds.
|
||||
func CurrentPercentage() int {
|
||||
once.Do(initOnce)
|
||||
return int(currentMemoryPercentage.Load())
|
||||
}
|
||||
|
||||
func sysCurrentMemory() int {
|
||||
am, err := getAvailableMemory()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return am
|
||||
}
|
||||
|
||||
// getAvailableMemory parse /proc/meminfo and return MemAvailable in byte.
|
||||
func getAvailableMemory() (int, error) {
|
||||
b, err := os.ReadFile("/proc/meminfo")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s := bufio.NewScanner(bytes.NewReader(b))
|
||||
for s.Scan() {
|
||||
fields := strings.Fields(s.Text())
|
||||
if fields[0] != "MemAvailable:" {
|
||||
continue
|
||||
}
|
||||
val, err := strconv.ParseInt(fields[1], 0, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
switch len(fields) {
|
||||
case 2:
|
||||
return int(val), nil
|
||||
case 3:
|
||||
if fields[2] != "kB" {
|
||||
return 0, fmt.Errorf("%w: unsupported unit in optional 3rd field %q", ErrFileParse, fields[2])
|
||||
}
|
||||
return int(1024 * val), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("%w: malformed line %q", ErrFileParse, s.Text())
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("AvailableMemory not found")
|
||||
}
|
||||
|
||||
var (
|
||||
ErrFileParse = errors.New("error parsing file")
|
||||
)
|
||||
|
||||
@@ -16,8 +16,6 @@ type ConnPool struct {
|
||||
mu sync.Mutex
|
||||
d *TCPDialer
|
||||
|
||||
cond sync.Cond
|
||||
|
||||
// concurrentDialsCh limits the number of concurrent dials the ConnPool can make.
|
||||
// This should prevent from creating an excees number of connections during temporary
|
||||
// spikes in workload at vmselect and vmstorage nodes.
|
||||
@@ -31,7 +29,6 @@ type ConnPool struct {
|
||||
conns []connWithTimestamp
|
||||
|
||||
isStopped bool
|
||||
stopCh chan struct{}
|
||||
|
||||
// lastDialError contains the last error seen when dialing remote addr.
|
||||
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
|
||||
@@ -73,11 +70,7 @@ func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Fun
|
||||
name: name,
|
||||
handshakeFunc: handshakeFunc,
|
||||
compressionLevel: compressionLevel,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
cp.cond.L = &cp.mu
|
||||
|
||||
cp.checkAvailability(true)
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
|
||||
cp.mu.Lock()
|
||||
@@ -108,7 +101,6 @@ func (cp *ConnPool) MustStop() {
|
||||
cp.mu.Lock()
|
||||
isStopped := cp.isStopped
|
||||
cp.isStopped = true
|
||||
close(cp.stopCh)
|
||||
for _, c := range cp.conns {
|
||||
_ = c.bc.Close()
|
||||
}
|
||||
@@ -142,57 +134,41 @@ func (cp *ConnPool) Addr() string {
|
||||
|
||||
// Get returns free connection from the pool.
|
||||
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
|
||||
cp.cond.L.Lock()
|
||||
defer cp.cond.L.Unlock()
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc != nil {
|
||||
// Fast path - obtained the connection from pool.
|
||||
return bc, nil
|
||||
}
|
||||
return cp.getConnSlow()
|
||||
}
|
||||
|
||||
maxAttempts := 5
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc != nil {
|
||||
// Fast path - obtained the connection from pool.
|
||||
func (cp *ConnPool) getConnSlow() (*handshake.BufferedConn, error) {
|
||||
for {
|
||||
select {
|
||||
// Limit the number of concurrent dials.
|
||||
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
case cp.concurrentDialsCh <- struct{}{}:
|
||||
// Create new connection.
|
||||
conn, err := cp.dialAndHandshake()
|
||||
<-cp.concurrentDialsCh
|
||||
return conn, err
|
||||
default:
|
||||
// Make attempt to get already established connections from the pool.
|
||||
// It may appear there while waiting for cp.concurrentDialsCh.
|
||||
bc, err := cp.tryGetConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bc == nil {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// Slow path - no free connections in the pool.
|
||||
go func() {
|
||||
// notify waiting goroutines about the new connection
|
||||
// notify even if err != nil, so that they can unblock and try to get a connection again
|
||||
defer cp.cond.Signal()
|
||||
|
||||
select {
|
||||
// Limit the number of concurrent dials.
|
||||
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
|
||||
case cp.concurrentDialsCh <- struct{}{}:
|
||||
// Create new connection.
|
||||
bc, err := cp.dialAndHandshake()
|
||||
<-cp.concurrentDialsCh
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
if cp.isStopped {
|
||||
_ = bc.Close()
|
||||
return
|
||||
}
|
||||
cp.conns = append(cp.conns, connWithTimestamp{
|
||||
bc: bc,
|
||||
lastActiveTime: fasttime.UnixTimestamp(),
|
||||
})
|
||||
case <-cp.stopCh:
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
cp.cond.Wait()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("cannot get connection from pool %s: no free connections after %d attempts", cp.name, maxAttempts)
|
||||
}
|
||||
|
||||
func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
@@ -218,6 +194,9 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
}
|
||||
|
||||
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
|
||||
cp.mu.Lock()
|
||||
defer cp.mu.Unlock()
|
||||
|
||||
if cp.isStopped {
|
||||
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
|
||||
}
|
||||
@@ -249,7 +228,6 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
|
||||
bc: bc,
|
||||
lastActiveTime: fasttime.UnixTimestamp(),
|
||||
})
|
||||
cp.cond.Signal()
|
||||
}
|
||||
cp.mu.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user