Compare commits

...

6 Commits

Author SHA1 Message Date
Artem Fetishev
bd4e63c834 sync changes with vmsingle
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-01 20:11:48 +02:00
Artem Fetishev
68021999e2 sync changes with vmsingle
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-01 19:23:29 +02:00
Artem Fetishev
c6a1f55372 minor fixes
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-01 18:52:50 +02:00
Artem Fetishev
74006ef552 move servers to main.go
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-01 13:01:53 +02:00
Artem Fetishev
2df0147d14 move limits to vmstorage
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-01 13:01:53 +02:00
Artem Fetishev
2dd42f5fba vmstorage: move servers to main package 2026-06-01 13:01:49 +02:00
6 changed files with 369 additions and 385 deletions

View File

@@ -36,9 +36,6 @@ var (
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
api := &vmstorageAPI{}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
MaxConcurrentRequests: *maxConcurrentRequests,
MaxConcurrentRequestsFlagName: "clusternative.maxConcurrentRequests",
MaxQueueDuration: *maxQueueDuration,
@@ -69,6 +66,9 @@ func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.S
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
maxLabelValues = *maxTagValues
}
dl := searchutil.DeadlineFromTimestamp(deadline)
labelValues, _, err := netstorage.LabelValues(qt, true, labelName, sq, maxLabelValues, dl)
return labelValues, wrapClusterNativeError(err)
@@ -76,12 +76,18 @@ func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQ
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
maxSuffixes = *maxTagValueSuffixesPerSearch
}
dl := searchutil.DeadlineFromTimestamp(deadline)
suffixes, _, err := netstorage.TagValueSuffixes(qt, accountID, projectID, true, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, dl)
return suffixes, wrapClusterNativeError(err)
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
maxLabelNames = *maxTagKeys
}
dl := searchutil.DeadlineFromTimestamp(deadline)
labelNames, _, err := netstorage.LabelNames(qt, true, sq, maxLabelNames, dl)
return labelNames, wrapClusterNativeError(err)

View File

@@ -9,12 +9,10 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
@@ -29,7 +27,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
@@ -42,12 +41,23 @@ var (
useProxyProtocol = flagutil.NewArrayBool("httpListenAddr.useProxyProtocol", "Whether to use proxy protocol for connections accepted at the given -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+
"may require high amounts of memory. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
"when -search.maxConcurrentRequests limit is reached")
disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
vminsertConnsShutdownDuration = flag.Duration("storage.vminsertConnsShutdownDuration", 10*time.Second, "The time needed for gradual closing of vminsert connections during "+
"graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. "+
"Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. "+
"Configured value must always be lower than the graceful shutdown period configured by the orchestration platform (terminationGracePeriodSeconds for Kubernetes). "+
"See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#improving-re-routing-performance-during-restart")
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
snapshotsMaxAge = flagutil.NewRetentionDuration("snapshotsMaxAge", "3d", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
_ = flag.Duration("snapshotCreateTimeout", 0, "Deprecated: this flag does nothing")
_ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing")
@@ -119,7 +129,7 @@ var (
)
func main() {
// vmstoage is optimized for reduced memory allocations,
// vmstorage is optimized for reduced memory allocations,
// so it can run with the reduced GOGC in order to reduce the used memory,
// while keeping CPU usage spent in GC at low levels.
//
@@ -178,7 +188,7 @@ func main() {
LogNewSeries: *logNewSeries,
}
strg := storage.MustOpenStorage(*storageDataPath, opts)
initStaleSnapshotsRemover(strg)
vmStorage := newVMStorage(strg, *maxConcurrentRequests)
var m storage.Metrics
strg.UpdateMetrics(&m)
@@ -193,18 +203,24 @@ func main() {
// register storage metrics
storageMetrics := metrics.NewSet()
storageMetrics.RegisterMetricsWriter(func(w io.Writer) {
writeStorageMetrics(w, strg)
vmStorage.writeStorageMetrics(w)
})
metrics.RegisterSet(storageMetrics)
protoparserutil.StartUnmarshalWorkers()
servers.GetMaxUniqueTimeSeries() // for init and logging only.
vminsertSrv, err := servers.NewVMInsertServer(*vminsertAddr, strg)
vminsertSrv, err := vminsertapi.NewVMInsertServer(*vminsertAddr, *vminsertConnsShutdownDuration, "vminsert", vmStorage, nil)
if err != nil {
logger.Fatalf("cannot create a server with -vminsertAddr=%s: %s", *vminsertAddr, err)
}
vmselectSrv, err := servers.NewVMSelectServer(*vmselectAddr, strg)
limits := vmselectapi.Limits{
MaxConcurrentRequests: *maxConcurrentRequests,
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
MaxQueueDuration: *maxQueueDuration,
MaxQueueDurationFlagName: "search.maxQueueDuration",
}
vmselectSrv, err := vmselectapi.NewServer(*vmselectAddr, vmStorage, limits, *disableRPCCompression)
if err != nil {
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
}
@@ -213,8 +229,7 @@ func main() {
if len(listenAddrs) == 0 {
listenAddrs = []string{":8482"}
}
requestHandler := newRequestHandler(strg)
go httpserver.Serve(listenAddrs, requestHandler, httpserver.ServeOptions{UseProxyProtocol: useProxyProtocol})
go httpserver.Serve(listenAddrs, vmStorage.requestHandler, httpserver.ServeOptions{UseProxyProtocol: useProxyProtocol})
pushmetrics.Init()
sig := procutil.WaitForSigterm()
@@ -234,7 +249,6 @@ func main() {
metrics.UnregisterSet(storageMetrics, true)
storageMetrics = nil
stopStaleSnapshotsRemover()
vmselectSrv.MustStop()
vminsertSrv.MustStop()
protoparserutil.StopUnmarshalWorkers()
@@ -242,31 +256,29 @@ func main() {
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime = time.Now()
strg.MustClose()
vmStorage.Stop()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
fs.MustStopDirRemover()
logger.Infof("the vmstorage has been stopped")
}
func newRequestHandler(strg *storage.Storage) httpserver.RequestHandler {
return func(w http.ResponseWriter, r *http.Request) bool {
if r.URL.Path == "/" {
if r.Method != http.MethodGet {
return false
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, `vmstorage - a component of VictoriaMetrics cluster<br/>
// requestHandler is a storage request handler.
// TODO(@rtm0): Move to a separate file, request_handler.go
func (api *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if path == "/" {
if r.Method != http.MethodGet {
return false
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, `vmstorage - a component of VictoriaMetrics cluster<br/>
<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/">docs</a><br>
`)
return true
}
return requestHandler(w, r, strg)
return true
}
}
func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storage) bool {
path := r.URL.Path
if path == "/internal/force_merge" {
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
return true
@@ -278,7 +290,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
defer activeForceMerges.Dec()
logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix)
startTime := time.Now()
if err := strg.ForceMergePartitions(partitionNamePrefix); err != nil {
if err := api.s.ForceMergePartitions(partitionNamePrefix); err != nil {
logger.Errorf("error in forced merge for partition_prefix=%q: %s", partitionNamePrefix, err)
return
}
@@ -291,7 +303,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
return true
}
logger.Infof("flushing storage to make pending data available for reading")
strg.DebugFlush()
api.s.DebugFlush()
return true
}
@@ -311,7 +323,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
}
logger.Infof("enabling logging of new series for the next %s. This may increase resource usage during this period.", time.Duration(dealine)*time.Second)
endTime := fasttime.UnixTimestamp() + uint64(dealine)
strg.SetLogNewSeriesUntil(endTime)
api.s.SetLogNewSeriesUntil(endTime)
fmt.Fprintf(w, `{"status":"success","data":{"logEndTime":%q}}`, time.Unix(int64(endTime), 0))
return true
}
@@ -327,13 +339,13 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
case "/create":
snapshotsCreateTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := strg.MustCreateSnapshot()
snapshotName := api.s.MustCreateSnapshot()
// Verify whether the client already closed the connection.
// In this case it is better to drop the created snapshot, since the client isn't interested in it.
if err := r.Context().Err(); err != nil {
logger.Infof("deleting already created snapshot at %s because the client canceled the request", snapshotName)
if err := deleteSnapshot(strg, snapshotName); err != nil {
if err := api.deleteSnapshot(snapshotName); err != nil {
logger.Infof("cannot delete just created snapshot: %s", err)
return true
}
@@ -345,7 +357,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
case "/list":
snapshotsListTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshots := strg.MustListSnapshots()
snapshots := api.s.MustListSnapshots()
fmt.Fprintf(w, `{"status":"ok","snapshots":[`)
if len(snapshots) > 0 {
for _, snapshot := range snapshots[:len(snapshots)-1] {
@@ -359,7 +371,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
snapshotsDeleteTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := r.FormValue("snapshot")
if err := deleteSnapshot(strg, snapshotName); err != nil {
if err := api.deleteSnapshot(snapshotName); err != nil {
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
@@ -369,9 +381,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
case "/delete_all":
snapshotsDeleteAllTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshots := strg.MustListSnapshots()
snapshots := api.s.MustListSnapshots()
for _, snapshotName := range snapshots {
if err := strg.DeleteSnapshot(snapshotName); err != nil {
// TODO(@rtm0): Use VMStorage.deleteSnapshot()?
if err := api.s.DeleteSnapshot(snapshotName); err != nil {
err = fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
jsonResponseError(w, err)
snapshotsDeleteAllErrorsTotal.Inc()
@@ -385,50 +398,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
}
}
func deleteSnapshot(strg *storage.Storage, snapshotName string) error {
snapshots := strg.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := strg.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}
func initStaleSnapshotsRemover(strg *storage.Storage) {
staleSnapshotsRemoverCh = make(chan struct{})
if snapshotsMaxAge.Duration() <= 0 {
return
}
snapshotsMaxAgeDur := snapshotsMaxAge.Duration()
staleSnapshotsRemoverWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second * 11)
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-staleSnapshotsRemoverCh:
return
case <-t.C:
}
strg.MustDeleteStaleSnapshots(snapshotsMaxAgeDur)
}
})
}
func stopStaleSnapshotsRemover() {
close(staleSnapshotsRemoverCh)
staleSnapshotsRemoverWG.Wait()
}
var (
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
)
var (
activeForceMerges = metrics.NewCounter("vm_active_force_merges")
@@ -443,7 +412,9 @@ var (
snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`)
)
func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
// TODO(@rtm0): Move to metrics.go.
func (api *VMStorage) writeStorageMetrics(w io.Writer) {
strg := api.s
var m storage.Metrics
strg.UpdateMetrics(&m)
tm := &m.TableMetrics
@@ -667,7 +638,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize)
metrics.WriteGaugeUint64(w, `vm_search_max_unique_timeseries`, uint64(servers.GetMaxUniqueTimeSeries()))
metrics.WriteGaugeUint64(w, `vm_search_max_unique_timeseries`, uint64(api.maxUniqueTimeSeriesCalculated))
metrics.WriteGaugeUint64(w, `vm_metrics_metadata_storage_items`, m.MetadataStorageItemsCurrent)
metrics.WriteCounterUint64(w, `vm_metrics_metadata_storage_size_bytes`, m.MetadataStorageCurrentSizeBytes)

View File

@@ -1,55 +0,0 @@
package servers
import (
"flag"
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
)
var (
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression "+
"at the cost of precision loss")
vminsertConnsShutdownDuration = flag.Duration("storage.vminsertConnsShutdownDuration", 10*time.Second, "The time needed for gradual closing of vminsert connections during "+
"graceful shutdown. Bigger duration reduces spikes in CPU, RAM and disk IO load on the remaining vmstorage nodes during rolling restart. "+
"Smaller duration reduces the time needed to close all the vminsert connections, thus reducing the time for graceful shutdown. "+
"Configured value must always be lower than the graceful shutdown period configured by the orchestration platform (terminationGracePeriodSeconds for Kubernetes). "+
"See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#improving-re-routing-performance-during-restart")
)
// NewVMInsertServer starts vminsertapi.VMInsertServer at the given addr serving the given storage.
func NewVMInsertServer(addr string, storage *storage.Storage) (*vminsertapi.VMInsertServer, error) {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
return nil, fmt.Errorf("invalid -precisionBits: %w", err)
}
api := &vminsertAPI{
storage: storage,
}
return vminsertapi.NewVMInsertServer(addr, *vminsertConnsShutdownDuration, "vminsert", api, nil)
}
type vminsertAPI struct {
storage *storage.Storage
}
// WriteRows implements lib/vminsertapi.API interface
func (v *vminsertAPI) WriteRows(rows []storage.MetricRow) error {
v.storage.AddRows(rows, uint8(*precisionBits))
return nil
}
// WriteMetadata implements lib/vminsertapi.API interface
func (v *vminsertAPI) WriteMetadata(rows []metricsmetadata.Row) error {
v.storage.AddMetadataRows(rows)
return nil
}
// IsReadOnly implements lib/vminsertapi.API interface
func (v *vminsertAPI) IsReadOnly() bool {
return v.storage.IsReadOnly()
}

View File

@@ -1,4 +1,4 @@
package servers
package main
import (
"flag"
@@ -6,17 +6,21 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression "+
"at the cost of precision loss")
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect")
maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
@@ -24,46 +28,110 @@ var (
maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+
"may require high amounts of memory. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
"when -search.maxConcurrentRequests limit is reached")
disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
snapshotsMaxAge = flagutil.NewRetentionDuration("snapshotsMaxAge", "3d", "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
)
var (
maxUniqueTimeseriesValue int
maxUniqueTimeseriesValueOnce sync.Once
)
// newVMStorage creates a new instance of of VMStorage.
//
// The created VMStorage instance takes ownership of s.
func newVMStorage(s *storage.Storage, maxConcurrentRequests int) *VMStorage {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid -precisionBits: %d", err)
}
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) {
api := &vmstorageAPI{
s: s,
maxUniqueTimeseriesCalculated := *maxUniqueTimeseries
if maxUniqueTimeseriesCalculated <= 0 {
maxUniqueTimeseriesCalculated = calculateMaxUniqueTimeseries(maxConcurrentRequests, memory.Remaining())
}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
MaxConcurrentRequests: *maxConcurrentRequests,
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
MaxQueueDuration: *maxQueueDuration,
MaxQueueDurationFlagName: "search.maxQueueDuration",
vms := &VMStorage{
s: s,
maxUniqueTimeseries: *maxUniqueTimeseries,
maxUniqueTimeSeriesCalculated: maxUniqueTimeseriesCalculated,
staleSnapshotsRemoverCh: make(chan struct{}),
}
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
vms.initStaleSnapshotsRemover()
return vms
}
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct {
s *storage.Storage
// calculateMaxUniqueTimeseries calculates the maxUniqueTimeseries based on the
// available system resources.
func calculateMaxUniqueTimeseries(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
// VMStorage impelements vmselectapi.API and vminsertapi.API.
type VMStorage struct {
s *storage.Storage
maxUniqueTimeseries int
maxUniqueTimeSeriesCalculated int
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
}
func (api *VMStorage) initStaleSnapshotsRemover() {
if snapshotsMaxAge.Duration() <= 0 {
return
}
snapshotsMaxAgeDuration := snapshotsMaxAge.Duration()
api.staleSnapshotsRemoverWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second * 11)
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-api.staleSnapshotsRemoverCh:
return
case <-t.C:
}
api.s.MustDeleteStaleSnapshots(snapshotsMaxAgeDuration)
}
})
}
func (api *VMStorage) Stop() {
close(api.staleSnapshotsRemoverCh)
api.staleSnapshotsRemoverWG.Wait()
api.s.MustClose()
}
// WriteRows writes metric rows to the storage.
//
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (api *VMStorage) WriteRows(rows []storage.MetricRow) error {
api.s.AddRows(rows, uint8(*precisionBits))
return nil
}
// WriteMetadata writes metrics metadata to storage.
//
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (api *VMStorage) WriteMetadata(rows []metricsmetadata.Row) error {
api.s.AddMetadataRows(rows)
return nil
}
// IsReadOnly returns true is the storage is in read-only mode.
func (api *VMStorage) IsReadOnly() bool {
return api.s.IsReadOnly()
}
func (api *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq.MaxMetrics)
maxMetrics := api.getMaxMetrics(sq.MaxMetrics)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
@@ -80,158 +148,15 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQu
return bi, nil
}
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
func (api *VMStorage) getMaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return api.maxUniqueTimeSeriesCalculated
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if api.maxUniqueTimeseries != 0 && searchQueryLimit > api.maxUniqueTimeseries {
searchQueryLimit = api.maxUniqueTimeseries
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelValues(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
suffixes, err := api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
}
return suffixes, nil
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelNames(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
return api.s.GetSeriesCount(accountID, projectID, deadline)
}
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
return api.s.SearchTenants(qt, tr, deadline)
}
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return 0, err
}
if len(tfss) == 0 {
return 0, fmt.Errorf("missing tag filters")
}
return api.s.DeleteSeries(qt, tfss, maxMetrics)
}
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
api.s.RegisterMetricNames(qt, mrs)
return nil
}
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
return api.s.GetMetricNamesStats(qt, tt, limit, le, matchPattern), nil
}
func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
api.s.ResetMetricNamesStats(qt)
return nil
}
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
accountID := sq.AccountID
projectID := sq.ProjectID
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters(accountID, projectID)
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := api.s.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}
func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
return api.s.GetMetadataRows(qt, tt, limit, metricName, deadline)
return searchQueryLimit
}
// blockIterator implements vmselectapi.BlockIterator
@@ -272,41 +197,197 @@ func (bi *blockIterator) Error() error {
return bi.sr.Error()
}
func getMaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return GetMaxUniqueTimeSeries()
// SearchMetricNames returns metric names for the given tfss on the given tr.
func (api *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = api.maxUniqueTimeSeriesCalculated
}
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
searchQueryLimit = *maxUniqueTimeseries
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return searchQueryLimit
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
// GetMaxUniqueTimeSeries returns `-search.maxUniqueTimeseries` or the auto-calculated value based on available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func GetMaxUniqueTimeSeries() int {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(*maxConcurrentRequests, memory.Remaining())
// SearchLabelValues searches for label values for the given labelName, tfss and
// tr.
func (api *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
maxLabelValues = *maxTagValues
}
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = api.maxUniqueTimeSeriesCalculated
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelValues(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
// TagValueSuffixes returns all the tag value suffixes for the given tagKey and
// tagValuePrefix on the given tr.
//
// This allows implementing
// https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or
// similar APIs.
func (api *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
maxSuffixes = *maxTagValueSuffixesPerSearch
}
suffixes, err := api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
}
return suffixes, nil
}
// SearchLabelNames searches for tag keys matching the given tfss on tr.
func (api *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
maxLabelNames = *maxTagKeys
}
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = api.maxUniqueTimeSeriesCalculated
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelNames(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
func (api *VMStorage) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
return api.s.GetSeriesCount(accountID, projectID, deadline)
}
func (api *VMStorage) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
return api.s.SearchTenants(qt, tr, deadline)
}
// GetTSDBStatus returns TSDB status for given filters on the given date.
func (api *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = api.maxUniqueTimeSeriesCalculated
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
// DeleteSeries deletes series matching tfss.
//
// Returns the number of deleted series.
func (api *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = api.maxUniqueTimeSeriesCalculated
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return 0, err
}
if len(tfss) == 0 {
return 0, fmt.Errorf("missing tag filters")
}
return api.s.DeleteSeries(qt, tfss, maxMetrics)
}
func (api *VMStorage) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
api.s.RegisterMetricNames(qt, mrs)
return nil
}
// GetMetricNamesUsageStats returns metric name usage stats.
func (api *VMStorage) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
return api.s.GetMetricNamesStats(qt, tt, limit, le, matchPattern), nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func (api *VMStorage) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
api.s.ResetMetricNamesStats(qt)
return nil
}
func (api *VMStorage) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
accountID := sq.AccountID
projectID := sq.ProjectID
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters(accountID, projectID)
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := api.s.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
})
return maxUniqueTimeseriesValue
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
tfss = append(tfss, tfs)
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
return tfss, nil
}
func (api *VMStorage) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
return api.s.GetMetadataRows(qt, tt, limit, metricName, deadline)
}
// deleteSnapshot deletes a snapshot by its name.
//
// Callers must wrap the call with wg.Add(1)...wg.Done().
func (api *VMStorage) deleteSnapshot(snapshotName string) error {
snapshots := api.s.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := api.s.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}

View File

@@ -1,15 +1,18 @@
package servers
package main
import (
"math"
"strconv"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
maxMetricsLimit := calculateMaxUniqueTimeseries(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
@@ -37,10 +40,14 @@ func TestGetMaxMetrics(t *testing.T) {
defer func() {
*maxUniqueTimeseries = originalMaxUniqueTimeSeries
}()
maxConcurrentRequests := 2 * cgroup.AvailableCPUs()
f := func(searchQueryLimit, storageMaxUniqueTimeseries, expect int) {
t.Helper()
*maxUniqueTimeseries = storageMaxUniqueTimeseries
maxMetrics := getMaxMetrics(searchQueryLimit)
s := &storage.Storage{}
vmstorage := newVMStorage(s, maxConcurrentRequests)
maxMetrics := vmstorage.getMaxMetrics(searchQueryLimit)
if maxMetrics != expect {
t.Fatalf("unexpected max metrics: got %d, want %d", maxMetrics, expect)
}

View File

@@ -76,15 +76,6 @@ type Server struct {
// Limits contains various limits for Server.
type Limits struct {
// MaxLabelNames is the maximum label names, which may be returned from labelNames request.
MaxLabelNames int
// MaxLabelValues is the maximum label values, which may be returned from labelValues request.
MaxLabelValues int
// MaxTagValueSuffixes is the maximum number of entries, which can be returned from tagValueSuffixes request.
MaxTagValueSuffixes int
// MaxConcurrentRequests is the maximum number of concurrent requests a server can process.
//
// The remaining requests wait for up to MaxQueueDuration for their execution.
@@ -683,9 +674,6 @@ func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error {
if err != nil {
return fmt.Errorf("cannot read maxLabelNames: %w", err)
}
if maxLabelNames <= 0 || maxLabelNames > s.limits.MaxLabelNames {
maxLabelNames = s.limits.MaxLabelNames
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
@@ -737,9 +725,6 @@ func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error {
if err != nil {
return fmt.Errorf("cannot read maxLabelValues: %w", err)
}
if maxLabelValues <= 0 || maxLabelValues > s.limits.MaxLabelValues {
maxLabelValues = s.limits.MaxLabelValues
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
@@ -802,9 +787,6 @@ func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
if err != nil {
return fmt.Errorf("cannot read maxTagValueSuffixes: %w", err)
}
if maxSuffixes <= 0 || maxSuffixes > s.limits.MaxTagValueSuffixes {
maxSuffixes = s.limits.MaxTagValueSuffixes
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
@@ -817,14 +799,6 @@ func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
return ctx.writeErrorMessage(err)
}
if len(suffixes) >= s.limits.MaxTagValueSuffixes {
err := fmt.Errorf("more than %d tag value suffixes found "+
"for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
"either narrow down the query or increase -search.max* command-line flag value; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits",
s.limits.MaxTagValueSuffixes, tagKey, tagValuePrefix, delimiter, tr.String())
return ctx.writeErrorMessage(err)
}
// Send an empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)