mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
expose topN average memory bytes consumption queries in /api/v1/status/top_queries (#10350)
### Describe Your Changes part of https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9330 ### Checklist The following checks are **mandatory**: - [x] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist). - [x] My change adheres to [VictoriaMetrics development goals](https://docs.victoriametrics.com/victoriametrics/goals/). --------- Signed-off-by: JAYICE <1185430411@qq.com>
This commit is contained in:
@@ -1713,6 +1713,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
return nil, err
|
||||
}
|
||||
defer rml.Put(uint64(rollupMemorySize))
|
||||
qs.addMemoryUsage(rollupMemorySize)
|
||||
qt.Printf("the rollup evaluation needs an estimated %d bytes of RAM for %d series and %d points per series (summary %d points)",
|
||||
rollupMemorySize, timeseriesLen, pointsPerSeries, rollupPoints)
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
|
||||
if querystats.Enabled() {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
querystats.RegisterQuery(q, ec.End-ec.Start, startTime)
|
||||
querystats.RegisterQuery(q, ec.End-ec.Start, startTime, ec.QueryStats.memoryUsage())
|
||||
ec.QueryStats.addExecutionTimeMsec(startTime)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ type QueryStats struct {
|
||||
ExecutionDuration atomic.Pointer[time.Duration]
|
||||
// SeriesFetched contains the number of series fetched from storage or cache.
|
||||
SeriesFetched atomic.Int64
|
||||
// MemoryUsage contains the estimated memory consumption of the query
|
||||
MemoryUsage atomic.Int64
|
||||
|
||||
at *auth.Token
|
||||
|
||||
@@ -53,3 +55,17 @@ func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) {
|
||||
d := time.Since(startTime)
|
||||
qs.ExecutionDuration.Store(&d)
|
||||
}
|
||||
|
||||
func (qs *QueryStats) addMemoryUsage(memoryUsage int64) {
|
||||
if qs == nil {
|
||||
return
|
||||
}
|
||||
qs.MemoryUsage.Store(memoryUsage)
|
||||
}
|
||||
|
||||
func (qs *QueryStats) memoryUsage() int64 {
|
||||
if qs == nil {
|
||||
return 0
|
||||
}
|
||||
return qs.MemoryUsage.Load()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
)
|
||||
@@ -15,7 +16,8 @@ import (
|
||||
var (
|
||||
lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 20000, "Query stats for /api/v1/status/top_queries is tracked on this number of last queries. "+
|
||||
"Zero value disables query stats tracking")
|
||||
minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", time.Millisecond, "The minimum duration for queries to track in query stats at /api/v1/status/top_queries. Queries with lower duration are ignored in query stats")
|
||||
minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", time.Millisecond, "The minimum duration for queries to track in query stats at /api/v1/status/top_queries. Queries with lower duration are ignored in query stats")
|
||||
minQueryMemoryUsage = flagutil.NewBytes("search.queryStats.minQueryMemoryUsage", 1024, "The minimum memory bytes consumption for queries to track in query stats at /api/v1/status/top_queries. Queries with lower memory bytes consumption are ignored in query stats")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -31,9 +33,9 @@ func Enabled() bool {
|
||||
// RegisterQuery registers the query on the given timeRangeMsecs, which has been started at startTime.
|
||||
//
|
||||
// RegisterQuery must be called when the query is finished.
|
||||
func RegisterQuery(query string, timeRangeMsecs int64, startTime time.Time) {
|
||||
func RegisterQuery(query string, timeRangeMsecs int64, startTime time.Time, memoryUsage int64) {
|
||||
initOnce.Do(initQueryStats)
|
||||
qsTracker.registerQuery(query, timeRangeMsecs, startTime)
|
||||
qsTracker.registerQuery(query, timeRangeMsecs, startTime, memoryUsage)
|
||||
}
|
||||
|
||||
// WriteJSONQueryStats writes query stats to given writer in json format.
|
||||
@@ -54,6 +56,7 @@ type queryStatRecord struct {
|
||||
timeRangeSecs int64
|
||||
registerTime time.Time
|
||||
duration time.Duration
|
||||
memoryUsage int64
|
||||
}
|
||||
|
||||
type queryStatKey struct {
|
||||
@@ -66,8 +69,8 @@ func initQueryStats() {
|
||||
if recordsCount <= 0 {
|
||||
recordsCount = 1
|
||||
} else {
|
||||
logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s",
|
||||
*lastQueriesCount, *minQueryDuration)
|
||||
logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s, -search.queryStats.minQueryMemoryUsage=%s",
|
||||
*lastQueriesCount, *minQueryDuration, minQueryMemoryUsage)
|
||||
}
|
||||
qsTracker = &queryStatsTracker{
|
||||
a: make([]queryStatRecord, recordsCount),
|
||||
@@ -78,6 +81,7 @@ func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLife
|
||||
fmt.Fprintf(w, `{"topN":"%d","maxLifetime":"%s",`, topN, maxLifetime)
|
||||
fmt.Fprintf(w, `"search.queryStats.lastQueriesCount":%d,`, *lastQueriesCount)
|
||||
fmt.Fprintf(w, `"search.queryStats.minQueryDuration":"%s",`, *minQueryDuration)
|
||||
fmt.Fprintf(w, `"search.queryStats.minQueryMemoryUsage":"%s",`, minQueryMemoryUsage)
|
||||
fmt.Fprintf(w, `"topByCount":[`)
|
||||
topByCount := qst.getTopByCount(topN, maxLifetime)
|
||||
for i, r := range topByCount {
|
||||
@@ -102,15 +106,28 @@ func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLife
|
||||
fmt.Fprintf(w, `,`)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, `],"topByAvgMemoryUsage":[`)
|
||||
topByAvgMemoryConsumption := qst.getTopByAvgMemoryUsage(topN, maxLifetime)
|
||||
for i, r := range topByAvgMemoryConsumption {
|
||||
fmt.Fprintf(w, `{"query":%s,"timeRangeSeconds":%d,"avgMemoryBytes":%d,"count":%d}`, stringsutil.JSONString(r.query), r.timeRangeSecs, r.memoryUsage, r.count)
|
||||
if i+1 < len(topByAvgMemoryConsumption) {
|
||||
fmt.Fprintf(w, `,`)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, `]}`)
|
||||
}
|
||||
|
||||
func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64, startTime time.Time) {
|
||||
func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64, startTime time.Time, memoryUsage int64) {
|
||||
registerTime := time.Now()
|
||||
duration := registerTime.Sub(startTime)
|
||||
if duration < *minQueryDuration {
|
||||
return
|
||||
}
|
||||
if memoryUsage < int64(minQueryMemoryUsage.IntN()) {
|
||||
return
|
||||
}
|
||||
|
||||
qst.mu.Lock()
|
||||
defer qst.mu.Unlock()
|
||||
@@ -126,6 +143,7 @@ func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64,
|
||||
r.timeRangeSecs = timeRangeMsecs / 1000
|
||||
r.registerTime = registerTime
|
||||
r.duration = duration
|
||||
r.memoryUsage = memoryUsage
|
||||
}
|
||||
|
||||
func (r *queryStatRecord) matches(currentTime time.Time, maxLifetime time.Duration) bool {
|
||||
@@ -257,3 +275,47 @@ func (qst *queryStatsTracker) getTopBySumDuration(topN int, maxLifetime time.Dur
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
type queryStatByMemory struct {
|
||||
query string
|
||||
timeRangeSecs int64
|
||||
memoryUsage int64
|
||||
count int
|
||||
}
|
||||
|
||||
func (qst *queryStatsTracker) getTopByAvgMemoryUsage(topN int, maxLifetime time.Duration) []queryStatByMemory {
|
||||
currentTime := time.Now()
|
||||
qst.mu.Lock()
|
||||
type countSum struct {
|
||||
count int
|
||||
sum int64
|
||||
}
|
||||
m := make(map[queryStatKey]countSum)
|
||||
for _, r := range qst.a {
|
||||
if r.matches(currentTime, maxLifetime) {
|
||||
k := r.key()
|
||||
ks := m[k]
|
||||
ks.count++
|
||||
ks.sum += r.memoryUsage
|
||||
m[k] = ks
|
||||
}
|
||||
}
|
||||
qst.mu.Unlock()
|
||||
|
||||
var a []queryStatByMemory
|
||||
for k, ks := range m {
|
||||
a = append(a, queryStatByMemory{
|
||||
query: k.query,
|
||||
timeRangeSecs: k.timeRangeSecs,
|
||||
memoryUsage: ks.sum / int64(ks.count),
|
||||
count: ks.count,
|
||||
})
|
||||
}
|
||||
sort.Slice(a, func(i, j int) bool {
|
||||
return a[i].memoryUsage > a[j].memoryUsage
|
||||
})
|
||||
if len(a) > topN {
|
||||
a = a[:topN]
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules): add new alerting rules `PersistentQueueRunsOutOfSpaceIn12Hours` and `PersistentQueueRunsOutOfSpaceIn4Hours` for `vmagent` persistent queue capacity. These alerts help users to take proactive actions before `vmagent` starts dropping metrics due to insufficient persistent queue space. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193)
|
||||
* FEATURE: All VictoriaMetrics components: add build version information to the home page for consistency with other projects. See [#10249](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10249).
|
||||
* FEATURE: all VictoriaMetrics components: add flag `fs.disableMincore`, which allows to disable `mincore` syscall. See [#10327](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10327).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): expose topN average memory bytes consumption queries in `/api/v1/status/top_queries`. It can help users to find queries that consume a lot of memory and potentially cause OOM. See [#9330](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9330).
|
||||
|
||||
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): stop backend health checks for URL prefixes defined in `url_map` during configuration reloads. Previously, stale backends kept being health-checked and produced repeated warning logs after reloads. See [#10334](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10334).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly return [/api/v1/status/tsdb](https://docs.victoriametrics.com/victoriametrics/#tsdb-stats) response for time range outside [partition index](https://docs.victoriametrics.com/victoriametrics/#indexdb). See [#10315](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10315).
|
||||
|
||||
Reference in New Issue
Block a user