Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
a7404304fc obtain tenant information from headers 2025-03-08 11:17:34 +02:00
2 changed files with 46 additions and 27 deletions

View File

@@ -212,17 +212,18 @@ func getOpenTSDBHTTPInsertHandler() func(req *http.Request) error {
}
}
return func(req *http.Request) error {
path := strings.Replace(req.URL.Path, "//", "/", -1)
at, err := getAuthTokenFromPath(path)
at, err := getAuthTokenFromReq(req)
if err != nil {
return fmt.Errorf("cannot obtain auth token from path %q: %w", path, err)
return fmt.Errorf("cannot obtain auth token: %w", err)
}
return opentsdbhttp.InsertHandler(at, req)
}
}
func getAuthTokenFromPath(path string) (*auth.Token, error) {
p, err := httpserver.ParsePath(path)
func getAuthTokenFromReq(req *http.Request) (*auth.Token, error) {
tenantID := req.Header.Get("TenantID")
path := strings.Replace(req.URL.Path, "//", "/", -1)
p, err := httpserver.ParsePath(path, tenantID)
if err != nil {
return nil, fmt.Errorf("cannot parse multitenant path: %w", err)
}
@@ -257,10 +258,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
var at *auth.Token
var err error
if tenantID := r.Header.Get("TenantID"); tenantID != "" {
at, err = auth.NewToken(tenantID)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
}
path := strings.Replace(r.URL.Path, "//", "/", -1)
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") {
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(nil, r); err != nil {
if err := prometheusimport.InsertHandler(at, r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -280,13 +291,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
path = strings.TrimSuffix(path, "/")
}
switch path {
case "/prometheus/api/v1/write", "/api/v1/write", "/api/v1/push", "/prometheus/api/v1/push":
if common.HandleVMProtoServerHandshake(w, r) {
return true
}
prometheusWriteRequests.Inc()
if err := promremotewrite.InsertHandler(nil, r); err != nil {
if err := promremotewrite.InsertHandler(at, r); err != nil {
prometheusWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -295,7 +307,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/prometheus/api/v1/import", "/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(nil, r); err != nil {
if err := vmimport.InsertHandler(at, r); err != nil {
vmimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -304,7 +316,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/prometheus/api/v1/import/csv", "/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(nil, r); err != nil {
if err := csvimport.InsertHandler(at, r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -313,7 +325,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/prometheus/api/v1/import/native", "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(nil, r); err != nil {
if err := native.InsertHandler(at, r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -322,7 +334,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/influx/write", "/influx/api/v2/write", "/write", "/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(nil, r); err != nil {
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
influxWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -339,7 +351,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/opentelemetry/api/v1/push", "/opentelemetry/v1/metrics":
opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(nil, r); err != nil {
if err := opentelemetry.InsertHandler(at, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -360,7 +372,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/newrelic/infra/v2/metrics/events/bulk":
newrelicWriteRequests.Inc()
if err := newrelic.InsertHandlerForHTTP(nil, r); err != nil {
if err := newrelic.InsertHandlerForHTTP(at, r); err != nil {
newrelicWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -371,7 +383,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/datadog/api/v1/series":
datadogv1WriteRequests.Inc()
if err := datadogv1.InsertHandlerForHTTP(nil, r); err != nil {
if err := datadogv1.InsertHandlerForHTTP(at, r); err != nil {
datadogv1WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -382,7 +394,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/datadog/api/v2/series":
datadogv2WriteRequests.Inc()
if err := datadogv2.InsertHandlerForHTTP(nil, r); err != nil {
if err := datadogv2.InsertHandlerForHTTP(at, r); err != nil {
datadogv2WriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -394,7 +406,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/datadog/api/beta/sketches":
datadogsketchesWriteRequests.Inc()
if err := datadogsketches.InsertHandlerForHTTP(nil, r); err != nil {
if err := datadogsketches.InsertHandlerForHTTP(at, r); err != nil {
datadogsketchesWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@@ -504,7 +516,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path string) bool {
p, err := httpserver.ParsePath(path)
p, err := httpserver.ParsePath(path, "")
if err != nil {
// Cannot parse multitenant path. Skip it - probably it will be parsed later.
return false

View File

@@ -16,9 +16,9 @@ type Path struct {
}
// ParsePath parses the given path.
func ParsePath(path string) (*Path, error) {
func ParsePath(path string, tenantID string) (*Path, error) {
// The path must have the following form:
// /{prefix}/{authToken}/{suffix}
// /{prefix}/[{authToken}/]{suffix}
//
// - prefix must contain `select`, `insert` or `delete`.
// - authToken contains `accountID[:projectID]`, where projectID is optional.
@@ -31,18 +31,25 @@ func ParsePath(path string) (*Path, error) {
s := skipPrefixSlashes(path)
n := strings.IndexByte(s, '/')
if n < 0 {
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
if len(tenantID) == 0 {
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{tenantID}/{suffix} format; "+
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
}
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{suffix} format; "+
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
}
prefix := s[:n]
s = skipPrefixSlashes(s[n+1:])
n = strings.IndexByte(s, '/')
if n < 0 {
return nil, fmt.Errorf("cannot find {authToken} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
if len(tenantID) == 0 {
n = strings.IndexByte(s, '/')
if n < 0 {
return nil, fmt.Errorf("cannot find {tenantID} in %q; expecting /{prefix}/{tenantID}/{suffix} format; "+
"see https://docs.victoriametrics.com/cluster-victoriametrics/#url-format", path)
}
tenantID = s[:n]
s = skipPrefixSlashes(s[n+1:])
}
authToken := s[:n]
s = skipPrefixSlashes(s[n+1:])
// Substitute double slashes with single slashes in the path, since such slashes
// may appear due improper copy-pasting of the url.
@@ -50,7 +57,7 @@ func ParsePath(path string) (*Path, error) {
p := &Path{
Prefix: prefix,
AuthToken: authToken,
AuthToken: tenantID,
Suffix: suffix,
}
return p, nil