Compare commits

...

15 Commits

Author SHA1 Message Date
Artem Fetishev
5551715af9 fix code review remarks
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:08 +02:00
Artem Fetishev
7a5e63128a fix code review remarks
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:08 +02:00
Artem Fetishev
81c3e7998a fix ai code review remarks
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:07 +02:00
Artem Fetishev
ad5d1b014d sync changes from cluster
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:07 +02:00
Artem Fetishev
c2dc13f7a7 remove a todo
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:06 +02:00
Artem Fetishev
91f982d516 fix code review remarks
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:06 +02:00
Artem Fetishev
dd4ff824b7 extract vmsingle-specific code into vmstorageSingleNode type
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:06 +02:00
Artem Fetishev
eb8791600b sync change with vmstorage
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:05 +02:00
Artem Fetishev
e286966d20 minor fixes
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:05 +02:00
Artem Fetishev
9f5fc7228f reuse vmstorage type throughout vmsingle code
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:04 +02:00
Artem Fetishev
39800154b6 vmstorage: extract code that is the same in cluster into a separate type
Initial commit:

- Copied the vmstorage.go and deps from origin/shared-vmstorage-cluster branch,
- made necessary corrections to make the code compile,
- the binary fails to run due to flag dublicates.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-04 14:08:04 +02:00
JAYICE
1baaaf3b31 app/vmagent: introduce metrics for kafka saturation breakdown
This commit adds new metrics for kafka remotewrite:

* vmagent_remotewrite_kafka_outbuf_latency_seconds
* vmagent_remotewrite_kafka_rtt_seconds

 It helps to detect possible network saturation between vmagent and kafka brokers.

fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730
2026-06-04 13:28:05 +02:00
Zhu Jiekun
ec88b9cac6 app/vmauth: properly log user information when a missing route error occurs
properly log user information when a missing route error occurs.

examples:
- `user foo missing route for "http://foo:secret@some-host.com/a/b`
- `user unauthorized missing route for "http://some-host.com/abc?de=fg"`

---

Previously, the error log was not good enough to identify the source of
the request:
```
2026-06-03T06:48:09.020Z	warn	VictoriaMetrics/app/vmauth/main.go:427	remoteAddr: "**.**.**.**, X-Forwarded-For: **.**.**.**"; requestURI: /prometheus/api/v1/write; missing route for "/prometheus/api/v1/write" 
```
It takes extra efforts to check it by metrics such as
`vmauth_user_requests_total`.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11052
2026-06-04 13:14:55 +02:00
Max Kotliar
661fbf947c docs/changelog: add empty line
follow-up on prev commit
5c176838d1
2026-06-03 13:26:14 +03:00
JAYICE
5c176838d1 deployment: upgrade golang builder from 1.26.3 to 1.26.4 (#11053)
upgrade golang builder to fix failed PR pipeline
https://github.com/VictoriaMetrics/VictoriaMetrics/actions/runs/26870814471/job/79245400038?pr=11052

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11053
2026-06-03 13:21:38 +03:00
25 changed files with 1015 additions and 461 deletions

View File

@@ -34,8 +34,21 @@ var (
"This can be changed with -promscrape.config.strictParse=false command-line flag")
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmsingle can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate.")
vmselectMaxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
vmselectMaxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
)
func getDefaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
func main() {
// VictoriaMetrics is optimized for reduced memory allocations,
// so it can run with the reduced GOGC in order to reduce the used memory,
@@ -76,8 +89,8 @@ func main() {
}
logger.Infof("starting VictoriaMetrics at %q...", listenAddrs)
startTime := time.Now()
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vmstorage.Init(*vmselectMaxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
vmselect.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration)
vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate)
vminsert.Init()

View File

@@ -93,7 +93,7 @@ func selfScraper(scrapeInterval time.Duration) {
mr.Value = r.Value
}
}
if err := vmstorage.AddRows(mrs); err != nil {
if err := vmstorage.VMInsertAPI.WriteRows(mrs); err != nil {
logger.Errorf("cannot store self-scraped metrics: %s", err)
}
if len(metadataRows.Rows) > 0 {
@@ -105,7 +105,7 @@ func selfScraper(scrapeInterval time.Duration) {
Type: mm.Type,
})
}
if err := vmstorage.AddMetadataRows(mms); err != nil {
if err := vmstorage.VMInsertAPI.WriteMetadata(mms); err != nil {
logger.Errorf("cannot store self-scraped metrics metadata: %s", err)
}
}

View File

@@ -52,7 +52,7 @@ func writeInputSeries(input []series, interval *promutil.Duration, startStamp ti
data := testutil.Compress(r)
// write input series to vm
httpWrite(dst, bytes.NewBuffer(data))
vmstorage.Storage.DebugFlush()
vmstorage.DebugFlush()
return nil
}

View File

@@ -108,7 +108,9 @@ func UnitTest(files []string, disableGroupLabel bool, externalLabels []string, e
storagePath = tmpFolder
processFlags()
vminsert.Init()
vmselect.Init()
const maxConcurrentRequests = 4
maxQueueDuration := 5 * time.Second
vmselect.Init(maxConcurrentRequests, maxQueueDuration)
// storagePath will be created again when closing vmselect, so remove it again.
defer fs.MustRemoveDir(storagePath)
defer vminsert.Stop()
@@ -279,7 +281,8 @@ func processFlags() {
}
func setUp() {
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
const maxConcurrentRequests = 4
vmstorage.Init(maxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
readyCheckFunc := func() bool {
@@ -384,7 +387,7 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
}
}
// flush series after each group evaluation
vmstorage.Storage.DebugFlush()
vmstorage.DebugFlush()
}
// check alert_rule_test case at every eval time

View File

@@ -317,7 +317,7 @@ func processUserRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tk
defer ui.endConcurrencyLimit()
// Process the request.
processRequest(w, r, ui, tkn)
processRequest(w, r, ui, tkn, userName)
}
func beginConcurrencyLimit(ctx context.Context) error {
@@ -391,7 +391,7 @@ func bufferRequestBody(ctx context.Context, r io.ReadCloser, userName string) (i
return bb, nil
}
func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *jwt.Token) {
func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *jwt.Token, userName string) {
u := normalizeURL(r.URL)
up, hc := ui.getURLPrefixAndHeaders(u, r.Host, r.Header)
isDefault := false
@@ -409,7 +409,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
if ui.DumpRequestOnErrors {
di = debugInfo(u, r)
}
httpserver.Errorf(w, r, "missing route for %q%s", u.String(), di)
httpserver.Errorf(w, r, "user %s missing route for %q%s", userName, u.String(), di)
return
}
up, hc = ui.DefaultURL, ui.HeadersConf
@@ -455,7 +455,7 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
ui.backendErrors.Inc()
}
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("all the %d backends for the user %q are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend", up.getBackendsCount(), ui.name()),
Err: fmt.Errorf("all the %d backends for the user %q are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend", up.getBackendsCount(), userName),
StatusCode: http.StatusBadGateway,
}
httpserver.Errorf(w, r, "%s", err)

View File

@@ -307,6 +307,24 @@ statusCode=200
requested_url={BACKEND}/bar/a/b`
f(cfgStr, requestURL, backendHandler, responseExpected)
// correct authorization but unexisted path, hence missing route error.
cfgStr = `
users:
- username: foo
password: secret
url_map:
- src_paths:
- "/api/v1/write"
url_prefix: "{BACKEND}/bar"`
requestURL = "http://foo:secret@some-host.com/a/b"
backendHandler = func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "requested_url=http://%s%s", r.Host, r.URL)
}
responseExpected = `
statusCode=400
user foo missing route for "http://foo:secret@some-host.com/a/b"`
f(cfgStr, requestURL, backendHandler, responseExpected)
// verify how path cleanup works
cfgStr = `
unauthorized_user:
@@ -403,7 +421,7 @@ unauthorized_user:
}
responseExpected = `
statusCode=400
missing route for "http://some-host.com/abc?de=fg"`
user unauthorized missing route for "http://some-host.com/abc?de=fg"`
f(cfgStr, requestURL, backendHandler, responseExpected)
// missing default_url and default url_prefix for unauthorized user with dump_request_on_errors enabled
@@ -419,7 +437,7 @@ unauthorized_user:
}
responseExpected = `
statusCode=400
missing route for "http://some-host.com/abc?de=fg" (host: "some-host.com"; path: "/abc"; args: "de=fg"; headers:Connection: Some-Header,Other-Header
user unauthorized missing route for "http://some-host.com/abc?de=fg" (host: "some-host.com"; path: "/abc"; args: "de=fg"; headers:Connection: Some-Header,Other-Header
Pass-Header: abc
Some-Header: foobar
X-Forwarded-For: 12.34.56.78
@@ -461,7 +479,7 @@ unauthorized_user:
}
responseExpected = `
statusCode=502
all the 2 backends for the user "" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
all the 2 backends for the user "unauthorized" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
f(cfgStr, requestURL, backendHandler, responseExpected)
// all the backend_urls are unavailable for authorized user
@@ -501,7 +519,7 @@ unauthorized_user:
}
responseExpected = `
statusCode=502
all the 0 backends for the user "" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
all the 0 backends for the user "unauthorized" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
f(cfgStr, requestURL, backendHandler, responseExpected)
netutil.Resolver = origResolver
@@ -518,7 +536,7 @@ unauthorized_user:
}
responseExpected = `
statusCode=502
all the 2 backends for the user "" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
all the 2 backends for the user "unauthorized" are unavailable for proxying the request - check previous WARN logs to see the exact error for each failed backend`
f(cfgStr, requestURL, backendHandler, responseExpected)
if n := retries.Load(); n != 2 {
t.Fatalf("unexpected number of retries; got %d; want 2", n)

View File

@@ -184,7 +184,7 @@ func (ctx *InsertCtx) WriteMetadata(mmpbs []prompb.MetricMetadata) error {
}
ctx.mms = mms
err := vmstorage.AddMetadataRows(mms)
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
if err != nil {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot store metrics metadata: %w", err),
@@ -209,7 +209,7 @@ func (ctx *InsertCtx) WritePromMetadata(mmps []prometheus.Metadata) error {
}
ctx.mms = mms
err := vmstorage.AddMetadataRows(mms)
err := vmstorage.VMInsertAPI.WriteMetadata(mms)
if err != nil {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot store prometheus metrics metadata: %w", err),
@@ -278,7 +278,7 @@ func (ctx *InsertCtx) FlushBufs() error {
// since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter
// used at every stream.Parse() call under lib/protoparser/*
err := vmstorage.AddRows(ctx.mrs)
err := vmstorage.VMInsertAPI.WriteRows(ctx.mrs)
ctx.Reset(0)
if err == nil {
return nil

View File

@@ -283,7 +283,7 @@ func pushAggregateSeries(tss []prompb.TimeSeries) {
}
// There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here,
// since the number of concurrent pushAggregateSeries() calls should be already limited by lib/streamaggr.
if err := vmstorage.AddRows(ctx.mrs); err != nil {
if err := vmstorage.VMInsertAPI.WriteRows(ctx.mrs); err != nil {
logger.Errorf("cannot flush aggregate series: %s", err)
}
}

View File

@@ -1,7 +1,6 @@
package graphite
import (
"flag"
"fmt"
"math"
"net/http"
@@ -21,8 +20,6 @@ import (
"github.com/VictoriaMetrics/metricsql"
)
var maxTagValueSuffixes = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
// MetricsFindHandler implements /metrics/find handler.
//
// See https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
@@ -222,10 +219,11 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
// metricsFind searches for label values that match the given qHead and qTail.
func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byte, isExpand bool, deadline searchutil.Deadline) ([]string, error) {
maxSuffixes := 0 // let vmstorage use its maxTagValueSuffixesPerSearch limit
n := strings.IndexAny(qTail, "*{[")
if n < 0 {
query := qHead + qTail
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
@@ -245,7 +243,7 @@ func metricsFind(tr storage.TimeRange, label, qHead, qTail string, delimiter byt
}
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
query := qHead + qTail[:len(qTail)-1]
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, *maxTagValueSuffixes, deadline)
suffixes, err := netstorage.TagValueSuffixes(nil, tr, label, query, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}

View File

@@ -138,7 +138,9 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
mr.Timestamp = ct
}
vmstorage.RegisterMetricNames(nil, mrs)
if err := vmstorage.VMSelectAPI.RegisterMetricNames(nil, mrs, 0); err != nil {
return err
}
// Return response
contentType := "text/plain; charset=utf-8"

View File

@@ -21,7 +21,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/stats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@@ -36,12 +35,6 @@ var (
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries. It could be passed via authKey query arg. It overrides -httpAuth.*")
metricNamesStatsResetAuthKey = flagutil.NewPassword("metricNamesStatsResetAuthKey", "authKey for resetting metric names usage cache via /api/v1/admin/status/metric_names_stats/reset. It overrides -httpAuth.*. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#track-ingested-metrics-usage")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration and -search.maxMemoryPerQuery")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
"See also -search.logQueryMemoryUsage")
@@ -50,23 +43,17 @@ var (
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
func getDefaultMaxConcurrentRequests() int {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n := min(cgroup.AvailableCPUs()*2, 16)
return n
}
// Init initializes vmselect
func Init() {
func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Duration) {
tmpDirPath := vmstorage.DataPath() + "/tmp"
fs.MustRemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(vmstorage.DataPath() + "/cache/rollupResult")
prometheus.InitMaxUniqueTimeseries(*maxConcurrentRequests)
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
maxConcurrentRequests = vmselectMaxConcurrentRequests
maxQueueDuration = vmselectMaxQueueDuration
concurrencyLimitCh = make(chan struct{}, maxConcurrentRequests)
initVMUIConfig()
initVMAlertProxy()
@@ -78,7 +65,11 @@ func Stop() {
promql.StopRollupResultCache()
}
var concurrencyLimitCh chan struct{}
var (
maxConcurrentRequests int
maxQueueDuration time.Duration
concurrencyLimitCh chan struct{}
)
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_select_limit_reached_total`)
@@ -90,9 +81,6 @@ var (
_ = metrics.NewGauge(`vm_concurrent_select_current`, func() float64 {
return float64(len(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_search_max_unique_timeseries`, func() float64 {
return float64(prometheus.GetMaxUniqueTimeSeries())
})
)
//go:embed vmui
@@ -131,12 +119,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
d := min(searchutil.GetMaxQueryDuration(r), *maxQueueDuration)
d := min(searchutil.GetMaxQueryDuration(r), maxQueueDuration)
t := timerpool.Get(d)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", *maxConcurrentRequests)
qt.Printf("wait in queue because -search.maxConcurrentRequests=%d concurrent requests are executed", maxConcurrentRequests)
defer func() { <-concurrencyLimitCh }()
case <-r.Context().Done():
timerpool.Put(t)
@@ -152,7 +140,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
d.Seconds(), maxConcurrentRequests, maxQueueDuration),
StatusCode: http.StatusTooManyRequests,
}
w.Header().Add("Retry-After", "10")

View File

@@ -27,10 +27,6 @@ import (
)
var (
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned from /api/v1/labels . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned from /api/v1/label/<label_name>/values . "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxSamplesPerSeries = flag.Int("search.maxSamplesPerSeries", 30e6, "The maximum number of raw samples a single query can scan per each time series. This option allows limiting memory usage")
maxSamplesPerQuery = flag.Int("search.maxSamplesPerQuery", 1e9, "The maximum number of raw samples a single query can process across all time series. "+
"This protects from heavy queries, which select unexpectedly high number of raw samples. See also -search.maxSamplesPerSeries")
@@ -80,7 +76,7 @@ func (rss *Results) Cancel() {
}
func (rss *Results) mustClose() {
putStorageSearch(rss.sr)
vmstorage.PutSearch(rss.sr)
rss.sr = nil
putTmpBlocksFile(rss.tbf)
rss.tbf = nil
@@ -758,12 +754,7 @@ var sbhPool sync.Pool
func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (int, error) {
qt = qt.NewChild("delete series: %s", sq)
defer qt.Done()
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return 0, err
}
return vmstorage.DeleteSeries(qt, tfss, sq.MaxMetrics)
return vmstorage.VMSelectAPI.DeleteSeries(qt, sq, deadline.Deadline())
}
// LabelNames returns label names matching the given sq until the given deadline.
@@ -773,15 +764,7 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
maxLabelNames = *maxTagKeysPerSearch
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return nil, err
}
labels, err := vmstorage.SearchLabelNames(qt, tfss, tr, maxLabelNames, sq.MaxMetrics, deadline.Deadline())
labels, err := vmstorage.VMSelectAPI.LabelNames(qt, sq, maxLabelNames, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during labels search on time range: %w", err)
}
@@ -841,15 +824,7 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
maxLabelValues = *maxTagValuesPerSearch
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return nil, err
}
labelValues, err := vmstorage.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, sq.MaxMetrics, deadline.Deadline())
labelValues, err := vmstorage.VMSelectAPI.LabelValues(qt, sq, labelName, maxLabelValues, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
}
@@ -864,7 +839,10 @@ func GetMetricsMetadata(qt *querytracer.Tracer, limit int, metricName string) ([
qt = qt.NewChild("get metrics metadata: limit=%d, metric_name=%q", limit, metricName)
defer qt.Done()
metadata := vmstorage.Storage.GetMetadataRows(qt, limit, metricName)
metadata, err := vmstorage.VMSelectAPI.GetMetadataRecords(qt, nil, limit, metricName, 0)
if err != nil {
return nil, err
}
sort.Slice(metadata, func(i, j int) bool {
return string(metadata[i].MetricFamilyName) < string(metadata[j].MetricFamilyName)
@@ -912,16 +890,11 @@ func TagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagV
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
suffixes, err := vmstorage.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline.Deadline())
suffixes, err := vmstorage.VMSelectAPI.TagValueSuffixes(qt, 0, 0, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w",
tagKey, tagValuePrefix, delimiter, tr.String(), err)
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value",
maxSuffixes, tagKey, tagValuePrefix, delimiter, tr.String())
}
return suffixes, nil
}
@@ -934,13 +907,7 @@ func TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel stri
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(tr.MinTimestamp) / (3600 * 24 * 1000)
status, err := vmstorage.GetTSDBStatus(qt, tfss, date, focusLabel, topN, sq.MaxMetrics, deadline.Deadline())
status, err := vmstorage.VMSelectAPI.TSDBStatus(qt, sq, focusLabel, topN, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during tsdb status request: %w", err)
}
@@ -954,28 +921,13 @@ func SeriesCount(qt *querytracer.Tracer, deadline searchutil.Deadline) (uint64,
if deadline.Exceeded() {
return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
n, err := vmstorage.GetSeriesCount(deadline.Deadline())
n, err := vmstorage.VMSelectAPI.SeriesCount(qt, 0, 0, deadline.Deadline())
if err != nil {
return 0, fmt.Errorf("error during series count request: %w", err)
}
return n, nil
}
func getStorageSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putStorageSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
@@ -989,18 +941,13 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
if deadline.Exceeded() {
return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String())
}
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
sr, _, err := vmstorage.GetSearch(qt, sq, deadline.Deadline())
if err != nil {
return err
}
vmstorage.WG.Add(1)
defer vmstorage.WG.Done()
sr := getStorageSearch()
defer putStorageSearch(sr)
sr.Init(qt, vmstorage.Storage, tfss, tr, sq.MaxMetrics, deadline.Deadline())
defer vmstorage.PutSearch(sr)
// Start workers that call f in parallel on available CPU cores.
workCh := make(chan *exportWork, gomaxprocs*8)
@@ -1093,14 +1040,7 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline
return nil, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String())
}
// Setup search.
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return nil, err
}
metricNames, err := vmstorage.SearchMetricNames(qt, tfss, tr, sq.MaxMetrics, deadline.Deadline())
metricNames, err := vmstorage.VMSelectAPI.SearchMetricNames(qt, sq, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("cannot find metric names: %w", err)
}
@@ -1119,18 +1059,11 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
// Setup search.
tr := sq.GetTimeRange()
tfss, err := setupTfss(qt, tr, sq.TagFilterss, sq.MaxMetrics, deadline)
sr, maxSeriesCount, err := vmstorage.GetSearch(qt, sq, deadline.Deadline())
if err != nil {
return nil, err
}
vmstorage.WG.Add(1)
defer vmstorage.WG.Done()
sr := getStorageSearch()
maxSeriesCount := sr.Init(qt, vmstorage.Storage, tfss, tr, sq.MaxMetrics, deadline.Deadline())
type blockRefs struct {
brs []blockRef
}
@@ -1168,7 +1101,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
blocksRead++
if deadline.Exceeded() {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
vmstorage.PutSearch(sr)
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
}
br := sr.MetricBlockRef.BlockRef
@@ -1180,7 +1113,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
samples += br.RowsCount()
if *maxSamplesPerQuery > 0 && samples > *maxSamplesPerQuery {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
vmstorage.PutSearch(sr)
return nil, fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: increase the -search.maxSamplesPerQuery; "+
"reduce time range for the query; use more specific label filters in order to select fewer series", *maxSamplesPerQuery)
}
@@ -1189,7 +1122,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
addr, err := tbf.WriteBlockRefData(buf)
if err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
vmstorage.PutSearch(sr)
return nil, fmt.Errorf("cannot write %d bytes to temporary file: %w", len(buf), err)
}
@@ -1247,7 +1180,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
if err := sr.Error(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
vmstorage.PutSearch(sr)
if errors.Is(err, storage.ErrDeadlineExceeded) {
return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String())
}
@@ -1255,13 +1188,13 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
}
if err := tbf.Finalize(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
vmstorage.PutSearch(sr)
return nil, fmt.Errorf("cannot finalize temporary file: %w", err)
}
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(m), blocksRead, samples, tbf.Len())
var rss Results
rss.tr = tr
rss.tr = sq.GetTimeRange()
rss.deadline = deadline
pts := make([]packedTimeseries, len(orderedMetricNames))
for i, metricName := range orderedMetricNames {
@@ -1302,35 +1235,6 @@ func getBlockRefsEnd(a []blockRef) uintptr {
return uintptr(unsafe.Pointer(unsafe.SliceData(a))) + uintptr(len(a))*unsafe.Sizeof(blockRef{})
}
func setupTfss(qt *querytracer.Tracer, tr storage.TimeRange, tagFilterss [][]storage.TagFilter, maxMetrics int, deadline searchutil.Deadline) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
for _, tagFilters := range tagFilterss {
tfs := storage.NewTagFilters()
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
paths, err := vmstorage.SearchGraphitePaths(qt, tr, query, maxMetrics, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value; "+
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}
func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) {
// Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157
@@ -1357,13 +1261,12 @@ const maxFastAllocBlockSize = 32 * 1024
func GetMetricNamesStats(qt *querytracer.Tracer, limit, le int, matchPattern string) (metricnamestats.StatsResult, error) {
qt = qt.NewChild("get metric names usage statistics with limit: %d, less or equal to: %d, match pattern=%q", limit, le, matchPattern)
defer qt.Done()
return vmstorage.GetMetricNamesStats(qt, limit, le, matchPattern)
return vmstorage.VMSelectAPI.GetMetricNamesUsageStats(qt, nil, limit, le, matchPattern, 0)
}
// ResetMetricNamesStats resets state of metric names usage
func ResetMetricNamesStats(qt *querytracer.Tracer) error {
qt = qt.NewChild("reset metric names usage stats")
defer qt.Done()
vmstorage.ResetMetricNamesStats(qt)
return nil
return vmstorage.VMSelectAPI.ResetMetricNamesUsageStats(qt, 0)
}

View File

@@ -28,8 +28,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -50,9 +48,6 @@ var (
"If set to true, the query model becomes closer to InfluxDB data model. If set to true, then -search.maxLookback and -search.maxStalenessInterval are ignored")
maxStepForPointsAdjustment = flag.Duration("search.maxStepForPointsAdjustment", time.Minute, "The maximum step when /api/v1/query_range handler adjusts "+
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. "+
"When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).")
maxFederateSeries = flag.Int("search.maxFederateSeries", 1e6, "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage")
maxExportSeries = flag.Int("search.maxExportSeries", 10e6, "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage")
maxTSDBStatusSeries = flag.Int("search.maxTSDBStatusSeries", 10e6, "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage")
@@ -873,7 +868,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
End: start,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -984,7 +979,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
@@ -1320,43 +1315,6 @@ func (sw *scalableWriter) flush() error {
return sw.bw.Flush()
}
var (
maxUniqueTimeseriesValueOnce sync.Once
maxUniqueTimeseriesValue int
)
// InitMaxUniqueTimeseries init the max metrics limit calculated by available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func InitMaxUniqueTimeseries(maxConcurrentRequests int) {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, memory.Remaining())
}
})
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
func GetMaxUniqueTimeSeries() int {
return maxUniqueTimeseriesValue
}
// copied from https://github.com/prometheus/common/blob/adea6285c1c7447fcb7bfdeb6abfc6eff893e0a7/model/metric.go#L483
// it's not possible to use direct import due to increased binary size
func unescapePrometheusLabelName(name string) string {

View File

@@ -4,7 +4,6 @@ import (
"math"
"net/http"
"reflect"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
@@ -230,29 +229,3 @@ func TestGetLatencyOffsetMillisecondsFailure(t *testing.T) {
}
f("http://localhost?latency_offset=foobar")
}
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}

View File

@@ -1,7 +1,6 @@
package vmstorage
import (
"errors"
"flag"
"fmt"
"io"
@@ -9,12 +8,10 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@@ -23,11 +20,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
@@ -39,11 +34,8 @@ var (
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
snapshotsMaxAge = flagutil.NewRetentionDuration("snapshotsMaxAge", "3d", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
_ = flag.Duration("snapshotCreateTimeout", 0, "Deprecated: this flag does nothing")
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
_ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing")
_ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing")
_ = flag.Int("smallMergeConcurrency", 0, "Deprecated: this flag does nothing")
@@ -117,11 +109,7 @@ func DataPath() string {
}
// Init initializes vmstorage.
func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid `-precisionBits`: %s", err)
}
func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
@@ -165,7 +153,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
LogNewSeries: *logNewSeries,
}
strg := storage.MustOpenStorage(*storageDataPath, opts)
initStaleSnapshotsRemover(strg)
vmStorage = newVMStorageSingleNode(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
var m storage.Metrics
strg.UpdateMetrics(&m)
@@ -179,151 +167,32 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
// register storage metrics
storageMetrics = metrics.NewSet()
storageMetrics.RegisterMetricsWriter(func(w io.Writer) {
writeStorageMetrics(w, strg)
})
storageMetrics.RegisterMetricsWriter(vmStorage.writeStorageMetrics)
metrics.RegisterSet(storageMetrics)
WG = syncwg.WaitGroup{}
resetResponseCacheIfNeeded = resetCacheIfNeeded
Storage = strg
VMInsertAPI = vmStorage
VMSelectAPI = vmStorage
GetSearch = vmStorage.GetSearch
PutSearch = vmStorage.PutSearch
RequestHandler = vmStorage.requestHandler
DebugFlush = vmStorage.vms.s.DebugFlush
}
var storageMetrics *metrics.Set
// Storage is a storage.
//
// Every storage call must be wrapped into WG.Add(1) ... WG.Done()
// for proper graceful shutdown when Stop is called.
var Storage *storage.Storage
var (
// vmStorageSingleNode is an instance of vmstorage used by vminsert and
// vmselect for writing and reading data.
vmStorage *VMStorageSingleNode
VMInsertAPI vminsertapi.API
VMSelectAPI vmselectapi.API
GetSearch func(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error)
PutSearch func(sr *storage.Search)
RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// WG must be incremented before Storage call.
//
// Use syncwg instead of sync, since Add is called from concurrent goroutines.
var WG syncwg.WaitGroup
// resetResponseCacheIfNeeded is a callback for automatic resetting of response cache if needed.
var resetResponseCacheIfNeeded func(mrs []storage.MetricRow)
// AddRows adds mrs to the storage.
//
// The caller should limit the number of concurrent calls to AddRows() in order to limit memory usage.
func AddRows(mrs []storage.MetricRow) error {
if Storage.IsReadOnly() {
return errReadOnly
}
resetResponseCacheIfNeeded(mrs)
WG.Add(1)
Storage.AddRows(mrs, uint8(*precisionBits))
WG.Done()
return nil
}
// AddMetadataRows adds mrs to the storage.
//
// The caller should limit the number of concurrent calls to AddMetadataRows() in order to limit memory usage.
func AddMetadataRows(mms []metricsmetadata.Row) error {
if Storage.IsReadOnly() {
return errReadOnly
}
WG.Add(1)
Storage.AddMetadataRows(mms)
WG.Done()
return nil
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
// RegisterMetricNames registers all the metrics from mrs in the storage.
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) {
WG.Add(1)
Storage.RegisterMetricNames(qt, mrs)
WG.Done()
}
// DeleteSeries deletes series matching tfss.
//
// Returns the number of deleted series.
func DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int) (int, error) {
WG.Add(1)
n, err := Storage.DeleteSeries(qt, tfss, maxMetrics)
WG.Done()
return n, err
}
// GetMetricNamesStats returns metric names usage stats with give limit and lte predicate
func GetMetricNamesStats(qt *querytracer.Tracer, limit, le int, matchPattern string) (metricnamestats.StatsResult, error) {
WG.Add(1)
r := Storage.GetMetricNamesStats(qt, limit, le, matchPattern)
WG.Done()
return r, nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func ResetMetricNamesStats(qt *querytracer.Tracer) {
WG.Add(1)
Storage.ResetMetricNamesStats(qt)
WG.Done()
}
// SearchMetricNames returns metric names for the given tfss on the given tr.
func SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1)
metricNames, err := Storage.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
WG.Done()
return metricNames, err
}
// SearchLabelNames searches for tag keys matching the given tfss on tr.
func SearchLabelNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxTagKeys, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1)
labelNames, err := Storage.SearchLabelNames(qt, tfss, tr, maxTagKeys, maxMetrics, deadline)
WG.Done()
return labelNames, err
}
// SearchLabelValues searches for label values for the given labelName, tfss and
// tr.
func SearchLabelValues(qt *querytracer.Tracer, labelName string, tfss []*storage.TagFilters, tr storage.TimeRange, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1)
labelValues, err := Storage.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
WG.Done()
return labelValues, err
}
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
//
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
func SearchTagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
WG.Add(1)
suffixes, err := Storage.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
WG.Done()
return suffixes, err
}
// SearchGraphitePaths returns all the metric names matching the given Graphite query.
func SearchGraphitePaths(qt *querytracer.Tracer, tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
WG.Add(1)
paths, err := Storage.SearchGraphitePaths(qt, tr, query, maxPaths, deadline)
WG.Done()
return paths, err
}
// GetTSDBStatus returns TSDB status for given filters on the given date.
func GetTSDBStatus(qt *querytracer.Tracer, tfss []*storage.TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
WG.Add(1)
status, err := Storage.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline)
WG.Done()
return status, err
}
// GetSeriesCount returns the number of time series in the storage.
func GetSeriesCount(deadline uint64) (uint64, error) {
WG.Add(1)
n, err := Storage.GetSeriesCount(deadline)
WG.Done()
return n, err
}
// TODO(@rtm0): Remove this dependency from vmalert-tool unit tests.
DebugFlush func()
)
// Stop stops the vmstorage
func Stop() {
@@ -333,17 +202,22 @@ func Stop() {
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime := time.Now()
WG.WaitAndBlock()
stopStaleSnapshotsRemover()
Storage.MustClose()
vmStorage.Stop()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
fs.MustStopDirRemover()
logger.Infof("the storage has been stopped")
logger.Infof("the vmstorage has been stopped")
}
// RequestHandler is a storage request handler.
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
func (vmssn *VMStorageSingleNode) requestHandler(w http.ResponseWriter, r *http.Request) bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.requestHandler(w, r)
}
// requestHandler is a storage request handler.
// TODO(@rtm0): Move to a separate file, request_handler.go
func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if path == "/internal/force_merge" {
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
@@ -356,7 +230,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
defer activeForceMerges.Dec()
logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix)
startTime := time.Now()
if err := Storage.ForceMergePartitions(partitionNamePrefix); err != nil {
if err := vms.s.ForceMergePartitions(partitionNamePrefix); err != nil {
logger.Errorf("error in forced merge for partition_prefix=%q: %s", partitionNamePrefix, err)
return
}
@@ -369,7 +243,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
logger.Infof("flushing storage to make pending data available for reading")
Storage.DebugFlush()
vms.s.DebugFlush()
return true
}
@@ -389,7 +263,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
logger.Infof("enabling logging of new series for the next %s. This may increase resource usage during this period.", time.Duration(dealine)*time.Second)
endTime := fasttime.UnixTimestamp() + uint64(dealine)
Storage.SetLogNewSeriesUntil(endTime)
vms.s.SetLogNewSeriesUntil(endTime)
fmt.Fprintf(w, `{"status":"success","data":{"logEndTime":%q}}`, time.Unix(int64(endTime), 0))
return true
}
@@ -411,13 +285,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case "/create":
snapshotsCreateTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := Storage.MustCreateSnapshot()
snapshotName := vms.s.MustCreateSnapshot()
// Verify whether the client already closed the connection.
// In this case it is better to drop the created snapshot, since the client isn't interested in it.
if err := r.Context().Err(); err != nil {
logger.Infof("deleting already created snapshot at %s because the client canceled the request", snapshotName)
if err := deleteSnapshot(snapshotName); err != nil {
if err := vms.deleteSnapshot(snapshotName); err != nil {
logger.Infof("cannot delete just created snapshot: %s", err)
return true
}
@@ -433,7 +307,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case "/list":
snapshotsListTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshots := Storage.MustListSnapshots()
snapshots := vms.s.MustListSnapshots()
fmt.Fprintf(w, `{"status":"ok","snapshots":[`)
if len(snapshots) > 0 {
for _, snapshot := range snapshots[:len(snapshots)-1] {
@@ -447,7 +321,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
snapshotsDeleteTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := r.FormValue("snapshot")
if err := deleteSnapshot(snapshotName); err != nil {
if err := vms.deleteSnapshot(snapshotName); err != nil {
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
@@ -457,9 +331,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case "/delete_all":
snapshotsDeleteAllTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshots := Storage.MustListSnapshots()
snapshots := vms.s.MustListSnapshots()
for _, snapshotName := range snapshots {
if err := Storage.DeleteSnapshot(snapshotName); err != nil {
if err := vms.s.DeleteSnapshot(snapshotName); err != nil {
err = fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
jsonResponseError(w, err)
snapshotsDeleteAllErrorsTotal.Inc()
@@ -473,50 +347,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
}
func deleteSnapshot(snapshotName string) error {
snapshots := Storage.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := Storage.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}
func initStaleSnapshotsRemover(strg *storage.Storage) {
staleSnapshotsRemoverCh = make(chan struct{})
if snapshotsMaxAge.Duration() <= 0 {
return
}
snapshotsMaxAgeDur := snapshotsMaxAge.Duration()
staleSnapshotsRemoverWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second * 11)
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-staleSnapshotsRemoverCh:
return
case <-t.C:
}
strg.MustDeleteStaleSnapshots(snapshotsMaxAgeDur)
}
})
}
func stopStaleSnapshotsRemover() {
close(staleSnapshotsRemoverCh)
staleSnapshotsRemoverWG.Wait()
}
var (
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
)
var (
activeForceMerges = metrics.NewCounter("vm_active_force_merges")
@@ -531,7 +361,16 @@ var (
snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`)
)
func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
// TODO(@rtm0): Move to metrics.go.
func (vmssn *VMStorageSingleNode) writeStorageMetrics(w io.Writer) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
vmssn.vms.writeStorageMetrics(w)
}
// TODO(@rtm0): Move to metrics.go.
func (vms *VMStorage) writeStorageMetrics(w io.Writer) {
strg := vms.s
var m storage.Metrics
strg.UpdateMetrics(&m)
tm := &m.TableMetrics
@@ -755,6 +594,8 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize)
metrics.WriteGaugeUint64(w, `vm_search_max_unique_timeseries`, uint64(vms.maxUniqueTimeSeriesCalculated))
metrics.WriteGaugeUint64(w, `vm_metrics_metadata_storage_items`, m.MetadataStorageItemsCurrent)
metrics.WriteCounterUint64(w, `vm_metrics_metadata_storage_size_bytes`, m.MetadataStorageCurrentSizeBytes)
metrics.WriteCounterUint64(w, `vm_metrics_metadata_storage_max_size_bytes`, m.MetadataStorageMaxSizeBytes)

391
app/vmstorage/vmstorage.go Normal file
View File

@@ -0,0 +1,391 @@
package vmstorage
import (
"flag"
"fmt"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression "+
"at the cost of precision loss")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect")
maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
snapshotsMaxAge = flagutil.NewRetentionDuration("snapshotsMaxAge", "3d", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
)
// newVMStorage creates a new instance of of VMStorage.
//
// The created VMStorage instance takes ownership of s.
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStorage {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid -precisionBits=%d: %s", *precisionBits, err)
}
maxUniqueTimeseriesCalculated := *maxUniqueTimeseries
if maxUniqueTimeseriesCalculated <= 0 {
maxUniqueTimeseriesCalculated = calculateMaxUniqueTimeseries(vmselectMaxConcurrentRequests, memory.Remaining())
}
vms := &VMStorage{
s: s,
maxUniqueTimeseries: *maxUniqueTimeseries,
maxUniqueTimeSeriesCalculated: maxUniqueTimeseriesCalculated,
staleSnapshotsRemoverCh: make(chan struct{}),
}
vms.initStaleSnapshotsRemover()
return vms
}
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries based on the
// available system resources.
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}
// VMStorage impelements vmselectapi.API and vminsertapi.API.
type VMStorage struct {
s *storage.Storage
maxUniqueTimeseries int
maxUniqueTimeSeriesCalculated int
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
}
func (vms *VMStorage) initStaleSnapshotsRemover() {
if snapshotsMaxAge.Duration() <= 0 {
return
}
snapshotsMaxAgeDuration := snapshotsMaxAge.Duration()
vms.staleSnapshotsRemoverWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second * 11)
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-vms.staleSnapshotsRemoverCh:
return
case <-t.C:
}
vms.s.MustDeleteStaleSnapshots(snapshotsMaxAgeDuration)
}
})
}
func (vms *VMStorage) Stop() {
close(vms.staleSnapshotsRemoverCh)
vms.staleSnapshotsRemoverWG.Wait()
vms.s.MustClose()
}
// WriteRows writes metric rows to the storage.
//
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
vms.s.AddRows(rows, uint8(*precisionBits))
return nil
}
// WriteMetadata writes metrics metadata to storage.
//
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vms *VMStorage) WriteMetadata(rows []metricsmetadata.Row) error {
vms.s.AddMetadataRows(rows)
return nil
}
// IsReadOnly returns true is the storage is in read-only mode.
func (vms *VMStorage) IsReadOnly() bool {
return vms.s.IsReadOnly()
}
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
tr := sq.GetTimeRange()
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
bi.MustClose()
return nil, err
}
return bi, nil
}
func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return vms.maxUniqueTimeSeriesCalculated
}
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if vms.maxUniqueTimeseries != 0 && searchQueryLimit > vms.maxUniqueTimeseries {
searchQueryLimit = vms.maxUniqueTimeseries
}
return searchQueryLimit
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
}
var blockIteratorsPool sync.Pool
func (bi *blockIterator) MustClose() {
bi.sr.MustClose()
bi.mb.MetricName = nil
bi.mb.Block.Reset()
blockIteratorsPool.Put(bi)
}
func getBlockIterator() *blockIterator {
v := blockIteratorsPool.Get()
if v == nil {
v = &blockIterator{}
}
return v.(*blockIterator)
}
func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
if !bi.sr.NextMetricBlock() {
return dst, false
}
mb := bi.mb
mb.MetricName = bi.sr.MetricBlockRef.MetricName
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
dst = mb.Marshal(dst[:0])
return dst, true
}
func (bi *blockIterator) Error() error {
return bi.sr.Error()
}
// SearchMetricNames returns metric names for the given tfss on the given tr.
func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = vms.maxUniqueTimeSeriesCalculated
}
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
return vms.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
// SearchLabelValues searches for label values for the given labelName, tfss and
// tr.
func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
maxLabelValues = *maxTagValues
}
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = vms.maxUniqueTimeSeriesCalculated
}
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return vms.s.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
// TagValueSuffixes returns all the tag value suffixes for the given tagKey and
// tagValuePrefix on the given tr.
//
// This allows implementing
// https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or
// similar APIs.
func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
maxSuffixes = *maxTagValueSuffixesPerSearch
}
suffixes, err := vms.s.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
}
return suffixes, nil
}
// SearchLabelNames searches for tag keys matching the given tfss on tr.
func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
maxLabelNames = *maxTagKeys
}
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = vms.maxUniqueTimeSeriesCalculated
}
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return vms.s.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
func (vms *VMStorage) SeriesCount(_ *querytracer.Tracer, _, _ uint32, deadline uint64) (uint64, error) {
return vms.s.GetSeriesCount(deadline)
}
func (vms *VMStorage) Tenants(_ *querytracer.Tracer, _ storage.TimeRange, _ uint64) ([]string, error) {
return nil, nil
}
// GetTSDBStatus returns TSDB status for given filters on the given date.
func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = vms.maxUniqueTimeSeriesCalculated
}
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
return vms.s.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
// DeleteSeries deletes series matching tfss.
//
// Returns the number of deleted series.
func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = vms.maxUniqueTimeSeriesCalculated
}
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return 0, err
}
if len(tfss) == 0 {
return 0, fmt.Errorf("missing tag filters")
}
return vms.s.DeleteSeries(qt, tfss, maxMetrics)
}
func (vms *VMStorage) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
vms.s.RegisterMetricNames(qt, mrs)
return nil
}
// GetMetricNamesUsageStats returns metric name usage stats.
func (vms *VMStorage) GetMetricNamesUsageStats(qt *querytracer.Tracer, _ *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
return vms.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func (vms *VMStorage) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
vms.s.ResetMetricNamesStats(qt)
return nil
}
func (vms *VMStorage) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters()
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := vms.s.SearchGraphitePaths(qtChild, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}
func (vms *VMStorage) GetMetadataRecords(qt *querytracer.Tracer, _ *storage.TenantToken, limit int, metricName string, _ uint64) ([]*metricsmetadata.Row, error) {
return vms.s.GetMetadataRows(qt, limit, metricName), nil
}
// deleteSnapshot deletes a snapshot by its name.
//
// Callers must wrap the call with wg.Add(1)...wg.Done().
func (vms *VMStorage) deleteSnapshot(snapshotName string) error {
snapshots := vms.s.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := vms.s.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}

View File

@@ -0,0 +1,213 @@
package vmstorage
import (
"errors"
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
// newVMStorageSingleNode creates a new instance of of VMStorage for vmsingle.
func newVMStorageSingleNode(s *storage.Storage, maxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorageSingleNode {
vms := newVMStorage(s, maxConcurrentRequests)
return &VMStorageSingleNode{
vms: vms,
wg: syncwg.WaitGroup{},
resetCacheIfNeeded: resetCacheIfNeeded,
}
}
type VMStorageSingleNode struct {
vms *VMStorage
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
// for proper graceful shutdown when Stop is called.
//
// Use syncwg instead of sync, since Add is called from concurrent
// goroutines.
wg syncwg.WaitGroup
// resetCacheIfNeeded is a callback for automatic resetting of response
// cache if needed.
resetCacheIfNeeded func(mrs []storage.MetricRow)
}
func (vmssn *VMStorageSingleNode) Stop() {
vmssn.wg.WaitAndBlock()
vmssn.vms.Stop()
}
// WriteRows writes metric rows to the storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteRows(rows []storage.MetricRow) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
vmssn.resetCacheIfNeeded(rows)
return vmssn.vms.WriteRows(rows)
}
// WriteMetadata writes metrics metadata to storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteMetadata(rows []metricsmetadata.Row) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
return vmssn.vms.WriteMetadata(rows)
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
func (vmssn *VMStorageSingleNode) IsReadOnly() bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.IsReadOnly()
}
func (vmssn *VMStorageSingleNode) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
return nil, fmt.Errorf("not implemented in vmsingle")
}
// GetSearch sets up an instance of storage search and returns it to the caller
// along with the max series count that the search can return.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// Callers of this method must call PutSearch() once the search instance is not
// needed anymore.
func (vmssn *VMStorageSingleNode) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
vmssn.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vmssn.vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vmssn.vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vmssn.wg.Done()
return nil, 0, err
}
sr := getSearch()
maxSeriesCount := sr.Init(qt, vmssn.vms.s, tfss, tr, sq.MaxMetrics, deadline)
return sr, maxSeriesCount, nil
}
// PutSearch resets the search once it is not needed anymore and puts it aside
// for future reuse.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// The method must only be used on search instances that have been created with
// GetSearch().
func (vmssn *VMStorageSingleNode) PutSearch(sr *storage.Search) {
putSearch(sr)
vmssn.wg.Done()
}
func getSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
func (vmssn *VMStorageSingleNode) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SearchMetricNames(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
}
func (vmssn *VMStorageSingleNode) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
}
func (vmssn *VMStorageSingleNode) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelNames(qt, sq, maxLabelNames, deadline)
}
func (vmssn *VMStorageSingleNode) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SeriesCount(qt, accountID, projectID, deadline)
}
func (vmssn *VMStorageSingleNode) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.Tenants(qt, tr, deadline)
}
func (vmssn *VMStorageSingleNode) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
}
func (vmssn *VMStorageSingleNode) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.DeleteSeries(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.RegisterMetricNames(qt, mrs, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
}
func (vmssn *VMStorageSingleNode) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.ResetMetricNamesUsageStats(qt, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
}

View File

@@ -0,0 +1,62 @@
package vmstorage
import (
"math"
"strconv"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeseries(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
}
// 64-bit architectures support memory sizes > 4GB.
if strconv.IntSize == 64 {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}
func TestGetMaxMetrics(t *testing.T) {
originalMaxUniqueTimeSeries := *maxUniqueTimeseries
defer func() {
*maxUniqueTimeseries = originalMaxUniqueTimeSeries
fs.MustRemoveDir(t.Name())
}()
maxConcurrentRequests := 2 * cgroup.AvailableCPUs()
f := func(searchQueryLimit, storageMaxUniqueTimeseries, expect int) {
t.Helper()
*maxUniqueTimeseries = storageMaxUniqueTimeseries
s := storage.MustOpenStorage(t.Name(), storage.OpenOptions{})
vms := newVMStorage(s, maxConcurrentRequests)
defer vms.Stop()
maxMetrics := vms.getMaxMetrics(searchQueryLimit)
if maxMetrics != expect {
t.Fatalf("unexpected max metrics: got %d, want %d", maxMetrics, expect)
}
}
f(0, 1e6, 1e6)
f(2e6, 0, 2e6)
f(2e6, 1e6, 1e6)
}

View File

@@ -1,4 +1,4 @@
FROM golang:1.26.3 AS build-web-stage
FROM golang:1.26.4 AS build-web-stage
COPY build /build
WORKDIR /build

View File

@@ -7,7 +7,7 @@ ROOT_IMAGE ?= alpine:3.23.4
ROOT_IMAGE_SCRATCH ?= scratch
CERTS_IMAGE := alpine:3.23.4
GO_BUILDER_IMAGE := golang:1.26.3
GO_BUILDER_IMAGE := golang:1.26.4
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1
BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __)

View File

@@ -26,8 +26,12 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* SECURITY: upgrade Go builder from Go1.26.3 to Go1.26.4. See [the list of issues addressed in Go1.26.4](https://github.com/golang/go/issues?q=milestone%3AGo1.26.4%20label%3ACherryPickApproved).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): support `match[]=<label_selector>` query parameters in `/api/v1/rules` and `/api/v1/alerts` APIs to return only the rules that have configured labels satisfying the provided label selectors. See [11020](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11020).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) : introduce `vmagent_remotewrite_kafka_outbuf_latency_seconds` and `vmagent_remotewrite_kafka_rtt_seconds` metrics for [kafka integration](https://docs.victoriametrics.com/victoriametrics/integrations/kafka/). The metrics could help identify throughput bottlenecks. See [#10730](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly log user information when a missing route error occurs. See [#11052](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11052).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from Mimir object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix the `Notifiers` page in web UI appearing blank despite the API returning notifier data correctly. See [#11035](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11035).

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/VictoriaMetrics/VictoriaMetrics
go 1.26.3
go 1.26.4
require (
cloud.google.com/go/storage v1.62.1

View File

@@ -90,6 +90,77 @@ type MetricBlockRef struct {
BlockRef *BlockRef
}
// MetricBlock is a time series block for a single metric.
type MetricBlock struct {
// MetricName is metric name for the given Block.
MetricName []byte
// Block is a block for the given MetricName
Block Block
}
// Marshal marshals MetricBlock to dst
func (mb *MetricBlock) Marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mb.MetricName)
return MarshalBlock(dst, &mb.Block)
}
// CopyFrom copies src to mb.
func (mb *MetricBlock) CopyFrom(src *MetricBlock) {
mb.MetricName = append(mb.MetricName[:0], src.MetricName...)
mb.Block.CopyFrom(&src.Block)
}
// MarshalBlock marshals b to dst.
//
// b.MarshalData must be called on b before calling MarshalBlock.
func MarshalBlock(dst []byte, b *Block) []byte {
dst = b.bh.Marshal(dst)
dst = encoding.MarshalBytes(dst, b.timestampsData)
dst = encoding.MarshalBytes(dst, b.valuesData)
return dst
}
// Unmarshal unmarshals MetricBlock from src
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
mb.Block.Reset()
mn, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MetricName")
}
src = src[nSize:]
mb.MetricName = append(mb.MetricName[:0], mn...)
return UnmarshalBlock(&mb.Block, src)
}
// UnmarshalBlock unmarshal Block from src to dst.
//
// dst.UnmarshalData isn't called on the block.
func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
tail, err := dst.bh.Unmarshal(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal blockHeader: %w", err)
}
src = tail
tds, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return tail, fmt.Errorf("cannot unmarshal timestampsData")
}
src = src[nSize:]
dst.timestampsData = append(dst.timestampsData[:0], tds...)
vd, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return tail, fmt.Errorf("cannot unmarshal valuesData")
}
src = src[nSize:]
dst.valuesData = append(dst.valuesData[:0], vd...)
return src, nil
}
// Search is a search for time series.
type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call.
@@ -290,6 +361,24 @@ func NewSearchQuery(start, end int64, tagFilterss [][]TagFilter, maxMetrics int)
}
}
// TenantToken represents a tenant (accountID, projectID) pair.
type TenantToken struct {
AccountID uint32
ProjectID uint32
}
// String returns string representation of t.
func (t *TenantToken) String() string {
return fmt.Sprintf("{accountID=%d, projectID=%d}", t.AccountID, t.ProjectID)
}
// Marshal appends marshaled t to dst and returns the result.
func (t *TenantToken) Marshal(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, t.AccountID)
dst = encoding.MarshalUint32(dst, t.ProjectID)
return dst
}
// TagFilter represents a single tag filter from SearchQuery.
type TagFilter struct {
Key []byte

30
lib/vminsertapi/api.go Normal file
View File

@@ -0,0 +1,30 @@
package vminsertapi
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
)
// RPCCall defines rpc call from vminsert to vmstorage
type RPCCall struct {
Name string
VersionedName string
}
var (
MetricRowsRpcCall = RPCCall{
Name: "metric_rows",
VersionedName: "writeRows_v1",
}
MetricMetadataRpcCall = RPCCall{
Name: "metricmetadata_rows",
VersionedName: "writeMetadata_v1",
}
)
// API must implement vminsert API.
type API interface {
WriteRows(rows []storage.MetricRow) error
WriteMetadata(mrs []metricsmetadata.Row) error
IsReadOnly() bool
}

68
lib/vmselectapi/api.go Normal file
View File

@@ -0,0 +1,68 @@
package vmselectapi
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
)
// API must implement vmselect API.
type API interface {
// InitSearch initialize series search for the given sq.
//
// The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed.
InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error)
// SearchMetricNames returns metric names matching the given sq.
SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error)
// LabelValues returns values for labelName label acorss series matching the given sq.
LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error)
// TagValueSuffixes returns tag value suffixes for the given args.
TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error)
// LabelNames returns lable names for series matching the given sq.
LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error)
// SeriesCount returns the number of series for the given (accountID, projectID).
SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error)
// TSDBStatus returns tsdb status for the given sq.
TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error)
// DeleteSeries deletes series matching the given sq.
DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error)
// RegisterMetricNames registers the given mrs in the storage.
RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error
// Tenants returns list of tenants in the storage on the given tr.
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
// GetMetricNamesUsageStats returns statistics for metric names
GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error)
// ResetMetricNamesUsageStats resets internal state of metric names tracker
ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error
// GetMetadataRecords returns metrics metadata.
GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error)
}
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.
//
// MustClose must be called in order to free up allocated resources when BlockIterator is no longer needed.
type BlockIterator interface {
// NextBlock marshals next storage.MetricBlock into dst.
//
// It returns true on success, false on error or if no blocks to read.
NextBlock(dst []byte) ([]byte, bool)
// MustClose frees up resources allocated by BlockIterator.
MustClose()
// Error returns the last error occurred in NextBlock(), which returns false.
Error() error
}