mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-28 22:31:26 +03:00
Compare commits
1 Commits
vmauth-rea
...
rollback-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33d5b8f12b |
@@ -80,15 +80,14 @@ func (as AlertState) String() string {
|
||||
|
||||
// AlertTplData is used to execute templating
|
||||
type AlertTplData struct {
|
||||
Type string
|
||||
Labels map[string]string
|
||||
Value float64
|
||||
Expr string
|
||||
AlertID uint64
|
||||
GroupID uint64
|
||||
ActiveAt time.Time
|
||||
For time.Duration
|
||||
IsPartial bool
|
||||
Type string
|
||||
Labels map[string]string
|
||||
Value float64
|
||||
Expr string
|
||||
AlertID uint64
|
||||
GroupID uint64
|
||||
ActiveAt time.Time
|
||||
For time.Duration
|
||||
}
|
||||
|
||||
var tplHeaders = []string{
|
||||
@@ -102,7 +101,6 @@ var tplHeaders = []string{
|
||||
"{{ $groupID := .GroupID }}",
|
||||
"{{ $activeAt := .ActiveAt }}",
|
||||
"{{ $for := .For }}",
|
||||
"{{ $isPartial := .IsPartial }}",
|
||||
}
|
||||
|
||||
// ExecTemplate executes the Alert template for given
|
||||
|
||||
@@ -346,8 +346,6 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
|
||||
ls.processed[l.Name] = l.Value
|
||||
}
|
||||
|
||||
// labels only support limited templating variables,
|
||||
// including `labels`, `value` and `expr`, to avoid breaking alert states or causing cardinality issue with results
|
||||
extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{
|
||||
Labels: ls.origin,
|
||||
Value: m.Values[0],
|
||||
@@ -459,8 +457,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
||||
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
|
||||
}
|
||||
|
||||
isPartial := isPartialResponse(res)
|
||||
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartial)
|
||||
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartialResponse(res))
|
||||
qFn := func(query string) ([]datasource.Metric, error) {
|
||||
res, _, err := ar.q.Query(ctx, query, ts)
|
||||
return res.Data, err
|
||||
@@ -486,7 +483,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
|
||||
at = a.ActiveAt
|
||||
}
|
||||
}
|
||||
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls, isPartial)
|
||||
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls)
|
||||
if err != nil {
|
||||
// only set error in current state, but do not break alert processing
|
||||
curState.Err = err
|
||||
@@ -604,17 +601,16 @@ func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.
|
||||
return ls, nil
|
||||
}
|
||||
|
||||
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet, isPartial bool) (map[string]string, error) {
|
||||
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet) (map[string]string, error) {
|
||||
tplData := notifier.AlertTplData{
|
||||
Value: m.Values[0],
|
||||
Type: ar.Type.String(),
|
||||
Labels: ls.origin,
|
||||
Expr: ar.Expr,
|
||||
AlertID: hash(ls.processed),
|
||||
GroupID: ar.GroupID,
|
||||
ActiveAt: activeAt,
|
||||
For: ar.For,
|
||||
IsPartial: isPartial,
|
||||
Value: m.Values[0],
|
||||
Type: ar.Type.String(),
|
||||
Labels: ls.origin,
|
||||
Expr: ar.Expr,
|
||||
AlertID: hash(ls.processed),
|
||||
GroupID: ar.GroupID,
|
||||
ActiveAt: activeAt,
|
||||
For: ar.For,
|
||||
}
|
||||
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
|
||||
if err != nil {
|
||||
|
||||
@@ -1120,7 +1120,7 @@ func TestAlertingRuleLimit_Success(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAlertingRule_Template(t *testing.T) {
|
||||
f := func(rule *AlertingRule, metrics []datasource.Metric, isResponsePartial bool, alertsExpected map[uint64]*notifier.Alert) {
|
||||
f := func(rule *AlertingRule, metrics []datasource.Metric, alertsExpected map[uint64]*notifier.Alert) {
|
||||
t.Helper()
|
||||
|
||||
fakeGroup := Group{
|
||||
@@ -1133,7 +1133,6 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
entries: make([]StateEntry, 10),
|
||||
}
|
||||
fq.Add(metrics...)
|
||||
fq.SetPartialResponse(isResponsePartial)
|
||||
|
||||
if _, err := rule.exec(context.TODO(), time.Now(), 0); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
@@ -1164,7 +1163,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
}, []datasource.Metric{
|
||||
metricWithValueAndLabels(t, 1, "instance", "foo"),
|
||||
metricWithValueAndLabels(t, 1, "instance", "bar"),
|
||||
}, false, map[uint64]*notifier.Alert{
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): {
|
||||
Annotations: map[string]string{
|
||||
"summary": `common: Too high connection number for "foo"`,
|
||||
@@ -1193,14 +1192,14 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
"instance": "{{ $labels.instance }}",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}".{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
|
||||
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`,
|
||||
"description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`,
|
||||
},
|
||||
alerts: make(map[uint64]*notifier.Alert),
|
||||
}, []datasource.Metric{
|
||||
metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"),
|
||||
metricWithValueAndLabels(t, 10, "__name__", "second", "instance", "bar", alertNameLabel, "override"),
|
||||
}, false, map[uint64]*notifier.Alert{
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{alertNameLabel: "override label", "exported_alertname": "override", "instance": "foo"}): {
|
||||
Labels: map[string]string{
|
||||
alertNameLabel: "override label",
|
||||
@@ -1208,7 +1207,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
"instance": "foo",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `first: Too high connection number for "foo".`,
|
||||
"summary": `first: Too high connection number for "foo"`,
|
||||
"description": `override: It is 2 connections for "foo"`,
|
||||
},
|
||||
},
|
||||
@@ -1219,7 +1218,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
"instance": "bar",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `second: Too high connection number for "bar".`,
|
||||
"summary": `second: Too high connection number for "bar"`,
|
||||
"description": `override: It is 10 connections for "bar"`,
|
||||
},
|
||||
},
|
||||
@@ -1232,7 +1231,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
"instance": "{{ $labels.instance }}",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}.{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
|
||||
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`,
|
||||
},
|
||||
alerts: make(map[uint64]*notifier.Alert),
|
||||
}, []datasource.Metric{
|
||||
@@ -1240,7 +1239,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
alertNameLabel, "originAlertname",
|
||||
alertGroupNameLabel, "originGroupname",
|
||||
"instance", "foo"),
|
||||
}, true, map[uint64]*notifier.Alert{
|
||||
}, map[uint64]*notifier.Alert{
|
||||
hash(map[string]string{
|
||||
alertNameLabel: "OriginLabels",
|
||||
"exported_alertname": "originAlertname",
|
||||
@@ -1256,7 +1255,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
||||
"instance": "foo",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `Alert "originAlertname(originGroupname)" for instance foo. WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.`,
|
||||
"summary": `Alert "originAlertname(originGroupname)" for instance foo`,
|
||||
},
|
||||
},
|
||||
})
|
||||
@@ -1386,7 +1385,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
|
||||
"group": "vmalert",
|
||||
"alertname": "ConfigurationReloadFailure",
|
||||
"alertgroup": "vmalert",
|
||||
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
|
||||
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
|
||||
}
|
||||
|
||||
expectedProcessedLabels := map[string]string{
|
||||
@@ -1396,7 +1395,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
|
||||
"exported_alertname": "ConfigurationReloadFailure",
|
||||
"group": "vmalert",
|
||||
"alertgroup": "vmalert",
|
||||
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
|
||||
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
|
||||
}
|
||||
|
||||
ls, err := ar.toLabels(metric, nil)
|
||||
|
||||
@@ -394,7 +394,7 @@ func (bu *backendURL) runHealthCheck() {
|
||||
if errors.Is(bu.healthCheckContext.Err(), context.Canceled) {
|
||||
return
|
||||
}
|
||||
logger.Warnf("ignoring the backend at %s for %s because of dial error: %s", addr, *failTimeout, err)
|
||||
logger.Warnf("ignoring the backend at %s for %s becasue of dial error: %s", addr, *failTimeout, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -809,7 +809,7 @@ func reloadAuthConfig() (bool, error) {
|
||||
|
||||
ok, err := reloadAuthConfigData(data)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to parse -auth.config=%q: %w", *authConfigPath, err)
|
||||
return false, fmt.Errorf("failed to pars -auth.config=%q: %w", *authConfigPath, err)
|
||||
}
|
||||
if !ok {
|
||||
return false, nil
|
||||
|
||||
@@ -156,10 +156,6 @@ func requestHandlerWithInternalRoutes(w http.ResponseWriter, r *http.Request) bo
|
||||
}
|
||||
|
||||
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if r.Body != nil {
|
||||
r.Body = &readDurationTrackingBody{r: r.Body}
|
||||
}
|
||||
|
||||
ats := getAuthTokensFromRequest(r)
|
||||
if len(ats) == 0 {
|
||||
// Process requests for unauthorized users
|
||||
@@ -353,37 +349,14 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
|
||||
err = ctxErr
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, errReadTimeout) {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
// Do not retry canceled or timed out requests
|
||||
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
|
||||
requestURI := httpserver.GetRequestURI(r)
|
||||
|
||||
logger.Warnf("remoteAddr: %s; requestURI: %s; client %s request exceeded single read timeout -readTimeout=%s, closing connection", remoteAddr, requestURI, ui.name(), *readTimeout)
|
||||
|
||||
rejectSlowClientRequests.Inc()
|
||||
if w1, ok := w.(http.Hijacker); ok {
|
||||
conn, _, connErr := w1.Hijack()
|
||||
if connErr != nil {
|
||||
logger.Errorf("cannot hijack connection for slow read timeout handling for %s: %s", targetURL, connErr)
|
||||
return true, false
|
||||
}
|
||||
_ = conn.Close()
|
||||
return true, false
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
// Timed out request must be counted as errors, since this usually means that the backend is slow.
|
||||
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
|
||||
}
|
||||
|
||||
return true, false
|
||||
}
|
||||
|
||||
// Do not retry canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
clientCanceledRequests.Inc()
|
||||
return true, false
|
||||
}
|
||||
// Do not retry timed out requests
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
|
||||
requestURI := httpserver.GetRequestURI(r)
|
||||
// Timed out request must be counted as errors, since this usually means that the backend is slow.
|
||||
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
|
||||
return false, false
|
||||
}
|
||||
if !rtbOK || !rtb.canRetry() {
|
||||
@@ -440,10 +413,7 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
|
||||
|
||||
err = copyStreamToClient(w, res.Body)
|
||||
_ = res.Body.Close()
|
||||
if errors.Is(err, context.Canceled) {
|
||||
clientCanceledRequests.Inc()
|
||||
return true, false
|
||||
} else if err != nil && !netutil.IsTrivialNetworkError(err) {
|
||||
if err != nil && !netutil.IsTrivialNetworkError(err) && !errors.Is(err, context.Canceled) {
|
||||
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
|
||||
requestURI := httpserver.GetRequestURI(r)
|
||||
|
||||
@@ -576,8 +546,6 @@ var (
|
||||
configReloadRequests = metrics.NewCounter(`vmauth_http_requests_total{path="/-/reload"}`)
|
||||
invalidAuthTokenRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="invalid_auth_token"}`)
|
||||
missingRouteRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="missing_route"}`)
|
||||
clientCanceledRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="client_canceled"}`)
|
||||
rejectSlowClientRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="reject_slow_client"}`)
|
||||
)
|
||||
|
||||
func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, insecureSkipVerifyP *bool) (http.RoundTripper, error) {
|
||||
@@ -665,7 +633,6 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
// Do not return any response for the request canceled by the client,
|
||||
// since the connection to the client is already closed.
|
||||
clientCanceledRequests.Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -804,34 +771,3 @@ func debugInfo(u *url.URL, r *http.Request) string {
|
||||
fmt.Fprint(s, ")")
|
||||
return s.String()
|
||||
}
|
||||
|
||||
var slowReadDuration = metrics.NewSummary(`vmauth_request_slow_read_duration_seconds`)
|
||||
|
||||
var readTimeout = flag.Duration("readTimeout", 0, "The maximum duration for a single read call when exceeded the connection is closed. Zero disables request read timeout. "+
|
||||
"See also -writeTimeout")
|
||||
|
||||
var errReadTimeout = fmt.Errorf("request read timeout")
|
||||
|
||||
type readDurationTrackingBody struct {
|
||||
r io.ReadCloser
|
||||
}
|
||||
|
||||
func (r *readDurationTrackingBody) Read(p []byte) (n int, err error) {
|
||||
start := time.Now()
|
||||
n, err = r.r.Read(p)
|
||||
dur := time.Since(start)
|
||||
|
||||
// Record slow read durations only to avoid overhead for fast reads.
|
||||
if dur > time.Millisecond {
|
||||
slowReadDuration.Update(dur.Seconds())
|
||||
}
|
||||
if err == nil && *readTimeout > 0 && dur > *readTimeout {
|
||||
return n, errReadTimeout
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *readDurationTrackingBody) Close() error {
|
||||
return r.r.Close()
|
||||
}
|
||||
|
||||
@@ -389,23 +389,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
case "/create":
|
||||
snapshotsCreateTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshotName := Storage.MustCreateSnapshot()
|
||||
|
||||
// Verify whether the client already closed the connection.
|
||||
// In this case it is better to drop the created snapshot, since the client isn't interested in it.
|
||||
if err := r.Context().Err(); err != nil {
|
||||
logger.Infof("deleting already created snapshot at %s because the client canceled the request", snapshotName)
|
||||
if err := deleteSnapshot(snapshotName); err != nil {
|
||||
logger.Infof("cannot delete just created snapshot: %s", err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
snapshotPath := Storage.MustCreateSnapshot()
|
||||
if prometheusCompatibleResponse {
|
||||
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotName))
|
||||
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotPath))
|
||||
} else {
|
||||
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotName))
|
||||
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotPath))
|
||||
}
|
||||
return true
|
||||
case "/list":
|
||||
@@ -425,12 +413,23 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
snapshotsDeleteTotal.Inc()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
snapshotName := r.FormValue("snapshot")
|
||||
if err := deleteSnapshot(snapshotName); err != nil {
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteErrorsTotal.Inc()
|
||||
return true
|
||||
|
||||
snapshots := Storage.MustListSnapshots()
|
||||
for _, snName := range snapshots {
|
||||
if snName == snapshotName {
|
||||
if err := Storage.DeleteSnapshot(snName); err != nil {
|
||||
err = fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
|
||||
jsonResponseError(w, err)
|
||||
snapshotsDeleteErrorsTotal.Inc()
|
||||
return true
|
||||
}
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
return true
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(w, `{"status":"ok"}`)
|
||||
|
||||
err := fmt.Errorf("cannot find snapshot %q", snapshotName)
|
||||
jsonResponseError(w, err)
|
||||
return true
|
||||
case "/delete_all":
|
||||
snapshotsDeleteAllTotal.Inc()
|
||||
@@ -451,19 +450,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func deleteSnapshot(snapshotName string) error {
|
||||
snapshots := Storage.MustListSnapshots()
|
||||
for _, snName := range snapshots {
|
||||
if snName == snapshotName {
|
||||
if err := Storage.DeleteSnapshot(snName); err != nil {
|
||||
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("cannot find snapshot %q", snapshotName)
|
||||
}
|
||||
|
||||
func initStaleSnapshotsRemover(strg *storage.Storage) {
|
||||
staleSnapshotsRemoverCh = make(chan struct{})
|
||||
if snapshotsMaxAge.Duration() <= 0 {
|
||||
|
||||
1491
app/vmui/packages/vmui/package-lock.json
generated
1491
app/vmui/packages/vmui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -23,43 +23,43 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"classnames": "^2.5.1",
|
||||
"dayjs": "^1.11.19",
|
||||
"dayjs": "^1.11.13",
|
||||
"lodash.debounce": "^4.0.8",
|
||||
"marked": "^17.0.1",
|
||||
"preact": "^10.28.2",
|
||||
"qs": "^6.14.1",
|
||||
"marked": "^16.0.0",
|
||||
"preact": "^10.26.9",
|
||||
"qs": "^6.14.0",
|
||||
"react-input-mask": "^2.0.4",
|
||||
"react-router-dom": "^7.12.0",
|
||||
"react-router-dom": "^7.6.3",
|
||||
"uplot": "^1.6.32",
|
||||
"vite": "^7.3.1",
|
||||
"web-vitals": "^5.1.0"
|
||||
"vite": "^7.1.11",
|
||||
"web-vitals": "^5.0.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/eslintrc": "^3.3.3",
|
||||
"@eslint/js": "^9.39.2",
|
||||
"@eslint/eslintrc": "^3.3.1",
|
||||
"@eslint/js": "^9.30.1",
|
||||
"@preact/preset-vite": "^2.10.2",
|
||||
"@testing-library/jest-dom": "^6.9.1",
|
||||
"@testing-library/jest-dom": "^6.6.3",
|
||||
"@testing-library/preact": "^3.2.4",
|
||||
"@types/lodash.debounce": "^4.0.9",
|
||||
"@types/node": "^25.0.8",
|
||||
"@types/node": "^24.0.12",
|
||||
"@types/qs": "^6.14.0",
|
||||
"@types/react": "^19.2.8",
|
||||
"@types/react": "^19.1.8",
|
||||
"@types/react-input-mask": "^3.0.6",
|
||||
"@types/react-router-dom": "^5.3.3",
|
||||
"@typescript-eslint/eslint-plugin": "^8.53.0",
|
||||
"@typescript-eslint/parser": "^8.53.0",
|
||||
"cross-env": "^10.1.0",
|
||||
"eslint": "^9.39.2",
|
||||
"@typescript-eslint/eslint-plugin": "^8.36.0",
|
||||
"@typescript-eslint/parser": "^8.36.0",
|
||||
"cross-env": "^7.0.3",
|
||||
"eslint": "^9.30.1",
|
||||
"eslint-plugin-react": "^7.37.5",
|
||||
"eslint-plugin-unused-imports": "^4.3.0",
|
||||
"globals": "^17.0.0",
|
||||
"eslint-plugin-unused-imports": "^4.1.4",
|
||||
"globals": "^16.3.0",
|
||||
"http-proxy-middleware": "^3.0.5",
|
||||
"jsdom": "^27.4.0",
|
||||
"jsdom": "^26.1.0",
|
||||
"postcss": "^8.5.6",
|
||||
"rollup-plugin-visualizer": "^6.0.5",
|
||||
"sass-embedded": "^1.97.2",
|
||||
"typescript": "^5.9.3",
|
||||
"vitest": "^4.0.17"
|
||||
"rollup-plugin-visualizer": "^6.0.3",
|
||||
"sass-embedded": "^1.89.2",
|
||||
"typescript": "^5.8.3",
|
||||
"vitest": "^3.2.4"
|
||||
},
|
||||
"browserslist": {
|
||||
"production": [
|
||||
|
||||
@@ -9,6 +9,7 @@ import { getFromStorage, removeFromStorage, saveToStorage } from "../../../../ut
|
||||
import useBoolean from "../../../../hooks/useBoolean";
|
||||
import { ChildComponentHandle } from "../GlobalSettings";
|
||||
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
|
||||
import { getTenantIdFromUrl } from "../../../../utils/tenants";
|
||||
|
||||
interface ServerConfiguratorProps {
|
||||
onClose: () => void;
|
||||
@@ -38,6 +39,10 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
|
||||
};
|
||||
|
||||
const handleApply = useCallback(() => {
|
||||
const tenantIdFromUrl = getTenantIdFromUrl(serverUrl);
|
||||
if (tenantIdFromUrl !== "") {
|
||||
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
|
||||
}
|
||||
dispatch({ type: "SET_SERVER", payload: serverUrl });
|
||||
onClose();
|
||||
}, [serverUrl]);
|
||||
@@ -55,6 +60,12 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
|
||||
}
|
||||
}, [enabledStorage]);
|
||||
|
||||
useEffect(() => {
|
||||
if (enabledStorage) {
|
||||
saveToStorage("SERVER_URL", serverUrl);
|
||||
}
|
||||
}, [serverUrl]);
|
||||
|
||||
useEffect(() => {
|
||||
// the tenant selector can change the serverUrl
|
||||
if (stateServerUrl === serverUrl) return;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { FC, useState, useRef, useMemo } from "preact/compat";
|
||||
import { FC, useState, useRef, useEffect, useMemo } from "preact/compat";
|
||||
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
|
||||
import { useTimeDispatch } from "../../../../state/time/TimeStateContext";
|
||||
import { ArrowDownIcon, StorageIcon } from "../../../Main/Icons";
|
||||
@@ -10,14 +10,14 @@ import { getAppModeEnable } from "../../../../utils/app-mode";
|
||||
import Tooltip from "../../../Main/Tooltip/Tooltip";
|
||||
import useDeviceDetect from "../../../../hooks/useDeviceDetect";
|
||||
import TextField from "../../../Main/TextField/TextField";
|
||||
import { replaceTenantId } from "../../../../utils/tenants";
|
||||
import { getTenantIdFromUrl, replaceTenantId } from "../../../../utils/tenants";
|
||||
import useBoolean from "../../../../hooks/useBoolean";
|
||||
|
||||
const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
const appModeEnable = getAppModeEnable();
|
||||
const { isMobile } = useDeviceDetect();
|
||||
|
||||
const { tenantId, serverUrl } = useAppState();
|
||||
const { tenantId: tenantIdState, serverUrl } = useAppState();
|
||||
const dispatch = useAppDispatch();
|
||||
const timeDispatch = useTimeDispatch();
|
||||
|
||||
@@ -48,8 +48,10 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
}, [accountIds]);
|
||||
|
||||
const createHandlerChange = (value: string) => () => {
|
||||
const tenant = value;
|
||||
dispatch({ type: "SET_TENANT_ID", payload: tenant });
|
||||
if (serverUrl) {
|
||||
const updateServerUrl = replaceTenantId(serverUrl, value);
|
||||
const updateServerUrl = replaceTenantId(serverUrl, tenant);
|
||||
if (updateServerUrl === serverUrl) return;
|
||||
dispatch({ type: "SET_SERVER", payload: updateServerUrl });
|
||||
timeDispatch({ type: "RUN_QUERY" });
|
||||
@@ -57,6 +59,16 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
handleCloseOptions();
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
const id = getTenantIdFromUrl(serverUrl);
|
||||
|
||||
if (tenantIdState && tenantIdState !== id) {
|
||||
createHandlerChange(tenantIdState)();
|
||||
} else {
|
||||
createHandlerChange(id)();
|
||||
}
|
||||
}, [serverUrl]);
|
||||
|
||||
if (!showTenantSelector) return null;
|
||||
|
||||
return (
|
||||
@@ -71,7 +83,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
<span className="vm-mobile-option__icon"><StorageIcon/></span>
|
||||
<div className="vm-mobile-option-text">
|
||||
<span className="vm-mobile-option-text__label">Tenant ID</span>
|
||||
<span className="vm-mobile-option-text__value">{tenantId}</span>
|
||||
<span className="vm-mobile-option-text__value">{tenantIdState}</span>
|
||||
</div>
|
||||
<span className="vm-mobile-option__arrow"><ArrowDownIcon/></span>
|
||||
</div>
|
||||
@@ -94,7 +106,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
)}
|
||||
onClick={toggleOpenOptions}
|
||||
>
|
||||
{tenantId}
|
||||
{tenantIdState}
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
@@ -126,7 +138,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
|
||||
className={classNames({
|
||||
"vm-list-item": true,
|
||||
"vm-list-item_mobile": isMobile,
|
||||
"vm-list-item_active": id === tenantId
|
||||
"vm-list-item_active": id === tenantIdState
|
||||
})}
|
||||
key={id}
|
||||
onClick={createHandlerChange(id)}
|
||||
|
||||
@@ -3,18 +3,19 @@ import { useEffect, useMemo, useState } from "preact/compat";
|
||||
import { ErrorTypes } from "../../../../../types";
|
||||
import { getAccountIds } from "../../../../../api/accountId";
|
||||
import { getAppModeEnable, getAppModeParams } from "../../../../../utils/app-mode";
|
||||
import { getTenantIdFromUrl } from "../../../../../utils/tenants";
|
||||
|
||||
export const useFetchAccountIds = () => {
|
||||
const { useTenantID } = getAppModeParams();
|
||||
const appModeEnable = getAppModeEnable();
|
||||
const { tenantId, serverUrl } = useAppState();
|
||||
const { serverUrl } = useAppState();
|
||||
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [error, setError] = useState<ErrorTypes | string>();
|
||||
const [accountIds, setAccountIds] = useState<string[]>([]);
|
||||
|
||||
const fetchUrl = useMemo(() => getAccountIds(serverUrl), [serverUrl]);
|
||||
const isServerUrlWithTenant = useMemo(() => !!tenantId, [tenantId]);
|
||||
const isServerUrlWithTenant = useMemo(() => !!getTenantIdFromUrl(serverUrl), [serverUrl]);
|
||||
const preventFetch = appModeEnable ? !useTenantID : !isServerUrlWithTenant;
|
||||
|
||||
useEffect(() => {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
@use "src/styles/variables" as *;
|
||||
@use 'sass:meta';
|
||||
|
||||
$button-radius: 6px;
|
||||
|
||||
@@ -43,8 +42,6 @@ $button-radius: 6px;
|
||||
|
||||
svg {
|
||||
width: 14px;
|
||||
min-width: 14px;
|
||||
max-width: 14px;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,8 +51,6 @@ $button-radius: 6px;
|
||||
|
||||
svg {
|
||||
width: 16px;
|
||||
min-width: 16px;
|
||||
max-width: 16px;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,8 +60,6 @@ $button-radius: 6px;
|
||||
|
||||
svg {
|
||||
width: 18px;
|
||||
min-width: 18px;
|
||||
max-width: 18px;
|
||||
line-height: 16px;
|
||||
}
|
||||
}
|
||||
@@ -135,14 +128,8 @@ $button-radius: 6px;
|
||||
);
|
||||
|
||||
@each $name, $color in $button-colors {
|
||||
@if $name == white {
|
||||
@include contained-button($name, $color, $color-black);
|
||||
@include outlined-button($name, $color, $color-white);
|
||||
@include text-button($name, $color-white);
|
||||
} @else {
|
||||
@include contained-button($name, $color, $color-white);
|
||||
@include outlined-button($name, $color, $color);
|
||||
@include text-button($name, $color);
|
||||
}
|
||||
@include contained-button($name, $color, if($name == white, $color-black, $color-white));
|
||||
@include outlined-button($name, $color, if($name == white, $color-white, $color));
|
||||
@include text-button($name, if($name == white, $color-white, $color));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import AppConfigurator from "../appConfigurator";
|
||||
import { useSearchParams } from "react-router-dom";
|
||||
import dayjs from "dayjs";
|
||||
import { DATE_FORMAT } from "../../../constants/date";
|
||||
import { getTenantIdFromUrl } from "../../../utils/tenants";
|
||||
import usePrevious from "../../../hooks/usePrevious";
|
||||
|
||||
export const useFetchQuery = (): {
|
||||
@@ -26,7 +27,7 @@ export const useFetchQuery = (): {
|
||||
const prevDate = usePrevious(date);
|
||||
const prevTotal = useRef<{ data: TSDBStatus }>();
|
||||
|
||||
const { tenantId, serverUrl } = useAppState();
|
||||
const { serverUrl } = useAppState();
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [error, setError] = useState<ErrorTypes | string>();
|
||||
const [tsdbStatus, setTSDBStatus] = useState<TSDBStatus>(appConfigurator.defaultTSDBStatus);
|
||||
@@ -157,8 +158,9 @@ export const useFetchQuery = (): {
|
||||
}, [error]);
|
||||
|
||||
useEffect(() => {
|
||||
setIsCluster(!!tenantId);
|
||||
}, [tenantId]);
|
||||
const id = getTenantIdFromUrl(serverUrl);
|
||||
setIsCluster(!!id);
|
||||
}, [serverUrl]);
|
||||
|
||||
|
||||
appConfigurator.tsdbStatusData = tsdbStatus;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { useEffect, useState } from "react";
|
||||
import { useTimeDispatch, useTimeState } from "../../../state/time/TimeStateContext";
|
||||
import { useCustomPanelDispatch, useCustomPanelState } from "../../../state/customPanel/CustomPanelStateContext";
|
||||
import { useAppDispatch, useAppState } from "../../../state/common/StateContext";
|
||||
import { useQueryDispatch, useQueryState } from "../../../state/query/QueryStateContext";
|
||||
import { displayTypeTabs } from "../DisplayTypeSwitch";
|
||||
import { useGraphDispatch, useGraphState } from "../../../state/graph/GraphStateContext";
|
||||
@@ -14,12 +15,14 @@ import { arrayEquals } from "../../../utils/array";
|
||||
import { isEqualURLSearchParams } from "../../../utils/url";
|
||||
|
||||
export const useSetQueryParams = () => {
|
||||
const { tenantId } = useAppState();
|
||||
const { displayType } = useCustomPanelState();
|
||||
const { query } = useQueryState();
|
||||
const { duration, relativeTime, period: { date, step } } = useTimeState();
|
||||
const { customStep } = useGraphState();
|
||||
const [searchParams, setSearchParams] = useSearchParams();
|
||||
|
||||
const dispatch = useAppDispatch();
|
||||
const timeDispatch = useTimeDispatch();
|
||||
const graphDispatch = useGraphDispatch();
|
||||
const queryDispatch = useQueryDispatch();
|
||||
@@ -69,6 +72,10 @@ export const useSetQueryParams = () => {
|
||||
if (searchParams.get(`${group}.tab`) !== displayTypeCode) {
|
||||
newSearchParams.set(`${group}.tab`, `${displayTypeCode}`);
|
||||
}
|
||||
|
||||
if (searchParams.get(`${group}.tenantID`) !== tenantId && tenantId) {
|
||||
newSearchParams.set(`${group}.tenantID`, tenantId);
|
||||
}
|
||||
});
|
||||
|
||||
// Remove extra parameters that exceed the request size
|
||||
@@ -82,7 +89,7 @@ export const useSetQueryParams = () => {
|
||||
|
||||
if (isEqualURLSearchParams(newSearchParams, searchParams) || !newSearchParams.size) return;
|
||||
setSearchParams(newSearchParams);
|
||||
}, [displayType, query, duration, relativeTime, date, step, customStep]);
|
||||
}, [tenantId, displayType, query, duration, relativeTime, date, step, customStep]);
|
||||
|
||||
useEffect(() => {
|
||||
const timer = setTimeout(setterSearchParams, 200);
|
||||
@@ -107,6 +114,11 @@ export const useSetQueryParams = () => {
|
||||
customPanelDispatch({ type: "SET_DISPLAY_TYPE", payload: displayTypeFromUrl });
|
||||
}
|
||||
|
||||
const tenantIdFromUrl = searchParams.get("g0.tenantID") || "";
|
||||
if (tenantIdFromUrl !== tenantId) {
|
||||
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
|
||||
}
|
||||
|
||||
const queryFromUrl = getQueryArray();
|
||||
if (!arrayEquals(queryFromUrl, query)) {
|
||||
queryDispatch({ type: "SET_QUERY", payload: queryFromUrl });
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { createContext, FC, useContext, useEffect, useMemo, useReducer } from "preact/compat";
|
||||
import { createContext, FC, useContext, useMemo, useReducer } from "preact/compat";
|
||||
import { Action, AppState, initialState, reducer } from "./reducer";
|
||||
import { getQueryStringValue } from "../../utils/query-string";
|
||||
import { Dispatch } from "react";
|
||||
import { getFromStorage, removeFromStorage, saveToStorage } from "../../utils/storage";
|
||||
|
||||
type StateContextType = { state: AppState, dispatch: Dispatch<Action> };
|
||||
|
||||
@@ -24,17 +23,6 @@ export const AppStateProvider: FC = ({ children }) => {
|
||||
return { state, dispatch };
|
||||
}, [state, dispatch]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!state.serverUrl) return;
|
||||
const enabledStorage = !!getFromStorage("SERVER_URL");
|
||||
|
||||
if (enabledStorage) {
|
||||
saveToStorage("SERVER_URL", state.serverUrl);
|
||||
} else {
|
||||
removeFromStorage(["SERVER_URL"]);
|
||||
}
|
||||
}, [state.serverUrl]);
|
||||
|
||||
return <StateContext.Provider value={contextValue}>
|
||||
{children}
|
||||
</StateContext.Provider>;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { getDefaultServer } from "../../utils/default-server-url";
|
||||
import { getQueryStringValue } from "../../utils/query-string";
|
||||
import { getFromStorage, saveToStorage } from "../../utils/storage";
|
||||
import { AppConfig, Theme } from "../../types";
|
||||
import { isDarkTheme } from "../../utils/theme";
|
||||
import { removeTrailingSlash } from "../../utils/url";
|
||||
import { getTenantIdFromUrl } from "../../utils/tenants";
|
||||
|
||||
export interface AppState {
|
||||
serverUrl: string;
|
||||
@@ -16,14 +16,15 @@ export interface AppState {
|
||||
export type Action =
|
||||
| { type: "SET_SERVER", payload: string }
|
||||
| { type: "SET_THEME", payload: Theme }
|
||||
| { type: "SET_TENANT_ID", payload: string }
|
||||
| { type: "SET_APP_CONFIG", payload: AppConfig }
|
||||
| { type: "SET_DARK_THEME" }
|
||||
|
||||
const serverUrl = removeTrailingSlash(getDefaultServer());
|
||||
const tenantId = getQueryStringValue("g0.tenantID", "") as string;
|
||||
|
||||
export const initialState: AppState = {
|
||||
serverUrl,
|
||||
tenantId: getTenantIdFromUrl(serverUrl),
|
||||
serverUrl: removeTrailingSlash(getDefaultServer(tenantId)),
|
||||
tenantId,
|
||||
theme: (getFromStorage("THEME") || Theme.system) as Theme,
|
||||
isDarkTheme: null,
|
||||
appConfig: {}
|
||||
@@ -34,9 +35,13 @@ export function reducer(state: AppState, action: Action): AppState {
|
||||
case "SET_SERVER":
|
||||
return {
|
||||
...state,
|
||||
tenantId: getTenantIdFromUrl(action.payload),
|
||||
serverUrl: removeTrailingSlash(action.payload)
|
||||
};
|
||||
case "SET_TENANT_ID":
|
||||
return {
|
||||
...state,
|
||||
tenantId: action.payload
|
||||
};
|
||||
case "SET_THEME":
|
||||
saveToStorage("THEME", action.payload);
|
||||
return {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { getAppModeParams } from "./app-mode";
|
||||
import { replaceTenantId } from "./tenants";
|
||||
import { APP_TYPE, AppType } from "../constants/appType";
|
||||
import { getFromStorage } from "./storage";
|
||||
|
||||
@@ -6,7 +7,7 @@ export const getDefaultURL = (u: string) => {
|
||||
return u.replace(/(\/(?:prometheus\/)?(?:graph|vmui)\/.*|\/#\/.*)/, "/prometheus");
|
||||
};
|
||||
|
||||
export const getDefaultServer = (): string => {
|
||||
export const getDefaultServer = (tenantId?: string): string => {
|
||||
const { serverURL } = getAppModeParams();
|
||||
const storageURL = getFromStorage("SERVER_URL") as string;
|
||||
const anomalyURL = `${window.location.origin}${window.location.pathname.replace(/^\/vmui/, "")}`;
|
||||
@@ -17,6 +18,6 @@ export const getDefaultServer = (): string => {
|
||||
case AppType.vmanomaly:
|
||||
return storageURL || anomalyURL;
|
||||
default:
|
||||
return url;
|
||||
return tenantId ? replaceTenantId(url, tenantId) : url;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
|
||||
import {
|
||||
replaceTenantId,
|
||||
getTenantIdFromUrl,
|
||||
getUrlWithoutTenant,
|
||||
} from "./tenants";
|
||||
|
||||
describe("tenant url helpers", () => {
|
||||
describe("getTenantIdFromUrl", () => {
|
||||
it("returns accountID", () => {
|
||||
expect(getTenantIdFromUrl("http://vmselect:8481/select/0/vmui/")).toBe("0");
|
||||
});
|
||||
|
||||
it("returns accountID:projectID", () => {
|
||||
expect(getTenantIdFromUrl("http://vmselect:8481/select/12:7/vmui/")).toBe("12:7");
|
||||
});
|
||||
|
||||
it("returns empty string if tenant is missing", () => {
|
||||
expect(getTenantIdFromUrl("http://vmselect:8481/select/vmui/")).toBe("");
|
||||
});
|
||||
|
||||
it("returns empty string for unrelated paths", () => {
|
||||
expect(getTenantIdFromUrl("http://vmselect:8481/foo/bar")).toBe("");
|
||||
});
|
||||
|
||||
it("returns accountID when url ends right after tenant", () => {
|
||||
expect(getTenantIdFromUrl("http://vmselect:8481/select/0")).toBe("0");
|
||||
});
|
||||
});
|
||||
|
||||
describe("replaceTenantId", () => {
|
||||
it("replaces accountID with another accountID", () => {
|
||||
expect(
|
||||
replaceTenantId("http://vmselect:8481/select/0/vmui/", "2")
|
||||
).toBe("http://vmselect:8481/select/2/vmui/");
|
||||
});
|
||||
|
||||
it("replaces accountID with accountID:projectID", () => {
|
||||
expect(
|
||||
replaceTenantId("http://vmselect:8481/select/0/prometheus/", "1:9")
|
||||
).toBe("http://vmselect:8481/select/1:9/prometheus/");
|
||||
});
|
||||
|
||||
it("keeps the rest of the path intact", () => {
|
||||
expect(
|
||||
replaceTenantId("http://vmselect:8481/select/3:4/prometheus/api/v1/query", "7")
|
||||
).toBe("http://vmselect:8481/select/7/prometheus/api/v1/query");
|
||||
});
|
||||
|
||||
it("does not change url if it doesn't match expected pattern", () => {
|
||||
expect(
|
||||
replaceTenantId("http://vmselect:8481/foo/bar", "2")
|
||||
).toBe("http://vmselect:8481/foo/bar");
|
||||
});
|
||||
});
|
||||
|
||||
describe("getUrlWithoutTenant", () => {
|
||||
it("removes /select/<tenant>/... and returns base url", () => {
|
||||
expect(
|
||||
getUrlWithoutTenant("http://vmselect:8481/select/0/vmui/")
|
||||
).toBe("http://vmselect:8481");
|
||||
});
|
||||
|
||||
it("removes /select/<tenant>/... for accountID:projectID and returns base url", () => {
|
||||
expect(
|
||||
getUrlWithoutTenant("http://vmselect:8481/select/5:6/prometheus/")
|
||||
).toBe("http://vmselect:8481");
|
||||
});
|
||||
|
||||
it("works with deep paths and returns base url", () => {
|
||||
expect(
|
||||
getUrlWithoutTenant("http://vmselect:8481/select/1:2/prometheus/api/v1/query")
|
||||
).toBe("http://vmselect:8481");
|
||||
});
|
||||
|
||||
it("does not change url if it doesn't match expected pattern", () => {
|
||||
expect(
|
||||
getUrlWithoutTenant("http://vmselect:8481/foo/bar")
|
||||
).toBe("http://vmselect:8481/foo/bar");
|
||||
});
|
||||
|
||||
it("removes url ending right after tenant", () => {
|
||||
expect(
|
||||
getUrlWithoutTenant("http://vmselect:8481/select/0")
|
||||
).toBe("http://vmselect:8481");
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,21 +1,13 @@
|
||||
const TENANT_REGEXP = /(\/select\/)(\d+(?::\d+)?)(\/.*)?$/;
|
||||
const regexp = /(\/select\/)([^/])(\/)(.+)/;
|
||||
|
||||
export const replaceTenantId = (serverUrl: string, tenantId: string) => {
|
||||
return serverUrl.replace(TENANT_REGEXP, `$1${tenantId}$3`);
|
||||
return serverUrl.replace(regexp, `$1${tenantId}/$4`);
|
||||
};
|
||||
|
||||
export const getTenantIdFromUrl = (url: string): string => {
|
||||
return url.match(TENANT_REGEXP)?.[2] ?? "";
|
||||
return url.match(regexp)?.[2] || "";
|
||||
};
|
||||
|
||||
export const getUrlWithoutTenant = (url: string): string => {
|
||||
return url.replace(TENANT_REGEXP, "");
|
||||
};
|
||||
|
||||
export const updateBrowserUrlTenant = (tenantId: string) => {
|
||||
const base = `${window.location.origin}${window.location.pathname}${window.location.search}`;
|
||||
const nextBase = replaceTenantId(base, tenantId);
|
||||
|
||||
const nextUrl = `${nextBase}${window.location.hash}`;
|
||||
window.history.replaceState(null, "", nextUrl);
|
||||
return url.replace(regexp, "");
|
||||
};
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ESNext",
|
||||
"types": ["vite/client", "vitest/globals", "node"],
|
||||
"types": ["vite/client", "vitest/globals"],
|
||||
"lib": [
|
||||
"dom",
|
||||
"dom.iterable",
|
||||
|
||||
@@ -452,7 +452,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
|
||||
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
@@ -605,7 +605,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
|
||||
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
|
||||
@@ -11458,4 +11458,4 @@
|
||||
"title": "VictoriaMetrics - cluster",
|
||||
"uid": "oS7Bi_0Wz",
|
||||
"version": 1
|
||||
}
|
||||
}
|
||||
@@ -453,7 +453,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
|
||||
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
@@ -606,7 +606,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
|
||||
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
|
||||
"legendFormat": "__auto",
|
||||
"range": true,
|
||||
"refId": "A"
|
||||
|
||||
@@ -11459,4 +11459,4 @@
|
||||
"title": "VictoriaMetrics - cluster (VM)",
|
||||
"uid": "oS7Bi_0Wz_vm",
|
||||
"version": 1
|
||||
}
|
||||
}
|
||||
@@ -106,7 +106,6 @@ See also [case studies](https://docs.victoriametrics.com/victoriametrics/casestu
|
||||
* [Why I Switched to VictoriaMetrics: Scaling from Small Business to Enterprise](https://blackmetalz.github.io/why-i-switched-to-victoriametrics-scaling-from-small-business-to-enterprise.html)
|
||||
* [Backing up VictoriaMetrics Data: A Complete Guide](https://medium.com/@kanakaraju896/backing-up-victoriametrics-data-a-complete-guide-24473c74450f)
|
||||
* [Unlocking the Power of VictoriaMetrics: A Prometheus Alternative](https://developer-friendly.blog/blog/2024/06/17/unlocking-the-power-of-victoriametrics-a-prometheus-alternative/)
|
||||
* [How to Master Kubernetes Observability: Multi-Cluster Monitoring with VictoriaMetrics, Loki, and Grafana](https://www.keyvalue.systems/blog/kubernetes-observability-with-victoriametrics-loki-grafana/)
|
||||
|
||||
## Third-party articles and slides about VictoriaLogs
|
||||
|
||||
|
||||
@@ -31,15 +31,11 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): expose `vm_rollup_result_cache_requests_total` which tracks the number of requests to the query rollup cache. See [#10117](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10117).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `localStorage` availability checks with error reporting. See [#10085](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10085).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `VMUI:`-prefixed `localStorage` keys and legacy key migration.
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add a metric `vmauth_http_request_errors_total{reason="client_canceled"}` to measure client cancelled requests. This should help with debugging vmauth issues. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): do not retry client canceled requests. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add explicit month duration unit (`M`) for `-retentionPeriod` flag. This allows users to specify retention periods in months more explicitly. See [#10181](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10181).
|
||||
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Persistent queue Full ETA` panel to the `Drilldown` section. This panel helps estimate how much time remains until `vmagent` starts dropping incoming metrics. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add support for `$isPartial` variable in alerting rule annotation [templating](https://docs.victoriametrics.com/victoriametrics/vmalert/#templating). This allows users to include an additional warning message in alerts triggered by partial query responses. See [#4531](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4531).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix configuration reloading for `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` when vmagent is launched with empty files. Previously, if vmagent started with an empty config, subsequent config reloads were ignored. See [#10211](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10211).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214).
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): remove legacy `tenantID` query param and use the URL path as the single source of truth for multitenancy. See [#10232](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10232).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214)
|
||||
|
||||
## [v1.133.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.133.0)
|
||||
|
||||
@@ -1093,4 +1089,4 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
|
||||
|
||||
## Previous releases
|
||||
|
||||
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
@@ -286,7 +286,7 @@ expr: <string>
|
||||
# In case of conflicts, original labels are kept with prefix `exported_`.
|
||||
#
|
||||
# Labels only support limited templating variables in https://docs.victoriametrics.com/victoriametrics/vmalert/#templating,
|
||||
# including `$labels`, `$value` and `$expr`, to avoid breaking alert states or causing cardinality issue with results.
|
||||
# including `$labels`, `$value` and `expr`, to avoid breaking alert states or causing cardinality issue with results.
|
||||
# Note: be careful set dynamic label values like `$value`, because each time the $value changes - the new alert will be
|
||||
# generated which also break `for` condition.
|
||||
labels:
|
||||
@@ -316,7 +316,6 @@ The following variables are available in templating:
|
||||
| $for or .For | Alert's configured for param. | Number of connections is too high for more than {{ .For }} |
|
||||
| $externalLabels or .ExternalLabels | List of labels configured via `-external.label` command-line flag. | Issues with {{ $labels.instance }} (datacenter-{{ $externalLabels.dc }}) |
|
||||
| $externalURL or .ExternalURL | URL configured via `-external.url` command-line flag. Used for cases when vmalert is hidden behind proxy. | Visit {{ $externalURL }} for more details |
|
||||
| $isPartial or .IsPartial | Indicates whether the latest rule query response from the datasource(that supports returning `isPartial` option, such as vmcluster) could be partial. | {{ if $isPartial }}WARNING: The latest alert state may be a false alarm due to a partial response from the datasource.{{ end }}
|
||||
|
||||
Additionally, `vmalert` provides some extra templating functions listed in [template functions](#template-functions) and [reusable templates](#reusable-templates).
|
||||
|
||||
|
||||
@@ -336,19 +336,6 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
||||
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) unmarshalSingleItem(commonPrefix, firstItem []byte, mt marshalType) {
|
||||
if mt != marshalTypePlain {
|
||||
logger.Panicf("BUG: single item block must be always encoded with TypePlain")
|
||||
}
|
||||
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
|
||||
ib.items = slicesutil.SetLength(ib.items, 1)
|
||||
ib.data = bytesutil.ResizeNoCopyNoOverallocate(ib.data, len(firstItem))
|
||||
ib.data = append(ib.data[:0], firstItem...)
|
||||
item := &ib.items[0]
|
||||
item.Start = 0
|
||||
item.End = uint32(len(ib.data))
|
||||
}
|
||||
|
||||
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
|
||||
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
||||
ib.Reset()
|
||||
|
||||
@@ -331,14 +331,6 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
||||
if ps.sparse {
|
||||
cache = ibSparseCache
|
||||
}
|
||||
if bh.itemsCount == 1 {
|
||||
// special case for single item
|
||||
// there is no need to cache it, since firstItem is always stored in-memory
|
||||
ib := ps.tmpIB
|
||||
ib.Reset()
|
||||
ib.unmarshalSingleItem(bh.commonPrefix, bh.firstItem, bh.marshalType)
|
||||
return ib, nil
|
||||
}
|
||||
ibKey := blockcache.Key{
|
||||
Part: ps.p,
|
||||
Offset: bh.itemsBlockOffset,
|
||||
|
||||
@@ -163,80 +163,3 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
|
||||
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
return p, items, nil
|
||||
}
|
||||
|
||||
func TestGetInmemoryBlockWithZeroSizeBlock(t *testing.T) {
|
||||
var ph partHeader
|
||||
var ip inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.MustInitFromInmemoryPart(&ip, -3)
|
||||
|
||||
buildBlock := func(items ...string) inmemoryBlock {
|
||||
var ib inmemoryBlock
|
||||
for _, item := range items {
|
||||
if !ib.Add([]byte(item)) {
|
||||
t.Fatalf("cannot add item %q", item)
|
||||
}
|
||||
}
|
||||
ib.SortItems()
|
||||
return ib
|
||||
}
|
||||
|
||||
writeBlock := func(ib inmemoryBlock) {
|
||||
if len(ib.items) == 0 {
|
||||
t.Fatalf("block must contain items")
|
||||
}
|
||||
data := ib.data
|
||||
ph.itemsCount += uint64(len(ib.items))
|
||||
if ph.blocksCount == 0 {
|
||||
ph.firstItem = append(ph.firstItem[:0], ib.items[0].Bytes(data)...)
|
||||
}
|
||||
ph.lastItem = append(ph.lastItem[:0], ib.items[len(ib.items)-1].Bytes(data)...)
|
||||
ph.blocksCount++
|
||||
bsw.WriteBlock(&ib)
|
||||
}
|
||||
|
||||
writeBlock(buildBlock("a"))
|
||||
writeBlock(buildBlock("b0", "b1"))
|
||||
bsw.MustClose()
|
||||
|
||||
p := newPart(&ph, "test", ip.size(), ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
defer p.MustClose()
|
||||
|
||||
var ps partSearch
|
||||
ps.Init(p, false)
|
||||
ps.mrs = p.mrs
|
||||
if err := ps.nextBHS(); err != nil {
|
||||
t.Fatalf("cannot read block headers: %s", err)
|
||||
}
|
||||
if len(ps.bhs) != 2 {
|
||||
t.Fatalf("unexpected block headers count: %d", len(ps.bhs))
|
||||
}
|
||||
if ps.bhs[0].itemsBlockOffset != ps.bhs[1].itemsBlockOffset {
|
||||
t.Fatalf("blocks must share itemsBlockOffset for the test: %d vs %d", ps.bhs[0].itemsBlockOffset, ps.bhs[1].itemsBlockOffset)
|
||||
}
|
||||
if ps.bhs[0].itemsBlockSize != 0 {
|
||||
t.Fatalf("the first block must have zero itemsBlockSize; got %d", ps.bhs[0].itemsBlockSize)
|
||||
}
|
||||
|
||||
// iterate 4 times in order to place block into the cache
|
||||
// storage caches it after 2 missed requests according to the flag blockcache.missesBeforeCaching=2
|
||||
for i := range 4 {
|
||||
if _, err := ps.getInmemoryBlock(&ps.bhs[1]); err != nil {
|
||||
t.Fatalf("cannot load non-empty block at iteration %d: %s", i, err)
|
||||
}
|
||||
}
|
||||
assertBlockAt := func(bhIdx int, wantFirstItemValue string) {
|
||||
block, err := ps.getInmemoryBlock(&ps.bhs[bhIdx])
|
||||
if err != nil {
|
||||
t.Fatalf("cannot block=%d : %s", bhIdx, err)
|
||||
}
|
||||
if len(block.items) != int(ps.bhs[bhIdx].itemsCount) {
|
||||
t.Fatalf("unexpected items count in block=%d; got %d; want %d", bhIdx, len(block.items), ps.bhs[bhIdx].itemsCount)
|
||||
}
|
||||
if got := string(block.items[0].Bytes(block.data)); got != wantFirstItemValue {
|
||||
t.Fatalf("unexpected item in block=%d; got %q; want %q", bhIdx, got, wantFirstItemValue)
|
||||
}
|
||||
}
|
||||
assertBlockAt(0, "a")
|
||||
assertBlockAt(1, "b0")
|
||||
}
|
||||
|
||||
@@ -34,19 +34,17 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
|
||||
return fmt.Errorf("cannot parse the provided csv format: %w", err)
|
||||
}
|
||||
|
||||
wcr, err := writeconcurrencylimiter.GetReader(req.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
encoding := req.Header.Get("Content-Encoding")
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(req.Body, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode csv data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
@@ -57,6 +55,7 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -27,18 +27,16 @@ var (
|
||||
//
|
||||
// callback shouldn't hold rows after returning.
|
||||
func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode graphite data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
|
||||
@@ -49,6 +47,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) erro
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -61,18 +61,16 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string
|
||||
}
|
||||
|
||||
func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string, callback func(db string, rows []influx.Row) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode influx line protocol data: %w; see https://docs.victoriametrics.com/victoriametrics/integrations/influxdb/", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
@@ -84,6 +82,7 @@ func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -20,18 +20,16 @@ import (
|
||||
//
|
||||
// callback shouldn't hold block after returning.
|
||||
func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, contentEncoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode vmimport data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
br := getBufferedReader(reader)
|
||||
defer putBufferedReader(br)
|
||||
|
||||
@@ -106,6 +104,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(block *Block) erro
|
||||
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,13 +27,11 @@ var (
|
||||
//
|
||||
// callback shouldn't hold rows after returning.
|
||||
func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wcr := writeconcurrencylimiter.GetReader(r)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
r = wcr
|
||||
|
||||
ctx := getStreamContext(wcr)
|
||||
ctx := getStreamContext(r)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
uw := getUnmarshalWork()
|
||||
@@ -42,6 +40,7 @@ func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -23,23 +23,20 @@ import (
|
||||
// limitConcurrency defines whether to control the number of concurrent calls to this function.
|
||||
// It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set,
|
||||
// like /api/v1/write calls.
|
||||
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool,
|
||||
callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
|
||||
if limitConcurrency {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
r = wcr
|
||||
}
|
||||
|
||||
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool, callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode Prometheus text exposition data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
var wcr *writeconcurrencylimiter.Reader
|
||||
if limitConcurrency {
|
||||
wcr = writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
}
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
@@ -52,6 +49,9 @@ func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrenc
|
||||
uw.enableMetadata = enableMetadata
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
if wcr != nil {
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -23,13 +23,11 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
|
||||
//
|
||||
// callback shouldn't hold tss after returning.
|
||||
func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wcr := writeconcurrencylimiter.GetReader(r)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
r = wcr
|
||||
|
||||
ctx := getPushCtx(wcr)
|
||||
ctx := getPushCtx(r)
|
||||
defer putPushCtx(ctx)
|
||||
if err := ctx.Read(); err != nil {
|
||||
return err
|
||||
@@ -40,6 +38,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
|
||||
bb := bodyBufferPool.Get()
|
||||
defer bodyBufferPool.Put(bb)
|
||||
var err error
|
||||
if isVMRemoteWrite {
|
||||
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
|
||||
if err != nil {
|
||||
|
||||
@@ -31,10 +31,7 @@ const maxSnappyBlockSize = 56_000_000
|
||||
//
|
||||
// The callback must not hold references to the data after returning.
|
||||
func ReadUncompressedData(r io.Reader, contentType string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wcr := writeconcurrencylimiter.GetReader(r)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
if contentType == "zstd" {
|
||||
|
||||
@@ -23,18 +23,16 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 10*1024*1024, "The maxim
|
||||
//
|
||||
// callback shouldn't hold rows after returning.
|
||||
func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode vmimport data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
@@ -44,6 +42,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) erro
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
|
||||
@@ -26,18 +26,16 @@ var (
|
||||
//
|
||||
// callback shouldn't hold rows after returning.
|
||||
func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Row) error) error {
|
||||
wcr, err := writeconcurrencylimiter.GetReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
|
||||
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
|
||||
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode zabbixconnector data: %w", err)
|
||||
}
|
||||
defer protoparserutil.PutUncompressedReader(reader)
|
||||
|
||||
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||
defer writeconcurrencylimiter.PutReader(wcr)
|
||||
reader = wcr
|
||||
|
||||
ctx := getStreamContext(reader)
|
||||
defer putStreamContext(ctx)
|
||||
for ctx.Read() {
|
||||
@@ -47,6 +45,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Ro
|
||||
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
|
||||
ctx.wg.Add(1)
|
||||
protoparserutil.ScheduleUnmarshalWork(uw)
|
||||
wcr.DecConcurrency()
|
||||
}
|
||||
ctx.wg.Wait()
|
||||
if err := ctx.Error(); err != nil {
|
||||
@@ -24,67 +24,77 @@ var (
|
||||
"concurrent insert requests are executed")
|
||||
)
|
||||
|
||||
// Reader is a reader, which decreases the concurrency before every Read() call
|
||||
// and increases the concurrency after Read() call.
|
||||
// Reader is a reader, which increases the concurrency after the first Read() call
|
||||
//
|
||||
// It effectively limits the number of concurrent goroutines,
|
||||
// which may process results returned by concurrently processed Reader structs.
|
||||
//
|
||||
// The Reader must be obtained via GetReader() call.
|
||||
// The concurrency can be reduced by calling DecConcurrency().
|
||||
// Then the concurrency is increased after the next Read() call.
|
||||
type Reader struct {
|
||||
r io.Reader
|
||||
r io.Reader
|
||||
increasedConcurrency bool
|
||||
}
|
||||
|
||||
// GetReader returns the Reader for r.
|
||||
//
|
||||
// The PutReader() must be called when the returned Reader is no longer needed.
|
||||
func GetReader(r io.Reader) (*Reader, error) {
|
||||
if err := incConcurrency(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func GetReader(r io.Reader) *Reader {
|
||||
v := readerPool.Get()
|
||||
if v == nil {
|
||||
v = &Reader{}
|
||||
return &Reader{
|
||||
r: r,
|
||||
}
|
||||
}
|
||||
rr := v.(*Reader)
|
||||
rr.r = r
|
||||
|
||||
return rr, nil
|
||||
return rr
|
||||
}
|
||||
|
||||
// PutReader returns the r to the pool.
|
||||
//
|
||||
// It decreases the concurrency.
|
||||
// It decreases the concurrency if r has increased concurrency.
|
||||
func PutReader(r *Reader) {
|
||||
r.DecConcurrency()
|
||||
r.r = nil
|
||||
readerPool.Put(r)
|
||||
|
||||
decConcurrency()
|
||||
}
|
||||
|
||||
var readerPool sync.Pool
|
||||
|
||||
// Read implements io.Reader.
|
||||
//
|
||||
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
|
||||
func (r *Reader) Read(p []byte) (int, error) {
|
||||
decConcurrency()
|
||||
|
||||
n, err := r.r.Read(p)
|
||||
|
||||
if errC := incConcurrency(); errC != nil {
|
||||
return n, errC
|
||||
if !r.increasedConcurrency {
|
||||
if !incConcurrency() {
|
||||
err = &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
|
||||
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
|
||||
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
|
||||
maxQueueDuration.Seconds(), *maxConcurrentInserts),
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
r.increasedConcurrency = true
|
||||
}
|
||||
|
||||
if errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704
|
||||
err = fmt.Errorf("%w: while reading the request body. This might be caused by a timeout on the client side. "+
|
||||
"Possible solutions: to lower -insert.maxQueueDuration below the client’s timeout; to increase the client-side timeout; "+
|
||||
"to increase compute resources at the server; to increase -maxConcurrentInserts", err)
|
||||
"to scale up vmagent (e.g., adding more CPU resources); to increase -maxConcurrentInserts if CPU capacity allows", err)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
|
||||
func (r *Reader) DecConcurrency() {
|
||||
if r.increasedConcurrency {
|
||||
decConcurrency()
|
||||
r.increasedConcurrency = false
|
||||
}
|
||||
}
|
||||
|
||||
func initConcurrencyLimitCh() {
|
||||
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
|
||||
}
|
||||
@@ -94,12 +104,12 @@ var (
|
||||
concurrencyLimitChOnce sync.Once
|
||||
)
|
||||
|
||||
func incConcurrency() error {
|
||||
func incConcurrency() bool {
|
||||
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
|
||||
|
||||
select {
|
||||
case concurrencyLimitCh <- struct{}{}:
|
||||
return nil
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
||||
@@ -108,16 +118,10 @@ func incConcurrency() error {
|
||||
defer timerpool.Put(t)
|
||||
select {
|
||||
case concurrencyLimitCh <- struct{}{}:
|
||||
return nil
|
||||
return true
|
||||
case <-t.C:
|
||||
concurrencyLimitTimeout.Inc()
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
|
||||
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
|
||||
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
|
||||
maxQueueDuration.Seconds(), *maxConcurrentInserts),
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user