Compare commits

..

1 Commits

Author SHA1 Message Date
hagen1778
aafbde2712 docs: update security recommendations
- move SBOM page down as general security recommendations must be mentioned first
- separate httplib security-related flags to a separate section, so it can be cross-referenced
  by products that use this lib
- mention that /metrics might require to be protected

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2026-06-18 11:15:51 +02:00
41 changed files with 285 additions and 3012 deletions

View File

@@ -471,9 +471,8 @@ test-full-386:
apptest:
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
# App tests for legacy indexDB
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
@@ -490,20 +489,6 @@ apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
VMSTORAGE_V1_132_0_PATH=$${DIR}/vmstorage-prod \
go test ./apptest/tests -run="^TestLegacySingle.*"
# App tests for mixed setups where vmsingle and vmcluster coexist.
apptest-mixed: victoria-metrics-race
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
VERSION=v1.145.0; \
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
DIR=/tmp/$${VERSION}; \
test -d $${DIR} || (mkdir $${DIR} && \
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
); \
VMSELECT_PATH=$${DIR}/vmselect-prod \
go test ./apptest/tests -run="^TestMixed.*"
benchmark:
go test -run=NO_TESTS -bench=. ./lib/...
go test -run=NO_TESTS -bench=. ./app/...

View File

@@ -89,7 +89,7 @@ func main() {
}
logger.Infof("starting VictoriaMetrics at %q...", listenAddrs)
startTime := time.Now()
vmstorage.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
vmstorage.Init(*vmselectMaxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
vmselect.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration)
vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate)
vminsert.Init()

View File

@@ -282,8 +282,7 @@ func processFlags() {
func setUp() {
const maxConcurrentRequests = 4
maxQueueDuration := 5 * time.Second
vmstorage.Init(maxConcurrentRequests, maxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
vmstorage.Init(maxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
readyCheckFunc := func() bool {

View File

@@ -131,13 +131,16 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
if ba.Username == "" {
return fmt.Errorf("missing `username` in `basic_auth` section")
}
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
if ba.Password != "" {
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}

View File

@@ -69,8 +69,6 @@ const (
vmAddr = "vm-addr"
vmUser = "vm-user"
vmPassword = "vm-password"
vmHeaders = "vm-headers"
vmBearerToken = "vm-bearer-token"
vmAccountID = "vm-account-id"
vmConcurrency = "vm-concurrency"
vmCompress = "vm-compress"
@@ -114,16 +112,6 @@ var (
Usage: "VictoriaMetrics password for basic auth",
EnvVars: []string{"VM_PASSWORD"},
},
&cli.StringFlag{
Name: vmHeaders,
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
},
&cli.StringFlag{
Name: vmBearerToken,
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
},
&cli.StringFlag{
Name: vmAccountID,
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +

View File

@@ -457,7 +457,7 @@ func main() {
auth.WithBearer(c.String(vmNativeDstBearerToken)),
auth.WithHeaders(c.String(vmNativeDstHeaders)))
if err != nil {
return fmt.Errorf("error initialize auth config for destination: %s: %s", dstAddr, err)
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
}
// create TLS config
@@ -596,18 +596,11 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
}
authCfg, err := auth.Generate(
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
auth.WithBearer(c.String(vmBearerToken)),
auth.WithHeaders(c.String(vmHeaders)))
if err != nil {
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %s", addr, err)
}
return vm.Config{
Addr: addr,
Transport: tr,
AuthCfg: authCfg,
User: c.String(vmUser),
Password: c.String(vmPassword),
Concurrency: uint8(c.Int(vmConcurrency)),
Compress: c.Bool(vmCompress),
AccountID: c.String(vmAccountID),

View File

@@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
@@ -28,8 +27,6 @@ type Config struct {
// --httpListenAddr value for single node version
// --httpListenAddr value of vmselect component for cluster version
Addr string
AuthCfg *auth.Config
// Transport allows specifying custom http.Transport
Transport *http.Transport
// Concurrency defines number of worker
@@ -43,6 +40,10 @@ type Config struct {
// BatchSize defines how many samples
// importer collects before sending the import request
BatchSize int
// User name for basic auth
User string
// Password for basic auth
Password string
// SignificantFigures defines the number of significant figures to leave
// in metric values before importing.
// Zero value saves all the significant decimal places
@@ -64,10 +65,11 @@ type Config struct {
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
type Importer struct {
addr string
authCfg *auth.Config
client *http.Client
importPath string
compress bool
user string
password string
close chan struct{}
input chan *TimeSeries
@@ -146,7 +148,8 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
client: client,
importPath: importPath,
compress: cfg.Compress,
authCfg: cfg.AuthCfg,
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
@@ -301,8 +304,8 @@ func (im *Importer) Ping() error {
if err != nil {
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
resp, err := im.client.Do(req)
if err != nil {
@@ -331,8 +334,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
if im.compress {
req.Header.Set("Content-Encoding", "gzip")

View File

@@ -6,6 +6,8 @@ import (
"flag"
"fmt"
"net/http"
nethttputil "net/http/httputil"
"net/url"
"strings"
"time"
@@ -27,7 +29,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmalertproxy"
)
var (
@@ -37,10 +38,7 @@ var (
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
"See also -search.logQueryMemoryUsage")
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , "+
"then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert")
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
)
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
@@ -57,8 +55,8 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
concurrencyLimitCh = make(chan struct{}, maxConcurrentRequests)
initVMUIConfig()
initVMAlertProxy()
vmalertproxy.Init(*vmalertProxyURL)
flagutil.RegisterSecretFlag("vmalert.proxyURL")
}
@@ -516,11 +514,10 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
if len(*vmalertProxyURL) == 0 {
w.WriteHeader(http.StatusBadRequest)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "%s", `{"status":"error","msg":"the '-vmalert.proxyURL' command-line must be configured; `+
`see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert"}`)
fmt.Fprintf(w, "%s", `{"status":"error","msg":"for accessing vmalert flag '-vmalert.proxyURL' must be configured"}`)
return true
}
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
@@ -558,7 +555,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/rules", "/rules":
rulesRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#rules
@@ -568,7 +565,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/alerts", "/alerts":
alertsRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
@@ -578,7 +575,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/notifiers", "/notifiers":
notifiersRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
w.Header().Set("Content-Type", "application/json")
@@ -725,7 +722,48 @@ var (
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
)
var vmuiConfig string
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request, path string) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
}
// Forward other panics to the caller.
panic(err)
}()
req := r.Clone(r.Context())
req.URL.Path = strings.TrimPrefix(path, "prometheus")
req.Host = vmalertProxyHost
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
// Grafana currently supports only Prometheus-style alerts. If other alert types
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
//
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
//
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
// `datasource_type=prometheus`.
//
// See:
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
q := req.URL.Query()
q.Set("datasource_type", "prometheus")
req.URL.RawQuery = q.Encode()
req.RequestURI = ""
}
vmalertProxy.ServeHTTP(w, req)
}
var (
vmalertProxyHost string
vmalertProxy *nethttputil.ReverseProxy
vmuiConfig string
)
func initVMUIConfig() {
var cfg struct {
@@ -757,3 +795,16 @@ func initVMUIConfig() {
}
vmuiConfig = string(data)
}
// initVMAlertProxy must be called after flag.Parse(), since it uses command-line flags.
func initVMAlertProxy() {
if len(*vmalertProxyURL) == 0 {
return
}
proxyURL, err := url.Parse(*vmalertProxyURL)
if err != nil {
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", *vmalertProxyURL, err)
}
vmalertProxyHost = proxyURL.Host
vmalertProxy = nethttputil.NewSingleHostReverseProxy(proxyURL)
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -526,7 +525,6 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
if deletedCount > 0 {
promql.ResetRollupResultCache()
}
logger.Infof("/api/v1/admin/tsdb/delete_series has been called for %q. Deleted %d series.", sq.FiltersString(), deletedCount)
return nil
}

View File

@@ -30,9 +30,6 @@ var (
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
vmselectAddr = flag.String("vmselectAddr", "", "TCP address to accept connections from vmselect services")
vmselectDisableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
@@ -111,7 +108,7 @@ func DataPath() string {
}
// Init initializes vmstorage.
func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Duration, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
@@ -172,21 +169,6 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
storageMetrics.RegisterMetricsWriter(vmStorage.writeStorageMetrics)
metrics.RegisterSet(storageMetrics)
if *vmselectAddr != "" {
var err error
limits := vmselectapi.Limits{
MaxConcurrentRequests: vmselectMaxConcurrentRequests,
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
MaxQueueDuration: vmselectMaxQueueDuration,
MaxQueueDurationFlagName: "search.maxQueueDuration",
}
api := newVMStorageWithTenantID(vmStorage)
vmselectSrv, err = vmselectapi.NewServer(*vmselectAddr, api, limits, *vmselectDisableRPCCompression)
if err != nil {
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
}
}
VMInsertAPI = vmStorage
VMSelectAPI = vmStorage
GetSearch = vmStorage.GetSearch
@@ -209,8 +191,6 @@ var (
// TODO(@rtm0): Remove this dependency from vmalert-tool unit tests.
DebugFlush func()
vmselectSrv *vmselectapi.Server
)
// Stop stops the vmstorage
@@ -221,10 +201,6 @@ func Stop() {
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime := time.Now()
if vmselectSrv != nil {
vmselectSrv.MustStop()
}
vmStorage.Stop()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())

View File

@@ -164,10 +164,6 @@ func (vms *VMStorage) IsReadOnly() bool {
}
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
return vms.initSearch(qt, sq, nil, deadline)
}
func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, marshal marshalFunc, deadline uint64) (vmselectapi.BlockIterator, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
@@ -182,7 +178,6 @@ func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.marshal = marshal
bi.wgDone = vms.wg.Done
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
@@ -203,14 +198,11 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
return searchQueryLimit
}
type marshalFunc func(dst []byte, src *storage.MetricBlock) []byte
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
marshal marshalFunc
wgDone func()
sr storage.Search
mb storage.MetricBlock
wgDone func()
}
var blockIteratorsPool sync.Pool
@@ -239,11 +231,7 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
mb := bi.mb
mb.MetricName = bi.sr.MetricBlockRef.MetricName
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
if bi.marshal != nil {
dst = bi.marshal(dst[:0], &mb)
} else {
dst = mb.Marshal(dst[:0])
}
dst = mb.Marshal(dst[:0])
return dst, true
}

View File

@@ -1,264 +0,0 @@
package vmstorage
import (
"flag"
"fmt"
"math"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
accountID = flag.Uint64("accountID", 0, "The accountID of the stored data")
projectID = flag.Uint64("projectID", 0, "The projectID of the stored data")
)
func newVMStorageWithTenantID(vms *VMStorage) *VMStorageWithTenantID {
if *accountID > math.MaxUint32 {
logger.Fatalf("-clusternative.accountID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *accountID)
}
if *projectID > math.MaxUint32 {
logger.Fatalf("-clusternative.projectID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *projectID)
}
return &VMStorageWithTenantID{
vms: vms,
accountID: uint32(*accountID),
projectID: uint32(*projectID),
}
}
// VMStorageWithTenantID is a thin wrapper around VMStorage type that overrides
// its methods to properly serve requests coming from a vmselect (require
// tenantID).
//
// A new instance of this type should be created using
// newVMStorageWithTenantID(). The created instance does not require closing.
// The instance also does not take ownership of vms and it is the responsibility
// of the caller to close vms.
type VMStorageWithTenantID struct {
vms *VMStorage
accountID uint32
projectID uint32
}
// InitSearch initializes a storage search for a request initiated by a
// vmselect.
//
// The search is initialized iff the search query is either multitenant or its
// accountID and projectID match -accountID and -projectID flag values.
// Otherwise, the method returns an interator that will return no data.
//
// The method also overrides the data format of the data returned by the
// iterator by prepending accountID and projectID bytes to the metric name and
// the data block (a format used in vmcluster).
func (vmst *VMStorageWithTenantID) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return emptyBI, nil
}
return vmst.vms.initSearch(qt, sq, vmst.marshalMetricBlock, deadline)
}
var emptyBI = &emptyBlockIterator{}
// emptyBlockIterator is an implementation of vmselectapi.BlockIterator that
// always returns no data.
type emptyBlockIterator struct{}
func (*emptyBlockIterator) MustClose() {}
func (*emptyBlockIterator) NextBlock(dst []byte) ([]byte, bool) {
return dst, false
}
func (*emptyBlockIterator) Error() error {
return nil
}
// marshalMetricBlock serializes a metric block in the format expected by
// vmselect.
//
// vmselect expects metric names and data blocks to have the tenantID but
// vmsingle does not have it. Therefore the tenantID needs to be included to
// every metric name and block.
func (vmst *VMStorageWithTenantID) marshalMetricBlock(dst []byte, src *storage.MetricBlock) []byte {
// Marshal metric name:
// 1. Marshal metric name length + accountID length + projectID length (in
// bytes).
// 2. append accountID and projectID bytes
// 3. Finally append metric name bytes
dst = encoding.MarshalVarUint64(dst, uint64(len(src.MetricName))+8)
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
dst = append(dst, src.MetricName...)
// Marshal data block.
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
dst = storage.MarshalBlock(dst, &src.Block)
return dst
}
// SearchMetricNames searches the storage for metric names that match the query.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
//
// Found metric names are prepended with accountID and projectID bytes (a format
// used in vmcluster).
func (vmst *VMStorageWithTenantID) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
metricNames, err := vmst.vms.SearchMetricNames(qt, sq, deadline)
if err != nil {
return nil, err
}
// vmselect expects metric names to have the tenantID but vmsingle does not
// have it. Therefore the tenantID needs to be appended to every metric
// name.
dst := make([]byte, 0, 8)
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
tenantID := string(dst)
for i, metricName := range metricNames {
metricNames[i] = tenantID + metricName
}
return metricNames, nil
}
// LabelValues searches the storage for values that match the query and
// correspond to a label whose name is `labelName`. The returned result
// will contain not more than `maxLabelValues`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
}
// TagValueSuffixes searches the storage for Graphite tag value suffixes. The
// returned result will contain not more than `maxSuffixes`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
if accountID != vmst.accountID || projectID != vmst.projectID {
return nil, nil
}
return vmst.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
}
// LabelNames searches the storage for label names that match the query.
// The returned result will contain not more than `maxLabelNames`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.LabelNames(qt, sq, maxLabelNames, deadline)
}
// SeriesCount returns the total number of metrics stored in the database.
//
// The method may return inflated numbers. How inflated the count depends
// on the churn rate and the retention period. For example, if a metric lasts
// for 2 months, it will be counted twice.
//
// The method also counts the deleted metrics.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return 0.
func (vmst *VMStorageWithTenantID) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
if accountID != vmst.accountID || projectID != vmst.projectID {
return 0, nil
}
return vmst.vms.SeriesCount(qt, accountID, projectID, deadline)
}
// Tenants returns just one tenant consisting of the -accountID and -projectID
// flag values.
func (vmst *VMStorageWithTenantID) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
tenantID := fmt.Sprintf("%d:%d", vmst.accountID, vmst.projectID)
return []string{tenantID}, nil
}
// TSDBStatus retrieves the status for metrics that match to the search query.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return empty
// status.
func (vmst *VMStorageWithTenantID) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return &storage.TSDBStatus{}, nil
}
return vmst.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
}
// DeleteSeries marks as deleted metrics that match the search query.
// The method returns the number of deleted metrics.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, no metrics will be deleted
// and the method will return 0.
func (vmst *VMStorageWithTenantID) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return 0, nil
}
return vmst.vms.DeleteSeries(qt, sq, deadline)
}
// RegisterMetricNames registers metric names in the index, the sample values
// and timestamps are ignored.
func (vmst *VMStorageWithTenantID) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
return vmst.vms.RegisterMetricNames(qt, mrs, deadline)
}
// GetMetricNamesUsageStats retrieves the usage stats for metrics whose name
// matches the pattern.
//
// If the request is not multitenant or the request accountID and projectID do
// not match the -accoutID and -projectID flag values, no metrics will be
// deleted and the method will return 0.
func (vmst *VMStorageWithTenantID) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
return metricnamestats.StatsResult{}, nil
}
return vmst.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
}
// ResetMetricNamesUsageStats resets the metric name usage stats.
func (vmst *VMStorageWithTenantID) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
return vmst.vms.ResetMetricNamesUsageStats(qt, deadline)
}
// GetMetadataRecords retrieves the metadata for the metricName.
//
// If the request is not multitenant or the request accountID and projectID do
// not match the -accoutID and -projectID flag values, no metrics will be
// deleted and the method will return 0.
func (vmst *VMStorageWithTenantID) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
}

View File

@@ -1,358 +0,0 @@
package apptest
import (
"fmt"
"slices"
)
type TestData struct {
Samples []string
Step int64
WantSeries []map[string]string
WantLabels []string
WantLabelValues []string
WantQueryResults []*QueryResult
WantMetadata map[string][]MetadataEntry
WantMetricNamesStats []MetricNamesStatsRecord
}
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
d := TestData{
Samples: []string{},
Step: (end - start) / numMetrics,
WantSeries: make([]map[string]string, numMetrics),
WantLabels: make([]string, numMetrics),
WantLabelValues: make([]string, numMetrics),
WantQueryResults: make([]*QueryResult, numMetrics),
WantMetadata: make(map[string][]MetadataEntry),
WantMetricNamesStats: make([]MetricNamesStatsRecord, numMetrics),
}
for i := range numMetrics {
metricName := fmt.Sprintf("%s_%04d", prefix, i)
metricHelp := fmt.Sprintf("# HELP %s some help message", metricName)
metricType := fmt.Sprintf("# TYPE %s gauge", metricName)
labelName := fmt.Sprintf("label_%04d", i)
labelValue := fmt.Sprintf("value_%04d", i)
value := i
timestamp := start + i*d.Step
sample := fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
d.Samples = append(d.Samples, metricHelp, metricType, sample)
d.WantSeries[i] = map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
}
d.WantLabels[i] = labelName
d.WantLabelValues[i] = labelValue
d.WantQueryResults[i] = &QueryResult{
Metric: map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
},
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
}
d.WantMetadata[metricName] = []MetadataEntry{{Help: "some help message", Type: "gauge"}}
d.WantMetricNamesStats[i].MetricName = metricName
}
d.WantLabels = append(d.WantLabels, "__name__", "label")
slices.Sort(d.WantLabels)
return d
}
// AssertSeries retrieves metric names from the storage and compares the result
// with the expected one.
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []map[string]string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/series response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
}).Sort()
},
Want: &PrometheusAPIV1SeriesResponse{
Status: "success",
Data: want,
},
Retries: 1000,
FailNow: true,
})
}
// AssertSeriesCount retrieves series count and compares it with expected one.
func AssertSeriesCount(tc *TestCase, app PrometheusQuerier, tenantID string, start, end int64, want uint64) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/series/count response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1SeriesCount(tc.T(), QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
},
Want: &PrometheusAPIV1SeriesCountResponse{
Status: "success",
Data: []uint64{want},
},
FailNow: true,
})
}
// AssertLabels retrieves label names from the storage and compares the result
// with the expected one.
func AssertLabels(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/labels response",
Got: func() any {
tc.T().Helper()
res := app.PrometheusAPIV1Labels(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
slices.Sort(res.Data)
return res
},
Want: &PrometheusAPIV1LabelsResponse{
Status: "success",
Data: want,
},
FailNow: true,
})
}
// AssertLabelValues retrieves values for the label whose name is labelName for
// the series whose name mathes metricNameRE, compares the result with the
// expected one.
func AssertLabelValues(tc *TestCase, app PrometheusQuerier, metricNameRE, labelName, tenantID string, start, end int64, want []string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/labels/.../values response",
Got: func() any {
tc.T().Helper()
res := app.PrometheusAPIV1LabelValues(tc.T(), labelName, query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
slices.Sort(res.Data)
return res
},
Want: &PrometheusAPIV1LabelValuesResponse{
Status: "success",
Data: want,
},
FailNow: true,
})
}
// AssertQueryResults sends a data query to storage and compares the query
// result with the expected one.
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end, step int64, want []*QueryResult) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/query_range response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
Step: fmt.Sprintf("%dms", step),
MaxLookback: fmt.Sprintf("%dms", step-1),
NoCache: "1",
})
},
Want: &PrometheusAPIV1QueryResponse{
Status: "success",
Data: &QueryData{
ResultType: "matrix",
Result: want,
},
},
FailNow: true,
})
}
func AssertMetadata(tc *TestCase, app PrometheusQuerier, metricName, tenantID string, want map[string][]MetadataEntry) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/metadata response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1Metadata(tc.T(), metricName, 0, QueryOpts{
Tenant: tenantID,
})
},
Want: &PrometheusAPIV1Metadata{
Status: "success",
Data: want,
},
FailNow: true,
})
}
func AssertMetricNamesStats(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, want []MetricNamesStatsRecord) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/status/metric_names_stats response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1StatusMetricNamesStats(tc.T(), "", "", metricNameRE, QueryOpts{
Tenant: tenantID,
})
},
Want: MetricNamesStatsResponse{
Records: want,
},
FailNow: true,
})
}
// GraphiteTestData holds the data samples in Graphite Pickle format, distance
// between samples in milliseconds and expected responses for various Graphite
// API endpoints.
type GraphiteTestData struct {
Samples []string
Step int64
WantMetricsIndex []string
WantMetricsFind []GraphiteMetric
WantMetricsExpand []string
WantRenderedTargets []GraphiteRenderedTarget
}
// GenerateGraphiteTestData generates Graphite test data.
func GenerateGraphiteTestData(prefix string, numMetrics, start, end int64) GraphiteTestData {
d := GraphiteTestData{
Samples: make([]string, numMetrics),
Step: (end - start) / numMetrics,
WantMetricsIndex: make([]string, numMetrics),
WantMetricsFind: make([]GraphiteMetric, numMetrics),
WantMetricsExpand: make([]string, numMetrics),
WantRenderedTargets: make([]GraphiteRenderedTarget, numMetrics),
}
datapoints := make([][2]float64, numMetrics)
for i := range numMetrics {
timestamp := (start + i*d.Step) / 1000
datapoints[i][1] = float64(timestamp)
}
for i := range numMetrics {
suffix := fmt.Sprintf("%04d", i)
metricName := fmt.Sprintf("%s.%s", prefix, suffix)
value := i
timestamp := (start + i*d.Step) / 1000
sample := fmt.Sprintf(`%s %d %d`, metricName, value, timestamp)
d.Samples[i] = sample
d.WantMetricsIndex[i] = metricName
d.WantMetricsFind[i].Id = metricName
d.WantMetricsFind[i].Text = suffix
d.WantMetricsFind[i].Leaf = 1
d.WantMetricsExpand[i] = metricName
d.WantRenderedTargets[i].Target = metricName
d.WantRenderedTargets[i].Datapoints = slices.Clone(datapoints)
d.WantRenderedTargets[i].Datapoints[i][0] = float64(value)
}
return d
}
// AssertGraphiteMetricsIndex retrieves all metrics by sending a request to
// /graphite/metrics/index.json and compares the result with the expected one.
func AssertGraphiteMetricsIndex(tc *TestCase, app PrometheusQuerier, tenantID string, want []string) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/index.json response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsIndex(tc.T(), QueryOpts{
Tenant: tenantID,
})
},
Want: want,
Retries: 30,
FailNow: true,
})
}
// AssertGraphiteMetricsFind finds metric names by sending a request to
// /graphite/metrics/find and compares the result with the expected one.
func AssertGraphiteMetricsFind(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []GraphiteMetric) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/find response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsFind(tc.T(), query, QueryOpts{
Tenant: tenantID,
})
},
Want: want,
FailNow: true,
})
}
// AssertGraphiteMetricsFind expands metric names by sending a request to
// /graphite/metrics/expand and compares the result with the expected one.
func AssertGraphiteMetricsExpand(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []string) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/expand response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsExpand(tc.T(), query, QueryOpts{
Tenant: tenantID,
})
},
Want: want,
FailNow: true,
})
}
// AssertGraphiteRender retieves metric raw data by sending a request to
// /graphite/render and compares the result with the expected one.
func AssertGraphiteRender(tc *TestCase, app PrometheusQuerier, target, tenantID string, from, until, step int64, want []GraphiteRenderedTarget) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/render response",
Got: func() any {
tc.T().Helper()
return app.GraphiteRender(tc.T(), target, QueryOpts{
Tenant: tenantID,
From: fmt.Sprintf("%d", from/1000),
Until: fmt.Sprintf("%d", until/1000),
StorageStep: fmt.Sprintf("%dms", step),
})
},
Want: want,
FailNow: true,
})
}

View File

@@ -1,216 +0,0 @@
package tests
import (
"fmt"
"path/filepath"
"slices"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/google/go-cmp/cmp"
)
func TestMixedPrometheusQueries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data := apptest.GenerateTestData("metric", numMetrics, start, end)
emptySeries := []map[string]string{}
emptyLabels := []string{}
emptyLabelValues := []string{}
emptyQueryResults := []*apptest.QueryResult{}
emptyMetadata := map[string][]apptest.MetadataEntry{}
emptyMetricNamesStats := []apptest.MetricNamesStatsRecord{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
// Ensure vmsingle returns data.
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data.WantSeries)
apptest.AssertSeriesCount(tc, vmsingle, "", start, end, numMetrics)
apptest.AssertLabels(tc, vmsingle, "metric.*", "", start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmsingle, "metric.*", "label", "", start, end, data.WantLabelValues)
apptest.AssertQueryResults(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmsingle, "", "", data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 1
}
apptest.AssertMetricNamesStats(tc, vmsingle, "", "", data.WantMetricNamesStats)
// Check that current vmsingle tenant (configured via flags) is tenant1.
gotAdminTenantsResponse := vmselect.APIV1AdminTenants(t, apptest.QueryOpts{})
wantAdminTenantsResponse := &apptest.AdminTenantsResponse{
Status: "success",
Data: []string{tenantID1},
}
if diff := cmp.Diff(wantAdminTenantsResponse, gotAdminTenantsResponse); diff != "" {
t.Fatalf("unexpected tenants (-want, +got):\n%s", diff)
}
// Ensure vmselect returns data for tenant1.
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID1, start, end, data.WantSeries)
apptest.AssertSeriesCount(tc, vmselect, tenantID1, start, end, numMetrics)
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID1, start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID1, start, end, data.WantLabelValues)
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmselect, "", tenantID1, data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 2
}
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID1, data.WantMetricNamesStats)
// Ensure vmselect does not return any data for tenant2.
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID2, start, end, emptySeries)
apptest.AssertSeriesCount(tc, vmselect, tenantID2, start, end, 0)
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID2, start, end, emptyLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID2, start, end, emptyLabelValues)
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyQueryResults)
apptest.AssertMetadata(tc, vmselect, "", tenantID2, emptyMetadata)
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID2, emptyMetricNamesStats)
// Ensure vmselect returns data for multitenant.
for _, v := range data.WantSeries {
v["vm_account_id"] = strconv.Itoa(accountID1)
v["vm_project_id"] = strconv.Itoa(projectID1)
}
apptest.AssertSeries(tc, vmselect, "metric.*", "multitenant", start, end, data.WantSeries)
data.WantLabels = append(data.WantLabels, "vm_account_id", "vm_project_id")
apptest.AssertLabels(tc, vmselect, "metric.*", "multitenant", start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", "multitenant", start, end, data.WantLabelValues)
for _, v := range data.WantQueryResults {
v.Metric["vm_account_id"] = strconv.Itoa(accountID1)
v.Metric["vm_project_id"] = strconv.Itoa(projectID1)
}
apptest.AssertQueryResults(tc, vmselect, "metric.*", "multitenant", start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmselect, "", "multitenant", data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 3
}
apptest.AssertMetricNamesStats(tc, vmselect, "", "multitenant", data.WantMetricNamesStats)
}
func TestMixedDeleteSeries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data1 := apptest.GenerateTestData("metric1", numMetrics, start, end)
data2 := apptest.GenerateTestData("metric2", numMetrics, start, end)
emptySeries := []map[string]string{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data1.Samples, apptest.QueryOpts{})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data2.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
wantSeries12 := slices.Concat(data1.WantSeries, data2.WantSeries)
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, wantSeries12)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric1.*"}`, apptest.QueryOpts{
Tenant: tenantID1,
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
Tenant: tenantID2,
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
Tenant: "multitenant",
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, emptySeries)
}
func TestMixedGraphiteQueries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data := apptest.GenerateGraphiteTestData("metric", numMetrics, start, end)
emptyMetricsIndex := []string{}
emptyMetricsFind := []apptest.GraphiteMetric{}
emptyMetricsExpand := []string{}
emptyRenderedTargets := []apptest.GraphiteRenderedTarget{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.GraphiteWrite(tc.T(), data.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
// Ensure vmsingle returns data.
apptest.AssertGraphiteMetricsIndex(tc, vmsingle, "", data.WantMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmsingle, "metric.*", "", data.WantMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmsingle, "metric.*", "", data.WantMetricsExpand)
apptest.AssertGraphiteRender(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantRenderedTargets)
// Ensure vmselect returns data for tenant1.
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID1, data.WantMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID1, data.WantMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID1, data.WantMetricsExpand)
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantRenderedTargets)
// Ensure vmselect does not return any data for tenant2.
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID2, emptyMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID2, emptyMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID2, emptyMetricsExpand)
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyRenderedTargets)
}

View File

@@ -25,14 +25,12 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
"-httpListenAddr": "127.0.0.1:0",
"-graphiteListenAddr": "127.0.0.1:0",
"-opentsdbListenAddr": "127.0.0.1:0",
"-vmselectAddr": "127.0.0.1:0",
},
extractREs: []*regexp.Regexp{
storageDataPathRE,
httpListenAddrRE,
graphiteListenAddrRE,
openTSDBListenAddrRE,
vmselectAddrRE,
},
output: output,
})
@@ -45,7 +43,6 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
httpListenAddr: stderrExtracts[1],
graphiteListenAddr: stderrExtracts[2],
openTSDBListenAddr: stderrExtracts[3],
vmselectAddr: stderrExtracts[4],
}), nil
}
@@ -54,7 +51,6 @@ type vmsingleRuntimeValues struct {
httpListenAddr string
graphiteListenAddr string
openTSDBListenAddr string
vmselectAddr string
}
func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
@@ -89,7 +85,6 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
},
storageDataPath: rt.storageDataPath,
httpListenAddr: rt.httpListenAddr,
vmselectAddr: rt.vmselectAddr,
}
}
@@ -104,7 +99,6 @@ type Vmsingle struct {
storageDataPath string
httpListenAddr string
vmselectAddr string
}
// HTTPAddr returns the address at which the vminsert process is
@@ -113,12 +107,6 @@ func (app *Vmsingle) HTTPAddr() string {
return app.httpListenAddr
}
// VmselectAddr returns the address at which the vmsingle process is listening
// for vmselect connections.
func (app *Vmsingle) VmselectAddr() string {
return app.vmselectAddr
}
// String returns the string representation of the vmsingle app state.
func (app *Vmsingle) String() string {
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{

View File

@@ -1712,30 +1712,21 @@ The following versions of VictoriaMetrics receive regular security fixes:
| [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | ✅ |
| other releases | ❌ |
### Software Bill of Materials (SBOM)
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
To inspect the SBOM for an image:
```sh
docker buildx imagetools inspect \
docker.io/victoriametrics/victoria-metrics:latest \
--format "{{ json .SBOM }}"
```
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
```sh
trivy image --sbom-sources oci \
docker.io/victoriametrics/victoria-metrics:latest
```
### Reporting a Vulnerability
Please report any security issues to <security@victoriametrics.com>
### CVE handling policy
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
When detected, only the Alpine base tag is updated.
Releases proceed as planned even if upstream fixes are not yet available.
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
### General security recommendations:
* All the VictoriaMetrics components must run in protected private networks without direct access from untrusted networks such as Internet.
@@ -1749,14 +1740,24 @@ Please report any security issues to <security@victoriametrics.com>
* Set reasonable [`Content-Security-Policy`](https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP) header value to mitigate [XSS attacks](https://en.wikipedia.org/wiki/Cross-site_scripting). See `-http.header.csp` flag.
* Set reasonable [`X-Frame-Options`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) header value to mitigate [clickjacking attacks](https://en.wikipedia.org/wiki/Clickjacking), for example `DENY`. See `-http.header.frameOptions` flag.
VictoriaMetrics provides the following security-related command-line flags:
There are the following security-related command-line flags for all components with HTTP API:
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr` (TCP port 8428 is listened by default).
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr`.
[Enterprise version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports automatic issuing of TLS certificates.
See [these docs](#automatic-issuing-of-tls-certificates).
* `-mtls` and `-mtlsCAFile` for enabling [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication) for requests to `-httpListenAddr`. See [these docs](#mtls-protection).
* `-httpAuth.username` and `-httpAuth.password` for protecting all the HTTP endpoints
with [HTTP Basic Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication).
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
and `X-Frame-Options` HTTP response headers.
### Protecting service endpoints
All VictoriaMetrics components expose internal metrics in Prometheus exposition format at `/metrics` page for [#Monitoring](https://docs.victoriametrics.com/victoriametrics/#monitoring).
Consider limiting access to `/metrics` page to trusted networks only.
There are other service endpoints that might require protection:
* `-deleteAuthKey` for protecting the `/api/v1/admin/tsdb/delete_series` endpoint. See [how to delete time series](#how-to-delete-time-series).
* `-snapshotAuthKey` for protecting the `/snapshot*` endpoints. See [how to work with snapshots](#how-to-work-with-snapshots).
* `-forceFlushAuthKey` for protecting the `/internal/force_flush` endpoint. See [force flush docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#forced-flush).
@@ -1768,8 +1769,6 @@ VictoriaMetrics provides the following security-related command-line flags:
* `-pprofAuthKey` for protecting the `/debug/pprof/*` endpoints, which can be used for [profiling](#profiling).
* `-metricNamesStatsResetAuthKey` for protecting the `/api/v1/admin/status/metric_names_stats/reset` endpoint, used for [Metric Names Tracker](#track-ingested-metrics-usage).
* `-denyQueryTracing` for disallowing [query tracing](#query-tracing).
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
and `X-Frame-Options` HTTP response headers.
Explicitly set internal network interface for TCP and UDP ports for data ingestion with Graphite and OpenTSDB formats.
For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<internal_iface_ip>:2003`. This protects from unexpected requests from untrusted network interfaces.
@@ -1777,17 +1776,6 @@ For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<i
See also [security recommendation for VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#security)
and [the general security page at VictoriaMetrics website](https://victoriametrics.com/security/).
### CVE handling policy
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
When detected, only the Alpine base tag is updated.
Releases proceed as planned even if upstream fixes are not yet available.
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
### mTLS protection
By default `VictoriaMetrics` accepts http requests at `8428` port (this port can be changed via `-httpListenAddr` command-line flags).
@@ -1817,6 +1805,26 @@ This functionality can be evaluated for free according to [these docs](https://d
See also [security recommendations](#security).
### Software Bill of Materials (SBOM)
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
To inspect the SBOM for an image:
```sh
docker buildx imagetools inspect \
docker.io/victoriametrics/victoria-metrics:latest \
--format "{{ json .SBOM }}"
```
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
```sh
trivy image --sbom-sources oci \
docker.io/victoriametrics/victoria-metrics:latest
```
## Tuning
* No need in tuning for VictoriaMetrics - it uses reasonable defaults for command-line flags,

View File

@@ -26,13 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Add the support of vmselect RPC to vmsingle so that single node can be queried by a vmselect from a vmcluster deployment. See [4328](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4328) and [10926](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10926).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -95,8 +95,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string
@@ -623,7 +621,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
-version
Show VictoriaMetrics version
-vmalert.proxyURL string
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
-vmui.customDashboardsPath string
Optional path to vmui dashboards. See https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards
-vmui.defaultTimezone string

View File

@@ -74,8 +74,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -115,8 +115,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmalert/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -59,8 +59,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmauth/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -400,8 +400,6 @@ Run `vmbackup -help` in order to see all the available options:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -575,8 +575,6 @@ command-line flags:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -496,8 +496,6 @@ Below is the list of configuration flags (it can be viewed by running `./vmgatew
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -76,8 +76,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -102,8 +102,6 @@ Run `vmrestore -help` in order to see all the available options:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -75,8 +75,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string
@@ -325,7 +323,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
-version
Show VictoriaMetrics version
-vmalert.proxyURL string
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#vmalert
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
-vmstorageDialTimeout duration
Timeout for establishing RPC connections from vmselect to vmstorage. See also -vmstorageUserTimeout (default 3s)
-vmstorageUserTimeout duration

View File

@@ -68,8 +68,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -1,140 +0,0 @@
package handshake
import (
"bufio"
"fmt"
"io"
"net"
"os"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type bufferedWriter interface {
Write(p []byte) (int, error)
Flush() error
}
// BufferedConn is a net.Conn with Flush suport.
type BufferedConn struct {
net.Conn
// IsLegacy defines if BufferedConn operates in legacy mode
// and doesn't support RPC protocol
IsLegacy bool
br io.Reader
bw bufferedWriter
readDeadline time.Time
writeDeadline time.Time
}
const bufferSize = 64 * 1024
// newBufferedConn returns buffered connection with the given compression level.
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
bc := &BufferedConn{
Conn: c,
}
if compressionLevel <= 0 {
bc.bw = bufio.NewWriterSize(c, bufferSize)
} else {
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
}
if !isReadCompressed {
bc.br = bufio.NewReaderSize(c, bufferSize)
} else {
bc.br = zstd.NewReader(c)
}
return bc
}
// SetDeadline sets read and write deadlines for bc to t.
//
// Deadline is checked on each Read and Write call.
func (bc *BufferedConn) SetDeadline(t time.Time) error {
bc.readDeadline = t
bc.writeDeadline = t
return bc.Conn.SetDeadline(t)
}
// SetReadDeadline sets read deadline for bc to t.
//
// Deadline is checked on each Read call.
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
bc.readDeadline = t
return bc.Conn.SetReadDeadline(t)
}
// SetWriteDeadline sets write deadline for bc to t.
//
// Deadline is checked on each Write call.
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
bc.writeDeadline = t
return bc.Conn.SetWriteDeadline(t)
}
// Read reads up to len(p) from bc to p.
func (bc *BufferedConn) Read(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.readDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.br.Read(p)
if err != nil && err != io.EOF {
err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
// Write writes p to bc.
//
// Do not forget to call Flush if needed.
func (bc *BufferedConn) Write(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.writeDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.bw.Write(p)
if err != nil {
err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool {
if deadline.IsZero() {
return false
}
return currentTimestamp > uint64(deadline.Unix())
}
// Close closes bc.
func (bc *BufferedConn) Close() error {
// Close the Conn at first. It is expected that all the required data
// is already flushed to the Conn.
err := bc.Conn.Close()
bc.Conn = nil
if zr, ok := bc.br.(*zstd.Reader); ok {
zr.Release()
}
bc.br = nil
if zw, ok := bc.bw.(*zstd.Writer); ok {
// Do not call zw.Close(), since we already closed the underlying conn.
zw.Release()
}
bc.bw = nil
bc.IsLegacy = false
return err
}
// Flush flushes internal write buffers to the underlying conn.
func (bc *BufferedConn) Flush() error {
return bc.bw.Flush()
}

View File

@@ -1,318 +0,0 @@
package handshake
import (
"errors"
"flag"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.")
const (
vminsertHelloLegacyVersion = "vminsert.02"
vminsertHello = "vminsert.03"
vmselectHello = "vmselect.01"
successResponse = "ok"
)
// Func must perform handshake on the given c using the given compressionLevel.
//
// It must return BufferedConn wrapper for c on successful handshake.
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
// VMInsertClientWithDialer performs client-side handshake for vminsert protocol.
//
// it uses provided dial func to establish connection to the server.
// compressionLevel is a legacy option which defines the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) {
c, err := dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err := vminsertClient(c, 0)
if err == nil {
return bc, nil
}
_ = c.Close()
if !strings.Contains(err.Error(), "cannot read success response after sending hello") {
return nil, err
}
// try to fallback to the prev non-RPC API version
// we cannot re-use exist connection, since vmstorage already closed it
c, err = dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("legacy handshake error: %w", err)
}
bc.IsLegacy = true
logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
return bc, nil
}
func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vminsertHello, compressionLevel)
}
// VMInsertClientWithHello performs client-side handshake for vminsert protocol.
//
// should be used for testing only
func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, helloMsg, compressionLevel)
}
// VMInsertServer performs server-side handshake for vminsert protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
var isRPCSupported bool
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
buf, err := readData(c, len(vminsertHello))
if err != nil {
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762
return errTCPHealthcheck
}
return fmt.Errorf("cannot read hello: %w", err)
}
isRPCSupported = string(buf) == vminsertHello
if !isRPCSupported {
// try to fallback to the previous protocol version
if string(buf) != vminsertHelloLegacyVersion {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello)
}
logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
}
return nil
})
if err != nil {
return nil, err
}
bc.IsLegacy = !isRPCSupported
return bc, nil
}
// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol
// with legacy hello message
//
// should be used for testing only
func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) {
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
return readMessage(c, vminsertHelloLegacyVersion)
})
if err != nil {
return nil, err
}
bc.IsLegacy = true
return bc, nil
}
// VMSelectClient performs client-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vmselectHello, compressionLevel)
}
// VMSelectServer performs server-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericServer(c, compressionLevel, func(c net.Conn) error {
err := readMessage(c, vmselectHello)
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786
return errTCPHealthcheck
}
return err
})
}
// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check
// and was closed immediately after being established.
//
// This is expected behavior and can be safely ignored.
var errTCPHealthcheck = fmt.Errorf("TCP health check connection safe to ignore")
// IsTCPHealthcheck determines whether the provided error is a TCP health check
func IsTCPHealthcheck(err error) bool {
return errors.Is(err, errTCPHealthcheck)
}
// IsClientNetworkError determines whether the provided error is a client-side network error,
// such as io.EOF, io.ErrUnexpectedEOF, or a timeout.
// These errors typically occur when a client disconnects abruptly or fails during the handshake,
// and are generally non-actionable from the server point of view.
// This function helps distinguish such errors from critical ones during the handshake process
// and adjust logging accordingly.
//
// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880
func IsClientNetworkError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
if IsTimeoutNetworkError(err) {
return true
}
if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") {
return true
}
return false
}
// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout.
func IsTimeoutNetworkError(err error) bool {
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return true
}
return false
}
func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := readHelloMessage(c); err != nil {
return nil, fmt.Errorf("cannot read hello message : %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := writeMessage(c, msg); err != nil {
return nil, fmt.Errorf("cannot write hello: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func writeIsCompressed(c net.Conn, isCompressed bool) error {
var buf [1]byte
if isCompressed {
buf[0] = 1
}
return writeMessage(c, string(buf[:]))
}
func readIsCompressed(c net.Conn) (bool, error) {
buf, err := readData(c, 1)
if err != nil {
return false, err
}
isCompressed := buf[0] != 0
return isCompressed, nil
}
func writeMessage(c net.Conn, msg string) error {
if _, err := io.WriteString(c, msg); err != nil {
return fmt.Errorf("cannot write %q to server: %w", msg, err)
}
if fc, ok := c.(flusher); ok {
if err := fc.Flush(); err != nil {
return fmt.Errorf("cannot flush %q to server: %w", msg, err)
}
}
return nil
}
type flusher interface {
Flush() error
}
func readMessage(c net.Conn, msg string) error {
buf, err := readData(c, len(msg))
if err != nil {
return err
}
if string(buf) != msg {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg)
}
return nil
}
func readData(c net.Conn, dataLen int) ([]byte, error) {
data := make([]byte, dataLen)
if n, err := io.ReadFull(c, data); err != nil {
return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n)
}
return data, nil
}

View File

@@ -1,83 +0,0 @@
package handshake
import (
"fmt"
"net"
"testing"
"time"
)
func TestVMInsertHandshake(t *testing.T) {
testHandshake(t, vminsertClient, VMInsertServer)
}
func TestVMSelectHandshake(t *testing.T) {
testHandshake(t, VMSelectClient, VMSelectServer)
}
func TestVMSelectServerTCPHealthcheck(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("cannot start listener: %s", err)
}
c, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatalf("cannot dial: %s", err)
}
if err := c.Close(); err != nil {
t.Fatalf("cannot close client conn: %s", err)
}
s, err := ln.Accept()
if err != nil {
t.Fatalf("cannot accept conn: %s", err)
}
if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) {
t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err)
}
}
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
t.Helper()
c, s := net.Pipe()
ch := make(chan error, 1)
go func() {
bcs, err := serverFunc(s, 3)
if err != nil {
ch <- fmt.Errorf("error on outer handshake: %w", err)
return
}
bcc, err := clientFunc(bcs, 3)
if err != nil {
ch <- fmt.Errorf("error on inner handshake: %w", err)
return
}
if bcc == nil {
ch <- fmt.Errorf("expecting non-nil conn")
return
}
ch <- nil
}()
bcc, err := clientFunc(c, 0)
if err != nil {
t.Fatalf("error on outer handshake: %s", err)
}
bcs, err := serverFunc(bcc, 0)
if err != nil {
t.Fatalf("error on inner handshake: %s", err)
}
if bcs == nil {
t.Fatalf("expecting non-nil conn")
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error on the server side: %s", err)
}
}
}

View File

@@ -64,10 +64,9 @@ var (
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
headerDisableServerHostname = flag.Bool("http.header.disableServerHostname", false, "Whether to disable 'X-Server-Hostname' header in HTTP responses")
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
disableCORS = flag.Bool("http.disableCORS", false, `Disable CORS for all origins (*)`)
)
@@ -330,9 +329,7 @@ func handlerWrapper(w http.ResponseWriter, r *http.Request, rh RequestHandler) {
if *headerCSP != "" {
h.Add("Content-Security-Policy", *headerCSP)
}
if !*headerDisableServerHostname {
h.Add("X-Server-Hostname", hostname)
}
h.Add("X-Server-Hostname", hostname)
requestsTotal.Inc()
if whetherToCloseConn(r) {
connTimeoutClosedConns.Inc()

View File

@@ -228,30 +228,4 @@ func TestHandlerWrapper(t *testing.T) {
if got := h.Get("Content-Security-Policy"); got != cspHeader {
t.Fatalf("unexpected CSP header; got %q; want %q", got, cspHeader)
}
if got := h.Get("X-Server-Hostname"); got != hostname {
t.Fatalf("unexpected X-Server-Hostname header; got %q; want %q", got, hostname)
}
}
func TestHandlerWrapperDisableServerHostnameHeader(t *testing.T) {
origDisableServerHostname := *headerDisableServerHostname
*headerDisableServerHostname = true
defer func() {
*headerDisableServerHostname = origDisableServerHostname
}()
req, _ := http.NewRequest("GET", "/health", nil)
srv := &server{s: &http.Server{}}
w := &httptest.ResponseRecorder{}
handlerWrapper(w, req, func(w http.ResponseWriter, r *http.Request) bool {
return builtinRoutesHandler(srv, r, w, func(_ http.ResponseWriter, _ *http.Request) bool {
return true
})
})
if got := w.Header().Get("X-Server-Hostname"); got != "" {
t.Fatalf("unexpected X-Server-Hostname header; got %q; want empty value", got)
}
}

View File

@@ -52,9 +52,6 @@ func TestGetTimeSuccess(t *testing.T) {
f("292277025-08-18T07:12:54.999999999Z", maxTimeMsecs)
f("1562529662.324", 1562529662324)
f("1223372036.855", 1223372036855)
// relative duration that resolves to a timestamp before 1970
f("-9223372036.854", minTimeMsecs)
}
func TestGetTimeError(t *testing.T) {
@@ -66,8 +63,8 @@ func TestGetTimeError(t *testing.T) {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
if msec, err := GetTime(r, "s", 123); err == nil {
t.Fatalf("expecting non-nil error in GetTime(%q); got %d", s, msec)
if _, err := GetTime(r, "s", 123); err == nil {
t.Fatalf("expecting non-nil error in GetTime(%q)", s)
}
}
@@ -87,6 +84,7 @@ func TestGetTimeError(t *testing.T) {
f("123md")
f("-12.3md")
// relative duration outside the allowed range
// relative duration that resolves to a timestamp before 1970
f("-9223372036.854")
f("-9223372036.855")
}

View File

@@ -325,17 +325,6 @@ func (s *Search) NextMetricBlock() bool {
// SearchQuery is used for sending search queries from vmselect to vmstorage.
type SearchQuery struct {
AccountID uint32
ProjectID uint32
// TenantTokens and IsMultiTenant is artificial fields
// they're only exist at runtime and cannot be transferred
// via network calls for keeping communication protocol compatibility
// TODO:@f41gh7 introduce breaking change to the protocol later
// and use TenantTokens instead of AccountID and ProjectID
TenantTokens []TenantToken
IsMultiTenant bool
// The time range for searching time series
MinTimestamp int64
MaxTimestamp int64
@@ -479,30 +468,15 @@ func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
return src, nil
}
// String returns string representation of the search query: tag filters and time range.
// String returns string representation of the search query.
func (sq *SearchQuery) String() string {
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
a := sq.FiltersString()
if !sq.IsMultiTenant {
return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end)
}
tts := make([]string, len(sq.TenantTokens))
for i, tt := range sq.TenantTokens {
tts[i] = tt.String()
}
return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end)
}
// FiltersString returns string representation of the tag filters.
func (sq *SearchQuery) FiltersString() []string {
a := make([]string, len(sq.TagFilterss))
for i, tfs := range sq.TagFilterss {
a[i] = tagFiltersToString(tfs)
}
return a
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
}
func tagFiltersToString(tfs []TagFilter) string {
@@ -513,9 +487,8 @@ func tagFiltersToString(tfs []TagFilter) string {
return "{" + strings.Join(a, ",") + "}"
}
// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result.
// It is expected that TenantToken is already marshaled to dst.
func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
// Marshal appends marshaled sq to dst and returns the result.
func (sq *SearchQuery) Marshal(dst []byte) []byte {
dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp)
dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss)))
@@ -525,25 +498,11 @@ func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
dst = tagFilters[i].Marshal(dst)
}
}
dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics))
return dst
}
// Unmarshal unmarshals sq from src and returns the tail.
func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.AccountID = encoding.UnmarshalUint32(src)
src = src[4:]
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.ProjectID = encoding.UnmarshalUint32(src)
src = src[4:]
sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}}
minTs, nSize := encoding.UnmarshalVarInt64(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
@@ -584,12 +543,6 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
sq.TagFilterss[i] = tagFilters
}
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.MaxMetrics = int(encoding.UnmarshalUint32(src))
src = src[4:]
return src, nil
}

View File

@@ -32,12 +32,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
// Skip nil sq1.
continue
}
tt := TenantToken{
AccountID: sq1.AccountID,
ProjectID: sq1.ProjectID,
}
buf = tt.Marshal(buf[:0])
buf = sq1.MarshalWithoutTenant(buf)
buf = sq1.Marshal(buf[:0])
tail, err := sq2.Unmarshal(buf)
if err != nil {
@@ -46,12 +41,6 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
if len(tail) > 0 {
t.Fatalf("unexpected tail left after SearchQuery unmarshaling; tail (len=%d): %q", len(tail), tail)
}
if sq2.AccountID != sq1.AccountID {
t.Fatalf("unexpected AccountID; got %d; want %d", sq2.AccountID, sq1.AccountID)
}
if sq2.ProjectID != sq1.ProjectID {
t.Fatalf("unexpected ProjectID; got %d; want %d", sq2.ProjectID, sq1.ProjectID)
}
if sq1.MinTimestamp != sq2.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", sq2.MinTimestamp, sq1.MinTimestamp)
}

View File

@@ -19,7 +19,7 @@ func ParseDuration(s string) (time.Duration, error) {
return 0, err
}
if ms < minValidMilli || maxValidMilli < ms {
return 0, fmt.Errorf("duration %q must be in the range [%s, %s]", s, minDuration, maxDuration)
return 0, fmt.Errorf("duration %q must be in the range [%v, %v]", s, minDuration, maxDuration)
}
return time.Duration(ms) * time.Millisecond, nil
}

View File

@@ -28,6 +28,7 @@ func ParseTimeMsec(s string) (int64, error) {
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats
//
// If s doesn't contain timezone information, then the local timezone is used.
// The time must be in the range [1970-01-01T00:00:00Z, 2262-04-11T23:47:16Z].
//
// It returns unix timestamp in nanoseconds.
func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
@@ -70,10 +71,14 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
if err != nil {
return 0, err
}
if d < 0 {
if d > 0 {
d = -d
}
return subInt64NoOverflow(currentTimestamp, int64(d)), nil
nsec := currentTimestamp + int64(d)
if nsec < 0 {
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, time.Unix(0, nsec).UTC(), minTime, maxTime)
}
return nsec, nil
}
if len(s) == 4 {
// Parse YYYY
@@ -110,28 +115,22 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
return parseTimeAt(time.RFC3339, sOrig, 0, sOrig)
}
func parseTimeAt(layout, value string, tzOffsetNsec int64, sOrig string) (int64, error) {
var (
minTime = time.Unix(0, 0).UTC()
maxTime = time.Unix(0, math.MaxInt64).UTC()
)
func parseTimeAt(layout, value string, tzOffsetNanos int64, sOrig string) (int64, error) {
t, err := time.Parse(layout, value)
if err != nil {
return 0, err
}
nsec := t.UnixNano()
return subInt64NoOverflow(nsec, -tzOffsetNsec), nil
}
func subInt64NoOverflow(a, b int64) int64 {
if b >= 0 {
if a < math.MinInt64+b {
return math.MinInt64
}
return a - b
tzOffset := time.Duration(tzOffsetNanos)
t = t.UTC().Add(tzOffset)
if t.Before(minTime) || t.After(maxTime) {
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, t, minTime, maxTime)
}
if a > math.MaxInt64+b {
return math.MaxInt64
}
return a - b
return t.UnixNano(), nil
}
// TryParseUnixTimestamp parses s as unix timestamp in seconds, milliseconds, microseconds or nanoseconds and returns the parsed timestamp in nanoseconds.

View File

@@ -210,7 +210,6 @@ func TestParseTimeAtLimits(t *testing.T) {
f := func(s string, wantTime time.Time) {
t.Helper()
got, err := ParseTimeAt(s, now.UnixNano())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -232,38 +231,42 @@ func TestParseTimeAtLimits(t *testing.T) {
west := location(t, "Etc/GMT+12") // UTC-12:00
var s string
// min timestamp
f("0", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
s = fmt.Sprintf("-%d", now.Unix())
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
s = fmt.Sprintf("now-%d", now.Unix())
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
// min year
f("1678Z", time.Date(1678, 1, 1, 0, 0, 0, 0, time.UTC))
f("1678+14:00", time.Date(1678, 1, 1, 0, 0, 0, 0, east))
f("1678-12:00", time.Date(1678, 1, 1, 0, 0, 0, 0, west))
f("1970Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1971+14:00", time.Date(1971, 1, 1, 0, 0, 0, 0, east))
f("1970-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min month
f("1677-10Z", time.Date(1677, 10, 1, 0, 0, 0, 0, time.UTC))
f("1677-10+14:00", time.Date(1677, 10, 1, 0, 0, 0, 0, east))
f("1677-10-12:00", time.Date(1677, 10, 1, 0, 0, 0, 0, west))
f("1970-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-02+14:00", time.Date(1970, 2, 1, 0, 0, 0, 0, east))
f("1970-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min day
f("1677-09-22Z", time.Date(1677, 9, 22, 0, 0, 0, 0, time.UTC))
f("1677-09-22+14:00", time.Date(1677, 9, 22, 0, 0, 0, 0, east))
f("1677-09-22-12:00", time.Date(1677, 9, 22, 0, 0, 0, 0, west))
f("1970-01-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-02+14:00", time.Date(1970, 1, 2, 0, 0, 0, 0, east))
f("1970-01-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min hour
f("1677-09-21T01Z", time.Date(1677, 9, 21, 1, 0, 0, 0, time.UTC))
f("1677-09-21T15+14:00", time.Date(1677, 9, 21, 15, 0, 0, 0, east))
f("1677-09-21T01+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T01-12:00", time.Date(1677, 9, 21, 1, 0, 0, 0, west))
f("1970-01-01T00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// min minute
f("1677-09-21T00:12Z", time.Date(1677, 9, 21, 0, 12, 0, 0, time.UTC))
f("1677-09-21T15:12Z+14:00", time.Date(1677, 9, 21, 15, 12, 0, 0, east))
f("1677-09-21T00:13Z+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T00:13Z-12:00", time.Date(1677, 9, 21, 0, 13, 0, 0, west))
f("1970-01-01T00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// min second
f("1677-09-21T00:12:43Z", time.Date(1677, 9, 21, 0, 12, 43, 0, time.UTC))
f("1677-09-21T15:12:43Z+14:00", time.Date(1677, 9, 21, 15, 12, 43, 0, east))
f("1677-09-21T00:12:44Z+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T00:12:44Z-12:00", time.Date(1677, 9, 21, 0, 12, 44, 0, west))
f("1970-01-01T00:00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14:00:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12:00:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// max year
f("2262Z", time.Date(2262, 1, 1, 0, 0, 0, 0, time.UTC))
@@ -277,26 +280,23 @@ func TestParseTimeAtLimits(t *testing.T) {
// max day
f("2262-04-11Z", time.Date(2262, 4, 11, 0, 0, 0, 0, time.UTC))
f("2262-04-11+14:00", time.Date(2262, 4, 11, 0, 0, 0, 0, east))
f("2262-04-12+14:00", time.Date(2262, 4, 12, 0, 0, 0, 0, east))
f("2262-04-11-12:00", time.Date(2262, 4, 11, 0, 0, 0, 0, west))
// max hour
f("2262-04-11T23Z", time.Date(2262, 4, 11, 23, 0, 0, 0, time.UTC))
f("2262-04-11T23+14:00", time.Date(2262, 4, 11, 23, 0, 0, 0, east))
f("2262-04-12T13+14:00", time.Date(2262, 4, 12, 13, 0, 0, 0, east))
f("2262-04-11T11-12:00", time.Date(2262, 4, 11, 11, 0, 0, 0, west))
f("2262-04-11T23-12:00", time.Unix(0, math.MaxInt64))
// max minute
f("2262-04-11T23:47Z", time.Date(2262, 4, 11, 23, 47, 0, 0, time.UTC))
f("2262-04-11T23:47+14:00", time.Date(2262, 4, 11, 23, 47, 0, 0, east))
f("2262-04-12T13:47+14:00", time.Date(2262, 4, 12, 13, 47, 0, 0, east))
f("2262-04-11T11:47-12:00", time.Date(2262, 4, 11, 11, 47, 0, 0, west))
f("2262-04-11T23:47-12:00", time.Unix(0, math.MaxInt64))
// max second
f("2262-04-11T23:47:16Z", time.Date(2262, 4, 11, 23, 47, 16, 0, time.UTC))
f("2262-04-11T23:47:16+14:00", time.Date(2262, 4, 11, 23, 47, 16, 0, east))
f("2262-04-12T13:47:16+14:00", time.Date(2262, 4, 12, 13, 47, 16, 0, east))
f("2262-04-11T11:47:16-12:00", time.Date(2262, 4, 11, 11, 47, 16, 0, west))
f("2262-04-11T23:47:16-12:00", time.Unix(0, math.MaxInt64))
// max timestamp
s = fmt.Sprintf("%d", int64(maxValidSecond))
@@ -324,6 +324,85 @@ func TestParseTimeAtLimits(t *testing.T) {
f(s, time.Date(1970, 4, 17, 18, 2, 52, 36_854_776, time.UTC))
}
func TestParseTimeAtOutsideLimits(t *testing.T) {
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
f := func(s string) {
t.Helper()
got, err := ParseTimeAt(s, now.UnixNano())
if err == nil {
t.Fatalf("expected error but got %d", got)
}
if !strings.Contains(err.Error(), "must be in the range") {
t.Fatalf("expected error: %v", err)
}
}
// min timestamp
f(fmt.Sprintf("-%d", now.Unix()+1))
f(fmt.Sprintf("now-%d", now.Unix()+1))
// min year
f("1969Z")
f("1970+14:00")
f("1969-12:00")
// min month
f("1969-12Z")
f("1970-01+14:00")
f("1969-12-12:00")
// min day
f("1969-12-31Z")
f("1970-01-01+14:00")
f("1969-12-31-12:00")
// min hour
f("1969-12-31T23Z")
f("1970-01-01T13+14:00")
f("1969-12-31T11-12:00")
// min minute
f("1969-12-31T23:59Z")
f("1970-01-01T13:59+14:00")
f("1969-12-31T11:59-12:00")
// min second
f("1969-12-31T23:59:59Z")
f("1970-01-01T13:59:59+14:00")
f("1969-12-31T11:59:59-12:00")
// max year
f("2263Z")
f("2263+14:00")
f("2263-12:00")
// max month
f("2262-05Z")
f("2262-05+14:00")
f("2262-05-12:00")
// max day
f("2262-04-12Z")
f("2262-04-13+14:00")
f("2262-04-12-12:00")
// max hour
f("2262-04-12T00Z")
f("2262-04-12T14+14:00")
f("2262-04-11T12-12:00")
// max minute
f("2262-04-11T23:48Z")
f("2262-04-12T13:48+14:00")
f("2262-04-11T11:48-12:00")
// max second
f("2262-04-11T23:47:17Z")
f("2262-04-12T13:47:17+14:00")
f("2262-04-11T11:47:17-12:00")
}
func TestParseTimeAtOutsideLimits_Nanos(t *testing.T) {
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
@@ -355,6 +434,7 @@ func TestParseTimeMsecFailure(t *testing.T) {
}
f("")
f("2263")
f("23-45:50")
f("1223-fo:ba")
f("1223-12:ba")

View File

@@ -1,68 +0,0 @@
package vmalertproxy
import (
"net/http"
"net/http/httputil"
"net/url"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Init initializes proxying requests to the given proxyURL when calling HandleRequest.
//
// Init must be called after flag.Parse(), since it uses command-line flags.
func Init(proxyURL string) {
if len(proxyURL) == 0 {
return
}
pu, err := url.Parse(proxyURL)
if err != nil {
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", proxyURL, err)
}
vmalertProxyHost = pu.Host
vmalertProxy = httputil.NewSingleHostReverseProxy(pu)
}
// HandleRequest proxies the given request path to vmalert at proxyURL passed to Init().
func HandleRequest(w http.ResponseWriter, r *http.Request, path string) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
}
// Forward other panics to the caller.
panic(err)
}()
req := r.Clone(r.Context())
req.URL.Path = path
req.Host = vmalertProxyHost
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
// Grafana currently supports only Prometheus-style alerts. If other alert types
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
//
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
//
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
// `datasource_type=prometheus`.
//
// See:
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
q := req.URL.Query()
q.Set("datasource_type", "prometheus")
req.URL.RawQuery = q.Encode()
req.RequestURI = ""
}
vmalertProxy.ServeHTTP(w, req)
}
var (
vmalertProxyHost string
vmalertProxy *httputil.ReverseProxy
)

File diff suppressed because it is too large Load Diff