mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-18 16:23:06 +03:00
Compare commits
1 Commits
vmsingle-v
...
docs-secur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aafbde2712 |
17
Makefile
17
Makefile
@@ -471,9 +471,8 @@ test-full-386:
|
||||
|
||||
apptest:
|
||||
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
|
||||
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
|
||||
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
|
||||
|
||||
# App tests for legacy indexDB
|
||||
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
@@ -490,20 +489,6 @@ apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
VMSTORAGE_V1_132_0_PATH=$${DIR}/vmstorage-prod \
|
||||
go test ./apptest/tests -run="^TestLegacySingle.*"
|
||||
|
||||
# App tests for mixed setups where vmsingle and vmcluster coexist.
|
||||
apptest-mixed: victoria-metrics-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
VERSION=v1.145.0; \
|
||||
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
|
||||
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
|
||||
DIR=/tmp/$${VERSION}; \
|
||||
test -d $${DIR} || (mkdir $${DIR} && \
|
||||
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
|
||||
); \
|
||||
VMSELECT_PATH=$${DIR}/vmselect-prod \
|
||||
go test ./apptest/tests -run="^TestMixed.*"
|
||||
|
||||
benchmark:
|
||||
go test -run=NO_TESTS -bench=. ./lib/...
|
||||
go test -run=NO_TESTS -bench=. ./app/...
|
||||
|
||||
@@ -89,7 +89,7 @@ func main() {
|
||||
}
|
||||
logger.Infof("starting VictoriaMetrics at %q...", listenAddrs)
|
||||
startTime := time.Now()
|
||||
vmstorage.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
|
||||
vmstorage.Init(*vmselectMaxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
|
||||
vmselect.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration)
|
||||
vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate)
|
||||
vminsert.Init()
|
||||
|
||||
@@ -282,8 +282,7 @@ func processFlags() {
|
||||
|
||||
func setUp() {
|
||||
const maxConcurrentRequests = 4
|
||||
maxQueueDuration := 5 * time.Second
|
||||
vmstorage.Init(maxConcurrentRequests, maxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
|
||||
vmstorage.Init(maxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
readyCheckFunc := func() bool {
|
||||
|
||||
@@ -131,13 +131,16 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
|
||||
if ba.Username == "" {
|
||||
return fmt.Errorf("missing `username` in `basic_auth` section")
|
||||
}
|
||||
ac.getAuthHeader = func() string {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64
|
||||
if ba.Password != "" {
|
||||
ac.getAuthHeader = func() string {
|
||||
// See https://en.wikipedia.org/wiki/Basic_access_authentication
|
||||
token := ba.Username + ":" + ba.Password
|
||||
token64 := base64.StdEncoding.EncodeToString([]byte(token))
|
||||
return "Basic " + token64
|
||||
}
|
||||
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
return nil
|
||||
}
|
||||
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -69,8 +69,6 @@ const (
|
||||
vmAddr = "vm-addr"
|
||||
vmUser = "vm-user"
|
||||
vmPassword = "vm-password"
|
||||
vmHeaders = "vm-headers"
|
||||
vmBearerToken = "vm-bearer-token"
|
||||
vmAccountID = "vm-account-id"
|
||||
vmConcurrency = "vm-concurrency"
|
||||
vmCompress = "vm-compress"
|
||||
@@ -114,16 +112,6 @@ var (
|
||||
Usage: "VictoriaMetrics password for basic auth",
|
||||
EnvVars: []string{"VM_PASSWORD"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmHeaders,
|
||||
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
|
||||
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
|
||||
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmBearerToken,
|
||||
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: vmAccountID,
|
||||
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +
|
||||
|
||||
@@ -457,7 +457,7 @@ func main() {
|
||||
auth.WithBearer(c.String(vmNativeDstBearerToken)),
|
||||
auth.WithHeaders(c.String(vmNativeDstHeaders)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initialize auth config for destination: %s: %s", dstAddr, err)
|
||||
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
|
||||
}
|
||||
|
||||
// create TLS config
|
||||
@@ -596,18 +596,11 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
|
||||
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
|
||||
}
|
||||
|
||||
authCfg, err := auth.Generate(
|
||||
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
|
||||
auth.WithBearer(c.String(vmBearerToken)),
|
||||
auth.WithHeaders(c.String(vmHeaders)))
|
||||
if err != nil {
|
||||
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %s", addr, err)
|
||||
}
|
||||
|
||||
return vm.Config{
|
||||
Addr: addr,
|
||||
Transport: tr,
|
||||
AuthCfg: authCfg,
|
||||
User: c.String(vmUser),
|
||||
Password: c.String(vmPassword),
|
||||
Concurrency: uint8(c.Int(vmConcurrency)),
|
||||
Compress: c.Bool(vmCompress),
|
||||
AccountID: c.String(vmAccountID),
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
@@ -28,8 +27,6 @@ type Config struct {
|
||||
// --httpListenAddr value for single node version
|
||||
// --httpListenAddr value of vmselect component for cluster version
|
||||
Addr string
|
||||
|
||||
AuthCfg *auth.Config
|
||||
// Transport allows specifying custom http.Transport
|
||||
Transport *http.Transport
|
||||
// Concurrency defines number of worker
|
||||
@@ -43,6 +40,10 @@ type Config struct {
|
||||
// BatchSize defines how many samples
|
||||
// importer collects before sending the import request
|
||||
BatchSize int
|
||||
// User name for basic auth
|
||||
User string
|
||||
// Password for basic auth
|
||||
Password string
|
||||
// SignificantFigures defines the number of significant figures to leave
|
||||
// in metric values before importing.
|
||||
// Zero value saves all the significant decimal places
|
||||
@@ -64,10 +65,11 @@ type Config struct {
|
||||
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
|
||||
type Importer struct {
|
||||
addr string
|
||||
authCfg *auth.Config
|
||||
client *http.Client
|
||||
importPath string
|
||||
compress bool
|
||||
user string
|
||||
password string
|
||||
|
||||
close chan struct{}
|
||||
input chan *TimeSeries
|
||||
@@ -146,7 +148,8 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
|
||||
client: client,
|
||||
importPath: importPath,
|
||||
compress: cfg.Compress,
|
||||
authCfg: cfg.AuthCfg,
|
||||
user: cfg.User,
|
||||
password: cfg.Password,
|
||||
rl: limiter.NewLimiter(cfg.RateLimit),
|
||||
close: make(chan struct{}),
|
||||
input: make(chan *TimeSeries, cfg.Concurrency*4),
|
||||
@@ -301,8 +304,8 @@ func (im *Importer) Ping() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
|
||||
}
|
||||
if im.authCfg != nil {
|
||||
im.authCfg.SetHeaders(req, true)
|
||||
if im.user != "" {
|
||||
req.SetBasicAuth(im.user, im.password)
|
||||
}
|
||||
resp, err := im.client.Do(req)
|
||||
if err != nil {
|
||||
@@ -331,8 +334,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
|
||||
im.importRequestsErrorsTotal.Inc()
|
||||
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
|
||||
}
|
||||
if im.authCfg != nil {
|
||||
im.authCfg.SetHeaders(req, true)
|
||||
if im.user != "" {
|
||||
req.SetBasicAuth(im.user, im.password)
|
||||
}
|
||||
if im.compress {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
nethttputil "net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +29,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmalertproxy"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -37,10 +38,7 @@ var (
|
||||
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
|
||||
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
|
||||
"See also -search.logQueryMemoryUsage")
|
||||
|
||||
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , "+
|
||||
"then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert")
|
||||
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
|
||||
)
|
||||
|
||||
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
|
||||
@@ -57,8 +55,8 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
|
||||
concurrencyLimitCh = make(chan struct{}, maxConcurrentRequests)
|
||||
|
||||
initVMUIConfig()
|
||||
initVMAlertProxy()
|
||||
|
||||
vmalertproxy.Init(*vmalertProxyURL)
|
||||
flagutil.RegisterSecretFlag("vmalert.proxyURL")
|
||||
}
|
||||
|
||||
@@ -516,11 +514,10 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
if len(*vmalertProxyURL) == 0 {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(w, "%s", `{"status":"error","msg":"the '-vmalert.proxyURL' command-line must be configured; `+
|
||||
`see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert"}`)
|
||||
fmt.Fprintf(w, "%s", `{"status":"error","msg":"for accessing vmalert flag '-vmalert.proxyURL' must be configured"}`)
|
||||
return true
|
||||
}
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -558,7 +555,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/rules", "/rules":
|
||||
rulesRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
return true
|
||||
}
|
||||
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#rules
|
||||
@@ -568,7 +565,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/alerts", "/alerts":
|
||||
alertsRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
return true
|
||||
}
|
||||
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
|
||||
@@ -578,7 +575,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
case "/api/v1/notifiers", "/notifiers":
|
||||
notifiersRequests.Inc()
|
||||
if len(*vmalertProxyURL) > 0 {
|
||||
vmalertproxy.HandleRequest(w, r, path)
|
||||
proxyVMAlertRequests(w, r, path)
|
||||
return true
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@@ -725,7 +722,48 @@ var (
|
||||
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
|
||||
)
|
||||
|
||||
var vmuiConfig string
|
||||
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request, path string) {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err == nil || err == http.ErrAbortHandler {
|
||||
// Suppress http.ErrAbortHandler panic.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
|
||||
return
|
||||
}
|
||||
// Forward other panics to the caller.
|
||||
panic(err)
|
||||
}()
|
||||
req := r.Clone(r.Context())
|
||||
req.URL.Path = strings.TrimPrefix(path, "prometheus")
|
||||
req.Host = vmalertProxyHost
|
||||
|
||||
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
|
||||
// Grafana currently supports only Prometheus-style alerts. If other alert types
|
||||
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
|
||||
//
|
||||
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
|
||||
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
|
||||
//
|
||||
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
|
||||
// `datasource_type=prometheus`.
|
||||
//
|
||||
// See:
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
|
||||
q := req.URL.Query()
|
||||
q.Set("datasource_type", "prometheus")
|
||||
req.URL.RawQuery = q.Encode()
|
||||
req.RequestURI = ""
|
||||
}
|
||||
|
||||
vmalertProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
var (
|
||||
vmalertProxyHost string
|
||||
vmalertProxy *nethttputil.ReverseProxy
|
||||
vmuiConfig string
|
||||
)
|
||||
|
||||
func initVMUIConfig() {
|
||||
var cfg struct {
|
||||
@@ -757,3 +795,16 @@ func initVMUIConfig() {
|
||||
}
|
||||
vmuiConfig = string(data)
|
||||
}
|
||||
|
||||
// initVMAlertProxy must be called after flag.Parse(), since it uses command-line flags.
|
||||
func initVMAlertProxy() {
|
||||
if len(*vmalertProxyURL) == 0 {
|
||||
return
|
||||
}
|
||||
proxyURL, err := url.Parse(*vmalertProxyURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", *vmalertProxyURL, err)
|
||||
}
|
||||
vmalertProxyHost = proxyURL.Host
|
||||
vmalertProxy = nethttputil.NewSingleHostReverseProxy(proxyURL)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
@@ -526,7 +525,6 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
|
||||
if deletedCount > 0 {
|
||||
promql.ResetRollupResultCache()
|
||||
}
|
||||
logger.Infof("/api/v1/admin/tsdb/delete_series has been called for %q. Deleted %d series.", sq.FiltersString(), deletedCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -30,9 +30,6 @@ var (
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
|
||||
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
|
||||
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
|
||||
vmselectAddr = flag.String("vmselectAddr", "", "TCP address to accept connections from vmselect services")
|
||||
vmselectDisableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
|
||||
"This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
|
||||
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
|
||||
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
|
||||
@@ -111,7 +108,7 @@ func DataPath() string {
|
||||
}
|
||||
|
||||
// Init initializes vmstorage.
|
||||
func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Duration, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
storage.SetDedupInterval(*minScrapeInterval)
|
||||
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
|
||||
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
|
||||
@@ -172,21 +169,6 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
|
||||
storageMetrics.RegisterMetricsWriter(vmStorage.writeStorageMetrics)
|
||||
metrics.RegisterSet(storageMetrics)
|
||||
|
||||
if *vmselectAddr != "" {
|
||||
var err error
|
||||
limits := vmselectapi.Limits{
|
||||
MaxConcurrentRequests: vmselectMaxConcurrentRequests,
|
||||
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
|
||||
MaxQueueDuration: vmselectMaxQueueDuration,
|
||||
MaxQueueDurationFlagName: "search.maxQueueDuration",
|
||||
}
|
||||
api := newVMStorageWithTenantID(vmStorage)
|
||||
vmselectSrv, err = vmselectapi.NewServer(*vmselectAddr, api, limits, *vmselectDisableRPCCompression)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
VMInsertAPI = vmStorage
|
||||
VMSelectAPI = vmStorage
|
||||
GetSearch = vmStorage.GetSearch
|
||||
@@ -209,8 +191,6 @@ var (
|
||||
|
||||
// TODO(@rtm0): Remove this dependency from vmalert-tool unit tests.
|
||||
DebugFlush func()
|
||||
|
||||
vmselectSrv *vmselectapi.Server
|
||||
)
|
||||
|
||||
// Stop stops the vmstorage
|
||||
@@ -221,10 +201,6 @@ func Stop() {
|
||||
|
||||
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
|
||||
startTime := time.Now()
|
||||
|
||||
if vmselectSrv != nil {
|
||||
vmselectSrv.MustStop()
|
||||
}
|
||||
vmStorage.Stop()
|
||||
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
|
||||
|
||||
|
||||
@@ -164,10 +164,6 @@ func (vms *VMStorage) IsReadOnly() bool {
|
||||
}
|
||||
|
||||
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
return vms.initSearch(qt, sq, nil, deadline)
|
||||
}
|
||||
|
||||
func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, marshal marshalFunc, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
vms.wg.Add(1)
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
@@ -182,7 +178,6 @@ func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
bi := getBlockIterator()
|
||||
bi.marshal = marshal
|
||||
bi.wgDone = vms.wg.Done
|
||||
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
|
||||
if err := bi.sr.Error(); err != nil {
|
||||
@@ -203,14 +198,11 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
|
||||
return searchQueryLimit
|
||||
}
|
||||
|
||||
type marshalFunc func(dst []byte, src *storage.MetricBlock) []byte
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
marshal marshalFunc
|
||||
wgDone func()
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
wgDone func()
|
||||
}
|
||||
|
||||
var blockIteratorsPool sync.Pool
|
||||
@@ -239,11 +231,7 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
mb := bi.mb
|
||||
mb.MetricName = bi.sr.MetricBlockRef.MetricName
|
||||
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
|
||||
if bi.marshal != nil {
|
||||
dst = bi.marshal(dst[:0], &mb)
|
||||
} else {
|
||||
dst = mb.Marshal(dst[:0])
|
||||
}
|
||||
dst = mb.Marshal(dst[:0])
|
||||
return dst, true
|
||||
}
|
||||
|
||||
|
||||
@@ -1,264 +0,0 @@
|
||||
package vmstorage
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
|
||||
var (
|
||||
accountID = flag.Uint64("accountID", 0, "The accountID of the stored data")
|
||||
projectID = flag.Uint64("projectID", 0, "The projectID of the stored data")
|
||||
)
|
||||
|
||||
func newVMStorageWithTenantID(vms *VMStorage) *VMStorageWithTenantID {
|
||||
if *accountID > math.MaxUint32 {
|
||||
logger.Fatalf("-clusternative.accountID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *accountID)
|
||||
}
|
||||
if *projectID > math.MaxUint32 {
|
||||
logger.Fatalf("-clusternative.projectID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *projectID)
|
||||
}
|
||||
return &VMStorageWithTenantID{
|
||||
vms: vms,
|
||||
accountID: uint32(*accountID),
|
||||
projectID: uint32(*projectID),
|
||||
}
|
||||
}
|
||||
|
||||
// VMStorageWithTenantID is a thin wrapper around VMStorage type that overrides
|
||||
// its methods to properly serve requests coming from a vmselect (require
|
||||
// tenantID).
|
||||
//
|
||||
// A new instance of this type should be created using
|
||||
// newVMStorageWithTenantID(). The created instance does not require closing.
|
||||
// The instance also does not take ownership of vms and it is the responsibility
|
||||
// of the caller to close vms.
|
||||
type VMStorageWithTenantID struct {
|
||||
vms *VMStorage
|
||||
|
||||
accountID uint32
|
||||
projectID uint32
|
||||
}
|
||||
|
||||
// InitSearch initializes a storage search for a request initiated by a
|
||||
// vmselect.
|
||||
//
|
||||
// The search is initialized iff the search query is either multitenant or its
|
||||
// accountID and projectID match -accountID and -projectID flag values.
|
||||
// Otherwise, the method returns an interator that will return no data.
|
||||
//
|
||||
// The method also overrides the data format of the data returned by the
|
||||
// iterator by prepending accountID and projectID bytes to the metric name and
|
||||
// the data block (a format used in vmcluster).
|
||||
func (vmst *VMStorageWithTenantID) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return emptyBI, nil
|
||||
}
|
||||
return vmst.vms.initSearch(qt, sq, vmst.marshalMetricBlock, deadline)
|
||||
}
|
||||
|
||||
var emptyBI = &emptyBlockIterator{}
|
||||
|
||||
// emptyBlockIterator is an implementation of vmselectapi.BlockIterator that
|
||||
// always returns no data.
|
||||
type emptyBlockIterator struct{}
|
||||
|
||||
func (*emptyBlockIterator) MustClose() {}
|
||||
|
||||
func (*emptyBlockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
|
||||
func (*emptyBlockIterator) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// marshalMetricBlock serializes a metric block in the format expected by
|
||||
// vmselect.
|
||||
//
|
||||
// vmselect expects metric names and data blocks to have the tenantID but
|
||||
// vmsingle does not have it. Therefore the tenantID needs to be included to
|
||||
// every metric name and block.
|
||||
func (vmst *VMStorageWithTenantID) marshalMetricBlock(dst []byte, src *storage.MetricBlock) []byte {
|
||||
// Marshal metric name:
|
||||
// 1. Marshal metric name length + accountID length + projectID length (in
|
||||
// bytes).
|
||||
// 2. append accountID and projectID bytes
|
||||
// 3. Finally append metric name bytes
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(src.MetricName))+8)
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
dst = append(dst, src.MetricName...)
|
||||
|
||||
// Marshal data block.
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
dst = storage.MarshalBlock(dst, &src.Block)
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
// SearchMetricNames searches the storage for metric names that match the query.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
//
|
||||
// Found metric names are prepended with accountID and projectID bytes (a format
|
||||
// used in vmcluster).
|
||||
func (vmst *VMStorageWithTenantID) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metricNames, err := vmst.vms.SearchMetricNames(qt, sq, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// vmselect expects metric names to have the tenantID but vmsingle does not
|
||||
// have it. Therefore the tenantID needs to be appended to every metric
|
||||
// name.
|
||||
dst := make([]byte, 0, 8)
|
||||
dst = encoding.MarshalUint32(dst, vmst.accountID)
|
||||
dst = encoding.MarshalUint32(dst, vmst.projectID)
|
||||
tenantID := string(dst)
|
||||
|
||||
for i, metricName := range metricNames {
|
||||
metricNames[i] = tenantID + metricName
|
||||
}
|
||||
return metricNames, nil
|
||||
}
|
||||
|
||||
// LabelValues searches the storage for values that match the query and
|
||||
// correspond to a label whose name is `labelName`. The returned result
|
||||
// will contain not more than `maxLabelValues`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
|
||||
}
|
||||
|
||||
// TagValueSuffixes searches the storage for Graphite tag value suffixes. The
|
||||
// returned result will contain not more than `maxSuffixes`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
if accountID != vmst.accountID || projectID != vmst.projectID {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
}
|
||||
|
||||
// LabelNames searches the storage for label names that match the query.
|
||||
// The returned result will contain not more than `maxLabelNames`.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return an
|
||||
// empty result.
|
||||
func (vmst *VMStorageWithTenantID) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.LabelNames(qt, sq, maxLabelNames, deadline)
|
||||
}
|
||||
|
||||
// SeriesCount returns the total number of metrics stored in the database.
|
||||
//
|
||||
// The method may return inflated numbers. How inflated the count depends
|
||||
// on the churn rate and the retention period. For example, if a metric lasts
|
||||
// for 2 months, it will be counted twice.
|
||||
//
|
||||
// The method also counts the deleted metrics.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||
if accountID != vmst.accountID || projectID != vmst.projectID {
|
||||
return 0, nil
|
||||
}
|
||||
return vmst.vms.SeriesCount(qt, accountID, projectID, deadline)
|
||||
}
|
||||
|
||||
// Tenants returns just one tenant consisting of the -accountID and -projectID
|
||||
// flag values.
|
||||
func (vmst *VMStorageWithTenantID) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
||||
tenantID := fmt.Sprintf("%d:%d", vmst.accountID, vmst.projectID)
|
||||
return []string{tenantID}, nil
|
||||
}
|
||||
|
||||
// TSDBStatus retrieves the status for metrics that match to the search query.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, the method will return empty
|
||||
// status.
|
||||
func (vmst *VMStorageWithTenantID) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return &storage.TSDBStatus{}, nil
|
||||
}
|
||||
return vmst.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
|
||||
}
|
||||
|
||||
// DeleteSeries marks as deleted metrics that match the search query.
|
||||
// The method returns the number of deleted metrics.
|
||||
//
|
||||
// If the query is not multitenant or the query accountID and projectID do not
|
||||
// match the -accoutID and -projectID flag values, no metrics will be deleted
|
||||
// and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
|
||||
return 0, nil
|
||||
}
|
||||
return vmst.vms.DeleteSeries(qt, sq, deadline)
|
||||
}
|
||||
|
||||
// RegisterMetricNames registers metric names in the index, the sample values
|
||||
// and timestamps are ignored.
|
||||
func (vmst *VMStorageWithTenantID) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
|
||||
return vmst.vms.RegisterMetricNames(qt, mrs, deadline)
|
||||
}
|
||||
|
||||
// GetMetricNamesUsageStats retrieves the usage stats for metrics whose name
|
||||
// matches the pattern.
|
||||
//
|
||||
// If the request is not multitenant or the request accountID and projectID do
|
||||
// not match the -accoutID and -projectID flag values, no metrics will be
|
||||
// deleted and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
|
||||
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
|
||||
return metricnamestats.StatsResult{}, nil
|
||||
}
|
||||
return vmst.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
}
|
||||
|
||||
// ResetMetricNamesUsageStats resets the metric name usage stats.
|
||||
func (vmst *VMStorageWithTenantID) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
|
||||
return vmst.vms.ResetMetricNamesUsageStats(qt, deadline)
|
||||
}
|
||||
|
||||
// GetMetadataRecords retrieves the metadata for the metricName.
|
||||
//
|
||||
// If the request is not multitenant or the request accountID and projectID do
|
||||
// not match the -accoutID and -projectID flag values, no metrics will be
|
||||
// deleted and the method will return 0.
|
||||
func (vmst *VMStorageWithTenantID) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
|
||||
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return vmst.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
|
||||
}
|
||||
@@ -1,358 +0,0 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
)
|
||||
|
||||
type TestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantSeries []map[string]string
|
||||
WantLabels []string
|
||||
WantLabelValues []string
|
||||
WantQueryResults []*QueryResult
|
||||
WantMetadata map[string][]MetadataEntry
|
||||
WantMetricNamesStats []MetricNamesStatsRecord
|
||||
}
|
||||
|
||||
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
|
||||
d := TestData{
|
||||
Samples: []string{},
|
||||
Step: (end - start) / numMetrics,
|
||||
WantSeries: make([]map[string]string, numMetrics),
|
||||
WantLabels: make([]string, numMetrics),
|
||||
WantLabelValues: make([]string, numMetrics),
|
||||
WantQueryResults: make([]*QueryResult, numMetrics),
|
||||
WantMetadata: make(map[string][]MetadataEntry),
|
||||
WantMetricNamesStats: make([]MetricNamesStatsRecord, numMetrics),
|
||||
}
|
||||
for i := range numMetrics {
|
||||
metricName := fmt.Sprintf("%s_%04d", prefix, i)
|
||||
metricHelp := fmt.Sprintf("# HELP %s some help message", metricName)
|
||||
metricType := fmt.Sprintf("# TYPE %s gauge", metricName)
|
||||
labelName := fmt.Sprintf("label_%04d", i)
|
||||
labelValue := fmt.Sprintf("value_%04d", i)
|
||||
value := i
|
||||
timestamp := start + i*d.Step
|
||||
sample := fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
|
||||
|
||||
d.Samples = append(d.Samples, metricHelp, metricType, sample)
|
||||
d.WantSeries[i] = map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
}
|
||||
d.WantLabels[i] = labelName
|
||||
d.WantLabelValues[i] = labelValue
|
||||
d.WantQueryResults[i] = &QueryResult{
|
||||
Metric: map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
},
|
||||
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
|
||||
}
|
||||
d.WantMetadata[metricName] = []MetadataEntry{{Help: "some help message", Type: "gauge"}}
|
||||
d.WantMetricNamesStats[i].MetricName = metricName
|
||||
}
|
||||
d.WantLabels = append(d.WantLabels, "__name__", "label")
|
||||
slices.Sort(d.WantLabels)
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertSeries retrieves metric names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []map[string]string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
}).Sort()
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
Retries: 1000,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertSeriesCount retrieves series count and compares it with expected one.
|
||||
func AssertSeriesCount(tc *TestCase, app PrometheusQuerier, tenantID string, start, end int64, want uint64) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series/count response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1SeriesCount(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesCountResponse{
|
||||
Status: "success",
|
||||
Data: []uint64{want},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabels retrieves label names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertLabels(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1Labels(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelsResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabelValues retrieves values for the label whose name is labelName for
|
||||
// the series whose name mathes metricNameRE, compares the result with the
|
||||
// expected one.
|
||||
func AssertLabelValues(tc *TestCase, app PrometheusQuerier, metricNameRE, labelName, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels/.../values response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1LabelValues(tc.T(), labelName, query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelValuesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertQueryResults sends a data query to storage and compares the query
|
||||
// result with the expected one.
|
||||
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end, step int64, want []*QueryResult) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/query_range response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: fmt.Sprintf("%dms", step),
|
||||
MaxLookback: fmt.Sprintf("%dms", step-1),
|
||||
NoCache: "1",
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1QueryResponse{
|
||||
Status: "success",
|
||||
Data: &QueryData{
|
||||
ResultType: "matrix",
|
||||
Result: want,
|
||||
},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetadata(tc *TestCase, app PrometheusQuerier, metricName, tenantID string, want map[string][]MetadataEntry) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/metadata response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Metadata(tc.T(), metricName, 0, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetricNamesStats(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, want []MetricNamesStatsRecord) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/status/metric_names_stats response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1StatusMetricNamesStats(tc.T(), "", "", metricNameRE, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: MetricNamesStatsResponse{
|
||||
Records: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// GraphiteTestData holds the data samples in Graphite Pickle format, distance
|
||||
// between samples in milliseconds and expected responses for various Graphite
|
||||
// API endpoints.
|
||||
type GraphiteTestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantMetricsIndex []string
|
||||
WantMetricsFind []GraphiteMetric
|
||||
WantMetricsExpand []string
|
||||
WantRenderedTargets []GraphiteRenderedTarget
|
||||
}
|
||||
|
||||
// GenerateGraphiteTestData generates Graphite test data.
|
||||
func GenerateGraphiteTestData(prefix string, numMetrics, start, end int64) GraphiteTestData {
|
||||
d := GraphiteTestData{
|
||||
Samples: make([]string, numMetrics),
|
||||
Step: (end - start) / numMetrics,
|
||||
WantMetricsIndex: make([]string, numMetrics),
|
||||
WantMetricsFind: make([]GraphiteMetric, numMetrics),
|
||||
WantMetricsExpand: make([]string, numMetrics),
|
||||
WantRenderedTargets: make([]GraphiteRenderedTarget, numMetrics),
|
||||
}
|
||||
|
||||
datapoints := make([][2]float64, numMetrics)
|
||||
for i := range numMetrics {
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
datapoints[i][1] = float64(timestamp)
|
||||
}
|
||||
|
||||
for i := range numMetrics {
|
||||
suffix := fmt.Sprintf("%04d", i)
|
||||
metricName := fmt.Sprintf("%s.%s", prefix, suffix)
|
||||
value := i
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
sample := fmt.Sprintf(`%s %d %d`, metricName, value, timestamp)
|
||||
|
||||
d.Samples[i] = sample
|
||||
d.WantMetricsIndex[i] = metricName
|
||||
d.WantMetricsFind[i].Id = metricName
|
||||
d.WantMetricsFind[i].Text = suffix
|
||||
d.WantMetricsFind[i].Leaf = 1
|
||||
d.WantMetricsExpand[i] = metricName
|
||||
d.WantRenderedTargets[i].Target = metricName
|
||||
d.WantRenderedTargets[i].Datapoints = slices.Clone(datapoints)
|
||||
d.WantRenderedTargets[i].Datapoints[i][0] = float64(value)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsIndex retrieves all metrics by sending a request to
|
||||
// /graphite/metrics/index.json and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsIndex(tc *TestCase, app PrometheusQuerier, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/index.json response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsIndex(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
Retries: 30,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind finds metric names by sending a request to
|
||||
// /graphite/metrics/find and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsFind(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []GraphiteMetric) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/find response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsFind(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind expands metric names by sending a request to
|
||||
// /graphite/metrics/expand and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsExpand(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/expand response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsExpand(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteRender retieves metric raw data by sending a request to
|
||||
// /graphite/render and compares the result with the expected one.
|
||||
func AssertGraphiteRender(tc *TestCase, app PrometheusQuerier, target, tenantID string, from, until, step int64, want []GraphiteRenderedTarget) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/render response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteRender(tc.T(), target, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
From: fmt.Sprintf("%d", from/1000),
|
||||
Until: fmt.Sprintf("%d", until/1000),
|
||||
StorageStep: fmt.Sprintf("%dms", step),
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
@@ -1,216 +0,0 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestMixedPrometheusQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateTestData("metric", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
emptyLabels := []string{}
|
||||
emptyLabelValues := []string{}
|
||||
emptyQueryResults := []*apptest.QueryResult{}
|
||||
emptyMetadata := map[string][]apptest.MetadataEntry{}
|
||||
emptyMetricNamesStats := []apptest.MetricNamesStatsRecord{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmsingle, "", start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmsingle, "metric.*", "", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmsingle, "metric.*", "label", "", start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmsingle, "", "", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 1
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmsingle, "", "", data.WantMetricNamesStats)
|
||||
|
||||
// Check that current vmsingle tenant (configured via flags) is tenant1.
|
||||
gotAdminTenantsResponse := vmselect.APIV1AdminTenants(t, apptest.QueryOpts{})
|
||||
wantAdminTenantsResponse := &apptest.AdminTenantsResponse{
|
||||
Status: "success",
|
||||
Data: []string{tenantID1},
|
||||
}
|
||||
if diff := cmp.Diff(wantAdminTenantsResponse, gotAdminTenantsResponse); diff != "" {
|
||||
t.Fatalf("unexpected tenants (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID1, start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID1, start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID1, start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID1, start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID1, data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 2
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID1, data.WantMetricNamesStats)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID2, start, end, emptySeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID2, start, end, 0)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID2, start, end, emptyLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID2, start, end, emptyLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID2, emptyMetadata)
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID2, emptyMetricNamesStats)
|
||||
|
||||
// Ensure vmselect returns data for multitenant.
|
||||
for _, v := range data.WantSeries {
|
||||
v["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", "multitenant", start, end, data.WantSeries)
|
||||
data.WantLabels = append(data.WantLabels, "vm_account_id", "vm_project_id")
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", "multitenant", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", "multitenant", start, end, data.WantLabelValues)
|
||||
for _, v := range data.WantQueryResults {
|
||||
v.Metric["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v.Metric["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", "multitenant", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", "multitenant", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 3
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", "multitenant", data.WantMetricNamesStats)
|
||||
}
|
||||
|
||||
func TestMixedDeleteSeries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data1 := apptest.GenerateTestData("metric1", numMetrics, start, end)
|
||||
data2 := apptest.GenerateTestData("metric2", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data1.Samples, apptest.QueryOpts{})
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data2.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
wantSeries12 := slices.Concat(data1.WantSeries, data2.WantSeries)
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, wantSeries12)
|
||||
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric1.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID1,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID2,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: "multitenant",
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, emptySeries)
|
||||
}
|
||||
|
||||
func TestMixedGraphiteQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateGraphiteTestData("metric", numMetrics, start, end)
|
||||
emptyMetricsIndex := []string{}
|
||||
emptyMetricsFind := []apptest.GraphiteMetric{}
|
||||
emptyMetricsExpand := []string{}
|
||||
emptyRenderedTargets := []apptest.GraphiteRenderedTarget{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-accountID=%d", accountID1),
|
||||
fmt.Sprintf("-projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.GraphiteWrite(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmsingle, "", data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmsingle, "metric.*", "", data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmsingle, "metric.*", "", data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID1, data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID1, data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID1, data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID2, emptyMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID2, emptyMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID2, emptyMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyRenderedTargets)
|
||||
}
|
||||
@@ -25,14 +25,12 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-graphiteListenAddr": "127.0.0.1:0",
|
||||
"-opentsdbListenAddr": "127.0.0.1:0",
|
||||
"-vmselectAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
storageDataPathRE,
|
||||
httpListenAddrRE,
|
||||
graphiteListenAddrRE,
|
||||
openTSDBListenAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
output: output,
|
||||
})
|
||||
@@ -45,7 +43,6 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
openTSDBListenAddr: stderrExtracts[3],
|
||||
vmselectAddr: stderrExtracts[4],
|
||||
}), nil
|
||||
}
|
||||
|
||||
@@ -54,7 +51,6 @@ type vmsingleRuntimeValues struct {
|
||||
httpListenAddr string
|
||||
graphiteListenAddr string
|
||||
openTSDBListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
@@ -89,7 +85,6 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
},
|
||||
storageDataPath: rt.storageDataPath,
|
||||
httpListenAddr: rt.httpListenAddr,
|
||||
vmselectAddr: rt.vmselectAddr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +99,6 @@ type Vmsingle struct {
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
// HTTPAddr returns the address at which the vminsert process is
|
||||
@@ -113,12 +107,6 @@ func (app *Vmsingle) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// VmselectAddr returns the address at which the vmsingle process is listening
|
||||
// for vmselect connections.
|
||||
func (app *Vmsingle) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmsingle app state.
|
||||
func (app *Vmsingle) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{
|
||||
|
||||
@@ -1712,30 +1712,21 @@ The following versions of VictoriaMetrics receive regular security fixes:
|
||||
| [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | ✅ |
|
||||
| other releases | ❌ |
|
||||
|
||||
### Software Bill of Materials (SBOM)
|
||||
|
||||
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
|
||||
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
|
||||
|
||||
To inspect the SBOM for an image:
|
||||
|
||||
```sh
|
||||
docker buildx imagetools inspect \
|
||||
docker.io/victoriametrics/victoria-metrics:latest \
|
||||
--format "{{ json .SBOM }}"
|
||||
```
|
||||
|
||||
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
|
||||
|
||||
```sh
|
||||
trivy image --sbom-sources oci \
|
||||
docker.io/victoriametrics/victoria-metrics:latest
|
||||
```
|
||||
|
||||
### Reporting a Vulnerability
|
||||
|
||||
Please report any security issues to <security@victoriametrics.com>
|
||||
|
||||
### CVE handling policy
|
||||
|
||||
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
|
||||
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
|
||||
When detected, only the Alpine base tag is updated.
|
||||
Releases proceed as planned even if upstream fixes are not yet available.
|
||||
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
|
||||
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
|
||||
|
||||
### General security recommendations:
|
||||
|
||||
* All the VictoriaMetrics components must run in protected private networks without direct access from untrusted networks such as Internet.
|
||||
@@ -1749,14 +1740,24 @@ Please report any security issues to <security@victoriametrics.com>
|
||||
* Set reasonable [`Content-Security-Policy`](https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP) header value to mitigate [XSS attacks](https://en.wikipedia.org/wiki/Cross-site_scripting). See `-http.header.csp` flag.
|
||||
* Set reasonable [`X-Frame-Options`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) header value to mitigate [clickjacking attacks](https://en.wikipedia.org/wiki/Clickjacking), for example `DENY`. See `-http.header.frameOptions` flag.
|
||||
|
||||
VictoriaMetrics provides the following security-related command-line flags:
|
||||
There are the following security-related command-line flags for all components with HTTP API:
|
||||
|
||||
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr` (TCP port 8428 is listened by default).
|
||||
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr`.
|
||||
[Enterprise version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports automatic issuing of TLS certificates.
|
||||
See [these docs](#automatic-issuing-of-tls-certificates).
|
||||
* `-mtls` and `-mtlsCAFile` for enabling [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication) for requests to `-httpListenAddr`. See [these docs](#mtls-protection).
|
||||
* `-httpAuth.username` and `-httpAuth.password` for protecting all the HTTP endpoints
|
||||
with [HTTP Basic Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication).
|
||||
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
|
||||
and `X-Frame-Options` HTTP response headers.
|
||||
|
||||
### Protecting service endpoints
|
||||
|
||||
All VictoriaMetrics components expose internal metrics in Prometheus exposition format at `/metrics` page for [#Monitoring](https://docs.victoriametrics.com/victoriametrics/#monitoring).
|
||||
Consider limiting access to `/metrics` page to trusted networks only.
|
||||
|
||||
There are other service endpoints that might require protection:
|
||||
|
||||
* `-deleteAuthKey` for protecting the `/api/v1/admin/tsdb/delete_series` endpoint. See [how to delete time series](#how-to-delete-time-series).
|
||||
* `-snapshotAuthKey` for protecting the `/snapshot*` endpoints. See [how to work with snapshots](#how-to-work-with-snapshots).
|
||||
* `-forceFlushAuthKey` for protecting the `/internal/force_flush` endpoint. See [force flush docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#forced-flush).
|
||||
@@ -1768,8 +1769,6 @@ VictoriaMetrics provides the following security-related command-line flags:
|
||||
* `-pprofAuthKey` for protecting the `/debug/pprof/*` endpoints, which can be used for [profiling](#profiling).
|
||||
* `-metricNamesStatsResetAuthKey` for protecting the `/api/v1/admin/status/metric_names_stats/reset` endpoint, used for [Metric Names Tracker](#track-ingested-metrics-usage).
|
||||
* `-denyQueryTracing` for disallowing [query tracing](#query-tracing).
|
||||
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
|
||||
and `X-Frame-Options` HTTP response headers.
|
||||
|
||||
Explicitly set internal network interface for TCP and UDP ports for data ingestion with Graphite and OpenTSDB formats.
|
||||
For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<internal_iface_ip>:2003`. This protects from unexpected requests from untrusted network interfaces.
|
||||
@@ -1777,17 +1776,6 @@ For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<i
|
||||
See also [security recommendation for VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#security)
|
||||
and [the general security page at VictoriaMetrics website](https://victoriametrics.com/security/).
|
||||
|
||||
### CVE handling policy
|
||||
|
||||
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
|
||||
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
|
||||
When detected, only the Alpine base tag is updated.
|
||||
Releases proceed as planned even if upstream fixes are not yet available.
|
||||
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
|
||||
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
|
||||
|
||||
### mTLS protection
|
||||
|
||||
By default `VictoriaMetrics` accepts http requests at `8428` port (this port can be changed via `-httpListenAddr` command-line flags).
|
||||
@@ -1817,6 +1805,26 @@ This functionality can be evaluated for free according to [these docs](https://d
|
||||
|
||||
See also [security recommendations](#security).
|
||||
|
||||
### Software Bill of Materials (SBOM)
|
||||
|
||||
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
|
||||
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
|
||||
|
||||
To inspect the SBOM for an image:
|
||||
|
||||
```sh
|
||||
docker buildx imagetools inspect \
|
||||
docker.io/victoriametrics/victoria-metrics:latest \
|
||||
--format "{{ json .SBOM }}"
|
||||
```
|
||||
|
||||
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
|
||||
|
||||
```sh
|
||||
trivy image --sbom-sources oci \
|
||||
docker.io/victoriametrics/victoria-metrics:latest
|
||||
```
|
||||
|
||||
## Tuning
|
||||
|
||||
* No need in tuning for VictoriaMetrics - it uses reasonable defaults for command-line flags,
|
||||
|
||||
@@ -26,13 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Add the support of vmselect RPC to vmsingle so that single node can be queried by a vmselect from a vmcluster deployment. See [4328](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4328) and [10926](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10926).
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
|
||||
@@ -95,8 +95,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
@@ -623,7 +621,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
|
||||
-version
|
||||
Show VictoriaMetrics version
|
||||
-vmalert.proxyURL string
|
||||
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert
|
||||
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
|
||||
-vmui.customDashboardsPath string
|
||||
Optional path to vmui dashboards. See https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards
|
||||
-vmui.defaultTimezone string
|
||||
|
||||
@@ -74,8 +74,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -115,8 +115,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmalert/ .
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -59,8 +59,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmauth/ .
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -400,8 +400,6 @@ Run `vmbackup -help` in order to see all the available options:
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -575,8 +575,6 @@ command-line flags:
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -496,8 +496,6 @@ Below is the list of configuration flags (it can be viewed by running `./vmgatew
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -76,8 +76,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -102,8 +102,6 @@ Run `vmrestore -help` in order to see all the available options:
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -75,8 +75,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
@@ -325,7 +323,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
|
||||
-version
|
||||
Show VictoriaMetrics version
|
||||
-vmalert.proxyURL string
|
||||
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#vmalert
|
||||
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
|
||||
-vmstorageDialTimeout duration
|
||||
Timeout for establishing RPC connections from vmselect to vmstorage. See also -vmstorageUserTimeout (default 3s)
|
||||
-vmstorageUserTimeout duration
|
||||
|
||||
@@ -68,8 +68,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
type bufferedWriter interface {
|
||||
Write(p []byte) (int, error)
|
||||
Flush() error
|
||||
}
|
||||
|
||||
// BufferedConn is a net.Conn with Flush suport.
|
||||
type BufferedConn struct {
|
||||
net.Conn
|
||||
|
||||
// IsLegacy defines if BufferedConn operates in legacy mode
|
||||
// and doesn't support RPC protocol
|
||||
IsLegacy bool
|
||||
|
||||
br io.Reader
|
||||
bw bufferedWriter
|
||||
|
||||
readDeadline time.Time
|
||||
writeDeadline time.Time
|
||||
}
|
||||
|
||||
const bufferSize = 64 * 1024
|
||||
|
||||
// newBufferedConn returns buffered connection with the given compression level.
|
||||
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
|
||||
bc := &BufferedConn{
|
||||
Conn: c,
|
||||
}
|
||||
if compressionLevel <= 0 {
|
||||
bc.bw = bufio.NewWriterSize(c, bufferSize)
|
||||
} else {
|
||||
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
|
||||
}
|
||||
if !isReadCompressed {
|
||||
bc.br = bufio.NewReaderSize(c, bufferSize)
|
||||
} else {
|
||||
bc.br = zstd.NewReader(c)
|
||||
}
|
||||
return bc
|
||||
}
|
||||
|
||||
// SetDeadline sets read and write deadlines for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Read and Write call.
|
||||
func (bc *BufferedConn) SetDeadline(t time.Time) error {
|
||||
bc.readDeadline = t
|
||||
bc.writeDeadline = t
|
||||
return bc.Conn.SetDeadline(t)
|
||||
}
|
||||
|
||||
// SetReadDeadline sets read deadline for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Read call.
|
||||
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
|
||||
bc.readDeadline = t
|
||||
return bc.Conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets write deadline for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Write call.
|
||||
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
|
||||
bc.writeDeadline = t
|
||||
return bc.Conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// Read reads up to len(p) from bc to p.
|
||||
func (bc *BufferedConn) Read(p []byte) (int, error) {
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
if deadlineExceeded(bc.readDeadline, startTime) {
|
||||
return 0, os.ErrDeadlineExceeded
|
||||
}
|
||||
n, err := bc.br.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write writes p to bc.
|
||||
//
|
||||
// Do not forget to call Flush if needed.
|
||||
func (bc *BufferedConn) Write(p []byte) (int, error) {
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
if deadlineExceeded(bc.writeDeadline, startTime) {
|
||||
return 0, os.ErrDeadlineExceeded
|
||||
}
|
||||
n, err := bc.bw.Write(p)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool {
|
||||
if deadline.IsZero() {
|
||||
return false
|
||||
}
|
||||
return currentTimestamp > uint64(deadline.Unix())
|
||||
}
|
||||
|
||||
// Close closes bc.
|
||||
func (bc *BufferedConn) Close() error {
|
||||
// Close the Conn at first. It is expected that all the required data
|
||||
// is already flushed to the Conn.
|
||||
err := bc.Conn.Close()
|
||||
bc.Conn = nil
|
||||
|
||||
if zr, ok := bc.br.(*zstd.Reader); ok {
|
||||
zr.Release()
|
||||
}
|
||||
bc.br = nil
|
||||
|
||||
if zw, ok := bc.bw.(*zstd.Writer); ok {
|
||||
// Do not call zw.Close(), since we already closed the underlying conn.
|
||||
zw.Release()
|
||||
}
|
||||
bc.bw = nil
|
||||
|
||||
bc.IsLegacy = false
|
||||
return err
|
||||
}
|
||||
|
||||
// Flush flushes internal write buffers to the underlying conn.
|
||||
func (bc *BufferedConn) Flush() error {
|
||||
return bc.bw.Flush()
|
||||
}
|
||||
@@ -1,318 +0,0 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.")
|
||||
|
||||
const (
|
||||
vminsertHelloLegacyVersion = "vminsert.02"
|
||||
vminsertHello = "vminsert.03"
|
||||
vmselectHello = "vmselect.01"
|
||||
|
||||
successResponse = "ok"
|
||||
)
|
||||
|
||||
// Func must perform handshake on the given c using the given compressionLevel.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
|
||||
|
||||
// VMInsertClientWithDialer performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// it uses provided dial func to establish connection to the server.
|
||||
// compressionLevel is a legacy option which defines the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial error: %w", err)
|
||||
}
|
||||
bc, err := vminsertClient(c, 0)
|
||||
if err == nil {
|
||||
return bc, nil
|
||||
}
|
||||
_ = c.Close()
|
||||
if !strings.Contains(err.Error(), "cannot read success response after sending hello") {
|
||||
return nil, err
|
||||
}
|
||||
// try to fallback to the prev non-RPC API version
|
||||
// we cannot re-use exist connection, since vmstorage already closed it
|
||||
c, err = dial()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial error: %w", err)
|
||||
}
|
||||
bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel)
|
||||
if err != nil {
|
||||
_ = c.Close()
|
||||
return nil, fmt.Errorf("legacy handshake error: %w", err)
|
||||
}
|
||||
bc.IsLegacy = true
|
||||
logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, vminsertHello, compressionLevel)
|
||||
}
|
||||
|
||||
// VMInsertClientWithHello performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// should be used for testing only
|
||||
func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, helloMsg, compressionLevel)
|
||||
}
|
||||
|
||||
// VMInsertServer performs server-side handshake for vminsert protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
|
||||
var isRPCSupported bool
|
||||
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
buf, err := readData(c, len(vminsertHello))
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762
|
||||
return errTCPHealthcheck
|
||||
}
|
||||
return fmt.Errorf("cannot read hello: %w", err)
|
||||
}
|
||||
isRPCSupported = string(buf) == vminsertHello
|
||||
if !isRPCSupported {
|
||||
// try to fallback to the previous protocol version
|
||||
if string(buf) != vminsertHelloLegacyVersion {
|
||||
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello)
|
||||
}
|
||||
logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc.IsLegacy = !isRPCSupported
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol
|
||||
// with legacy hello message
|
||||
//
|
||||
// should be used for testing only
|
||||
func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
|
||||
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
return readMessage(c, vminsertHelloLegacyVersion)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc.IsLegacy = true
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// VMSelectClient performs client-side handshake for vmselect protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, vmselectHello, compressionLevel)
|
||||
}
|
||||
|
||||
// VMSelectServer performs server-side handshake for vmselect protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
err := readMessage(c, vmselectHello)
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786
|
||||
return errTCPHealthcheck
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check
|
||||
// and was closed immediately after being established.
|
||||
//
|
||||
// This is expected behavior and can be safely ignored.
|
||||
var errTCPHealthcheck = fmt.Errorf("TCP health check connection – safe to ignore")
|
||||
|
||||
// IsTCPHealthcheck determines whether the provided error is a TCP health check
|
||||
func IsTCPHealthcheck(err error) bool {
|
||||
return errors.Is(err, errTCPHealthcheck)
|
||||
}
|
||||
|
||||
// IsClientNetworkError determines whether the provided error is a client-side network error,
|
||||
// such as io.EOF, io.ErrUnexpectedEOF, or a timeout.
|
||||
// These errors typically occur when a client disconnects abruptly or fails during the handshake,
|
||||
// and are generally non-actionable from the server point of view.
|
||||
// This function helps distinguish such errors from critical ones during the handshake process
|
||||
// and adjust logging accordingly.
|
||||
//
|
||||
// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880
|
||||
func IsClientNetworkError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return true
|
||||
}
|
||||
|
||||
if IsTimeoutNetworkError(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout.
|
||||
func IsTimeoutNetworkError(err error) bool {
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) {
|
||||
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set deadline: %w", err)
|
||||
}
|
||||
|
||||
if err := readHelloMessage(c); err != nil {
|
||||
return nil, fmt.Errorf("cannot read hello message : %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
|
||||
if err := c.SetDeadline(time.Time{}); err != nil {
|
||||
return nil, fmt.Errorf("cannot reset deadline: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set deadline: %w", err)
|
||||
}
|
||||
|
||||
if err := writeMessage(c, msg); err != nil {
|
||||
return nil, fmt.Errorf("cannot write hello: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
|
||||
if err := c.SetDeadline(time.Time{}); err != nil {
|
||||
return nil, fmt.Errorf("cannot reset deadline: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func writeIsCompressed(c net.Conn, isCompressed bool) error {
|
||||
var buf [1]byte
|
||||
if isCompressed {
|
||||
buf[0] = 1
|
||||
}
|
||||
return writeMessage(c, string(buf[:]))
|
||||
}
|
||||
|
||||
func readIsCompressed(c net.Conn) (bool, error) {
|
||||
buf, err := readData(c, 1)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
isCompressed := buf[0] != 0
|
||||
return isCompressed, nil
|
||||
}
|
||||
|
||||
func writeMessage(c net.Conn, msg string) error {
|
||||
if _, err := io.WriteString(c, msg); err != nil {
|
||||
return fmt.Errorf("cannot write %q to server: %w", msg, err)
|
||||
}
|
||||
if fc, ok := c.(flusher); ok {
|
||||
if err := fc.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot flush %q to server: %w", msg, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type flusher interface {
|
||||
Flush() error
|
||||
}
|
||||
|
||||
func readMessage(c net.Conn, msg string) error {
|
||||
buf, err := readData(c, len(msg))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if string(buf) != msg {
|
||||
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readData(c net.Conn, dataLen int) ([]byte, error) {
|
||||
data := make([]byte, dataLen)
|
||||
if n, err := io.ReadFull(c, data); err != nil {
|
||||
return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
@@ -1,83 +0,0 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestVMInsertHandshake(t *testing.T) {
|
||||
testHandshake(t, vminsertClient, VMInsertServer)
|
||||
}
|
||||
|
||||
func TestVMSelectHandshake(t *testing.T) {
|
||||
testHandshake(t, VMSelectClient, VMSelectServer)
|
||||
}
|
||||
|
||||
func TestVMSelectServerTCPHealthcheck(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot start listener: %s", err)
|
||||
}
|
||||
|
||||
c, err := net.Dial("tcp", ln.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("cannot dial: %s", err)
|
||||
}
|
||||
if err := c.Close(); err != nil {
|
||||
t.Fatalf("cannot close client conn: %s", err)
|
||||
}
|
||||
s, err := ln.Accept()
|
||||
if err != nil {
|
||||
t.Fatalf("cannot accept conn: %s", err)
|
||||
}
|
||||
if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) {
|
||||
t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
||||
t.Helper()
|
||||
|
||||
c, s := net.Pipe()
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
bcs, err := serverFunc(s, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on outer handshake: %w", err)
|
||||
return
|
||||
}
|
||||
bcc, err := clientFunc(bcs, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on inner handshake: %w", err)
|
||||
return
|
||||
}
|
||||
if bcc == nil {
|
||||
ch <- fmt.Errorf("expecting non-nil conn")
|
||||
return
|
||||
}
|
||||
ch <- nil
|
||||
}()
|
||||
|
||||
bcc, err := clientFunc(c, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on outer handshake: %s", err)
|
||||
}
|
||||
bcs, err := serverFunc(bcc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on inner handshake: %s", err)
|
||||
}
|
||||
if bcs == nil {
|
||||
t.Fatalf("expecting non-nil conn")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error on the server side: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,10 +64,9 @@ var (
|
||||
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
|
||||
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
|
||||
|
||||
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
|
||||
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
|
||||
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
|
||||
headerDisableServerHostname = flag.Bool("http.header.disableServerHostname", false, "Whether to disable 'X-Server-Hostname' header in HTTP responses")
|
||||
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
|
||||
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
|
||||
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
|
||||
|
||||
disableCORS = flag.Bool("http.disableCORS", false, `Disable CORS for all origins (*)`)
|
||||
)
|
||||
@@ -330,9 +329,7 @@ func handlerWrapper(w http.ResponseWriter, r *http.Request, rh RequestHandler) {
|
||||
if *headerCSP != "" {
|
||||
h.Add("Content-Security-Policy", *headerCSP)
|
||||
}
|
||||
if !*headerDisableServerHostname {
|
||||
h.Add("X-Server-Hostname", hostname)
|
||||
}
|
||||
h.Add("X-Server-Hostname", hostname)
|
||||
requestsTotal.Inc()
|
||||
if whetherToCloseConn(r) {
|
||||
connTimeoutClosedConns.Inc()
|
||||
|
||||
@@ -228,30 +228,4 @@ func TestHandlerWrapper(t *testing.T) {
|
||||
if got := h.Get("Content-Security-Policy"); got != cspHeader {
|
||||
t.Fatalf("unexpected CSP header; got %q; want %q", got, cspHeader)
|
||||
}
|
||||
if got := h.Get("X-Server-Hostname"); got != hostname {
|
||||
t.Fatalf("unexpected X-Server-Hostname header; got %q; want %q", got, hostname)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlerWrapperDisableServerHostnameHeader(t *testing.T) {
|
||||
origDisableServerHostname := *headerDisableServerHostname
|
||||
*headerDisableServerHostname = true
|
||||
defer func() {
|
||||
*headerDisableServerHostname = origDisableServerHostname
|
||||
}()
|
||||
|
||||
req, _ := http.NewRequest("GET", "/health", nil)
|
||||
|
||||
srv := &server{s: &http.Server{}}
|
||||
w := &httptest.ResponseRecorder{}
|
||||
|
||||
handlerWrapper(w, req, func(w http.ResponseWriter, r *http.Request) bool {
|
||||
return builtinRoutesHandler(srv, r, w, func(_ http.ResponseWriter, _ *http.Request) bool {
|
||||
return true
|
||||
})
|
||||
})
|
||||
|
||||
if got := w.Header().Get("X-Server-Hostname"); got != "" {
|
||||
t.Fatalf("unexpected X-Server-Hostname header; got %q; want empty value", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,9 +52,6 @@ func TestGetTimeSuccess(t *testing.T) {
|
||||
f("292277025-08-18T07:12:54.999999999Z", maxTimeMsecs)
|
||||
f("1562529662.324", 1562529662324)
|
||||
f("1223372036.855", 1223372036855)
|
||||
|
||||
// relative duration that resolves to a timestamp before 1970
|
||||
f("-9223372036.854", minTimeMsecs)
|
||||
}
|
||||
|
||||
func TestGetTimeError(t *testing.T) {
|
||||
@@ -66,8 +63,8 @@ func TestGetTimeError(t *testing.T) {
|
||||
t.Fatalf("unexpected error in NewRequest: %s", err)
|
||||
}
|
||||
|
||||
if msec, err := GetTime(r, "s", 123); err == nil {
|
||||
t.Fatalf("expecting non-nil error in GetTime(%q); got %d", s, msec)
|
||||
if _, err := GetTime(r, "s", 123); err == nil {
|
||||
t.Fatalf("expecting non-nil error in GetTime(%q)", s)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +84,7 @@ func TestGetTimeError(t *testing.T) {
|
||||
f("123md")
|
||||
f("-12.3md")
|
||||
|
||||
// relative duration outside the allowed range
|
||||
// relative duration that resolves to a timestamp before 1970
|
||||
f("-9223372036.854")
|
||||
f("-9223372036.855")
|
||||
}
|
||||
|
||||
@@ -325,17 +325,6 @@ func (s *Search) NextMetricBlock() bool {
|
||||
|
||||
// SearchQuery is used for sending search queries from vmselect to vmstorage.
|
||||
type SearchQuery struct {
|
||||
AccountID uint32
|
||||
ProjectID uint32
|
||||
|
||||
// TenantTokens and IsMultiTenant is artificial fields
|
||||
// they're only exist at runtime and cannot be transferred
|
||||
// via network calls for keeping communication protocol compatibility
|
||||
// TODO:@f41gh7 introduce breaking change to the protocol later
|
||||
// and use TenantTokens instead of AccountID and ProjectID
|
||||
TenantTokens []TenantToken
|
||||
IsMultiTenant bool
|
||||
|
||||
// The time range for searching time series
|
||||
MinTimestamp int64
|
||||
MaxTimestamp int64
|
||||
@@ -479,30 +468,15 @@ func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
|
||||
return src, nil
|
||||
}
|
||||
|
||||
// String returns string representation of the search query: tag filters and time range.
|
||||
// String returns string representation of the search query.
|
||||
func (sq *SearchQuery) String() string {
|
||||
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
|
||||
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
|
||||
a := sq.FiltersString()
|
||||
if !sq.IsMultiTenant {
|
||||
return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end)
|
||||
}
|
||||
tts := make([]string, len(sq.TenantTokens))
|
||||
for i, tt := range sq.TenantTokens {
|
||||
tts[i] = tt.String()
|
||||
}
|
||||
|
||||
return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end)
|
||||
|
||||
}
|
||||
|
||||
// FiltersString returns string representation of the tag filters.
|
||||
func (sq *SearchQuery) FiltersString() []string {
|
||||
a := make([]string, len(sq.TagFilterss))
|
||||
for i, tfs := range sq.TagFilterss {
|
||||
a[i] = tagFiltersToString(tfs)
|
||||
}
|
||||
return a
|
||||
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
|
||||
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
|
||||
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
|
||||
}
|
||||
|
||||
func tagFiltersToString(tfs []TagFilter) string {
|
||||
@@ -513,9 +487,8 @@ func tagFiltersToString(tfs []TagFilter) string {
|
||||
return "{" + strings.Join(a, ",") + "}"
|
||||
}
|
||||
|
||||
// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result.
|
||||
// It is expected that TenantToken is already marshaled to dst.
|
||||
func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
|
||||
// Marshal appends marshaled sq to dst and returns the result.
|
||||
func (sq *SearchQuery) Marshal(dst []byte) []byte {
|
||||
dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp)
|
||||
dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss)))
|
||||
@@ -525,25 +498,11 @@ func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
|
||||
dst = tagFilters[i].Marshal(dst)
|
||||
}
|
||||
}
|
||||
dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics))
|
||||
return dst
|
||||
}
|
||||
|
||||
// Unmarshal unmarshals sq from src and returns the tail.
|
||||
func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.AccountID = encoding.UnmarshalUint32(src)
|
||||
src = src[4:]
|
||||
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.ProjectID = encoding.UnmarshalUint32(src)
|
||||
src = src[4:]
|
||||
|
||||
sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}}
|
||||
minTs, nSize := encoding.UnmarshalVarInt64(src)
|
||||
if nSize <= 0 {
|
||||
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
|
||||
@@ -584,12 +543,6 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||
sq.TagFilterss[i] = tagFilters
|
||||
}
|
||||
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.MaxMetrics = int(encoding.UnmarshalUint32(src))
|
||||
src = src[4:]
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -32,12 +32,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
// Skip nil sq1.
|
||||
continue
|
||||
}
|
||||
tt := TenantToken{
|
||||
AccountID: sq1.AccountID,
|
||||
ProjectID: sq1.ProjectID,
|
||||
}
|
||||
buf = tt.Marshal(buf[:0])
|
||||
buf = sq1.MarshalWithoutTenant(buf)
|
||||
buf = sq1.Marshal(buf[:0])
|
||||
|
||||
tail, err := sq2.Unmarshal(buf)
|
||||
if err != nil {
|
||||
@@ -46,12 +41,6 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
if len(tail) > 0 {
|
||||
t.Fatalf("unexpected tail left after SearchQuery unmarshaling; tail (len=%d): %q", len(tail), tail)
|
||||
}
|
||||
if sq2.AccountID != sq1.AccountID {
|
||||
t.Fatalf("unexpected AccountID; got %d; want %d", sq2.AccountID, sq1.AccountID)
|
||||
}
|
||||
if sq2.ProjectID != sq1.ProjectID {
|
||||
t.Fatalf("unexpected ProjectID; got %d; want %d", sq2.ProjectID, sq1.ProjectID)
|
||||
}
|
||||
if sq1.MinTimestamp != sq2.MinTimestamp {
|
||||
t.Fatalf("unexpected MinTimestamp; got %d; want %d", sq2.MinTimestamp, sq1.MinTimestamp)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ func ParseDuration(s string) (time.Duration, error) {
|
||||
return 0, err
|
||||
}
|
||||
if ms < minValidMilli || maxValidMilli < ms {
|
||||
return 0, fmt.Errorf("duration %q must be in the range [%s, %s]", s, minDuration, maxDuration)
|
||||
return 0, fmt.Errorf("duration %q must be in the range [%v, %v]", s, minDuration, maxDuration)
|
||||
}
|
||||
return time.Duration(ms) * time.Millisecond, nil
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ func ParseTimeMsec(s string) (int64, error) {
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats
|
||||
//
|
||||
// If s doesn't contain timezone information, then the local timezone is used.
|
||||
// The time must be in the range [1970-01-01T00:00:00Z, 2262-04-11T23:47:16Z].
|
||||
//
|
||||
// It returns unix timestamp in nanoseconds.
|
||||
func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
|
||||
@@ -70,10 +71,14 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if d < 0 {
|
||||
if d > 0 {
|
||||
d = -d
|
||||
}
|
||||
return subInt64NoOverflow(currentTimestamp, int64(d)), nil
|
||||
nsec := currentTimestamp + int64(d)
|
||||
if nsec < 0 {
|
||||
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, time.Unix(0, nsec).UTC(), minTime, maxTime)
|
||||
}
|
||||
return nsec, nil
|
||||
}
|
||||
if len(s) == 4 {
|
||||
// Parse YYYY
|
||||
@@ -110,28 +115,22 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
|
||||
return parseTimeAt(time.RFC3339, sOrig, 0, sOrig)
|
||||
}
|
||||
|
||||
func parseTimeAt(layout, value string, tzOffsetNsec int64, sOrig string) (int64, error) {
|
||||
var (
|
||||
minTime = time.Unix(0, 0).UTC()
|
||||
maxTime = time.Unix(0, math.MaxInt64).UTC()
|
||||
)
|
||||
|
||||
func parseTimeAt(layout, value string, tzOffsetNanos int64, sOrig string) (int64, error) {
|
||||
t, err := time.Parse(layout, value)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
nsec := t.UnixNano()
|
||||
|
||||
return subInt64NoOverflow(nsec, -tzOffsetNsec), nil
|
||||
}
|
||||
|
||||
func subInt64NoOverflow(a, b int64) int64 {
|
||||
if b >= 0 {
|
||||
if a < math.MinInt64+b {
|
||||
return math.MinInt64
|
||||
}
|
||||
return a - b
|
||||
tzOffset := time.Duration(tzOffsetNanos)
|
||||
t = t.UTC().Add(tzOffset)
|
||||
if t.Before(minTime) || t.After(maxTime) {
|
||||
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, t, minTime, maxTime)
|
||||
}
|
||||
|
||||
if a > math.MaxInt64+b {
|
||||
return math.MaxInt64
|
||||
}
|
||||
return a - b
|
||||
return t.UnixNano(), nil
|
||||
}
|
||||
|
||||
// TryParseUnixTimestamp parses s as unix timestamp in seconds, milliseconds, microseconds or nanoseconds and returns the parsed timestamp in nanoseconds.
|
||||
|
||||
@@ -210,7 +210,6 @@ func TestParseTimeAtLimits(t *testing.T) {
|
||||
|
||||
f := func(s string, wantTime time.Time) {
|
||||
t.Helper()
|
||||
|
||||
got, err := ParseTimeAt(s, now.UnixNano())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
@@ -232,38 +231,42 @@ func TestParseTimeAtLimits(t *testing.T) {
|
||||
west := location(t, "Etc/GMT+12") // UTC-12:00
|
||||
var s string
|
||||
|
||||
// min timestamp
|
||||
f("0", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
s = fmt.Sprintf("-%d", now.Unix())
|
||||
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
s = fmt.Sprintf("now-%d", now.Unix())
|
||||
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
|
||||
// min year
|
||||
f("1678Z", time.Date(1678, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1678+14:00", time.Date(1678, 1, 1, 0, 0, 0, 0, east))
|
||||
f("1678-12:00", time.Date(1678, 1, 1, 0, 0, 0, 0, west))
|
||||
f("1970Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1971+14:00", time.Date(1971, 1, 1, 0, 0, 0, 0, east))
|
||||
f("1970-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
|
||||
|
||||
// min month
|
||||
f("1677-10Z", time.Date(1677, 10, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1677-10+14:00", time.Date(1677, 10, 1, 0, 0, 0, 0, east))
|
||||
f("1677-10-12:00", time.Date(1677, 10, 1, 0, 0, 0, 0, west))
|
||||
f("1970-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1970-02+14:00", time.Date(1970, 2, 1, 0, 0, 0, 0, east))
|
||||
f("1970-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
|
||||
|
||||
// min day
|
||||
f("1677-09-22Z", time.Date(1677, 9, 22, 0, 0, 0, 0, time.UTC))
|
||||
f("1677-09-22+14:00", time.Date(1677, 9, 22, 0, 0, 0, 0, east))
|
||||
f("1677-09-22-12:00", time.Date(1677, 9, 22, 0, 0, 0, 0, west))
|
||||
f("1970-01-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1970-01-02+14:00", time.Date(1970, 1, 2, 0, 0, 0, 0, east))
|
||||
f("1970-01-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
|
||||
|
||||
// min hour
|
||||
f("1677-09-21T01Z", time.Date(1677, 9, 21, 1, 0, 0, 0, time.UTC))
|
||||
f("1677-09-21T15+14:00", time.Date(1677, 9, 21, 15, 0, 0, 0, east))
|
||||
f("1677-09-21T01+14:00", time.Unix(0, math.MinInt64))
|
||||
f("1677-09-21T01-12:00", time.Date(1677, 9, 21, 1, 0, 0, 0, west))
|
||||
f("1970-01-01T00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1970-01-01T14+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
|
||||
f("1969-12-31T12-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
|
||||
|
||||
// min minute
|
||||
f("1677-09-21T00:12Z", time.Date(1677, 9, 21, 0, 12, 0, 0, time.UTC))
|
||||
f("1677-09-21T15:12Z+14:00", time.Date(1677, 9, 21, 15, 12, 0, 0, east))
|
||||
f("1677-09-21T00:13Z+14:00", time.Unix(0, math.MinInt64))
|
||||
f("1677-09-21T00:13Z-12:00", time.Date(1677, 9, 21, 0, 13, 0, 0, west))
|
||||
f("1970-01-01T00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1970-01-01T14:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
|
||||
f("1969-12-31T12:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
|
||||
|
||||
// min second
|
||||
f("1677-09-21T00:12:43Z", time.Date(1677, 9, 21, 0, 12, 43, 0, time.UTC))
|
||||
f("1677-09-21T15:12:43Z+14:00", time.Date(1677, 9, 21, 15, 12, 43, 0, east))
|
||||
f("1677-09-21T00:12:44Z+14:00", time.Unix(0, math.MinInt64))
|
||||
f("1677-09-21T00:12:44Z-12:00", time.Date(1677, 9, 21, 0, 12, 44, 0, west))
|
||||
f("1970-01-01T00:00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
f("1970-01-01T14:00:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
|
||||
f("1969-12-31T12:00:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
|
||||
|
||||
// max year
|
||||
f("2262Z", time.Date(2262, 1, 1, 0, 0, 0, 0, time.UTC))
|
||||
@@ -277,26 +280,23 @@ func TestParseTimeAtLimits(t *testing.T) {
|
||||
|
||||
// max day
|
||||
f("2262-04-11Z", time.Date(2262, 4, 11, 0, 0, 0, 0, time.UTC))
|
||||
f("2262-04-11+14:00", time.Date(2262, 4, 11, 0, 0, 0, 0, east))
|
||||
f("2262-04-12+14:00", time.Date(2262, 4, 12, 0, 0, 0, 0, east))
|
||||
f("2262-04-11-12:00", time.Date(2262, 4, 11, 0, 0, 0, 0, west))
|
||||
|
||||
// max hour
|
||||
f("2262-04-11T23Z", time.Date(2262, 4, 11, 23, 0, 0, 0, time.UTC))
|
||||
f("2262-04-11T23+14:00", time.Date(2262, 4, 11, 23, 0, 0, 0, east))
|
||||
f("2262-04-12T13+14:00", time.Date(2262, 4, 12, 13, 0, 0, 0, east))
|
||||
f("2262-04-11T11-12:00", time.Date(2262, 4, 11, 11, 0, 0, 0, west))
|
||||
f("2262-04-11T23-12:00", time.Unix(0, math.MaxInt64))
|
||||
|
||||
// max minute
|
||||
f("2262-04-11T23:47Z", time.Date(2262, 4, 11, 23, 47, 0, 0, time.UTC))
|
||||
f("2262-04-11T23:47+14:00", time.Date(2262, 4, 11, 23, 47, 0, 0, east))
|
||||
f("2262-04-12T13:47+14:00", time.Date(2262, 4, 12, 13, 47, 0, 0, east))
|
||||
f("2262-04-11T11:47-12:00", time.Date(2262, 4, 11, 11, 47, 0, 0, west))
|
||||
f("2262-04-11T23:47-12:00", time.Unix(0, math.MaxInt64))
|
||||
|
||||
// max second
|
||||
f("2262-04-11T23:47:16Z", time.Date(2262, 4, 11, 23, 47, 16, 0, time.UTC))
|
||||
f("2262-04-11T23:47:16+14:00", time.Date(2262, 4, 11, 23, 47, 16, 0, east))
|
||||
f("2262-04-12T13:47:16+14:00", time.Date(2262, 4, 12, 13, 47, 16, 0, east))
|
||||
f("2262-04-11T11:47:16-12:00", time.Date(2262, 4, 11, 11, 47, 16, 0, west))
|
||||
f("2262-04-11T23:47:16-12:00", time.Unix(0, math.MaxInt64))
|
||||
|
||||
// max timestamp
|
||||
s = fmt.Sprintf("%d", int64(maxValidSecond))
|
||||
@@ -324,6 +324,85 @@ func TestParseTimeAtLimits(t *testing.T) {
|
||||
f(s, time.Date(1970, 4, 17, 18, 2, 52, 36_854_776, time.UTC))
|
||||
}
|
||||
|
||||
func TestParseTimeAtOutsideLimits(t *testing.T) {
|
||||
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
got, err := ParseTimeAt(s, now.UnixNano())
|
||||
if err == nil {
|
||||
t.Fatalf("expected error but got %d", got)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "must be in the range") {
|
||||
t.Fatalf("expected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// min timestamp
|
||||
f(fmt.Sprintf("-%d", now.Unix()+1))
|
||||
f(fmt.Sprintf("now-%d", now.Unix()+1))
|
||||
|
||||
// min year
|
||||
f("1969Z")
|
||||
f("1970+14:00")
|
||||
f("1969-12:00")
|
||||
|
||||
// min month
|
||||
f("1969-12Z")
|
||||
f("1970-01+14:00")
|
||||
f("1969-12-12:00")
|
||||
|
||||
// min day
|
||||
f("1969-12-31Z")
|
||||
f("1970-01-01+14:00")
|
||||
f("1969-12-31-12:00")
|
||||
|
||||
// min hour
|
||||
f("1969-12-31T23Z")
|
||||
f("1970-01-01T13+14:00")
|
||||
f("1969-12-31T11-12:00")
|
||||
|
||||
// min minute
|
||||
f("1969-12-31T23:59Z")
|
||||
f("1970-01-01T13:59+14:00")
|
||||
f("1969-12-31T11:59-12:00")
|
||||
|
||||
// min second
|
||||
f("1969-12-31T23:59:59Z")
|
||||
f("1970-01-01T13:59:59+14:00")
|
||||
f("1969-12-31T11:59:59-12:00")
|
||||
|
||||
// max year
|
||||
f("2263Z")
|
||||
f("2263+14:00")
|
||||
f("2263-12:00")
|
||||
|
||||
// max month
|
||||
f("2262-05Z")
|
||||
f("2262-05+14:00")
|
||||
f("2262-05-12:00")
|
||||
|
||||
// max day
|
||||
f("2262-04-12Z")
|
||||
f("2262-04-13+14:00")
|
||||
f("2262-04-12-12:00")
|
||||
|
||||
// max hour
|
||||
f("2262-04-12T00Z")
|
||||
f("2262-04-12T14+14:00")
|
||||
f("2262-04-11T12-12:00")
|
||||
|
||||
// max minute
|
||||
f("2262-04-11T23:48Z")
|
||||
f("2262-04-12T13:48+14:00")
|
||||
f("2262-04-11T11:48-12:00")
|
||||
|
||||
// max second
|
||||
f("2262-04-11T23:47:17Z")
|
||||
f("2262-04-12T13:47:17+14:00")
|
||||
f("2262-04-11T11:47:17-12:00")
|
||||
}
|
||||
|
||||
func TestParseTimeAtOutsideLimits_Nanos(t *testing.T) {
|
||||
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
@@ -355,6 +434,7 @@ func TestParseTimeMsecFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
f("")
|
||||
f("2263")
|
||||
f("23-45:50")
|
||||
f("1223-fo:ba")
|
||||
f("1223-12:ba")
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
package vmalertproxy
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// Init initializes proxying requests to the given proxyURL when calling HandleRequest.
|
||||
//
|
||||
// Init must be called after flag.Parse(), since it uses command-line flags.
|
||||
func Init(proxyURL string) {
|
||||
if len(proxyURL) == 0 {
|
||||
return
|
||||
}
|
||||
pu, err := url.Parse(proxyURL)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", proxyURL, err)
|
||||
}
|
||||
vmalertProxyHost = pu.Host
|
||||
vmalertProxy = httputil.NewSingleHostReverseProxy(pu)
|
||||
}
|
||||
|
||||
// HandleRequest proxies the given request path to vmalert at proxyURL passed to Init().
|
||||
func HandleRequest(w http.ResponseWriter, r *http.Request, path string) {
|
||||
defer func() {
|
||||
err := recover()
|
||||
if err == nil || err == http.ErrAbortHandler {
|
||||
// Suppress http.ErrAbortHandler panic.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
|
||||
return
|
||||
}
|
||||
// Forward other panics to the caller.
|
||||
panic(err)
|
||||
}()
|
||||
req := r.Clone(r.Context())
|
||||
req.URL.Path = path
|
||||
req.Host = vmalertProxyHost
|
||||
|
||||
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
|
||||
// Grafana currently supports only Prometheus-style alerts. If other alert types
|
||||
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
|
||||
//
|
||||
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
|
||||
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
|
||||
//
|
||||
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
|
||||
// `datasource_type=prometheus`.
|
||||
//
|
||||
// See:
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
|
||||
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
|
||||
q := req.URL.Query()
|
||||
q.Set("datasource_type", "prometheus")
|
||||
req.URL.RawQuery = q.Encode()
|
||||
req.RequestURI = ""
|
||||
}
|
||||
|
||||
vmalertProxy.ServeHTTP(w, req)
|
||||
}
|
||||
|
||||
var (
|
||||
vmalertProxyHost string
|
||||
vmalertProxy *httputil.ReverseProxy
|
||||
)
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user