From b843f0e229d4099faabaaa59befeec2c82512cd0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 26 Mar 2022 10:17:37 +0200 Subject: [PATCH] app/vmselect: add fine-grained limits for the number of returned/scanned time series for various APIs --- app/vmselect/graphite/tags_api.go | 12 ++++---- app/vmselect/netstorage/netstorage.go | 14 +++++---- app/vmselect/prometheus/prometheus.go | 33 +++++++++++++-------- app/vmselect/promql/eval.go | 19 +++++++----- app/vmselect/promql/exec_test.go | 2 ++ app/vmstorage/transport/server.go | 42 +++++++++++++++++++++------ docs/CHANGELOG.md | 12 ++++++++ docs/README.md | 16 ++++++++-- docs/Single-server-VictoriaMetrics.md | 16 ++++++++-- lib/storage/index_db.go | 10 +++---- lib/storage/index_db_test.go | 4 +-- lib/storage/search.go | 26 ++++++++++++++--- lib/storage/storage.go | 4 +-- 13 files changed, 152 insertions(+), 58 deletions(-) diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index 984636d66c..84628e2dd0 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -54,7 +54,7 @@ func TagsDelSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWr }) } tfss := joinTagFilterss(tfs, etfs) - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tfss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tfss, 0) n, err := netstorage.DeleteSeries(at, sq, deadline) if err != nil { return fmt.Errorf("cannot delete series for %q: %w", sq, err) @@ -199,7 +199,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R } } else { // Slow path: use netstorage.SearchMetricNames for applying `expr` filters. - sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs) + sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs, limit*10) if err != nil { return err } @@ -288,7 +288,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, at *auth.Token, w http.Res } } else { // Slow path: use netstorage.SearchMetricNames for applying `expr` filters. - sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs) + sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs, limit*10) if err != nil { return err } @@ -356,7 +356,7 @@ func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseW if err != nil { return fmt.Errorf("cannot setup tag filters: %w", err) } - sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs) + sq, err := getSearchQueryForExprs(startTime, at, etfs, exprs, limit*10) if err != nil { return err } @@ -484,14 +484,14 @@ func getInt(r *http.Request, argName string) (int, error) { return n, nil } -func getSearchQueryForExprs(startTime time.Time, at *auth.Token, etfs [][]storage.TagFilter, exprs []string) (*storage.SearchQuery, error) { +func getSearchQueryForExprs(startTime time.Time, at *auth.Token, etfs [][]storage.TagFilter, exprs []string, maxMetrics int) (*storage.SearchQuery, error) { tfs, err := exprsToTagFilters(exprs) if err != nil { return nil, err } ct := startTime.UnixNano() / 1e6 tfss := joinTagFilterss(tfs, etfs) - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tfss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tfss, maxMetrics) return sq, nil } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 7d74f5fec1..5bc3b3fc30 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1084,7 +1084,7 @@ func deduplicateStrings(a []string) []string { } // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats -func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) { +func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -1095,7 +1095,7 @@ func GetTSDBStatusForDate(at *auth.Token, denyPartialResponse bool, deadline sea } snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} { sn.tsdbStatusRequests.Inc() - status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, deadline) + status, err := sn.getTSDBStatusForDate(at.AccountID, at.ProjectID, date, topN, maxMetrics, deadline) if err != nil { sn.tsdbStatusErrors.Inc() err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) @@ -1790,10 +1790,10 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline sea return tagEntries, nil } -func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) { +func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) { var status *storage.TSDBStatus f := func(bc *handshake.BufferedConn) error { - st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN) + st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN, maxMetrics) if err != nil { return err } @@ -2254,7 +2254,7 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account } } -func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN int) (*storage.TSDBStatus, error) { +func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, error) { // Send the request to sn. if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { return nil, err @@ -2267,6 +2267,10 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac if err := writeUint32(bc, uint32(topN)); err != nil { return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err) } + // maxMetrics shouldn't exceed 32 bits, so send it as uint32. + if err := writeUint32(bc, uint32(maxMetrics)); err != nil { + return nil, fmt.Errorf("cannot send maxMetrics=%d to conn: %w", maxMetrics, err) + } if err := bc.Flush(); err != nil { return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %w", err) } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 863749d6af..ff9da17cfa 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -45,6 +45,12 @@ var ( maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+ "points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data") selectNodes = flagutil.NewArray("selectNode", "Comma-serparated addresses of vmselect nodes; usage: -selectNode=vmselect-host1,...,vmselect-hostN") + + maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage") + maxFederateSeries = flag.Int("search.maxFederateSeries", 300e3, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage") + maxExportSeries = flag.Int("search.maxExportSeries", 1e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage") + maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 1e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage") + maxSeriesLimit = flag.Int("search.maxSeries", 10e3, "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage") ) // Default step used if not set. @@ -81,7 +87,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, if err != nil { return err } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxFederateSeries) denyPartialResponse := searchutils.GetDenyPartialResponse(r) rss, isPartial, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, deadline) if err != nil { @@ -142,7 +148,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter if err != nil { return err } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries) w.Header().Set("Content-Type", "text/csv; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -242,7 +248,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri if err != nil { return err } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries) w.Header().Set("Content-Type", "VictoriaMetrics/native") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -393,7 +399,8 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match return err } tagFilterss = searchutils.JoinTagFilterss(tagFilterss, etfs) - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries) + w.Header().Set("Content-Type", contentType) bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -497,7 +504,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { return err } ct := startTime.UnixNano() / 1e6 - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tagFilterss, 0) deletedCount, err := netstorage.DeleteSeries(at, sq, deadline) if err != nil { return fmt.Errorf("cannot delete time series: %w", err) @@ -658,7 +665,7 @@ func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName if len(tagFilterss) == 0 { logger.Panicf("BUG: tagFilterss must be non-empty") } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxSeriesLimit) m := make(map[string]struct{}) isPartial := false if end-start > 24*3600*1000 { @@ -777,12 +784,12 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite var status *storage.TSDBStatus var isPartial bool if len(matches) == 0 && len(etfs) == 0 { - status, isPartial, err = netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN) + status, isPartial, err = netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN, *maxTSDBStatusSeries) if err != nil { return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err) } } else { - status, isPartial, err = tsdbStatusWithMatches(at, denyPartialResponse, matches, etfs, date, topN, deadline) + status, isPartial, err = tsdbStatusWithMatches(at, denyPartialResponse, matches, etfs, date, topN, *maxTSDBStatusSeries, deadline) if err != nil { return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err) } @@ -798,7 +805,7 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite return nil } -func tsdbStatusWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etfs [][]storage.TagFilter, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) { +func tsdbStatusWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etfs [][]storage.TagFilter, date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) { tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { return nil, false, err @@ -809,7 +816,7 @@ func tsdbStatusWithMatches(at *auth.Token, denyPartialResponse bool, matches []s } start := int64(date*secsPerDay) * 1000 end := int64(date*secsPerDay+secsPerDay) * 1000 - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, maxMetrics) status, isPartial, err := netstorage.GetTSDBStatusWithFilters(at, denyPartialResponse, deadline, sq, topN) if err != nil { return nil, false, err @@ -906,7 +913,7 @@ func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []strin if len(tagFilterss) == 0 { logger.Panicf("BUG: tagFilterss must be non-empty") } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxSeriesLimit) m := make(map[string]struct{}) isPartial := false if end-start > 24*3600*1000 { @@ -1009,7 +1016,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if start >= end { end = start + defaultStep } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxSeriesLimit) denyPartialResponse := searchutils.GetDenyPartialResponse(r) if end-start > 24*3600*1000 { // It is cheaper to call SearchMetricNames on time ranges exceeding a day. @@ -1158,6 +1165,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r Start: start, End: start, Step: step, + MaxSeries: *maxUniqueTimeseries, QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), Deadline: deadline, LookbackDelta: lookbackDelta, @@ -1251,6 +1259,7 @@ func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite Start: start, End: end, Step: step, + MaxSeries: *maxUniqueTimeseries, QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), Deadline: deadline, MayCache: mayCache, diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index ccb8888c51..74577f37a0 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -95,6 +95,10 @@ type EvalConfig struct { End int64 Step int64 + // MaxSeries is the maximum number of time series, which can be scanned by the query. + // Zero means 'no limit' + MaxSeries int + // QuotedRemoteAddr contains quoted remote address. QuotedRemoteAddr string @@ -121,13 +125,14 @@ type EvalConfig struct { timestampsOnce sync.Once } -// newEvalConfig returns new EvalConfig copy from src. -func newEvalConfig(src *EvalConfig) *EvalConfig { +// copyEvalConfig returns src copy. +func copyEvalConfig(src *EvalConfig) *EvalConfig { var ec EvalConfig ec.AuthToken = src.AuthToken ec.Start = src.Start ec.End = src.End ec.Step = src.Step + ec.MaxSeries = src.MaxSeries ec.Deadline = src.Deadline ec.MayCache = src.MayCache ec.LookbackDelta = src.LookbackDelta @@ -592,7 +597,7 @@ func evalRollupFunc(ec *EvalConfig, funcName string, rf rollupFunc, expr metrics return nil, fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)) } atTimestamp := int64(tssAt[0].Values[0] * 1000) - ecNew := newEvalConfig(ec) + ecNew := copyEvalConfig(ec) ecNew.Start = atTimestamp ecNew.End = atTimestamp tss, err := evalRollupFuncWithoutAt(ecNew, funcName, rf, expr, re, iafc) @@ -619,7 +624,7 @@ func evalRollupFuncWithoutAt(ec *EvalConfig, funcName string, rf rollupFunc, exp var offset int64 if re.Offset != nil { offset = re.Offset.Duration(ec.Step) - ecNew = newEvalConfig(ecNew) + ecNew = copyEvalConfig(ecNew) ecNew.Start -= offset ecNew.End -= offset // There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true, @@ -632,7 +637,7 @@ func evalRollupFuncWithoutAt(ec *EvalConfig, funcName string, rf rollupFunc, exp // in order to obtain expected OHLC results. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462 step := ecNew.Step - ecNew = newEvalConfig(ecNew) + ecNew = copyEvalConfig(ecNew) ecNew.Start += step ecNew.End += step offset -= step @@ -697,7 +702,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, funcName string, rf rollupFunc, } window := re.Window.Duration(ec.Step) - ecSQ := newEvalConfig(ec) + ecSQ := copyEvalConfig(ec) ecSQ.Start -= window + maxSilenceInterval + step ecSQ.End += step ecSQ.Step = step @@ -853,7 +858,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, funcName string, rf rollupFunc } else { minTimestamp -= ec.Step } - sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss) + sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries) rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, ec.DenyPartialResponse, sq, true, ec.Deadline) if err != nil { return nil, err diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index c7f09f4d6c..c6f213119e 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -71,6 +71,7 @@ func TestExecSuccess(t *testing.T) { Start: start, End: end, Step: step, + MaxSeries: 1000, Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), RoundDigits: 100, } @@ -7510,6 +7511,7 @@ func TestExecError(t *testing.T) { Start: 1000, End: 2000, Step: 100, + MaxSeries: 1000, Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), RoundDigits: 100, } diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index c3720aa47a..1db10b3933 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -28,7 +28,7 @@ var ( maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search") maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search") maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find") - maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series a single query can process. This allows protecting against heavy queries, which select unexpectedly high number of series. See also -search.maxSamplesPerQuery and -search.maxSamplesPerSeries") + maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. Zero means 'no limit'. See also -search.max* command-line flags at vmselect") precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss") disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") @@ -905,9 +905,17 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error { if err != nil { return fmt.Errorf("cannot read topN: %w", err) } + maxMetricsUint32, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read MaxMetrics: %w", err) + } + maxMetrics := int(maxMetricsUint32) + if maxMetrics < 0 { + return fmt.Errorf("too big value for MaxMetrics=%d; must be smaller than 2e9", maxMetricsUint32) + } // Execute the request - status, err := s.storage.GetTSDBStatusWithFiltersForDate(accountID, projectID, nil, uint64(date), int(topN), ctx.deadline) + status, err := s.storage.GetTSDBStatusWithFiltersForDate(accountID, projectID, nil, uint64(date), int(topN), maxMetrics, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -950,8 +958,9 @@ func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) e if err := ctx.setupTfss(s.storage, tr); err != nil { return ctx.writeErrorMessage(err) } + maxMetrics := ctx.getMaxMetrics() date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000) - status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, int(topN), ctx.deadline) + status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, int(topN), maxMetrics, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -1008,7 +1017,8 @@ func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error if err := ctx.setupTfss(s.storage, tr); err != nil { return ctx.writeErrorMessage(err) } - mns, err := s.storage.SearchMetricNames(ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline) + maxMetrics := ctx.getMaxMetrics() + mns, err := s.storage.SearchMetricNames(ctx.tfss, tr, maxMetrics, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -1056,7 +1066,8 @@ func (s *Server) processVMSelectSearch(ctx *vmselectRequestCtx) error { return ctx.writeErrorMessage(err) } startTime := time.Now() - ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline) + maxMetrics := ctx.getMaxMetrics() + ctx.sr.Init(s.storage, ctx.tfss, tr, maxMetrics, ctx.deadline) indexSearchDuration.UpdateDuration(startTime) defer ctx.sr.MustClose() if err := ctx.sr.Error(); err != nil { @@ -1130,6 +1141,18 @@ var ( vmselectMetricRowsRead = metrics.NewCounter(`vm_vmselect_metric_rows_read_total`) ) +func (ctx *vmselectRequestCtx) getMaxMetrics() int { + maxMetrics := ctx.sq.MaxMetrics + if maxMetrics <= 0 || maxMetrics > *maxMetricsPerSearch { + maxMetrics = *maxMetricsPerSearch + } + if maxMetrics <= 0 { + // The limit is missing. + maxMetrics = 2e9 + } + return maxMetrics +} + func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRange) error { tfss := ctx.tfss[:0] accountID := ctx.sq.AccountID @@ -1140,13 +1163,14 @@ func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRang tf := &tagFilters[i] if string(tf.Key) == "__graphite__" { query := tf.Value - paths, err := s.SearchGraphitePaths(accountID, projectID, tr, query, *maxMetricsPerSearch, ctx.deadline) + maxMetrics := ctx.getMaxMetrics() + paths, err := s.SearchGraphitePaths(accountID, projectID, tr, query, maxMetrics, ctx.deadline) if err != nil { return fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) } - if len(paths) >= *maxMetricsPerSearch { - return fmt.Errorf("more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; "+ - "either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value", *maxMetricsPerSearch, query) + if len(paths) >= maxMetrics { + return fmt.Errorf("more than %d time series match Graphite query %q; "+ + "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query) } tfs.AddGraphiteQuery(query, paths, tf.IsNegative) continue diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index baa280a056..3231853514 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,18 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls: + + * `-search.maxFederateSeries` for limiting the number of time series, which can be returned from [/federate](https://docs.victoriametrics.com/#federation). + * `-search.maxExportSeries` for limiting the number of time series, which can be returned from [/api/v1/export](https://docs.victoriametrics.com/#how-to-export-time-series). + * `-search.maxSeries` for limiting the number of time series, which can be returned from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series). + * `-search.maxTSDBStatusSeries` for limiting the number of time series, which can be scanned during the request to [/api/v1/status/tsdb](https://docs.victoriametrics.com/#tsdb-stats). + * `-search.maxGraphiteSeries` for limiting the number of time series, which can be scanned during the request to [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage). + +Previously the `-search.maxUniqueTimeseries` command-line flag was used as a global limit for all these APIs. Now the `-search.maxUniqueTimeseries` is used only for limiting the number of time series, which can be scanned during requests to [/api/v1/query](https://docs.victoriametrics.com/url-examples.html#apiv1query) and [/api/v1/query_range](https://docs.victoriametrics.com/url-examples.html#apiv1query_range). + +When using [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), these command-line flags (including `-search.maxUniqueTimeseries`) must be passed to `vmselect` instead of `vmstorage`. + * BUGFIX: return `Content-Type: text/html` response header when requesting `/` HTTP path at VictoriaMetrics components. Previously `text/plain` response header was returned, which could lead to broken page formatting. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2323). ## [v1.75.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.75.0) diff --git a/docs/README.md b/docs/README.md index 9965210173..121413b6a5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -820,13 +820,13 @@ Send a request to `http://:8428/api/v1/export/native?match where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to export. Use `{__name__=~".*"}` selector for fetching all the time series. -On large databases you may experience problems with limit on unique timeseries (default value is 300000). In this case you need to adjust `-search.maxUniqueTimeseries` parameter: +On large databases you may experience problems with limit on the number of time series, which can be exported. In this case you need to adjust `-search.maxExportSeries` command-line flag: ```bash # count unique timeseries in database wget -O- -q 'http://your_victoriametrics_instance:8428/api/v1/series/count' | jq '.data[0]' -# relaunch victoriametrics with search.maxUniqueTimeseries more than value from previous command +# relaunch victoriametrics with search.maxExportSeries more than value from previous command ``` Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either @@ -1835,6 +1835,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of concurrent search requests. It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration (default 8) -search.maxExportDuration duration The maximum duration for /api/v1/export call (default 720h0m0s) + -search.maxExportSeries int + The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage (default 1000000) + -search.maxFederateSeries int + The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage (default 300000) + -search.maxGraphiteSeries int + The maximum number of time series, which can be scanned during queries to Graphite Render API. See https://docs.victoriametrics.com/#graphite-render-api-usage (default 300000) -search.maxLookback duration Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons -search.maxPointsPerTimeseries int @@ -1850,12 +1856,16 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries (default 1000000000) -search.maxSamplesPerSeries int The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage (default 30000000) + -search.maxSeries int + The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage (default 10000) -search.maxStalenessInterval duration The maximum interval for staleness calculations. By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. See also '-search.maxLookback' flag, which has the same meaning due to historical reasons -search.maxStatusRequestDuration duration The maximum duration for /api/v1/status/* requests (default 5m0s) -search.maxStepForPointsAdjustment duration The maximum step when /api/v1/query_range handler adjusts points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data (default 1m0s) + -search.maxTSDBStatusSeries int + The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage (default 1000000) -search.maxTagKeys int The maximum number of tag keys returned from /api/v1/labels (default 100000) -search.maxTagValueSuffixesPerSearch int @@ -1863,7 +1873,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -search.maxTagValues int The maximum number of tag values returned from /api/v1/label//values (default 100000) -search.maxUniqueTimeseries int - The maximum number of unique time series each search can scan. This option allows limiting memory usage (default 300000) + The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage (default 300000) -search.minStalenessInterval duration The minimum interval for staleness calculations. This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. See also '-search.maxStalenessInterval' -search.noStaleMarkers diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index e0d6e3a658..cf8aa1a6ec 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -824,13 +824,13 @@ Send a request to `http://:8428/api/v1/export/native?match where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to export. Use `{__name__=~".*"}` selector for fetching all the time series. -On large databases you may experience problems with limit on unique timeseries (default value is 300000). In this case you need to adjust `-search.maxUniqueTimeseries` parameter: +On large databases you may experience problems with limit on the number of time series, which can be exported. In this case you need to adjust `-search.maxExportSeries` command-line flag: ```bash # count unique timeseries in database wget -O- -q 'http://your_victoriametrics_instance:8428/api/v1/series/count' | jq '.data[0]' -# relaunch victoriametrics with search.maxUniqueTimeseries more than value from previous command +# relaunch victoriametrics with search.maxExportSeries more than value from previous command ``` Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either @@ -1839,6 +1839,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of concurrent search requests. It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration (default 8) -search.maxExportDuration duration The maximum duration for /api/v1/export call (default 720h0m0s) + -search.maxExportSeries int + The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage (default 1000000) + -search.maxFederateSeries int + The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage (default 300000) + -search.maxGraphiteSeries int + The maximum number of time series, which can be scanned during queries to Graphite Render API. See https://docs.victoriametrics.com/#graphite-render-api-usage (default 300000) -search.maxLookback duration Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons -search.maxPointsPerTimeseries int @@ -1854,12 +1860,16 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of raw samples a single query can process across all time series. This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries (default 1000000000) -search.maxSamplesPerSeries int The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage (default 30000000) + -search.maxSeries int + The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage (default 10000) -search.maxStalenessInterval duration The maximum interval for staleness calculations. By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. See also '-search.maxLookback' flag, which has the same meaning due to historical reasons -search.maxStatusRequestDuration duration The maximum duration for /api/v1/status/* requests (default 5m0s) -search.maxStepForPointsAdjustment duration The maximum step when /api/v1/query_range handler adjusts points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data (default 1m0s) + -search.maxTSDBStatusSeries int + The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage (default 1000000) -search.maxTagKeys int The maximum number of tag keys returned from /api/v1/labels (default 100000) -search.maxTagValueSuffixesPerSearch int @@ -1867,7 +1877,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -search.maxTagValues int The maximum number of tag values returned from /api/v1/label//values (default 100000) -search.maxUniqueTimeseries int - The maximum number of unique time series each search can scan. This option allows limiting memory usage (default 300000) + The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage (default 300000) -search.minStalenessInterval duration The minimum interval for staleness calculations. This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. See also '-search.maxStalenessInterval' -search.noStaleMarkers diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index dc6f559dc5..baaa104771 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1338,9 +1338,9 @@ func (is *indexSearch) getSeriesCount() (uint64, error) { } // GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss, date, accountID and projectID. -func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { +func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) { is := db.getIndexSearch(accountID, projectID, deadline) - status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN) + status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics) db.putIndexSearch(is) if err != nil { return nil, err @@ -1350,7 +1350,7 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, } ok := db.doExtDB(func(extDB *indexDB) { is := extDB.getIndexSearch(accountID, projectID, deadline) - status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN) + status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics) extDB.putIndexSearch(is) }) if ok && err != nil { @@ -1360,14 +1360,14 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, } // getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date. -func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int) (*TSDBStatus, error) { +func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) { var filter *uint64set.Set if len(tfss) > 0 { tr := TimeRange{ MinTimestamp: int64(date) * msecPerDay, MaxTimestamp: int64(date+1) * msecPerDay, } - metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9) + metricIDs, err := is.searchMetricIDsInternal(tfss, tr, maxMetrics) if err != nil { return nil, err } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 7a0cf67d33..8cc8129aad 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1846,7 +1846,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check GetTSDBStatusWithFiltersForDate with nil filters. - status, err := db.GetTSDBStatusWithFiltersForDate(accountID, projectID, nil, baseDate, 5, noDeadline) + status, err := db.GetTSDBStatusWithFiltersForDate(accountID, projectID, nil, baseDate, 5, 1e6, noDeadline) if err != nil { t.Fatalf("error in GetTSDBStatusWithFiltersForDate with nil filters: %s", err) } @@ -1914,7 +1914,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil { t.Fatalf("cannot add filter: %s", err) } - status, err = db.GetTSDBStatusWithFiltersForDate(accountID, projectID, []*TagFilters{tfs}, baseDate, 5, noDeadline) + status, err = db.GetTSDBStatusWithFiltersForDate(accountID, projectID, []*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline) if err != nil { t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err) } diff --git a/lib/storage/search.go b/lib/storage/search.go index 8033172c18..351fb8bd8f 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -256,21 +256,32 @@ func (s *Search) NextMetricBlock() bool { // SearchQuery is used for sending search queries from vmselect to vmstorage. type SearchQuery struct { - AccountID uint32 - ProjectID uint32 + AccountID uint32 + ProjectID uint32 + + // The time range for searching time series MinTimestamp int64 MaxTimestamp int64 - TagFilterss [][]TagFilter + + // Tag filters for the search query + TagFilterss [][]TagFilter + + // The maximum number of time series the search query can return. + MaxMetrics int } // NewSearchQuery creates new search query for the given args. -func NewSearchQuery(accountID, projectID uint32, start, end int64, tagFilterss [][]TagFilter) *SearchQuery { +func NewSearchQuery(accountID, projectID uint32, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery { + if maxMetrics <= 0 { + maxMetrics = 2e9 + } return &SearchQuery{ AccountID: accountID, ProjectID: projectID, MinTimestamp: start, MaxTimestamp: end, TagFilterss: tagFilterss, + MaxMetrics: maxMetrics, } } @@ -375,6 +386,7 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte { dst = tagFilters[i].Marshal(dst) } } + dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics)) return dst } @@ -438,6 +450,12 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { sq.TagFilterss[i] = tagFilters } + if len(src) < 4 { + return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4) + } + sq.MaxMetrics = int(encoding.UnmarshalUint32(src)) + src = src[4:] + return src, nil } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index f387cb931f..35717eece4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1544,8 +1544,8 @@ func (s *Storage) GetSeriesCount(accountID, projectID uint32, deadline uint64) ( } // GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters and the given (accountID, projectID). -func (s *Storage) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { - return s.idb().GetTSDBStatusWithFiltersForDate(accountID, projectID, tfss, date, topN, deadline) +func (s *Storage) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) { + return s.idb().GetTSDBStatusWithFiltersForDate(accountID, projectID, tfss, date, topN, maxMetrics, deadline) } // MetricRow is a metric to insert into storage.