mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-20 02:06:31 +03:00
Compare commits
1 Commits
weakpointe
...
tenant-fro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a7404304fc |
@@ -212,17 +212,18 @@ func getOpenTSDBHTTPInsertHandler() func(req *http.Request) error {
|
||||
}
|
||||
}
|
||||
return func(req *http.Request) error {
|
||||
path := strings.Replace(req.URL.Path, "//", "/", -1)
|
||||
at, err := getAuthTokenFromPath(path)
|
||||
at, err := getAuthTokenFromReq(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain auth token from path %q: %w", path, err)
|
||||
return fmt.Errorf("cannot obtain auth token: %w", err)
|
||||
}
|
||||
return opentsdbhttp.InsertHandler(at, req)
|
||||
}
|
||||
}
|
||||
|
||||
func getAuthTokenFromPath(path string) (*auth.Token, error) {
|
||||
p, err := httpserver.ParsePath(path)
|
||||
func getAuthTokenFromReq(req *http.Request) (*auth.Token, error) {
|
||||
tenantID := req.Header.Get("TenantID")
|
||||
path := strings.Replace(req.URL.Path, "//", "/", -1)
|
||||
p, err := httpserver.ParsePath(path, tenantID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse multitenant path: %w", err)
|
||||
}
|
||||
@@ -257,10 +258,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
var at *auth.Token
|
||||
var err error
|
||||
if tenantID := r.Header.Get("TenantID"); tenantID != "" {
|
||||
at, err = auth.NewToken(tenantID)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
path := strings.Replace(r.URL.Path, "//", "/", -1)
|
||||
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") {
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(nil, r); err != nil {
|
||||
if err := prometheusimport.InsertHandler(at, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -280,13 +291,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
|
||||
path = strings.TrimSuffix(path, "/")
|
||||
}
|
||||
|
||||
switch path {
|
||||
case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push":
|
||||
if common.HandleVMProtoServerHandshake(w, r) {
|
||||
return true
|
||||
}
|
||||
prometheusWriteRequests.Inc()
|
||||
if err := promremotewrite.InsertHandler(nil, r); err != nil {
|
||||
if err := promremotewrite.InsertHandler(at, r); err != nil {
|
||||
prometheusWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -295,7 +307,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/prometheus/api/v1/import", "/api/v1/import":
|
||||
vmimportRequests.Inc()
|
||||
if err := vmimport.InsertHandler(nil, r); err != nil {
|
||||
if err := vmimport.InsertHandler(at, r); err != nil {
|
||||
vmimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -304,7 +316,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/prometheus/api/v1/import/csv", "/api/v1/import/csv":
|
||||
csvimportRequests.Inc()
|
||||
if err := csvimport.InsertHandler(nil, r); err != nil {
|
||||
if err := csvimport.InsertHandler(at, r); err != nil {
|
||||
csvimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -313,7 +325,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/prometheus/api/v1/import/native", "/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(nil, r); err != nil {
|
||||
if err := native.InsertHandler(at, r); err != nil {
|
||||
nativeimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -322,7 +334,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/influx/write", "/influx/api/v2/write", "/write", "/api/v2/write":
|
||||
influxWriteRequests.Inc()
|
||||
if err := influx.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
|
||||
influxWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -339,7 +351,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
|
||||
opentelemetryPushRequests.Inc()
|
||||
if err := opentelemetry.InsertHandler(nil, r); err != nil {
|
||||
if err := opentelemetry.InsertHandler(at, r); err != nil {
|
||||
opentelemetryPushErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -360,7 +372,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/newrelic/infra/v2/metrics/events/bulk":
|
||||
newrelicWriteRequests.Inc()
|
||||
if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
if err := newrelic.InsertHandlerForHTTP(at, r); err != nil {
|
||||
newrelicWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -371,7 +383,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/datadog/api/v1/series":
|
||||
datadogv1WriteRequests.Inc()
|
||||
if err := datadogv1.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogv1WriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -382,7 +394,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/datadog/api/v2/series":
|
||||
datadogv2WriteRequests.Inc()
|
||||
if err := datadogv2.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogv2WriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -394,7 +406,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
case "/datadog/api/beta/sketches":
|
||||
datadogsketchesWriteRequests.Inc()
|
||||
if err := datadogsketches.InsertHandlerForHTTP(nil, r); err != nil {
|
||||
if err := datadogsketches.InsertHandlerForHTTP(at, r); err != nil {
|
||||
datadogsketchesWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
@@ -504,7 +516,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
}
|
||||
|
||||
func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path string) bool {
|
||||
p, err := httpserver.ParsePath(path)
|
||||
p, err := httpserver.ParsePath(path, "")
|
||||
if err != nil {
|
||||
// Cannot parse multitenant path. Skip it - probably it will be parsed later.
|
||||
return false
|
||||
|
||||
@@ -16,9 +16,9 @@ type Path struct {
|
||||
}
|
||||
|
||||
// ParsePath parses the given path.
|
||||
func ParsePath(path string) (*Path, error) {
|
||||
func ParsePath(path string, tenantID string) (*Path, error) {
|
||||
// The path must have the following form:
|
||||
// /{prefix}/{authToken}/{suffix}
|
||||
// /{prefix}/[{authToken}/]{suffix}
|
||||
//
|
||||
// - prefix must contain `select`, `insert` or `delete`.
|
||||
// - authToken contains `accountID[:projectID]`, where projectID is optional.
|
||||
@@ -31,18 +31,25 @@ func ParsePath(path string) (*Path, error) {
|
||||
s := skipPrefixSlashes(path)
|
||||
n := strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
|
||||
if len(tenantID) == 0 {
|
||||
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{tenantID}/{suffix} format; "+
|
||||
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
|
||||
}
|
||||
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{suffix} format; "+
|
||||
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
|
||||
}
|
||||
prefix := s[:n]
|
||||
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
n = strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {authToken} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
|
||||
if len(tenantID) == 0 {
|
||||
n = strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {tenantID} in %q; expecting /{prefix}/{tenantID}/{suffix} format; "+
|
||||
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
|
||||
}
|
||||
tenantID = s[:n]
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
}
|
||||
authToken := s[:n]
|
||||
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
|
||||
// Substitute double slashes with single slashes in the path, since such slashes
|
||||
// may appear due improper copy-pasting of the url.
|
||||
@@ -50,7 +57,7 @@ func ParsePath(path string) (*Path, error) {
|
||||
|
||||
p := &Path{
|
||||
Prefix: prefix,
|
||||
AuthToken: authToken,
|
||||
AuthToken: tenantID,
|
||||
Suffix: suffix,
|
||||
}
|
||||
return p, nil
|
||||
|
||||
Reference in New Issue
Block a user