Compare commits

..

6 Commits

Author SHA1 Message Date
Jiekun
0baa091b77 feature: [memory limit] add cluster version 2025-07-04 15:41:28 +08:00
leiwingqueen
7bdd01e23e app/vmagent: rename flag remoteWrite.retryMaxTime
This commit renames flag `remoteWrite.retryMaxTime` into `remoteWrite.retryMaxInterval`. New name aligns with corresponding `MinInterval` flag. Previous flag name still could be used, but vmagent will log warning message with suggested migration.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9169
2025-07-03 18:32:56 +02:00
Vadim Alekseev
697bb33354 deployment/docker: add configuration examples of vlagent for Docker
Modified the existing HA setups to use vlagent instead of writing to two
sources

Related to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9328
2025-07-03 18:32:55 +02:00
Max Kotliar
fb07c59da4 lib/fs: enhance MustReadAt panic message
enhance `MustReadAt` panic message to include filename for easier debugging of out-of-range read

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9106
2025-07-03 18:32:55 +02:00
f41gh7
e06cc2c69b lib/fs: make linter happy for windows build
Signed-off-by: f41gh7 <nik@victoriametrics.com>
2025-07-03 18:16:04 +02:00
Andrei Baidarov
81c15c21a9 apptest: Add an integration test for /graphite/metrics/index.json (#9357)
While working on #8134 it has been discovered that
`/graphite/index.json` always returns an empty result. This PR adds a
test and a fix. This fix is a no-op for the master but adding it here in
order to reduce the diff in #8134.

Co-authored-by: Artem Fetishev <149964189+rtm0@users.noreply.github.com>
Co-authored-by: Artem Fetishev <rtm@victoriametrics.com>
2025-07-03 15:20:11 +02:00
19 changed files with 302 additions and 117 deletions

View File

@@ -38,8 +38,10 @@ var (
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
"is sent after temporary unavailability of the remote storage. See also -maxIngestionRate")
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url")
retryMinInterval = flagutil.NewArrayDuration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxTime")
retryMaxTime = flagutil.NewArrayDuration("remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. Change this value if it is expected for -remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
retryMinInterval = flagutil.NewArrayDuration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
// deprecated in the future. use -remoteWrite.retryMaxInterval instead
retryMaxTime = flagutil.NewArrayDuration("remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. This flag is deprecated, use -remoteWrite.retryMaxInterval instead")
retryMaxInterval = flagutil.NewArrayDuration("remoteWrite.retryMaxInterval", time.Minute, "The maximum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. The delay doubles with each retry until this maximum is reached, after which it remains constant. See also -remoteWrite.retryMinInterval")
proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234")
@@ -97,7 +99,7 @@ type client struct {
hc *http.Client
retryMinInterval time.Duration
retryMaxTime time.Duration
retryMaxInterval time.Duration
sendBlock func(block []byte) bool
authCfg *promauth.Config
@@ -151,6 +153,10 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
Transport: authCfg.NewRoundTripper(tr),
Timeout: sendTimeout.GetOptionalArg(argIdx),
}
retryMaxIntervalFlag := retryMaxTime
if retryMaxInterval.String() != "" {
retryMaxIntervalFlag = retryMaxInterval
}
c := &client{
sanitizedURL: sanitizedURL,
remoteWriteURL: remoteWriteURL,
@@ -159,7 +165,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
fq: fq,
hc: hc,
retryMinInterval: retryMinInterval.GetOptionalArg(argIdx),
retryMaxTime: retryMaxTime.GetOptionalArg(argIdx),
retryMaxInterval: retryMaxIntervalFlag.GetOptionalArg(argIdx),
stopCh: make(chan struct{}),
}
c.sendBlock = c.sendBlockHTTP
@@ -404,7 +410,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) {
// Otherwise, it tries sending the block to remote storage indefinitely.
func (c *client) sendBlockHTTP(block []byte) bool {
c.rl.Register(len(block))
maxRetryDuration := timeutil.AddJitterToDuration(c.retryMaxTime)
maxRetryDuration := timeutil.AddJitterToDuration(c.retryMaxInterval)
retryDuration := timeutil.AddJitterToDuration(c.retryMinInterval)
retriesCount := 0

View File

@@ -279,6 +279,9 @@ func initRemoteWriteCtxs(urls []string) {
}
rwctxs := make([]*remoteWriteCtx, len(urls))
rwctxIdx := make([]int, len(urls))
if retryMaxTime.String() != "" {
logger.Warnf("-remoteWrite.retryMaxTime is deprecated; use -remoteWrite.retryMaxInterval instead")
}
for i, remoteWriteURLRaw := range urls {
remoteWriteURL, err := url.Parse(remoteWriteURLRaw)
if err != nil {

View File

@@ -10,8 +10,6 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadogsketches"
@@ -42,11 +40,13 @@ import (
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -81,6 +81,7 @@ var (
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 40, "The maximum number of labels per time series to be accepted. Series with superfluous labels are ignored. In this case the vm_rows_ignored_total{reason=\"too_many_labels\"} metric at /metrics page is incremented")
maxLabelNameLen = flag.Int("maxLabelNameLen", 256, "The maximum length of label name in the accepted time series. Series with longer label name are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_name\"} metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 4*1024, "The maximum length of label values in the accepted time series. Series with longer label value are ignored. In this case the vm_rows_ignored_total{reason=\"too_long_label_value\"} metric at /metrics page is incremented")
maxMemoryUsage = flag.Int("insert.circuitBreakMemoryUsage", 90, "Reject insert requests when memory usage exceeds a certain percentage. 0 means no circuit breaking. An integer value from 1-100 represents 1%-100%.")
)
var (
@@ -217,6 +218,12 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// This is not our link.
return false
}
if *maxMemoryUsage >= 1 && *maxMemoryUsage <= 100 {
if memory.CurrentPercentage() > *maxMemoryUsage {
httpserver.Errorf(w, r, "server overloaded, request rejected by circuit breaker")
return true
}
}
at, err := auth.NewTokenPossibleMultitenant(p.AuthToken)
if err != nil {
httpserver.Errorf(w, r, "auth error: %s", err)

View File

@@ -3,6 +3,7 @@ package graphite
import (
"flag"
"fmt"
"math"
"net/http"
"regexp"
"sort"
@@ -210,7 +211,7 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
deadline := searchutil.GetDeadlineForQuery(r, startTime)
jsonp := r.FormValue("jsonp")
denyPartialResponse := httputil.GetDenyPartialResponse(r)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, nil, 0)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, math.MaxInt, nil, 0)
metricNames, isPartial, err := netstorage.LabelValues(nil, denyPartialResponse, "__name__", sq, 0, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain metric names: %w`, err)

View File

@@ -25,6 +25,11 @@ type PrometheusQuerier interface {
PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse
PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse
PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte
// TODO(@rtm0): Prometheus does not provide this API. Either move it to a
// separate interface or rename this interface to allow for multiple querier
// types.
GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse
}
// Writer contains methods for writing new data
@@ -395,6 +400,10 @@ type TSDBStatusResponse struct {
Data TSDBStatusResponseData
}
// GraphiteMetricsIndexResponse is an in-memory representation of the json response
// returned by the /graphite/metrics/index.json endpoint.
type GraphiteMetricsIndexResponse = []string
// AdminTenantsResponse is an in-memory representation of the json response
// returned by the /api/v1/admin/tenants endpoint.
type AdminTenantsResponse struct {

View File

@@ -0,0 +1,62 @@
package tests
import (
"testing"
"github.com/google/go-cmp/cmp"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
func testMetricsIndex(t *testing.T, sut at.PrometheusWriteQuerier) {
// verify index is empty at the start
expected := at.GraphiteMetricsIndexResponse{}
tenant := "1:2"
got := sut.GraphiteMetricsIndex(t, at.QueryOpts{Tenant: tenant})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// Mon Feb 5 09:57:36 CET 2024
const ingestTimestamp = ` 1707123456700`
dataSet := []string{
`metric_name_1{label="foo"} 10`,
`metric_name_1{label="bar"} 10`,
`metric_name_2{label="baz"} 20`,
`metric_name_1{label="baz"} 10`,
`metric_name_3{label="baz"} 30`,
}
for idx := range dataSet {
dataSet[idx] += ingestTimestamp
}
sut.PrometheusAPIV1ImportPrometheus(t, dataSet, at.QueryOpts{Tenant: tenant})
sut.ForceFlush(t)
// verify ingested metrics correctly returned in index response
expected = []string{"metric_name_1", "metric_name_2", "metric_name_3"}
got = sut.GraphiteMetricsIndex(t, at.QueryOpts{Tenant: tenant})
if diff := cmp.Diff(expected, got); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
}
func TestSingleMetricsIndex(t *testing.T) {
tc := at.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultVmsingle()
testMetricsIndex(tc.T(), sut)
}
func TestClusterMetricsIndex(t *testing.T) {
tc := at.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultCluster()
testMetricsIndex(tc.T(), sut)
}

View File

@@ -227,6 +227,25 @@ func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date strin
return status
}
// GraphiteMetricsIndex sends a query to a /graphite/metrics/index.json
//
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/select/%s/graphite/metrics/index.json", app.httpListenAddr, opts.getTenant())
res, statusCode := app.cli.Get(t, seriesURL)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
var index GraphiteMetricsIndexResponse
if err := json.Unmarshal([]byte(res), &index); err != nil {
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
}
return index
}
// APIV1AdminTenants sends a query to a /admin/tenants endpoint
func (app *Vmselect) APIV1AdminTenants(t *testing.T) *AdminTenantsResponse {
t.Helper()

View File

@@ -318,6 +318,25 @@ func (app *Vmsingle) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
return NewPrometheusAPIV1SeriesResponse(t, res)
}
// GraphiteMetricsIndex sends a query to a /metrics/index.json
//
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#metrics-api
func (app *Vmsingle) GraphiteMetricsIndex(t *testing.T, _ QueryOpts) GraphiteMetricsIndexResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/metrics/index.json", app.httpListenAddr)
res, statusCode := app.cli.Get(t, seriesURL)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
var index GraphiteMetricsIndexResponse
if err := json.Unmarshal([]byte(res), &index); err != nil {
t.Fatalf("could not unmarshal metrics index response data:\n%s\n err: %v", res, err)
}
return index
}
// APIV1StatusMetricNamesStats sends a query to a /api/v1/status/metric_names_stats endpoint
// and returns the statistics response for given params.
//

View File

@@ -44,6 +44,18 @@ services:
deploy:
replicas: 0
# vlagent is needed for HA setup and its replica count is set to 1 in compose-ha.yml file
vlagent:
image: victoriametrics/vlagent:v0.0.1
volumes:
- vlagent:/vlagent
command:
- '--remoteWrite.tmpDataPath=/vlagent'
- '--remoteWrite.url=http://victorialogs:9428/internal/insert'
- '--remoteWrite.url=http://victorialogs-2:9428/internal/insert'
deploy:
replicas: 0
victoriametrics:
image: victoriametrics/victoria-metrics:v1.112.0
ports:
@@ -63,3 +75,4 @@ volumes:
victorialogs:
victorialogs-2:
victoriametrics:
vlagent:

View File

@@ -2,3 +2,6 @@ services:
victorialogs-2:
deploy:
replicas: 1
vlagent:
deploy:
replicas: 1

View File

@@ -32,20 +32,8 @@
[OUTPUT]
Name http
Match *
host victorialogs
port 9428
compress gzip
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
format json_lines
json_date_format iso8601
header AccountID 0
header ProjectID 0
[OUTPUT]
Name http
Match *
host victorialogs-2
port 9428
host vlagent
port 9429
compress gzip
uri /insert/jsonline?_stream_fields=stream,path&_msg_field=log&_time_field=date
format json_lines

View File

@@ -13,12 +13,7 @@ input {
output {
http {
url => "http://victorialogs:9428/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
format => "json"
http_method => "post"
}
http {
url => "http://victorialogs-2:9428/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
url => "http://vlagent:9429/insert/jsonline?_stream_fields=host.name,stream&_msg_field=log&_time_field=time"
format => "json"
http_method => "post"
}

View File

@@ -1,8 +1,7 @@
exporters:
elasticsearch:
endpoints:
- http://victorialogs:9428/insert/elasticsearch
- http://victorialogs-2:9428/insert/elasticsearch
- http://vlagent:9429/insert/elasticsearch
receivers:
filelog:
include: [/var/lib/docker/containers/**/*.log]

View File

@@ -18,26 +18,7 @@ sinks:
type: http
inputs:
- parser
uri: http://victorialogs:9428/insert/jsonline
encoding:
codec: json
framing:
method: newline_delimited
compression: gzip
healthcheck:
enabled: false
request:
headers:
AccountID: '0'
ProjectID: '0'
VL-Stream-Fields: source_type,host,container_name,label.com.docker.compose.service
VL-Msg-Field: message.msg
VL-Time-Field: timestamp
vlogs-2:
type: http
inputs:
- parser
uri: http://victorialogs-2:9428/insert/jsonline
uri: http://vlagent:9429/insert/jsonline
encoding:
codec: json
framing:

View File

@@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): remove duplicate kubernetes targets from [service-discovery-debug](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging) page. See [8626](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8626) issue for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `concurrency` option to kafka remoteWrite producer. See [9249](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9249) issue for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): introduce a new flag `-retryMaxInterval` as a replacement for the deprecated `-retryMaxTime` flag. The new flag more accurately reflects the behavior it controls. See [#9169](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9169) for more details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `/api/v1/notifiers` API endpoint for returning list of configured or discovered notifiers.
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `datasource_type` query argument for `/api/v1/rules` and `/api/v1/alerts` endpoints to filter response by rule's datasource [type](https://docs.victoriametrics.com/victoriametrics/vmalert/#groups). See [#8537](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8537).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): respect group order defined in the rule file during [replay mode](https://docs.victoriametrics.com/victoriametrics/vmalert/#rules-backfilling) to allow chained group if needed. See [#9334](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9334).
@@ -29,7 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add ability to proxy `/api/v1/notifiers` to vmalert when `-vmalert.proxyURL` is set. See [9267](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9267) PR for details.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `vm_cache_eviction_bytes_total` counter metrics to reflect cache evictions due to expiration, misses and cache size. See [9293](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9293) PR for details. Thanks to the @BenNF
* FEATURE: all the [VictoriaMetrics Enterprise](https://docs.victoriametrics.com/enterprise.html) components: improve error message when an empty license is provided via the `-license` or `-licenseFile` command-line flags. See [#9337](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9337) for the details.
* FEATURE: [vmselect](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): attempt to reuse an existing connection from the pool on dial or handshake failure when creating a new connection. The change should reduce transient handshake or dial errors between `vmselect` and `vmstorage`. See [#9345](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9345)
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): enhance `MustReadAt` panic message to include filename for easier debugging of out-of-range reads. See [#9106](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9106).
* BUGFIX: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683) and [dashboards/vmalert](https://grafana.com/grafana/dashboards/14950): fix ad-hoc filters auto-complete and filtering on panels that use MetricsQL specific expressions. See [#8657](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8657).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): automatically retry requests failing with `Expired Token` errors. This helps to avoid failed backups when using [EKS Pod Identity](https://docs.aws.amazon.com/eks/latest/userguide/pod-id-how-it-works.html) for authentication. See [#9280](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9280).

View File

@@ -12,7 +12,7 @@ import (
// at windows only files could be synced
// Sync for directories is not supported.
func mustSyncPath(path string) {
func mustSyncPath(_ string) {
}
func mustRemoveDirAtomic(dir string) {
@@ -66,7 +66,7 @@ var (
mmapByAddr = map[uintptr]windows.Handle{}
)
func mmap(fd int, length int) ([]byte, error) {
func mmap(fd, length int) ([]byte, error) {
flProtect := uint32(windows.PAGE_READONLY)
dwDesiredAccess := uint32(windows.FILE_MAP_READ)
// https://learn.microsoft.com/en-us/windows/win32/memory/creating-a-file-mapping-object#file-mapping-size
@@ -81,7 +81,11 @@ func mmap(fd int, length int) ([]byte, error) {
windows.CloseHandle(h)
return nil, os.NewSyscallError("MapViewOfFile", errno)
}
data := unsafe.Slice((*byte)(unsafe.Pointer(addr)), length)
// mitigate go vet false positive
// https://github.com/golang/go/issues/58625
addrPtr := *(*unsafe.Pointer)(unsafe.Pointer(&addr))
data := unsafe.Slice((*byte)(addrPtr), length)
mmapByAddrLock.Lock()
mmapByAddr[addr] = h
@@ -121,7 +125,7 @@ func mustGetFreeSpace(path string) uint64 {
}
// stub
func fadviseSequentialRead(f *os.File, prefetch bool) error {
func fadviseSequentialRead(_ *os.File, _ bool) error {
return nil
}

View File

@@ -73,7 +73,7 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
}
} else {
if off > int64(len(mr.mmapData)-len(p)) {
logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d", off, len(mr.mmapData)-len(p), len(p))
logger.Panicf("BUG: off=%d is out of allowed range [0...%d] for len(p)=%d in file %q", off, len(mr.mmapData)-len(p), len(p), r.path)
}
src := mr.mmapData[off:]
// The copy() below may result in thread block as described at https://valyala.medium.com/mmap-in-go-considered-harmful-d92a25cb161d .

View File

@@ -1,18 +1,28 @@
package memory
import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/metrics"
)
var (
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
allowedPercent = flag.Float64("memory.allowedPercent", 60, `Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage`)
allowedBytes = flagutil.NewBytes("memory.allowedBytes", 0, `Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage`)
memCheckInterval = flag.Duration("memory.checkInterval", 2*time.Second, "How often to check the memory usage.")
)
var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
@@ -20,10 +30,13 @@ var _ = metrics.NewGauge("process_memory_limit_bytes", func() float64 {
})
var (
allowedMemory int
remainingMemory int
memoryLimit int
allowedMemory int
remainingMemory int
memoryLimit int
currentMemory atomic.Int64
currentMemoryPercentage atomic.Int32
)
var once sync.Once
func initOnce() {
@@ -45,6 +58,35 @@ func initOnce() {
remainingMemory = memoryLimit - allowedMemory
logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedBytes=%s", allowedMemory, remainingMemory, allowedBytes.String())
}
if *memCheckInterval == 0 {
return
}
// enable memory detection if configured
currentAvailableBytes, _ := getAvailableMemory()
currentUsedBytes := max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
go func() {
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
t := time.NewTicker(*memCheckInterval)
defer t.Stop()
for {
select {
case <-sighupCh:
return
case <-t.C:
currentAvailableBytes, _ = getAvailableMemory()
currentUsedBytes = max(0, memoryLimit-currentAvailableBytes)
currentMemory.Store(int64(currentUsedBytes))
currentMemoryPercentage.Store(int32(currentUsedBytes * 100 / memoryLimit))
logger.Infof("current: %dMiB, total: %dMiB, percent: %d%%", currentUsedBytes/1024/1024, memoryLimit/1024/1024, currentMemoryPercentage.Load())
}
}
}()
}
// Allowed returns the amount of system memory allowed to use by the app.
@@ -62,3 +104,58 @@ func Remaining() int {
once.Do(initOnce)
return remainingMemory
}
// Current return memory usage in byte. The value is updated every 5 seconds.
func Current() int {
once.Do(initOnce)
return int(currentMemory.Load())
}
// CurrentPercentage return memory usage percentage in [0-100] int. The value is updated every 5 seconds.
func CurrentPercentage() int {
once.Do(initOnce)
return int(currentMemoryPercentage.Load())
}
func sysCurrentMemory() int {
am, err := getAvailableMemory()
if err != nil {
return 0
}
return am
}
// getAvailableMemory parse /proc/meminfo and return MemAvailable in byte.
func getAvailableMemory() (int, error) {
b, err := os.ReadFile("/proc/meminfo")
if err != nil {
return 0, err
}
s := bufio.NewScanner(bytes.NewReader(b))
for s.Scan() {
fields := strings.Fields(s.Text())
if fields[0] != "MemAvailable:" {
continue
}
val, err := strconv.ParseInt(fields[1], 0, 64)
if err != nil {
return 0, err
}
switch len(fields) {
case 2:
return int(val), nil
case 3:
if fields[2] != "kB" {
return 0, fmt.Errorf("%w: unsupported unit in optional 3rd field %q", ErrFileParse, fields[2])
}
return int(1024 * val), nil
default:
return 0, fmt.Errorf("%w: malformed line %q", ErrFileParse, s.Text())
}
}
return 0, fmt.Errorf("AvailableMemory not found")
}
var (
ErrFileParse = errors.New("error parsing file")
)

View File

@@ -16,8 +16,6 @@ type ConnPool struct {
mu sync.Mutex
d *TCPDialer
cond sync.Cond
// concurrentDialsCh limits the number of concurrent dials the ConnPool can make.
// This should prevent from creating an excees number of connections during temporary
// spikes in workload at vmselect and vmstorage nodes.
@@ -31,7 +29,6 @@ type ConnPool struct {
conns []connWithTimestamp
isStopped bool
stopCh chan struct{}
// lastDialError contains the last error seen when dialing remote addr.
// When it is non-nil and conns is empty, then ConnPool.Get() return this error.
@@ -73,11 +70,7 @@ func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Fun
name: name,
handshakeFunc: handshakeFunc,
compressionLevel: compressionLevel,
stopCh: make(chan struct{}),
}
cp.cond.L = &cp.mu
cp.checkAvailability(true)
_ = ms.NewGauge(fmt.Sprintf(`vm_tcpdialer_conns_idle{name=%q, addr=%q}`, name, addr), func() float64 {
cp.mu.Lock()
@@ -108,7 +101,6 @@ func (cp *ConnPool) MustStop() {
cp.mu.Lock()
isStopped := cp.isStopped
cp.isStopped = true
close(cp.stopCh)
for _, c := range cp.conns {
_ = c.bc.Close()
}
@@ -142,57 +134,41 @@ func (cp *ConnPool) Addr() string {
// Get returns free connection from the pool.
func (cp *ConnPool) Get() (*handshake.BufferedConn, error) {
cp.cond.L.Lock()
defer cp.cond.L.Unlock()
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
// Fast path - obtained the connection from pool.
return bc, nil
}
return cp.getConnSlow()
}
maxAttempts := 5
for i := 0; i < maxAttempts; i++ {
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc != nil {
// Fast path - obtained the connection from pool.
func (cp *ConnPool) getConnSlow() (*handshake.BufferedConn, error) {
for {
select {
// Limit the number of concurrent dials.
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
case cp.concurrentDialsCh <- struct{}{}:
// Create new connection.
conn, err := cp.dialAndHandshake()
<-cp.concurrentDialsCh
return conn, err
default:
// Make attempt to get already established connections from the pool.
// It may appear there while waiting for cp.concurrentDialsCh.
bc, err := cp.tryGetConn()
if err != nil {
return nil, err
}
if bc == nil {
time.Sleep(100 * time.Millisecond)
continue
}
return bc, nil
}
// Slow path - no free connections in the pool.
go func() {
// notify waiting goroutines about the new connection
// notify even if err != nil, so that they can unblock and try to get a connection again
defer cp.cond.Signal()
select {
// Limit the number of concurrent dials.
// This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2552
case cp.concurrentDialsCh <- struct{}{}:
// Create new connection.
bc, err := cp.dialAndHandshake()
<-cp.concurrentDialsCh
if err != nil {
return
}
cp.mu.Lock()
defer cp.mu.Unlock()
if cp.isStopped {
_ = bc.Close()
return
}
cp.conns = append(cp.conns, connWithTimestamp{
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
case <-cp.stopCh:
return
}
}()
cp.cond.Wait()
}
return nil, fmt.Errorf("cannot get connection from pool %s: no free connections after %d attempts", cp.name, maxAttempts)
}
func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
@@ -218,6 +194,9 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
}
func (cp *ConnPool) tryGetConn() (*handshake.BufferedConn, error) {
cp.mu.Lock()
defer cp.mu.Unlock()
if cp.isStopped {
return nil, fmt.Errorf("conn pool to %s cannot be used, since it is stopped", cp.d.addr)
}
@@ -249,7 +228,6 @@ func (cp *ConnPool) Put(bc *handshake.BufferedConn) {
bc: bc,
lastActiveTime: fasttime.UnixTimestamp(),
})
cp.cond.Signal()
}
cp.mu.Unlock()
}