Compare commits

..

15 Commits

Author SHA1 Message Date
Max Kotliar
94218bd4dd app/vmauth: wrap readDurationTrackingBody if r.Body != nil 2026-01-15 18:07:36 +02:00
Max Kotliar
c7cc0a0332 app/vmauth: measure client request read single op durtsion 2026-01-15 17:39:21 +02:00
Yury Moladau
8657470068 app/vmui: bump package versions (#10291)
### Describe Your Changes

Updated project dependencies to the latest versions.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

Signed-off-by: Yury Molodov <yurymolodov@gmail.com>
2026-01-15 16:53:23 +02:00
Aliaksandr Valialkin
3f16bc7cb2 docs/victoriametrics/Articles.md: add https://www.keyvalue.systems/blog/kubernetes-observability-with-victoriametrics-loki-grafana/ 2026-01-15 13:53:26 +01:00
Aliaksandr Valialkin
655a0eb0c3 app/vmstorage/main.go: typo fix after the commit 7cbd2a8600: partition -> snapshot 2026-01-15 12:50:24 +01:00
Aliaksandr Valialkin
7cbd2a8600 app/vmstorage: delete just created snapshot if the client canceled the request for creating the snapshot
It is better to delete the snapshot, since the client is no longer interested in it.
This should prevent from creating many unused snapshots when clients cancel creating snapshots
because of timeouts. This is the real production case from one of VictoriaMetrics users:
the disk IO subsystem became very slow, so creating a snapshot took a lot of time, so vmbackup
was canceling creating the snapshot because of the timeout. But vmstorage was still continue
creating the snapshot. This resulted in the increasing number of created but unused snapshots.
2026-01-15 12:36:48 +01:00
Max Kotliar
5f67f04f6b app/vmauth: measure client cancelled requests
Without measuring this, we have a blind spot. Exposing it as a metric
improves visibility and should save time during future debugging
sessions.

Inspired by review commit
c9596a0364 (r173621968)
2026-01-15 12:13:35 +01:00
Nikolay
2056e5b46d lib/mergeset: do no cache inmemoryBlock with single item
indexDB mergeset has an edge for single item inmemoryBlock. It stores
such items blocks in-memory at blockheader firstItem. So there is no
need to perform on-disk read operations and storing copy of it at cache.

 It also may result in incorrect search results, inmemoryBlock with a
 single item has always zero index block offset. Which causes collisions
if it's cached with the next index block at part.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10239
Probably fixes
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10063
2026-01-15 12:12:08 +01:00
Hui Wang
4d1f262ec4 vmalert: add support for $isPartial variable in alerting rule annotation templating
fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4531
2026-01-15 12:10:07 +01:00
Vadim Rutkovsky
afca599a46 app/vmauth: Ffx typo in auth config warning message 2026-01-15 12:09:36 +01:00
Yury Moladau
d667f694bc app/vmui: fix tenant ID handling via URL path (#10287)
**Problem**

* VMUI had two tenant ID sources:

  * URL path: `/select/<accountID>/vmui/`
  * Query param: `tenantID`
* These could differ, causing confusion and inconsistent behavior.

**Solution**

* Removed the legacy `tenantID` query parameter.
* Use the URL path as the single source of truth for tenant ID.
* Changing the tenant in the UI now updates the URL path.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10232
2026-01-15 12:05:12 +01:00
Aliaksandr Valialkin
fe2c60c79b dashboards: follow-up for the commit 36460f6297
Use $__range duration instead of 1h duration for the 'Retention errors' stats panel
in the similar way it was done in the commit 36460f6297
for the 'Backup errors' stats panel.

While at it, run `make dashboards-sync` in order to sync the dashboards
in the dasbhoards/vm/ folder. See https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/README.md
for details.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10279
2026-01-14 23:30:08 +01:00
Stephan Burns
36460f6297 Make stats panel use the range specified in grafana (#10279)
### Describe Your Changes

The Backups errors panel uses a hard coded rate, when looking over a
large period of time this number would likely stay low do to the hard
coded rate when in reality the amount of errors is much larger.

This change addresses this by using the __rate variable in Grafana so
the rate will align with the date/time range in Grafana.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
2026-01-14 23:21:16 +01:00
Aliaksandr Valialkin
d107dee9c7 lib/writeconcurrencylimiter: remove Reader.DecConcurrency() method
Call decConcurrency() inside Reader.Read() before calling the Read() at the underlying reader.
This reduces chances of improper use of the writeconcurrencylimiter.Reader by callers.

While at it, move the creation of writeconcurrencylimiter.GetReader() to the top of stream parser functions
at lib/protoparser/* packages, and call incConcurrency() inside GetReader() call.
This reduces the frequency of decConcurrency() / incConcurrency() calls
for typical buffered reads when parsing the incoming data. This, in turn,
reduces the contention on the concurrencyLimitCh.
2026-01-14 22:55:17 +01:00
Max Kotliar
b33d7c3ef9 dashboards: remove timezone from vmagent dashboard
The bug introduced in
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10267 and breaks
helm charts customization, see discussion
415ff27c74 (r174600675)
2026-01-14 13:28:35 +02:00
41 changed files with 1187 additions and 1092 deletions

View File

@@ -80,14 +80,15 @@ func (as AlertState) String() string {
// AlertTplData is used to execute templating
type AlertTplData struct {
Type string
Labels map[string]string
Value float64
Expr string
AlertID uint64
GroupID uint64
ActiveAt time.Time
For time.Duration
Type string
Labels map[string]string
Value float64
Expr string
AlertID uint64
GroupID uint64
ActiveAt time.Time
For time.Duration
IsPartial bool
}
var tplHeaders = []string{
@@ -101,6 +102,7 @@ var tplHeaders = []string{
"{{ $groupID := .GroupID }}",
"{{ $activeAt := .ActiveAt }}",
"{{ $for := .For }}",
"{{ $isPartial := .IsPartial }}",
}
// ExecTemplate executes the Alert template for given

View File

@@ -346,6 +346,8 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
ls.processed[l.Name] = l.Value
}
// labels only support limited templating variables,
// including `labels`, `value` and `expr`, to avoid breaking alert states or causing cardinality issue with results
extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{
Labels: ls.origin,
Value: m.Values[0],
@@ -457,7 +459,8 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
}
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartialResponse(res))
isPartial := isPartialResponse(res)
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartial)
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
@@ -483,7 +486,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
at = a.ActiveAt
}
}
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls)
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls, isPartial)
if err != nil {
// only set error in current state, but do not break alert processing
curState.Err = err
@@ -601,16 +604,17 @@ func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.
return ls, nil
}
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet) (map[string]string, error) {
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet, isPartial bool) (map[string]string, error) {
tplData := notifier.AlertTplData{
Value: m.Values[0],
Type: ar.Type.String(),
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: activeAt,
For: ar.For,
Value: m.Values[0],
Type: ar.Type.String(),
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: activeAt,
For: ar.For,
IsPartial: isPartial,
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {

View File

@@ -1120,7 +1120,7 @@ func TestAlertingRuleLimit_Success(t *testing.T) {
}
func TestAlertingRule_Template(t *testing.T) {
f := func(rule *AlertingRule, metrics []datasource.Metric, alertsExpected map[uint64]*notifier.Alert) {
f := func(rule *AlertingRule, metrics []datasource.Metric, isResponsePartial bool, alertsExpected map[uint64]*notifier.Alert) {
t.Helper()
fakeGroup := Group{
@@ -1133,6 +1133,7 @@ func TestAlertingRule_Template(t *testing.T) {
entries: make([]StateEntry, 10),
}
fq.Add(metrics...)
fq.SetPartialResponse(isResponsePartial)
if _, err := rule.exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -1163,7 +1164,7 @@ func TestAlertingRule_Template(t *testing.T) {
}, []datasource.Metric{
metricWithValueAndLabels(t, 1, "instance", "foo"),
metricWithValueAndLabels(t, 1, "instance", "bar"),
}, map[uint64]*notifier.Alert{
}, false, map[uint64]*notifier.Alert{
hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): {
Annotations: map[string]string{
"summary": `common: Too high connection number for "foo"`,
@@ -1192,14 +1193,14 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`,
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}".{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
"description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`,
},
alerts: make(map[uint64]*notifier.Alert),
}, []datasource.Metric{
metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"),
metricWithValueAndLabels(t, 10, "__name__", "second", "instance", "bar", alertNameLabel, "override"),
}, map[uint64]*notifier.Alert{
}, false, map[uint64]*notifier.Alert{
hash(map[string]string{alertNameLabel: "override label", "exported_alertname": "override", "instance": "foo"}): {
Labels: map[string]string{
alertNameLabel: "override label",
@@ -1207,7 +1208,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "foo",
},
Annotations: map[string]string{
"summary": `first: Too high connection number for "foo"`,
"summary": `first: Too high connection number for "foo".`,
"description": `override: It is 2 connections for "foo"`,
},
},
@@ -1218,7 +1219,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "bar",
},
Annotations: map[string]string{
"summary": `second: Too high connection number for "bar"`,
"summary": `second: Too high connection number for "bar".`,
"description": `override: It is 10 connections for "bar"`,
},
},
@@ -1231,7 +1232,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`,
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}.{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
},
alerts: make(map[uint64]*notifier.Alert),
}, []datasource.Metric{
@@ -1239,7 +1240,7 @@ func TestAlertingRule_Template(t *testing.T) {
alertNameLabel, "originAlertname",
alertGroupNameLabel, "originGroupname",
"instance", "foo"),
}, map[uint64]*notifier.Alert{
}, true, map[uint64]*notifier.Alert{
hash(map[string]string{
alertNameLabel: "OriginLabels",
"exported_alertname": "originAlertname",
@@ -1255,7 +1256,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "foo",
},
Annotations: map[string]string{
"summary": `Alert "originAlertname(originGroupname)" for instance foo`,
"summary": `Alert "originAlertname(originGroupname)" for instance foo. WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.`,
},
},
})
@@ -1385,7 +1386,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"group": "vmalert",
"alertname": "ConfigurationReloadFailure",
"alertgroup": "vmalert",
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
}
expectedProcessedLabels := map[string]string{
@@ -1395,7 +1396,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"exported_alertname": "ConfigurationReloadFailure",
"group": "vmalert",
"alertgroup": "vmalert",
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
}
ls, err := ar.toLabels(metric, nil)

View File

@@ -394,7 +394,7 @@ func (bu *backendURL) runHealthCheck() {
if errors.Is(bu.healthCheckContext.Err(), context.Canceled) {
return
}
logger.Warnf("ignoring the backend at %s for %s becasue of dial error: %s", addr, *failTimeout, err)
logger.Warnf("ignoring the backend at %s for %s because of dial error: %s", addr, *failTimeout, err)
continue
}
@@ -809,7 +809,7 @@ func reloadAuthConfig() (bool, error) {
ok, err := reloadAuthConfigData(data)
if err != nil {
return false, fmt.Errorf("failed to pars -auth.config=%q: %w", *authConfigPath, err)
return false, fmt.Errorf("failed to parse -auth.config=%q: %w", *authConfigPath, err)
}
if !ok {
return false, nil

View File

@@ -156,6 +156,10 @@ func requestHandlerWithInternalRoutes(w http.ResponseWriter, r *http.Request) bo
}
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.Body != nil {
r.Body = &readDurationTrackingBody{r: r.Body}
}
ats := getAuthTokensFromRequest(r)
if len(ats) == 0 {
// Process requests for unauthorized users
@@ -349,14 +353,37 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
err = ctxErr
}
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Do not retry canceled or timed out requests
if errors.Is(err, errReadTimeout) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
if errors.Is(err, context.DeadlineExceeded) {
// Timed out request must be counted as errors, since this usually means that the backend is slow.
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
logger.Warnf("remoteAddr: %s; requestURI: %s; client %s request exceeded single read timeout -readTimeout=%s, closing connection", remoteAddr, requestURI, ui.name(), *readTimeout)
rejectSlowClientRequests.Inc()
if w1, ok := w.(http.Hijacker); ok {
conn, _, connErr := w1.Hijack()
if connErr != nil {
logger.Errorf("cannot hijack connection for slow read timeout handling for %s: %s", targetURL, connErr)
return true, false
}
_ = conn.Close()
return true, false
}
return true, false
}
// Do not retry canceled
if errors.Is(err, context.Canceled) {
clientCanceledRequests.Inc()
return true, false
}
// Do not retry timed out requests
if errors.Is(err, context.DeadlineExceeded) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
// Timed out request must be counted as errors, since this usually means that the backend is slow.
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
return false, false
}
if !rtbOK || !rtb.canRetry() {
@@ -413,7 +440,10 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
err = copyStreamToClient(w, res.Body)
_ = res.Body.Close()
if err != nil && !netutil.IsTrivialNetworkError(err) && !errors.Is(err, context.Canceled) {
if errors.Is(err, context.Canceled) {
clientCanceledRequests.Inc()
return true, false
} else if err != nil && !netutil.IsTrivialNetworkError(err) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
@@ -546,6 +576,8 @@ var (
configReloadRequests = metrics.NewCounter(`vmauth_http_requests_total{path="/-/reload"}`)
invalidAuthTokenRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="invalid_auth_token"}`)
missingRouteRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="missing_route"}`)
clientCanceledRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="client_canceled"}`)
rejectSlowClientRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="reject_slow_client"}`)
)
func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, insecureSkipVerifyP *bool) (http.RoundTripper, error) {
@@ -633,6 +665,7 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err
if errors.Is(ctx.Err(), context.Canceled) {
// Do not return any response for the request canceled by the client,
// since the connection to the client is already closed.
clientCanceledRequests.Inc()
return
}
@@ -771,3 +804,34 @@ func debugInfo(u *url.URL, r *http.Request) string {
fmt.Fprint(s, ")")
return s.String()
}
var slowReadDuration = metrics.NewSummary(`vmauth_request_slow_read_duration_seconds`)
var readTimeout = flag.Duration("readTimeout", 0, "The maximum duration for a single read call when exceeded the connection is closed. Zero disables request read timeout. "+
"See also -writeTimeout")
var errReadTimeout = fmt.Errorf("request read timeout")
type readDurationTrackingBody struct {
r io.ReadCloser
}
func (r *readDurationTrackingBody) Read(p []byte) (n int, err error) {
start := time.Now()
n, err = r.r.Read(p)
dur := time.Since(start)
// Record slow read durations only to avoid overhead for fast reads.
if dur > time.Millisecond {
slowReadDuration.Update(dur.Seconds())
}
if err == nil && *readTimeout > 0 && dur > *readTimeout {
return n, errReadTimeout
}
return n, err
}
func (r *readDurationTrackingBody) Close() error {
return r.r.Close()
}

View File

@@ -389,11 +389,23 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case "/create":
snapshotsCreateTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotPath := Storage.MustCreateSnapshot()
snapshotName := Storage.MustCreateSnapshot()
// Verify whether the client already closed the connection.
// In this case it is better to drop the created snapshot, since the client isn't interested in it.
if err := r.Context().Err(); err != nil {
logger.Infof("deleting already created snapshot at %s because the client canceled the request", snapshotName)
if err := deleteSnapshot(snapshotName); err != nil {
logger.Infof("cannot delete just created snapshot: %s", err)
return true
}
return true
}
if prometheusCompatibleResponse {
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotPath))
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotName))
} else {
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotPath))
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotName))
}
return true
case "/list":
@@ -413,23 +425,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
snapshotsDeleteTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := r.FormValue("snapshot")
snapshots := Storage.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := Storage.DeleteSnapshot(snName); err != nil {
err = fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
}
fmt.Fprintf(w, `{"status":"ok"}`)
return true
}
if err := deleteSnapshot(snapshotName); err != nil {
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
}
err := fmt.Errorf("cannot find snapshot %q", snapshotName)
jsonResponseError(w, err)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/delete_all":
snapshotsDeleteAllTotal.Inc()
@@ -450,6 +451,19 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
}
func deleteSnapshot(snapshotName string) error {
snapshots := Storage.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := Storage.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}
func initStaleSnapshotsRemover(strg *storage.Storage) {
staleSnapshotsRemoverCh = make(chan struct{})
if snapshotsMaxAge.Duration() <= 0 {

File diff suppressed because it is too large Load Diff

View File

@@ -23,43 +23,43 @@
},
"dependencies": {
"classnames": "^2.5.1",
"dayjs": "^1.11.13",
"dayjs": "^1.11.19",
"lodash.debounce": "^4.0.8",
"marked": "^16.0.0",
"preact": "^10.26.9",
"qs": "^6.14.0",
"marked": "^17.0.1",
"preact": "^10.28.2",
"qs": "^6.14.1",
"react-input-mask": "^2.0.4",
"react-router-dom": "^7.6.3",
"react-router-dom": "^7.12.0",
"uplot": "^1.6.32",
"vite": "^7.1.11",
"web-vitals": "^5.0.3"
"vite": "^7.3.1",
"web-vitals": "^5.1.0"
},
"devDependencies": {
"@eslint/eslintrc": "^3.3.1",
"@eslint/js": "^9.30.1",
"@eslint/eslintrc": "^3.3.3",
"@eslint/js": "^9.39.2",
"@preact/preset-vite": "^2.10.2",
"@testing-library/jest-dom": "^6.6.3",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/preact": "^3.2.4",
"@types/lodash.debounce": "^4.0.9",
"@types/node": "^24.0.12",
"@types/node": "^25.0.8",
"@types/qs": "^6.14.0",
"@types/react": "^19.1.8",
"@types/react": "^19.2.8",
"@types/react-input-mask": "^3.0.6",
"@types/react-router-dom": "^5.3.3",
"@typescript-eslint/eslint-plugin": "^8.36.0",
"@typescript-eslint/parser": "^8.36.0",
"cross-env": "^7.0.3",
"eslint": "^9.30.1",
"@typescript-eslint/eslint-plugin": "^8.53.0",
"@typescript-eslint/parser": "^8.53.0",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-unused-imports": "^4.1.4",
"globals": "^16.3.0",
"eslint-plugin-unused-imports": "^4.3.0",
"globals": "^17.0.0",
"http-proxy-middleware": "^3.0.5",
"jsdom": "^26.1.0",
"jsdom": "^27.4.0",
"postcss": "^8.5.6",
"rollup-plugin-visualizer": "^6.0.3",
"sass-embedded": "^1.89.2",
"typescript": "^5.8.3",
"vitest": "^3.2.4"
"rollup-plugin-visualizer": "^6.0.5",
"sass-embedded": "^1.97.2",
"typescript": "^5.9.3",
"vitest": "^4.0.17"
},
"browserslist": {
"production": [

View File

@@ -9,7 +9,6 @@ import { getFromStorage, removeFromStorage, saveToStorage } from "../../../../ut
import useBoolean from "../../../../hooks/useBoolean";
import { ChildComponentHandle } from "../GlobalSettings";
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
import { getTenantIdFromUrl } from "../../../../utils/tenants";
interface ServerConfiguratorProps {
onClose: () => void;
@@ -39,10 +38,6 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
};
const handleApply = useCallback(() => {
const tenantIdFromUrl = getTenantIdFromUrl(serverUrl);
if (tenantIdFromUrl !== "") {
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
}
dispatch({ type: "SET_SERVER", payload: serverUrl });
onClose();
}, [serverUrl]);
@@ -60,12 +55,6 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
}
}, [enabledStorage]);
useEffect(() => {
if (enabledStorage) {
saveToStorage("SERVER_URL", serverUrl);
}
}, [serverUrl]);
useEffect(() => {
// the tenant selector can change the serverUrl
if (stateServerUrl === serverUrl) return;

View File

@@ -1,4 +1,4 @@
import { FC, useState, useRef, useEffect, useMemo } from "preact/compat";
import { FC, useState, useRef, useMemo } from "preact/compat";
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
import { useTimeDispatch } from "../../../../state/time/TimeStateContext";
import { ArrowDownIcon, StorageIcon } from "../../../Main/Icons";
@@ -10,14 +10,14 @@ import { getAppModeEnable } from "../../../../utils/app-mode";
import Tooltip from "../../../Main/Tooltip/Tooltip";
import useDeviceDetect from "../../../../hooks/useDeviceDetect";
import TextField from "../../../Main/TextField/TextField";
import { getTenantIdFromUrl, replaceTenantId } from "../../../../utils/tenants";
import { replaceTenantId } from "../../../../utils/tenants";
import useBoolean from "../../../../hooks/useBoolean";
const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
const appModeEnable = getAppModeEnable();
const { isMobile } = useDeviceDetect();
const { tenantId: tenantIdState, serverUrl } = useAppState();
const { tenantId, serverUrl } = useAppState();
const dispatch = useAppDispatch();
const timeDispatch = useTimeDispatch();
@@ -48,10 +48,8 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
}, [accountIds]);
const createHandlerChange = (value: string) => () => {
const tenant = value;
dispatch({ type: "SET_TENANT_ID", payload: tenant });
if (serverUrl) {
const updateServerUrl = replaceTenantId(serverUrl, tenant);
const updateServerUrl = replaceTenantId(serverUrl, value);
if (updateServerUrl === serverUrl) return;
dispatch({ type: "SET_SERVER", payload: updateServerUrl });
timeDispatch({ type: "RUN_QUERY" });
@@ -59,16 +57,6 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
handleCloseOptions();
};
useEffect(() => {
const id = getTenantIdFromUrl(serverUrl);
if (tenantIdState && tenantIdState !== id) {
createHandlerChange(tenantIdState)();
} else {
createHandlerChange(id)();
}
}, [serverUrl]);
if (!showTenantSelector) return null;
return (
@@ -83,7 +71,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
<span className="vm-mobile-option__icon"><StorageIcon/></span>
<div className="vm-mobile-option-text">
<span className="vm-mobile-option-text__label">Tenant ID</span>
<span className="vm-mobile-option-text__value">{tenantIdState}</span>
<span className="vm-mobile-option-text__value">{tenantId}</span>
</div>
<span className="vm-mobile-option__arrow"><ArrowDownIcon/></span>
</div>
@@ -106,7 +94,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
)}
onClick={toggleOpenOptions}
>
{tenantIdState}
{tenantId}
</Button>
)}
</div>
@@ -138,7 +126,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
className={classNames({
"vm-list-item": true,
"vm-list-item_mobile": isMobile,
"vm-list-item_active": id === tenantIdState
"vm-list-item_active": id === tenantId
})}
key={id}
onClick={createHandlerChange(id)}

View File

@@ -3,19 +3,18 @@ import { useEffect, useMemo, useState } from "preact/compat";
import { ErrorTypes } from "../../../../../types";
import { getAccountIds } from "../../../../../api/accountId";
import { getAppModeEnable, getAppModeParams } from "../../../../../utils/app-mode";
import { getTenantIdFromUrl } from "../../../../../utils/tenants";
export const useFetchAccountIds = () => {
const { useTenantID } = getAppModeParams();
const appModeEnable = getAppModeEnable();
const { serverUrl } = useAppState();
const { tenantId, serverUrl } = useAppState();
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<ErrorTypes | string>();
const [accountIds, setAccountIds] = useState<string[]>([]);
const fetchUrl = useMemo(() => getAccountIds(serverUrl), [serverUrl]);
const isServerUrlWithTenant = useMemo(() => !!getTenantIdFromUrl(serverUrl), [serverUrl]);
const isServerUrlWithTenant = useMemo(() => !!tenantId, [tenantId]);
const preventFetch = appModeEnable ? !useTenantID : !isServerUrlWithTenant;
useEffect(() => {

View File

@@ -1,4 +1,5 @@
@use "src/styles/variables" as *;
@use 'sass:meta';
$button-radius: 6px;
@@ -42,6 +43,8 @@ $button-radius: 6px;
svg {
width: 14px;
min-width: 14px;
max-width: 14px;
}
}
@@ -51,6 +54,8 @@ $button-radius: 6px;
svg {
width: 16px;
min-width: 16px;
max-width: 16px;
}
}
@@ -60,6 +65,8 @@ $button-radius: 6px;
svg {
width: 18px;
min-width: 18px;
max-width: 18px;
line-height: 16px;
}
}
@@ -128,8 +135,14 @@ $button-radius: 6px;
);
@each $name, $color in $button-colors {
@include contained-button($name, $color, if($name == white, $color-black, $color-white));
@include outlined-button($name, $color, if($name == white, $color-white, $color));
@include text-button($name, if($name == white, $color-white, $color));
@if $name == white {
@include contained-button($name, $color, $color-black);
@include outlined-button($name, $color, $color-white);
@include text-button($name, $color-white);
} @else {
@include contained-button($name, $color, $color-white);
@include outlined-button($name, $color, $color);
@include text-button($name, $color);
}
}
}

View File

@@ -7,7 +7,6 @@ import AppConfigurator from "../appConfigurator";
import { useSearchParams } from "react-router-dom";
import dayjs from "dayjs";
import { DATE_FORMAT } from "../../../constants/date";
import { getTenantIdFromUrl } from "../../../utils/tenants";
import usePrevious from "../../../hooks/usePrevious";
export const useFetchQuery = (): {
@@ -27,7 +26,7 @@ export const useFetchQuery = (): {
const prevDate = usePrevious(date);
const prevTotal = useRef<{ data: TSDBStatus }>();
const { serverUrl } = useAppState();
const { tenantId, serverUrl } = useAppState();
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<ErrorTypes | string>();
const [tsdbStatus, setTSDBStatus] = useState<TSDBStatus>(appConfigurator.defaultTSDBStatus);
@@ -158,9 +157,8 @@ export const useFetchQuery = (): {
}, [error]);
useEffect(() => {
const id = getTenantIdFromUrl(serverUrl);
setIsCluster(!!id);
}, [serverUrl]);
setIsCluster(!!tenantId);
}, [tenantId]);
appConfigurator.tsdbStatusData = tsdbStatus;

View File

@@ -1,7 +1,6 @@
import { useEffect, useState } from "react";
import { useTimeDispatch, useTimeState } from "../../../state/time/TimeStateContext";
import { useCustomPanelDispatch, useCustomPanelState } from "../../../state/customPanel/CustomPanelStateContext";
import { useAppDispatch, useAppState } from "../../../state/common/StateContext";
import { useQueryDispatch, useQueryState } from "../../../state/query/QueryStateContext";
import { displayTypeTabs } from "../DisplayTypeSwitch";
import { useGraphDispatch, useGraphState } from "../../../state/graph/GraphStateContext";
@@ -15,14 +14,12 @@ import { arrayEquals } from "../../../utils/array";
import { isEqualURLSearchParams } from "../../../utils/url";
export const useSetQueryParams = () => {
const { tenantId } = useAppState();
const { displayType } = useCustomPanelState();
const { query } = useQueryState();
const { duration, relativeTime, period: { date, step } } = useTimeState();
const { customStep } = useGraphState();
const [searchParams, setSearchParams] = useSearchParams();
const dispatch = useAppDispatch();
const timeDispatch = useTimeDispatch();
const graphDispatch = useGraphDispatch();
const queryDispatch = useQueryDispatch();
@@ -72,10 +69,6 @@ export const useSetQueryParams = () => {
if (searchParams.get(`${group}.tab`) !== displayTypeCode) {
newSearchParams.set(`${group}.tab`, `${displayTypeCode}`);
}
if (searchParams.get(`${group}.tenantID`) !== tenantId && tenantId) {
newSearchParams.set(`${group}.tenantID`, tenantId);
}
});
// Remove extra parameters that exceed the request size
@@ -89,7 +82,7 @@ export const useSetQueryParams = () => {
if (isEqualURLSearchParams(newSearchParams, searchParams) || !newSearchParams.size) return;
setSearchParams(newSearchParams);
}, [tenantId, displayType, query, duration, relativeTime, date, step, customStep]);
}, [displayType, query, duration, relativeTime, date, step, customStep]);
useEffect(() => {
const timer = setTimeout(setterSearchParams, 200);
@@ -114,11 +107,6 @@ export const useSetQueryParams = () => {
customPanelDispatch({ type: "SET_DISPLAY_TYPE", payload: displayTypeFromUrl });
}
const tenantIdFromUrl = searchParams.get("g0.tenantID") || "";
if (tenantIdFromUrl !== tenantId) {
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
}
const queryFromUrl = getQueryArray();
if (!arrayEquals(queryFromUrl, query)) {
queryDispatch({ type: "SET_QUERY", payload: queryFromUrl });

View File

@@ -1,7 +1,8 @@
import { createContext, FC, useContext, useMemo, useReducer } from "preact/compat";
import { createContext, FC, useContext, useEffect, useMemo, useReducer } from "preact/compat";
import { Action, AppState, initialState, reducer } from "./reducer";
import { getQueryStringValue } from "../../utils/query-string";
import { Dispatch } from "react";
import { getFromStorage, removeFromStorage, saveToStorage } from "../../utils/storage";
type StateContextType = { state: AppState, dispatch: Dispatch<Action> };
@@ -23,6 +24,17 @@ export const AppStateProvider: FC = ({ children }) => {
return { state, dispatch };
}, [state, dispatch]);
useEffect(() => {
if (!state.serverUrl) return;
const enabledStorage = !!getFromStorage("SERVER_URL");
if (enabledStorage) {
saveToStorage("SERVER_URL", state.serverUrl);
} else {
removeFromStorage(["SERVER_URL"]);
}
}, [state.serverUrl]);
return <StateContext.Provider value={contextValue}>
{children}
</StateContext.Provider>;

View File

@@ -1,9 +1,9 @@
import { getDefaultServer } from "../../utils/default-server-url";
import { getQueryStringValue } from "../../utils/query-string";
import { getFromStorage, saveToStorage } from "../../utils/storage";
import { AppConfig, Theme } from "../../types";
import { isDarkTheme } from "../../utils/theme";
import { removeTrailingSlash } from "../../utils/url";
import { getTenantIdFromUrl } from "../../utils/tenants";
export interface AppState {
serverUrl: string;
@@ -16,15 +16,14 @@ export interface AppState {
export type Action =
| { type: "SET_SERVER", payload: string }
| { type: "SET_THEME", payload: Theme }
| { type: "SET_TENANT_ID", payload: string }
| { type: "SET_APP_CONFIG", payload: AppConfig }
| { type: "SET_DARK_THEME" }
const tenantId = getQueryStringValue("g0.tenantID", "") as string;
const serverUrl = removeTrailingSlash(getDefaultServer());
export const initialState: AppState = {
serverUrl: removeTrailingSlash(getDefaultServer(tenantId)),
tenantId,
serverUrl,
tenantId: getTenantIdFromUrl(serverUrl),
theme: (getFromStorage("THEME") || Theme.system) as Theme,
isDarkTheme: null,
appConfig: {}
@@ -35,13 +34,9 @@ export function reducer(state: AppState, action: Action): AppState {
case "SET_SERVER":
return {
...state,
tenantId: getTenantIdFromUrl(action.payload),
serverUrl: removeTrailingSlash(action.payload)
};
case "SET_TENANT_ID":
return {
...state,
tenantId: action.payload
};
case "SET_THEME":
saveToStorage("THEME", action.payload);
return {

View File

@@ -1,5 +1,4 @@
import { getAppModeParams } from "./app-mode";
import { replaceTenantId } from "./tenants";
import { APP_TYPE, AppType } from "../constants/appType";
import { getFromStorage } from "./storage";
@@ -7,7 +6,7 @@ export const getDefaultURL = (u: string) => {
return u.replace(/(\/(?:prometheus\/)?(?:graph|vmui)\/.*|\/#\/.*)/, "/prometheus");
};
export const getDefaultServer = (tenantId?: string): string => {
export const getDefaultServer = (): string => {
const { serverURL } = getAppModeParams();
const storageURL = getFromStorage("SERVER_URL") as string;
const anomalyURL = `${window.location.origin}${window.location.pathname.replace(/^\/vmui/, "")}`;
@@ -18,6 +17,6 @@ export const getDefaultServer = (tenantId?: string): string => {
case AppType.vmanomaly:
return storageURL || anomalyURL;
default:
return tenantId ? replaceTenantId(url, tenantId) : url;
return url;
}
};

View File

@@ -0,0 +1,89 @@
import { describe, it, expect } from "vitest";
import {
replaceTenantId,
getTenantIdFromUrl,
getUrlWithoutTenant,
} from "./tenants";
describe("tenant url helpers", () => {
describe("getTenantIdFromUrl", () => {
it("returns accountID", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/0/vmui/")).toBe("0");
});
it("returns accountID:projectID", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/12:7/vmui/")).toBe("12:7");
});
it("returns empty string if tenant is missing", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/vmui/")).toBe("");
});
it("returns empty string for unrelated paths", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/foo/bar")).toBe("");
});
it("returns accountID when url ends right after tenant", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/0")).toBe("0");
});
});
describe("replaceTenantId", () => {
it("replaces accountID with another accountID", () => {
expect(
replaceTenantId("http://vmselect:8481/select/0/vmui/", "2")
).toBe("http://vmselect:8481/select/2/vmui/");
});
it("replaces accountID with accountID:projectID", () => {
expect(
replaceTenantId("http://vmselect:8481/select/0/prometheus/", "1:9")
).toBe("http://vmselect:8481/select/1:9/prometheus/");
});
it("keeps the rest of the path intact", () => {
expect(
replaceTenantId("http://vmselect:8481/select/3:4/prometheus/api/v1/query", "7")
).toBe("http://vmselect:8481/select/7/prometheus/api/v1/query");
});
it("does not change url if it doesn't match expected pattern", () => {
expect(
replaceTenantId("http://vmselect:8481/foo/bar", "2")
).toBe("http://vmselect:8481/foo/bar");
});
});
describe("getUrlWithoutTenant", () => {
it("removes /select/<tenant>/... and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/0/vmui/")
).toBe("http://vmselect:8481");
});
it("removes /select/<tenant>/... for accountID:projectID and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/5:6/prometheus/")
).toBe("http://vmselect:8481");
});
it("works with deep paths and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/1:2/prometheus/api/v1/query")
).toBe("http://vmselect:8481");
});
it("does not change url if it doesn't match expected pattern", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/foo/bar")
).toBe("http://vmselect:8481/foo/bar");
});
it("removes url ending right after tenant", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/0")
).toBe("http://vmselect:8481");
});
});
});

View File

@@ -1,13 +1,21 @@
const regexp = /(\/select\/)([^/])(\/)(.+)/;
const TENANT_REGEXP = /(\/select\/)(\d+(?::\d+)?)(\/.*)?$/;
export const replaceTenantId = (serverUrl: string, tenantId: string) => {
return serverUrl.replace(regexp, `$1${tenantId}/$4`);
return serverUrl.replace(TENANT_REGEXP, `$1${tenantId}$3`);
};
export const getTenantIdFromUrl = (url: string): string => {
return url.match(regexp)?.[2] || "";
return url.match(TENANT_REGEXP)?.[2] ?? "";
};
export const getUrlWithoutTenant = (url: string): string => {
return url.replace(regexp, "");
return url.replace(TENANT_REGEXP, "");
};
export const updateBrowserUrlTenant = (tenantId: string) => {
const base = `${window.location.origin}${window.location.pathname}${window.location.search}`;
const nextBase = replaceTenantId(base, tenantId);
const nextUrl = `${nextBase}${window.location.hash}`;
window.history.replaceState(null, "", nextUrl);
};

View File

@@ -1,7 +1,7 @@
{
"compilerOptions": {
"target": "ESNext",
"types": ["vite/client", "vitest/globals"],
"types": ["vite/client", "vitest/globals", "node"],
"lib": [
"dom",
"dom.iterable",

View File

@@ -452,7 +452,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"
@@ -605,7 +605,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"

View File

@@ -11458,4 +11458,4 @@
"title": "VictoriaMetrics - cluster",
"uid": "oS7Bi_0Wz",
"version": 1
}
}

View File

@@ -453,7 +453,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"
@@ -606,7 +606,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"

View File

@@ -11459,4 +11459,4 @@
"title": "VictoriaMetrics - cluster (VM)",
"uid": "oS7Bi_0Wz_vm",
"version": 1
}
}

View File

@@ -106,6 +106,7 @@ See also [case studies](https://docs.victoriametrics.com/victoriametrics/casestu
* [Why I Switched to VictoriaMetrics: Scaling from Small Business to Enterprise](https://blackmetalz.github.io/why-i-switched-to-victoriametrics-scaling-from-small-business-to-enterprise.html)
* [Backing up VictoriaMetrics Data: A Complete Guide](https://medium.com/@kanakaraju896/backing-up-victoriametrics-data-a-complete-guide-24473c74450f)
* [Unlocking the Power of VictoriaMetrics: A Prometheus Alternative](https://developer-friendly.blog/blog/2024/06/17/unlocking-the-power-of-victoriametrics-a-prometheus-alternative/)
* [How to Master Kubernetes Observability: Multi-Cluster Monitoring with VictoriaMetrics, Loki, and Grafana](https://www.keyvalue.systems/blog/kubernetes-observability-with-victoriametrics-loki-grafana/)
## Third-party articles and slides about VictoriaLogs

View File

@@ -31,11 +31,15 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): expose `vm_rollup_result_cache_requests_total` which tracks the number of requests to the query rollup cache. See [#10117](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10117).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `localStorage` availability checks with error reporting. See [#10085](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10085).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `VMUI:`-prefixed `localStorage` keys and legacy key migration.
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add a metric `vmauth_http_request_errors_total{reason="client_canceled"}` to measure client cancelled requests. This should help with debugging vmauth issues. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): do not retry client canceled requests. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add explicit month duration unit (`M`) for `-retentionPeriod` flag. This allows users to specify retention periods in months more explicitly. See [#10181](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10181).
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Persistent queue Full ETA` panel to the `Drilldown` section. This panel helps estimate how much time remains until `vmagent` starts dropping incoming metrics. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add support for `$isPartial` variable in alerting rule annotation [templating](https://docs.victoriametrics.com/victoriametrics/vmalert/#templating). This allows users to include an additional warning message in alerts triggered by partial query responses. See [#4531](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4531).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix configuration reloading for `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` when vmagent is launched with empty files. Previously, if vmagent started with an empty config, subsequent config reloads were ignored. See [#10211](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10211).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214)
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214).
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): remove legacy `tenantID` query param and use the URL path as the single source of truth for multitenancy. See [#10232](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10232).
## [v1.133.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.133.0)
@@ -1089,4 +1093,4 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
## Previous releases
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).

View File

@@ -286,7 +286,7 @@ expr: <string>
# In case of conflicts, original labels are kept with prefix `exported_`.
#
# Labels only support limited templating variables in https://docs.victoriametrics.com/victoriametrics/vmalert/#templating,
# including `$labels`, `$value` and `expr`, to avoid breaking alert states or causing cardinality issue with results.
# including `$labels`, `$value` and `$expr`, to avoid breaking alert states or causing cardinality issue with results.
# Note: be careful set dynamic label values like `$value`, because each time the $value changes - the new alert will be
# generated which also break `for` condition.
labels:
@@ -316,6 +316,7 @@ The following variables are available in templating:
| $for or .For | Alert's configured for param. | Number of connections is too high for more than {{ .For }} |
| $externalLabels or .ExternalLabels | List of labels configured via `-external.label` command-line flag. | Issues with {{ $labels.instance }} (datacenter-{{ $externalLabels.dc }}) |
| $externalURL or .ExternalURL | URL configured via `-external.url` command-line flag. Used for cases when vmalert is hidden behind proxy. | Visit {{ $externalURL }} for more details |
| $isPartial or .IsPartial | Indicates whether the latest rule query response from the datasource(that supports returning `isPartial` option, such as vmcluster) could be partial. | {{ if $isPartial }}WARNING: The latest alert state may be a false alarm due to a partial response from the datasource.{{ end }}
Additionally, `vmalert` provides some extra templating functions listed in [template functions](#template-functions) and [reusable templates](#reusable-templates).

View File

@@ -336,6 +336,19 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
}
func (ib *inmemoryBlock) unmarshalSingleItem(commonPrefix, firstItem []byte, mt marshalType) {
if mt != marshalTypePlain {
logger.Panicf("BUG: single item block must be always encoded with TypePlain")
}
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
ib.items = slicesutil.SetLength(ib.items, 1)
ib.data = bytesutil.ResizeNoCopyNoOverallocate(ib.data, len(firstItem))
ib.data = append(ib.data[:0], firstItem...)
item := &ib.items[0]
item.Start = 0
item.End = uint32(len(ib.data))
}
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
ib.Reset()

View File

@@ -331,6 +331,14 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
if ps.sparse {
cache = ibSparseCache
}
if bh.itemsCount == 1 {
// special case for single item
// there is no need to cache it, since firstItem is always stored in-memory
ib := ps.tmpIB
ib.Reset()
ib.unmarshalSingleItem(bh.commonPrefix, bh.firstItem, bh.marshalType)
return ib, nil
}
ibKey := blockcache.Key{
Part: ps.p,
Offset: bh.itemsBlockOffset,

View File

@@ -163,3 +163,80 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
return p, items, nil
}
func TestGetInmemoryBlockWithZeroSizeBlock(t *testing.T) {
var ph partHeader
var ip inmemoryPart
var bsw blockStreamWriter
bsw.MustInitFromInmemoryPart(&ip, -3)
buildBlock := func(items ...string) inmemoryBlock {
var ib inmemoryBlock
for _, item := range items {
if !ib.Add([]byte(item)) {
t.Fatalf("cannot add item %q", item)
}
}
ib.SortItems()
return ib
}
writeBlock := func(ib inmemoryBlock) {
if len(ib.items) == 0 {
t.Fatalf("block must contain items")
}
data := ib.data
ph.itemsCount += uint64(len(ib.items))
if ph.blocksCount == 0 {
ph.firstItem = append(ph.firstItem[:0], ib.items[0].Bytes(data)...)
}
ph.lastItem = append(ph.lastItem[:0], ib.items[len(ib.items)-1].Bytes(data)...)
ph.blocksCount++
bsw.WriteBlock(&ib)
}
writeBlock(buildBlock("a"))
writeBlock(buildBlock("b0", "b1"))
bsw.MustClose()
p := newPart(&ph, "test", ip.size(), ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
defer p.MustClose()
var ps partSearch
ps.Init(p, false)
ps.mrs = p.mrs
if err := ps.nextBHS(); err != nil {
t.Fatalf("cannot read block headers: %s", err)
}
if len(ps.bhs) != 2 {
t.Fatalf("unexpected block headers count: %d", len(ps.bhs))
}
if ps.bhs[0].itemsBlockOffset != ps.bhs[1].itemsBlockOffset {
t.Fatalf("blocks must share itemsBlockOffset for the test: %d vs %d", ps.bhs[0].itemsBlockOffset, ps.bhs[1].itemsBlockOffset)
}
if ps.bhs[0].itemsBlockSize != 0 {
t.Fatalf("the first block must have zero itemsBlockSize; got %d", ps.bhs[0].itemsBlockSize)
}
// iterate 4 times in order to place block into the cache
// storage caches it after 2 missed requests according to the flag blockcache.missesBeforeCaching=2
for i := range 4 {
if _, err := ps.getInmemoryBlock(&ps.bhs[1]); err != nil {
t.Fatalf("cannot load non-empty block at iteration %d: %s", i, err)
}
}
assertBlockAt := func(bhIdx int, wantFirstItemValue string) {
block, err := ps.getInmemoryBlock(&ps.bhs[bhIdx])
if err != nil {
t.Fatalf("cannot block=%d : %s", bhIdx, err)
}
if len(block.items) != int(ps.bhs[bhIdx].itemsCount) {
t.Fatalf("unexpected items count in block=%d; got %d; want %d", bhIdx, len(block.items), ps.bhs[bhIdx].itemsCount)
}
if got := string(block.items[0].Bytes(block.data)); got != wantFirstItemValue {
t.Fatalf("unexpected item in block=%d; got %q; want %q", bhIdx, got, wantFirstItemValue)
}
}
assertBlockAt(0, "a")
assertBlockAt(1, "b0")
}

View File

@@ -34,17 +34,19 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
return fmt.Errorf("cannot parse the provided csv format: %w", err)
}
wcr, err := writeconcurrencylimiter.GetReader(req.Body)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
encoding := req.Header.Get("Content-Encoding")
reader, err := protoparserutil.GetUncompressedReader(req.Body, encoding)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
if err != nil {
return fmt.Errorf("cannot decode csv data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -55,7 +57,6 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -27,16 +27,18 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
if err != nil {
return fmt.Errorf("cannot decode graphite data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
@@ -47,7 +49,6 @@ func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) erro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -61,16 +61,18 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string
}
func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string, callback func(db string, rows []influx.Row) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
if err != nil {
return fmt.Errorf("cannot decode influx line protocol data: %w; see https://docs.victoriametrics.com/victoriametrics/integrations/influxdb/", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -82,7 +84,6 @@ func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -20,16 +20,18 @@ import (
//
// callback shouldn't hold block after returning.
func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, contentEncoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
br := getBufferedReader(reader)
defer putBufferedReader(br)
@@ -104,7 +106,6 @@ func Parse(r io.Reader, contentEncoding string, callback func(block *Block) erro
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
}

View File

@@ -27,11 +27,13 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getStreamContext(r)
ctx := getStreamContext(wcr)
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
@@ -40,7 +42,6 @@ func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -23,20 +23,23 @@ import (
// limitConcurrency defines whether to control the number of concurrent calls to this function.
// It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set,
// like /api/v1/write calls.
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool, callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool,
callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
if limitConcurrency {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
}
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode Prometheus text exposition data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
var wcr *writeconcurrencylimiter.Reader
if limitConcurrency {
wcr = writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
}
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -49,9 +52,6 @@ func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrenc
uw.enableMetadata = enableMetadata
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
if wcr != nil {
wcr.DecConcurrency()
}
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -23,11 +23,13 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
//
// callback shouldn't hold tss after returning.
func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getPushCtx(r)
ctx := getPushCtx(wcr)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
@@ -38,7 +40,6 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
bb := bodyBufferPool.Get()
defer bodyBufferPool.Put(bb)
var err error
if isVMRemoteWrite {
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
if err != nil {

View File

@@ -31,7 +31,10 @@ const maxSnappyBlockSize = 56_000_000
//
// The callback must not hold references to the data after returning.
func ReadUncompressedData(r io.Reader, contentType string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
if contentType == "zstd" {

View File

@@ -23,16 +23,18 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 10*1024*1024, "The maxim
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -42,7 +44,6 @@ func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) erro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -26,16 +26,18 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Row) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
if err != nil {
return fmt.Errorf("cannot decode zabbixconnector data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -45,7 +47,6 @@ func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Ro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -24,77 +24,67 @@ var (
"concurrent insert requests are executed")
)
// Reader is a reader, which increases the concurrency after the first Read() call
// Reader is a reader, which decreases the concurrency before every Read() call
// and increases the concurrency after Read() call.
//
// The concurrency can be reduced by calling DecConcurrency().
// Then the concurrency is increased after the next Read() call.
// It effectively limits the number of concurrent goroutines,
// which may process results returned by concurrently processed Reader structs.
//
// The Reader must be obtained via GetReader() call.
type Reader struct {
r io.Reader
increasedConcurrency bool
r io.Reader
}
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader(r io.Reader) *Reader {
func GetReader(r io.Reader) (*Reader, error) {
if err := incConcurrency(); err != nil {
return nil, err
}
v := readerPool.Get()
if v == nil {
return &Reader{
r: r,
}
v = &Reader{}
}
rr := v.(*Reader)
rr.r = r
return rr
return rr, nil
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency if r has increased concurrency.
// It decreases the concurrency.
func PutReader(r *Reader) {
r.DecConcurrency()
r.r = nil
readerPool.Put(r)
decConcurrency()
}
var readerPool sync.Pool
// Read implements io.Reader.
//
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
func (r *Reader) Read(p []byte) (int, error) {
decConcurrency()
n, err := r.r.Read(p)
if !r.increasedConcurrency {
if !incConcurrency() {
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
return 0, err
}
r.increasedConcurrency = true
if errC := incConcurrency(); errC != nil {
return n, errC
}
if errors.Is(err, io.ErrUnexpectedEOF) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704
err = fmt.Errorf("%w: while reading the request body. This might be caused by a timeout on the client side. "+
"Possible solutions: to lower -insert.maxQueueDuration below the clients timeout; to increase the client-side timeout; "+
"to scale up vmagent (e.g., adding more CPU resources); to increase -maxConcurrentInserts if CPU capacity allows", err)
"to increase compute resources at the server; to increase -maxConcurrentInserts", err)
}
return n, err
}
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
func (r *Reader) DecConcurrency() {
if r.increasedConcurrency {
decConcurrency()
r.increasedConcurrency = false
}
}
func initConcurrencyLimitCh() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
}
@@ -104,12 +94,12 @@ var (
concurrencyLimitChOnce sync.Once
)
func incConcurrency() bool {
func incConcurrency() error {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
select {
case concurrencyLimitCh <- struct{}{}:
return true
return nil
default:
}
@@ -118,10 +108,16 @@ func incConcurrency() bool {
defer timerpool.Put(t)
select {
case concurrencyLimitCh <- struct{}{}:
return true
return nil
case <-t.C:
concurrencyLimitTimeout.Inc()
return false
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
}
}