diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 3493efae13..66044e89a5 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -40,6 +40,8 @@ var ( "The saved data survives unclean shutdowns such as OOM crash, hardware reset, SIGKILL, etc. "+ "Bigger intervals may help increase the lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+ "Smaller intervals increase disk IO load. Minimum supported value is 1s") + maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmsingle can receive per second. Data ingestion is paused when the limit is exceeded. "+ + "By default there are no limits on samples ingestion rate.") ) func main() { @@ -86,6 +88,7 @@ func main() { storage.SetDataFlushInterval(*inmemoryDataFlushInterval) vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) vmselect.Init() + vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate) vminsert.Init() startSelfScraper() @@ -107,6 +110,7 @@ func main() { } logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds()) vminsert.Stop() + vminsertcommon.StopIngestionRateLimiter() vmstorage.Stop() vmselect.Stop() diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index b5f1379e8a..0a98d70b2c 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -4,16 +4,48 @@ import ( "fmt" "net/http" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" ) +// StartIngestionRateLimiter starts ingestion rate limiter. +// +// Ingestion rate limiter must be started before Init() call. +// +// StopIngestionRateLimiter must be called before Stop() call in order to unblock all the callers +// to ingestion rate limiter. Otherwise deadlock may occur at Stop() call. +func StartIngestionRateLimiter(maxIngestionRate int) { + if maxIngestionRate <= 0 { + return + } + ingestionRateLimitReached := metrics.NewCounter(`vm_max_ingestion_rate_limit_reached_total`) + ingestionRateLimiterStopCh = make(chan struct{}) + ingestionRateLimiter = ratelimiter.New(int64(maxIngestionRate), ingestionRateLimitReached, ingestionRateLimiterStopCh) +} + +// StopIngestionRateLimiter stops ingestion rate limiter. +func StopIngestionRateLimiter() { + if ingestionRateLimiterStopCh == nil { + return + } + close(ingestionRateLimiterStopCh) + ingestionRateLimiterStopCh = nil +} + +var ( + ingestionRateLimiter *ratelimiter.RateLimiter + ingestionRateLimiterStopCh chan struct{} +) + // InsertCtx contains common bits for data points insertion. type InsertCtx struct { Labels sortedLabels @@ -172,9 +204,12 @@ func (ctx *InsertCtx) FlushBufs() error { } matchIdxsPool.Put(matchIdxs) } + ingestionRateLimiter.Register(len(ctx.mrs)) + // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, // since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter // used at every stream.Parse() call under lib/protoparser/* + err := vmstorage.AddRows(ctx.mrs) ctx.Reset(0) if err == nil { diff --git a/apptest/client.go b/apptest/client.go index 4956ceea67..480ed9d6bd 100644 --- a/apptest/client.go +++ b/apptest/client.go @@ -128,7 +128,7 @@ func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 { return res } } - t.Fatalf("metic not found: %s", metricName) + t.Fatalf("metric not found: %s", metricName) return 0 } diff --git a/apptest/tests/key_concepts_test.go b/apptest/tests/key_concepts_test.go index 5eec915e2a..24f519e25d 100644 --- a/apptest/tests/key_concepts_test.go +++ b/apptest/tests/key_concepts_test.go @@ -3,9 +3,10 @@ package tests import ( "testing" - at "github.com/VictoriaMetrics/VictoriaMetrics/apptest" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + + at "github.com/VictoriaMetrics/VictoriaMetrics/apptest" ) // Data used in examples in diff --git a/apptest/tests/maxingestionrate_test.go b/apptest/tests/maxingestionrate_test.go new file mode 100644 index 0000000000..b0ca2fa3d9 --- /dev/null +++ b/apptest/tests/maxingestionrate_test.go @@ -0,0 +1,33 @@ +package tests + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +// Data used in tests +var testData = []string{ + "foo_bar 1.00", + "foo_bar 2.00", +} + +func TestSingleMaxIngestionRateIncrementsMetric(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + sut := tc.MustStartVmsingle("vmsingle", []string{"-maxIngestionRate=1"}) + sut.PrometheusAPIV1ImportPrometheus(t, testData, apptest.QueryOpts{}) + if got := sut.GetMetric(t, "vm_max_ingestion_rate_limit_reached_total"); got <= 0 { + t.Fatalf("Unexpected vm_max_ingestion_rate_limit_reached_total: got %f, want >0", got) + } +} + +func TestSingleMaxIngestionRateDoesNotIncrementMetric(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + sut := tc.MustStartVmsingle("vmsingle", []string{"-maxIngestionRate=15"}) + sut.PrometheusAPIV1ImportPrometheus(t, testData, apptest.QueryOpts{}) + if got, want := sut.GetMetric(t, "vm_max_ingestion_rate_limit_reached_total"), 0.0; got != want { + t.Fatalf("Unexpected vm_max_ingestion_rate_limit_reached_total: got %f, want >0", got) + } +} diff --git a/docs/README.md b/docs/README.md index e9cde90770..249ddfca48 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1715,6 +1715,7 @@ See also [resource usage limits docs](#resource-usage-limits). By default, VictoriaMetrics is tuned for an optimal resource usage under typical workloads. Some workloads may need fine-grained resource usage limits. In these cases the following command-line flags may be useful: +- `-maxIngestionRate` limits samples/second ingested. This may be useful when CPU resources are limited or overloaded. - `-memory.allowedPercent` and `-memory.allowedBytes` limit the amounts of memory, which may be used for various internal caches at VictoriaMetrics. Note that VictoriaMetrics may use more memory, since these flags don't limit additional memory, which may be needed on a per-query basis. - `-search.maxMemoryPerQuery` limits the amounts of memory, which can be used for processing a single query. Queries, which need more memory, are rejected. @@ -1793,7 +1794,7 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical - `-search.maxExportSeries` limits maximum number of time series, which can be returned from [/api/v1/export* APIs](#how-to-export-data-in-json-line-format). The duration of the export queries is limited via `-search.maxExportDuration` flag. This option allows limiting memory usage. - `-search.maxTSDBStatusSeries` limits maximum number of time series, which can be processed during the call to [/api/v1/status/tsdb](#tsdb-stats). - The duration of the status queries is limited via `-search.maxStatusRequestDuration` flag. This option allows limiting memory usage. + The duration of the status queries is limited via `-search.maxStatusRequestDuration` flag. This option allows limiting memory usage. See also [resource usage limits at VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/#resource-usage-limits), [cardinality limiter](#cardinality-limiter) and [capacity planning docs](#capacity-planning). @@ -2969,6 +2970,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit -maxConcurrentInserts int The maximum number of concurrent insert requests. Set higher value when clients send data over slow networks. Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage. See also -insert.maxQueueDuration (default 32) + -maxIngestionRate int + The maximum number of samples vmsingle can receive per second. Data ingestion is paused when the limit is exceeded + By default there are no limits on samples ingestion rate. -maxInsertRequestSize size The maximum size in bytes of a single Prometheus remote_write API request Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432) diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 7cafeb0c38..a960a1cd7f 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -22,6 +22,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add export data functionality for the `Raw Query` page and the ability to import exported data into the `Query Analyzer` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7628). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `markdown` support for comments during data export. [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7828). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): added `min` and `max` metrics for Datadog Sketches API metrics, changed `_` metric name separator to `.` if metrics are not sanitized for consistency. +* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): support `-maxIngestionRate` cmd-line flag to ratelimit samples/sec ingested. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7377) for details. * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly set `host` field at debug information formatted with `dump_request_on_errors: true` setting. @@ -56,6 +57,7 @@ Released at 2024-12-13 * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow to start `vmauth` with empty configuration file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6467) for details. * FEATURE: [vmalert-tool](https://docs.victoriametrics.com/vmalert-tool/): support debug mode for alerting rule. See [this doc](https://docs.victoriametrics.com/vmalert-tool/#debug-mode). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): update error messages for Clipboard API issues with docs links. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7677). +* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): support `-maxIngestionRate` flag to ratelimit samples/sec ingested * BUGFIX: all VictoriaMetrics components: consistently deduplicate values with stale markers within deduplication interval. Previously, deduplication could randomly prefer stale marker or value on the deduplication interval. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7674) for details. Thanks to @tIGO for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7675). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add missing common service labels for docker swarm service discovery when `role` is set to `tasks`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7800).