Compare commits

..

1 Commits

Author SHA1 Message Date
Jayice
33d5b8f12b reset timezone to none in vmagent dashboard 2026-01-14 19:20:54 +08:00
41 changed files with 1092 additions and 1187 deletions

View File

@@ -80,15 +80,14 @@ func (as AlertState) String() string {
// AlertTplData is used to execute templating
type AlertTplData struct {
Type string
Labels map[string]string
Value float64
Expr string
AlertID uint64
GroupID uint64
ActiveAt time.Time
For time.Duration
IsPartial bool
Type string
Labels map[string]string
Value float64
Expr string
AlertID uint64
GroupID uint64
ActiveAt time.Time
For time.Duration
}
var tplHeaders = []string{
@@ -102,7 +101,6 @@ var tplHeaders = []string{
"{{ $groupID := .GroupID }}",
"{{ $activeAt := .ActiveAt }}",
"{{ $for := .For }}",
"{{ $isPartial := .IsPartial }}",
}
// ExecTemplate executes the Alert template for given

View File

@@ -346,8 +346,6 @@ func (ar *AlertingRule) toLabels(m datasource.Metric, qFn templates.QueryFn) (*l
ls.processed[l.Name] = l.Value
}
// labels only support limited templating variables,
// including `labels`, `value` and `expr`, to avoid breaking alert states or causing cardinality issue with results
extraLabels, err := notifier.ExecTemplate(qFn, ar.Labels, notifier.AlertTplData{
Labels: ls.origin,
Value: m.Values[0],
@@ -459,8 +457,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
return nil, fmt.Errorf("failed to execute query %q: %w", ar.Expr, err)
}
isPartial := isPartialResponse(res)
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartial)
ar.logDebugf(ts, nil, "query returned %d series (elapsed: %s, isPartial: %t)", curState.Samples, curState.Duration, isPartialResponse(res))
qFn := func(query string) ([]datasource.Metric, error) {
res, _, err := ar.q.Query(ctx, query, ts)
return res.Data, err
@@ -486,7 +483,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
at = a.ActiveAt
}
}
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls, isPartial)
as, err := ar.expandAnnotationTemplates(m, qFn, at, ls)
if err != nil {
// only set error in current state, but do not break alert processing
curState.Err = err
@@ -604,17 +601,16 @@ func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.
return ls, nil
}
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet, isPartial bool) (map[string]string, error) {
func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templates.QueryFn, activeAt time.Time, ls *labelSet) (map[string]string, error) {
tplData := notifier.AlertTplData{
Value: m.Values[0],
Type: ar.Type.String(),
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: activeAt,
For: ar.For,
IsPartial: isPartial,
Value: m.Values[0],
Type: ar.Type.String(),
Labels: ls.origin,
Expr: ar.Expr,
AlertID: hash(ls.processed),
GroupID: ar.GroupID,
ActiveAt: activeAt,
For: ar.For,
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {

View File

@@ -1120,7 +1120,7 @@ func TestAlertingRuleLimit_Success(t *testing.T) {
}
func TestAlertingRule_Template(t *testing.T) {
f := func(rule *AlertingRule, metrics []datasource.Metric, isResponsePartial bool, alertsExpected map[uint64]*notifier.Alert) {
f := func(rule *AlertingRule, metrics []datasource.Metric, alertsExpected map[uint64]*notifier.Alert) {
t.Helper()
fakeGroup := Group{
@@ -1133,7 +1133,6 @@ func TestAlertingRule_Template(t *testing.T) {
entries: make([]StateEntry, 10),
}
fq.Add(metrics...)
fq.SetPartialResponse(isResponsePartial)
if _, err := rule.exec(context.TODO(), time.Now(), 0); err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -1164,7 +1163,7 @@ func TestAlertingRule_Template(t *testing.T) {
}, []datasource.Metric{
metricWithValueAndLabels(t, 1, "instance", "foo"),
metricWithValueAndLabels(t, 1, "instance", "bar"),
}, false, map[uint64]*notifier.Alert{
}, map[uint64]*notifier.Alert{
hash(map[string]string{alertNameLabel: "common", "region": "east", "instance": "foo"}): {
Annotations: map[string]string{
"summary": `common: Too high connection number for "foo"`,
@@ -1193,14 +1192,14 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}".{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
"summary": `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}"`,
"description": `{{ $labels.alertname}}: It is {{ $value }} connections for "{{ $labels.instance }}"`,
},
alerts: make(map[uint64]*notifier.Alert),
}, []datasource.Metric{
metricWithValueAndLabels(t, 2, "__name__", "first", "instance", "foo", alertNameLabel, "override"),
metricWithValueAndLabels(t, 10, "__name__", "second", "instance", "bar", alertNameLabel, "override"),
}, false, map[uint64]*notifier.Alert{
}, map[uint64]*notifier.Alert{
hash(map[string]string{alertNameLabel: "override label", "exported_alertname": "override", "instance": "foo"}): {
Labels: map[string]string{
alertNameLabel: "override label",
@@ -1208,7 +1207,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "foo",
},
Annotations: map[string]string{
"summary": `first: Too high connection number for "foo".`,
"summary": `first: Too high connection number for "foo"`,
"description": `override: It is 2 connections for "foo"`,
},
},
@@ -1219,7 +1218,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "bar",
},
Annotations: map[string]string{
"summary": `second: Too high connection number for "bar".`,
"summary": `second: Too high connection number for "bar"`,
"description": `override: It is 10 connections for "bar"`,
},
},
@@ -1232,7 +1231,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "{{ $labels.instance }}",
},
Annotations: map[string]string{
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}.{{ if $isPartial }} WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.{{ end }}`,
"summary": `Alert "{{ $labels.alertname }}({{ $labels.alertgroup }})" for instance {{ $labels.instance }}`,
},
alerts: make(map[uint64]*notifier.Alert),
}, []datasource.Metric{
@@ -1240,7 +1239,7 @@ func TestAlertingRule_Template(t *testing.T) {
alertNameLabel, "originAlertname",
alertGroupNameLabel, "originGroupname",
"instance", "foo"),
}, true, map[uint64]*notifier.Alert{
}, map[uint64]*notifier.Alert{
hash(map[string]string{
alertNameLabel: "OriginLabels",
"exported_alertname": "originAlertname",
@@ -1256,7 +1255,7 @@ func TestAlertingRule_Template(t *testing.T) {
"instance": "foo",
},
Annotations: map[string]string{
"summary": `Alert "originAlertname(originGroupname)" for instance foo. WARNING: Partial response detected - this alert may be incomplete. Please verify the results manually.`,
"summary": `Alert "originAlertname(originGroupname)" for instance foo`,
},
},
})
@@ -1386,7 +1385,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"group": "vmalert",
"alertname": "ConfigurationReloadFailure",
"alertgroup": "vmalert",
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
}
expectedProcessedLabels := map[string]string{
@@ -1396,7 +1395,7 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"exported_alertname": "ConfigurationReloadFailure",
"group": "vmalert",
"alertgroup": "vmalert",
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
"invalid_label": `error evaluating template: template: :1:268: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
}
ls, err := ar.toLabels(metric, nil)

View File

@@ -394,7 +394,7 @@ func (bu *backendURL) runHealthCheck() {
if errors.Is(bu.healthCheckContext.Err(), context.Canceled) {
return
}
logger.Warnf("ignoring the backend at %s for %s because of dial error: %s", addr, *failTimeout, err)
logger.Warnf("ignoring the backend at %s for %s becasue of dial error: %s", addr, *failTimeout, err)
continue
}
@@ -809,7 +809,7 @@ func reloadAuthConfig() (bool, error) {
ok, err := reloadAuthConfigData(data)
if err != nil {
return false, fmt.Errorf("failed to parse -auth.config=%q: %w", *authConfigPath, err)
return false, fmt.Errorf("failed to pars -auth.config=%q: %w", *authConfigPath, err)
}
if !ok {
return false, nil

View File

@@ -156,10 +156,6 @@ func requestHandlerWithInternalRoutes(w http.ResponseWriter, r *http.Request) bo
}
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.Body != nil {
r.Body = &readDurationTrackingBody{r: r.Body}
}
ats := getAuthTokensFromRequest(r)
if len(ats) == 0 {
// Process requests for unauthorized users
@@ -353,37 +349,14 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
err = ctxErr
}
if err != nil {
if errors.Is(err, errReadTimeout) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Do not retry canceled or timed out requests
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Warnf("remoteAddr: %s; requestURI: %s; client %s request exceeded single read timeout -readTimeout=%s, closing connection", remoteAddr, requestURI, ui.name(), *readTimeout)
rejectSlowClientRequests.Inc()
if w1, ok := w.(http.Hijacker); ok {
conn, _, connErr := w1.Hijack()
if connErr != nil {
logger.Errorf("cannot hijack connection for slow read timeout handling for %s: %s", targetURL, connErr)
return true, false
}
_ = conn.Close()
return true, false
if errors.Is(err, context.DeadlineExceeded) {
// Timed out request must be counted as errors, since this usually means that the backend is slow.
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
}
return true, false
}
// Do not retry canceled
if errors.Is(err, context.Canceled) {
clientCanceledRequests.Inc()
return true, false
}
// Do not retry timed out requests
if errors.Is(err, context.DeadlineExceeded) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
// Timed out request must be counted as errors, since this usually means that the backend is slow.
logger.Warnf("remoteAddr: %s; requestURI: %s; timeout while proxying the response from %s: %s", remoteAddr, requestURI, targetURL, err)
return false, false
}
if !rtbOK || !rtb.canRetry() {
@@ -440,10 +413,7 @@ func tryProcessingRequest(w http.ResponseWriter, r *http.Request, targetURL *url
err = copyStreamToClient(w, res.Body)
_ = res.Body.Close()
if errors.Is(err, context.Canceled) {
clientCanceledRequests.Inc()
return true, false
} else if err != nil && !netutil.IsTrivialNetworkError(err) {
if err != nil && !netutil.IsTrivialNetworkError(err) && !errors.Is(err, context.Canceled) {
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
@@ -576,8 +546,6 @@ var (
configReloadRequests = metrics.NewCounter(`vmauth_http_requests_total{path="/-/reload"}`)
invalidAuthTokenRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="invalid_auth_token"}`)
missingRouteRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="missing_route"}`)
clientCanceledRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="client_canceled"}`)
rejectSlowClientRequests = metrics.NewCounter(`vmauth_http_request_errors_total{reason="reject_slow_client"}`)
)
func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, insecureSkipVerifyP *bool) (http.RoundTripper, error) {
@@ -665,7 +633,6 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err
if errors.Is(ctx.Err(), context.Canceled) {
// Do not return any response for the request canceled by the client,
// since the connection to the client is already closed.
clientCanceledRequests.Inc()
return
}
@@ -804,34 +771,3 @@ func debugInfo(u *url.URL, r *http.Request) string {
fmt.Fprint(s, ")")
return s.String()
}
var slowReadDuration = metrics.NewSummary(`vmauth_request_slow_read_duration_seconds`)
var readTimeout = flag.Duration("readTimeout", 0, "The maximum duration for a single read call when exceeded the connection is closed. Zero disables request read timeout. "+
"See also -writeTimeout")
var errReadTimeout = fmt.Errorf("request read timeout")
type readDurationTrackingBody struct {
r io.ReadCloser
}
func (r *readDurationTrackingBody) Read(p []byte) (n int, err error) {
start := time.Now()
n, err = r.r.Read(p)
dur := time.Since(start)
// Record slow read durations only to avoid overhead for fast reads.
if dur > time.Millisecond {
slowReadDuration.Update(dur.Seconds())
}
if err == nil && *readTimeout > 0 && dur > *readTimeout {
return n, errReadTimeout
}
return n, err
}
func (r *readDurationTrackingBody) Close() error {
return r.r.Close()
}

View File

@@ -389,23 +389,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
case "/create":
snapshotsCreateTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := Storage.MustCreateSnapshot()
// Verify whether the client already closed the connection.
// In this case it is better to drop the created snapshot, since the client isn't interested in it.
if err := r.Context().Err(); err != nil {
logger.Infof("deleting already created snapshot at %s because the client canceled the request", snapshotName)
if err := deleteSnapshot(snapshotName); err != nil {
logger.Infof("cannot delete just created snapshot: %s", err)
return true
}
return true
}
snapshotPath := Storage.MustCreateSnapshot()
if prometheusCompatibleResponse {
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotName))
fmt.Fprintf(w, `{"status":"success","data":{"name":%s}}`, stringsutil.JSONString(snapshotPath))
} else {
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotName))
fmt.Fprintf(w, `{"status":"ok","snapshot":%s}`, stringsutil.JSONString(snapshotPath))
}
return true
case "/list":
@@ -425,12 +413,23 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
snapshotsDeleteTotal.Inc()
w.Header().Set("Content-Type", "application/json")
snapshotName := r.FormValue("snapshot")
if err := deleteSnapshot(snapshotName); err != nil {
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
snapshots := Storage.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := Storage.DeleteSnapshot(snName); err != nil {
err = fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
jsonResponseError(w, err)
snapshotsDeleteErrorsTotal.Inc()
return true
}
fmt.Fprintf(w, `{"status":"ok"}`)
return true
}
}
fmt.Fprintf(w, `{"status":"ok"}`)
err := fmt.Errorf("cannot find snapshot %q", snapshotName)
jsonResponseError(w, err)
return true
case "/delete_all":
snapshotsDeleteAllTotal.Inc()
@@ -451,19 +450,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
}
func deleteSnapshot(snapshotName string) error {
snapshots := Storage.MustListSnapshots()
for _, snName := range snapshots {
if snName == snapshotName {
if err := Storage.DeleteSnapshot(snName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snName, err)
}
return nil
}
}
return fmt.Errorf("cannot find snapshot %q", snapshotName)
}
func initStaleSnapshotsRemover(strg *storage.Storage) {
staleSnapshotsRemoverCh = make(chan struct{})
if snapshotsMaxAge.Duration() <= 0 {

File diff suppressed because it is too large Load Diff

View File

@@ -23,43 +23,43 @@
},
"dependencies": {
"classnames": "^2.5.1",
"dayjs": "^1.11.19",
"dayjs": "^1.11.13",
"lodash.debounce": "^4.0.8",
"marked": "^17.0.1",
"preact": "^10.28.2",
"qs": "^6.14.1",
"marked": "^16.0.0",
"preact": "^10.26.9",
"qs": "^6.14.0",
"react-input-mask": "^2.0.4",
"react-router-dom": "^7.12.0",
"react-router-dom": "^7.6.3",
"uplot": "^1.6.32",
"vite": "^7.3.1",
"web-vitals": "^5.1.0"
"vite": "^7.1.11",
"web-vitals": "^5.0.3"
},
"devDependencies": {
"@eslint/eslintrc": "^3.3.3",
"@eslint/js": "^9.39.2",
"@eslint/eslintrc": "^3.3.1",
"@eslint/js": "^9.30.1",
"@preact/preset-vite": "^2.10.2",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/jest-dom": "^6.6.3",
"@testing-library/preact": "^3.2.4",
"@types/lodash.debounce": "^4.0.9",
"@types/node": "^25.0.8",
"@types/node": "^24.0.12",
"@types/qs": "^6.14.0",
"@types/react": "^19.2.8",
"@types/react": "^19.1.8",
"@types/react-input-mask": "^3.0.6",
"@types/react-router-dom": "^5.3.3",
"@typescript-eslint/eslint-plugin": "^8.53.0",
"@typescript-eslint/parser": "^8.53.0",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"@typescript-eslint/eslint-plugin": "^8.36.0",
"@typescript-eslint/parser": "^8.36.0",
"cross-env": "^7.0.3",
"eslint": "^9.30.1",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-unused-imports": "^4.3.0",
"globals": "^17.0.0",
"eslint-plugin-unused-imports": "^4.1.4",
"globals": "^16.3.0",
"http-proxy-middleware": "^3.0.5",
"jsdom": "^27.4.0",
"jsdom": "^26.1.0",
"postcss": "^8.5.6",
"rollup-plugin-visualizer": "^6.0.5",
"sass-embedded": "^1.97.2",
"typescript": "^5.9.3",
"vitest": "^4.0.17"
"rollup-plugin-visualizer": "^6.0.3",
"sass-embedded": "^1.89.2",
"typescript": "^5.8.3",
"vitest": "^3.2.4"
},
"browserslist": {
"production": [

View File

@@ -9,6 +9,7 @@ import { getFromStorage, removeFromStorage, saveToStorage } from "../../../../ut
import useBoolean from "../../../../hooks/useBoolean";
import { ChildComponentHandle } from "../GlobalSettings";
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
import { getTenantIdFromUrl } from "../../../../utils/tenants";
interface ServerConfiguratorProps {
onClose: () => void;
@@ -38,6 +39,10 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
};
const handleApply = useCallback(() => {
const tenantIdFromUrl = getTenantIdFromUrl(serverUrl);
if (tenantIdFromUrl !== "") {
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
}
dispatch({ type: "SET_SERVER", payload: serverUrl });
onClose();
}, [serverUrl]);
@@ -55,6 +60,12 @@ const ServerConfigurator = forwardRef<ChildComponentHandle, ServerConfiguratorPr
}
}, [enabledStorage]);
useEffect(() => {
if (enabledStorage) {
saveToStorage("SERVER_URL", serverUrl);
}
}, [serverUrl]);
useEffect(() => {
// the tenant selector can change the serverUrl
if (stateServerUrl === serverUrl) return;

View File

@@ -1,4 +1,4 @@
import { FC, useState, useRef, useMemo } from "preact/compat";
import { FC, useState, useRef, useEffect, useMemo } from "preact/compat";
import { useAppDispatch, useAppState } from "../../../../state/common/StateContext";
import { useTimeDispatch } from "../../../../state/time/TimeStateContext";
import { ArrowDownIcon, StorageIcon } from "../../../Main/Icons";
@@ -10,14 +10,14 @@ import { getAppModeEnable } from "../../../../utils/app-mode";
import Tooltip from "../../../Main/Tooltip/Tooltip";
import useDeviceDetect from "../../../../hooks/useDeviceDetect";
import TextField from "../../../Main/TextField/TextField";
import { replaceTenantId } from "../../../../utils/tenants";
import { getTenantIdFromUrl, replaceTenantId } from "../../../../utils/tenants";
import useBoolean from "../../../../hooks/useBoolean";
const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
const appModeEnable = getAppModeEnable();
const { isMobile } = useDeviceDetect();
const { tenantId, serverUrl } = useAppState();
const { tenantId: tenantIdState, serverUrl } = useAppState();
const dispatch = useAppDispatch();
const timeDispatch = useTimeDispatch();
@@ -48,8 +48,10 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
}, [accountIds]);
const createHandlerChange = (value: string) => () => {
const tenant = value;
dispatch({ type: "SET_TENANT_ID", payload: tenant });
if (serverUrl) {
const updateServerUrl = replaceTenantId(serverUrl, value);
const updateServerUrl = replaceTenantId(serverUrl, tenant);
if (updateServerUrl === serverUrl) return;
dispatch({ type: "SET_SERVER", payload: updateServerUrl });
timeDispatch({ type: "RUN_QUERY" });
@@ -57,6 +59,16 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
handleCloseOptions();
};
useEffect(() => {
const id = getTenantIdFromUrl(serverUrl);
if (tenantIdState && tenantIdState !== id) {
createHandlerChange(tenantIdState)();
} else {
createHandlerChange(id)();
}
}, [serverUrl]);
if (!showTenantSelector) return null;
return (
@@ -71,7 +83,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
<span className="vm-mobile-option__icon"><StorageIcon/></span>
<div className="vm-mobile-option-text">
<span className="vm-mobile-option-text__label">Tenant ID</span>
<span className="vm-mobile-option-text__value">{tenantId}</span>
<span className="vm-mobile-option-text__value">{tenantIdState}</span>
</div>
<span className="vm-mobile-option__arrow"><ArrowDownIcon/></span>
</div>
@@ -94,7 +106,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
)}
onClick={toggleOpenOptions}
>
{tenantId}
{tenantIdState}
</Button>
)}
</div>
@@ -126,7 +138,7 @@ const TenantsConfiguration: FC<{accountIds: string[]}> = ({ accountIds }) => {
className={classNames({
"vm-list-item": true,
"vm-list-item_mobile": isMobile,
"vm-list-item_active": id === tenantId
"vm-list-item_active": id === tenantIdState
})}
key={id}
onClick={createHandlerChange(id)}

View File

@@ -3,18 +3,19 @@ import { useEffect, useMemo, useState } from "preact/compat";
import { ErrorTypes } from "../../../../../types";
import { getAccountIds } from "../../../../../api/accountId";
import { getAppModeEnable, getAppModeParams } from "../../../../../utils/app-mode";
import { getTenantIdFromUrl } from "../../../../../utils/tenants";
export const useFetchAccountIds = () => {
const { useTenantID } = getAppModeParams();
const appModeEnable = getAppModeEnable();
const { tenantId, serverUrl } = useAppState();
const { serverUrl } = useAppState();
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<ErrorTypes | string>();
const [accountIds, setAccountIds] = useState<string[]>([]);
const fetchUrl = useMemo(() => getAccountIds(serverUrl), [serverUrl]);
const isServerUrlWithTenant = useMemo(() => !!tenantId, [tenantId]);
const isServerUrlWithTenant = useMemo(() => !!getTenantIdFromUrl(serverUrl), [serverUrl]);
const preventFetch = appModeEnable ? !useTenantID : !isServerUrlWithTenant;
useEffect(() => {

View File

@@ -1,5 +1,4 @@
@use "src/styles/variables" as *;
@use 'sass:meta';
$button-radius: 6px;
@@ -43,8 +42,6 @@ $button-radius: 6px;
svg {
width: 14px;
min-width: 14px;
max-width: 14px;
}
}
@@ -54,8 +51,6 @@ $button-radius: 6px;
svg {
width: 16px;
min-width: 16px;
max-width: 16px;
}
}
@@ -65,8 +60,6 @@ $button-radius: 6px;
svg {
width: 18px;
min-width: 18px;
max-width: 18px;
line-height: 16px;
}
}
@@ -135,14 +128,8 @@ $button-radius: 6px;
);
@each $name, $color in $button-colors {
@if $name == white {
@include contained-button($name, $color, $color-black);
@include outlined-button($name, $color, $color-white);
@include text-button($name, $color-white);
} @else {
@include contained-button($name, $color, $color-white);
@include outlined-button($name, $color, $color);
@include text-button($name, $color);
}
@include contained-button($name, $color, if($name == white, $color-black, $color-white));
@include outlined-button($name, $color, if($name == white, $color-white, $color));
@include text-button($name, if($name == white, $color-white, $color));
}
}

View File

@@ -7,6 +7,7 @@ import AppConfigurator from "../appConfigurator";
import { useSearchParams } from "react-router-dom";
import dayjs from "dayjs";
import { DATE_FORMAT } from "../../../constants/date";
import { getTenantIdFromUrl } from "../../../utils/tenants";
import usePrevious from "../../../hooks/usePrevious";
export const useFetchQuery = (): {
@@ -26,7 +27,7 @@ export const useFetchQuery = (): {
const prevDate = usePrevious(date);
const prevTotal = useRef<{ data: TSDBStatus }>();
const { tenantId, serverUrl } = useAppState();
const { serverUrl } = useAppState();
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<ErrorTypes | string>();
const [tsdbStatus, setTSDBStatus] = useState<TSDBStatus>(appConfigurator.defaultTSDBStatus);
@@ -157,8 +158,9 @@ export const useFetchQuery = (): {
}, [error]);
useEffect(() => {
setIsCluster(!!tenantId);
}, [tenantId]);
const id = getTenantIdFromUrl(serverUrl);
setIsCluster(!!id);
}, [serverUrl]);
appConfigurator.tsdbStatusData = tsdbStatus;

View File

@@ -1,6 +1,7 @@
import { useEffect, useState } from "react";
import { useTimeDispatch, useTimeState } from "../../../state/time/TimeStateContext";
import { useCustomPanelDispatch, useCustomPanelState } from "../../../state/customPanel/CustomPanelStateContext";
import { useAppDispatch, useAppState } from "../../../state/common/StateContext";
import { useQueryDispatch, useQueryState } from "../../../state/query/QueryStateContext";
import { displayTypeTabs } from "../DisplayTypeSwitch";
import { useGraphDispatch, useGraphState } from "../../../state/graph/GraphStateContext";
@@ -14,12 +15,14 @@ import { arrayEquals } from "../../../utils/array";
import { isEqualURLSearchParams } from "../../../utils/url";
export const useSetQueryParams = () => {
const { tenantId } = useAppState();
const { displayType } = useCustomPanelState();
const { query } = useQueryState();
const { duration, relativeTime, period: { date, step } } = useTimeState();
const { customStep } = useGraphState();
const [searchParams, setSearchParams] = useSearchParams();
const dispatch = useAppDispatch();
const timeDispatch = useTimeDispatch();
const graphDispatch = useGraphDispatch();
const queryDispatch = useQueryDispatch();
@@ -69,6 +72,10 @@ export const useSetQueryParams = () => {
if (searchParams.get(`${group}.tab`) !== displayTypeCode) {
newSearchParams.set(`${group}.tab`, `${displayTypeCode}`);
}
if (searchParams.get(`${group}.tenantID`) !== tenantId && tenantId) {
newSearchParams.set(`${group}.tenantID`, tenantId);
}
});
// Remove extra parameters that exceed the request size
@@ -82,7 +89,7 @@ export const useSetQueryParams = () => {
if (isEqualURLSearchParams(newSearchParams, searchParams) || !newSearchParams.size) return;
setSearchParams(newSearchParams);
}, [displayType, query, duration, relativeTime, date, step, customStep]);
}, [tenantId, displayType, query, duration, relativeTime, date, step, customStep]);
useEffect(() => {
const timer = setTimeout(setterSearchParams, 200);
@@ -107,6 +114,11 @@ export const useSetQueryParams = () => {
customPanelDispatch({ type: "SET_DISPLAY_TYPE", payload: displayTypeFromUrl });
}
const tenantIdFromUrl = searchParams.get("g0.tenantID") || "";
if (tenantIdFromUrl !== tenantId) {
dispatch({ type: "SET_TENANT_ID", payload: tenantIdFromUrl });
}
const queryFromUrl = getQueryArray();
if (!arrayEquals(queryFromUrl, query)) {
queryDispatch({ type: "SET_QUERY", payload: queryFromUrl });

View File

@@ -1,8 +1,7 @@
import { createContext, FC, useContext, useEffect, useMemo, useReducer } from "preact/compat";
import { createContext, FC, useContext, useMemo, useReducer } from "preact/compat";
import { Action, AppState, initialState, reducer } from "./reducer";
import { getQueryStringValue } from "../../utils/query-string";
import { Dispatch } from "react";
import { getFromStorage, removeFromStorage, saveToStorage } from "../../utils/storage";
type StateContextType = { state: AppState, dispatch: Dispatch<Action> };
@@ -24,17 +23,6 @@ export const AppStateProvider: FC = ({ children }) => {
return { state, dispatch };
}, [state, dispatch]);
useEffect(() => {
if (!state.serverUrl) return;
const enabledStorage = !!getFromStorage("SERVER_URL");
if (enabledStorage) {
saveToStorage("SERVER_URL", state.serverUrl);
} else {
removeFromStorage(["SERVER_URL"]);
}
}, [state.serverUrl]);
return <StateContext.Provider value={contextValue}>
{children}
</StateContext.Provider>;

View File

@@ -1,9 +1,9 @@
import { getDefaultServer } from "../../utils/default-server-url";
import { getQueryStringValue } from "../../utils/query-string";
import { getFromStorage, saveToStorage } from "../../utils/storage";
import { AppConfig, Theme } from "../../types";
import { isDarkTheme } from "../../utils/theme";
import { removeTrailingSlash } from "../../utils/url";
import { getTenantIdFromUrl } from "../../utils/tenants";
export interface AppState {
serverUrl: string;
@@ -16,14 +16,15 @@ export interface AppState {
export type Action =
| { type: "SET_SERVER", payload: string }
| { type: "SET_THEME", payload: Theme }
| { type: "SET_TENANT_ID", payload: string }
| { type: "SET_APP_CONFIG", payload: AppConfig }
| { type: "SET_DARK_THEME" }
const serverUrl = removeTrailingSlash(getDefaultServer());
const tenantId = getQueryStringValue("g0.tenantID", "") as string;
export const initialState: AppState = {
serverUrl,
tenantId: getTenantIdFromUrl(serverUrl),
serverUrl: removeTrailingSlash(getDefaultServer(tenantId)),
tenantId,
theme: (getFromStorage("THEME") || Theme.system) as Theme,
isDarkTheme: null,
appConfig: {}
@@ -34,9 +35,13 @@ export function reducer(state: AppState, action: Action): AppState {
case "SET_SERVER":
return {
...state,
tenantId: getTenantIdFromUrl(action.payload),
serverUrl: removeTrailingSlash(action.payload)
};
case "SET_TENANT_ID":
return {
...state,
tenantId: action.payload
};
case "SET_THEME":
saveToStorage("THEME", action.payload);
return {

View File

@@ -1,4 +1,5 @@
import { getAppModeParams } from "./app-mode";
import { replaceTenantId } from "./tenants";
import { APP_TYPE, AppType } from "../constants/appType";
import { getFromStorage } from "./storage";
@@ -6,7 +7,7 @@ export const getDefaultURL = (u: string) => {
return u.replace(/(\/(?:prometheus\/)?(?:graph|vmui)\/.*|\/#\/.*)/, "/prometheus");
};
export const getDefaultServer = (): string => {
export const getDefaultServer = (tenantId?: string): string => {
const { serverURL } = getAppModeParams();
const storageURL = getFromStorage("SERVER_URL") as string;
const anomalyURL = `${window.location.origin}${window.location.pathname.replace(/^\/vmui/, "")}`;
@@ -17,6 +18,6 @@ export const getDefaultServer = (): string => {
case AppType.vmanomaly:
return storageURL || anomalyURL;
default:
return url;
return tenantId ? replaceTenantId(url, tenantId) : url;
}
};

View File

@@ -1,89 +0,0 @@
import { describe, it, expect } from "vitest";
import {
replaceTenantId,
getTenantIdFromUrl,
getUrlWithoutTenant,
} from "./tenants";
describe("tenant url helpers", () => {
describe("getTenantIdFromUrl", () => {
it("returns accountID", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/0/vmui/")).toBe("0");
});
it("returns accountID:projectID", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/12:7/vmui/")).toBe("12:7");
});
it("returns empty string if tenant is missing", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/vmui/")).toBe("");
});
it("returns empty string for unrelated paths", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/foo/bar")).toBe("");
});
it("returns accountID when url ends right after tenant", () => {
expect(getTenantIdFromUrl("http://vmselect:8481/select/0")).toBe("0");
});
});
describe("replaceTenantId", () => {
it("replaces accountID with another accountID", () => {
expect(
replaceTenantId("http://vmselect:8481/select/0/vmui/", "2")
).toBe("http://vmselect:8481/select/2/vmui/");
});
it("replaces accountID with accountID:projectID", () => {
expect(
replaceTenantId("http://vmselect:8481/select/0/prometheus/", "1:9")
).toBe("http://vmselect:8481/select/1:9/prometheus/");
});
it("keeps the rest of the path intact", () => {
expect(
replaceTenantId("http://vmselect:8481/select/3:4/prometheus/api/v1/query", "7")
).toBe("http://vmselect:8481/select/7/prometheus/api/v1/query");
});
it("does not change url if it doesn't match expected pattern", () => {
expect(
replaceTenantId("http://vmselect:8481/foo/bar", "2")
).toBe("http://vmselect:8481/foo/bar");
});
});
describe("getUrlWithoutTenant", () => {
it("removes /select/<tenant>/... and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/0/vmui/")
).toBe("http://vmselect:8481");
});
it("removes /select/<tenant>/... for accountID:projectID and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/5:6/prometheus/")
).toBe("http://vmselect:8481");
});
it("works with deep paths and returns base url", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/1:2/prometheus/api/v1/query")
).toBe("http://vmselect:8481");
});
it("does not change url if it doesn't match expected pattern", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/foo/bar")
).toBe("http://vmselect:8481/foo/bar");
});
it("removes url ending right after tenant", () => {
expect(
getUrlWithoutTenant("http://vmselect:8481/select/0")
).toBe("http://vmselect:8481");
});
});
});

View File

@@ -1,21 +1,13 @@
const TENANT_REGEXP = /(\/select\/)(\d+(?::\d+)?)(\/.*)?$/;
const regexp = /(\/select\/)([^/])(\/)(.+)/;
export const replaceTenantId = (serverUrl: string, tenantId: string) => {
return serverUrl.replace(TENANT_REGEXP, `$1${tenantId}$3`);
return serverUrl.replace(regexp, `$1${tenantId}/$4`);
};
export const getTenantIdFromUrl = (url: string): string => {
return url.match(TENANT_REGEXP)?.[2] ?? "";
return url.match(regexp)?.[2] || "";
};
export const getUrlWithoutTenant = (url: string): string => {
return url.replace(TENANT_REGEXP, "");
};
export const updateBrowserUrlTenant = (tenantId: string) => {
const base = `${window.location.origin}${window.location.pathname}${window.location.search}`;
const nextBase = replaceTenantId(base, tenantId);
const nextUrl = `${nextBase}${window.location.hash}`;
window.history.replaceState(null, "", nextUrl);
return url.replace(regexp, "");
};

View File

@@ -1,7 +1,7 @@
{
"compilerOptions": {
"target": "ESNext",
"types": ["vite/client", "vitest/globals", "node"],
"types": ["vite/client", "vitest/globals"],
"lib": [
"dom",
"dom.iterable",

View File

@@ -452,7 +452,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"
@@ -605,7 +605,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"

View File

@@ -11458,4 +11458,4 @@
"title": "VictoriaMetrics - cluster",
"uid": "oS7Bi_0Wz",
"version": 1
}
}

View File

@@ -453,7 +453,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"expr": "sum(increase(vm_backup_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"
@@ -606,7 +606,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[$__range]))",
"expr": "sum(increase(vm_retention_errors_total{job=~\"$job\", instance=~\"$instance\"}[1h]))",
"legendFormat": "__auto",
"range": true,
"refId": "A"

View File

@@ -11459,4 +11459,4 @@
"title": "VictoriaMetrics - cluster (VM)",
"uid": "oS7Bi_0Wz_vm",
"version": 1
}
}

View File

@@ -106,7 +106,6 @@ See also [case studies](https://docs.victoriametrics.com/victoriametrics/casestu
* [Why I Switched to VictoriaMetrics: Scaling from Small Business to Enterprise](https://blackmetalz.github.io/why-i-switched-to-victoriametrics-scaling-from-small-business-to-enterprise.html)
* [Backing up VictoriaMetrics Data: A Complete Guide](https://medium.com/@kanakaraju896/backing-up-victoriametrics-data-a-complete-guide-24473c74450f)
* [Unlocking the Power of VictoriaMetrics: A Prometheus Alternative](https://developer-friendly.blog/blog/2024/06/17/unlocking-the-power-of-victoriametrics-a-prometheus-alternative/)
* [How to Master Kubernetes Observability: Multi-Cluster Monitoring with VictoriaMetrics, Loki, and Grafana](https://www.keyvalue.systems/blog/kubernetes-observability-with-victoriametrics-loki-grafana/)
## Third-party articles and slides about VictoriaLogs

View File

@@ -31,15 +31,11 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): expose `vm_rollup_result_cache_requests_total` which tracks the number of requests to the query rollup cache. See [#10117](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10117).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `localStorage` availability checks with error reporting. See [#10085](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10085).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add `VMUI:`-prefixed `localStorage` keys and legacy key migration.
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add a metric `vmauth_http_request_errors_total{reason="client_canceled"}` to measure client cancelled requests. This should help with debugging vmauth issues. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): do not retry client canceled requests. See [#10233](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10233).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add explicit month duration unit (`M`) for `-retentionPeriod` flag. This allows users to specify retention periods in months more explicitly. See [#10181](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10181).
* FEATURE: [dashboards/vmagent](https://grafana.com/grafana/dashboards/12683): add `Persistent queue Full ETA` panel to the `Drilldown` section. This panel helps estimate how much time remains until `vmagent` starts dropping incoming metrics. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193).
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add support for `$isPartial` variable in alerting rule annotation [templating](https://docs.victoriametrics.com/victoriametrics/vmalert/#templating). This allows users to include an additional warning message in alerts triggered by partial query responses. See [#4531](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4531).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix configuration reloading for `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` when vmagent is launched with empty files. Previously, if vmagent started with an empty config, subsequent config reloads were ignored. See [#10211](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10211).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214).
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): remove legacy `tenantID` query param and use the URL path as the single source of truth for multitenancy. See [#10232](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10232).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): Fixed a missing path error for `http://<victoriametrics-addr>:8428/zabbixconnector/api/v1/history`. See PR [10214](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10214)
## [v1.133.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.133.0)
@@ -1093,4 +1089,4 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
## Previous releases
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).

View File

@@ -286,7 +286,7 @@ expr: <string>
# In case of conflicts, original labels are kept with prefix `exported_`.
#
# Labels only support limited templating variables in https://docs.victoriametrics.com/victoriametrics/vmalert/#templating,
# including `$labels`, `$value` and `$expr`, to avoid breaking alert states or causing cardinality issue with results.
# including `$labels`, `$value` and `expr`, to avoid breaking alert states or causing cardinality issue with results.
# Note: be careful set dynamic label values like `$value`, because each time the $value changes - the new alert will be
# generated which also break `for` condition.
labels:
@@ -316,7 +316,6 @@ The following variables are available in templating:
| $for or .For | Alert's configured for param. | Number of connections is too high for more than {{ .For }} |
| $externalLabels or .ExternalLabels | List of labels configured via `-external.label` command-line flag. | Issues with {{ $labels.instance }} (datacenter-{{ $externalLabels.dc }}) |
| $externalURL or .ExternalURL | URL configured via `-external.url` command-line flag. Used for cases when vmalert is hidden behind proxy. | Visit {{ $externalURL }} for more details |
| $isPartial or .IsPartial | Indicates whether the latest rule query response from the datasource(that supports returning `isPartial` option, such as vmcluster) could be partial. | {{ if $isPartial }}WARNING: The latest alert state may be a false alarm due to a partial response from the datasource.{{ end }}
Additionally, `vmalert` provides some extra templating functions listed in [template functions](#template-functions) and [reusable templates](#reusable-templates).

View File

@@ -336,19 +336,6 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
}
func (ib *inmemoryBlock) unmarshalSingleItem(commonPrefix, firstItem []byte, mt marshalType) {
if mt != marshalTypePlain {
logger.Panicf("BUG: single item block must be always encoded with TypePlain")
}
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
ib.items = slicesutil.SetLength(ib.items, 1)
ib.data = bytesutil.ResizeNoCopyNoOverallocate(ib.data, len(firstItem))
ib.data = append(ib.data[:0], firstItem...)
item := &ib.items[0]
item.Start = 0
item.End = uint32(len(ib.data))
}
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
ib.Reset()

View File

@@ -331,14 +331,6 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
if ps.sparse {
cache = ibSparseCache
}
if bh.itemsCount == 1 {
// special case for single item
// there is no need to cache it, since firstItem is always stored in-memory
ib := ps.tmpIB
ib.Reset()
ib.unmarshalSingleItem(bh.commonPrefix, bh.firstItem, bh.marshalType)
return ib, nil
}
ibKey := blockcache.Key{
Part: ps.p,
Offset: bh.itemsBlockOffset,

View File

@@ -163,80 +163,3 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
return p, items, nil
}
func TestGetInmemoryBlockWithZeroSizeBlock(t *testing.T) {
var ph partHeader
var ip inmemoryPart
var bsw blockStreamWriter
bsw.MustInitFromInmemoryPart(&ip, -3)
buildBlock := func(items ...string) inmemoryBlock {
var ib inmemoryBlock
for _, item := range items {
if !ib.Add([]byte(item)) {
t.Fatalf("cannot add item %q", item)
}
}
ib.SortItems()
return ib
}
writeBlock := func(ib inmemoryBlock) {
if len(ib.items) == 0 {
t.Fatalf("block must contain items")
}
data := ib.data
ph.itemsCount += uint64(len(ib.items))
if ph.blocksCount == 0 {
ph.firstItem = append(ph.firstItem[:0], ib.items[0].Bytes(data)...)
}
ph.lastItem = append(ph.lastItem[:0], ib.items[len(ib.items)-1].Bytes(data)...)
ph.blocksCount++
bsw.WriteBlock(&ib)
}
writeBlock(buildBlock("a"))
writeBlock(buildBlock("b0", "b1"))
bsw.MustClose()
p := newPart(&ph, "test", ip.size(), ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
defer p.MustClose()
var ps partSearch
ps.Init(p, false)
ps.mrs = p.mrs
if err := ps.nextBHS(); err != nil {
t.Fatalf("cannot read block headers: %s", err)
}
if len(ps.bhs) != 2 {
t.Fatalf("unexpected block headers count: %d", len(ps.bhs))
}
if ps.bhs[0].itemsBlockOffset != ps.bhs[1].itemsBlockOffset {
t.Fatalf("blocks must share itemsBlockOffset for the test: %d vs %d", ps.bhs[0].itemsBlockOffset, ps.bhs[1].itemsBlockOffset)
}
if ps.bhs[0].itemsBlockSize != 0 {
t.Fatalf("the first block must have zero itemsBlockSize; got %d", ps.bhs[0].itemsBlockSize)
}
// iterate 4 times in order to place block into the cache
// storage caches it after 2 missed requests according to the flag blockcache.missesBeforeCaching=2
for i := range 4 {
if _, err := ps.getInmemoryBlock(&ps.bhs[1]); err != nil {
t.Fatalf("cannot load non-empty block at iteration %d: %s", i, err)
}
}
assertBlockAt := func(bhIdx int, wantFirstItemValue string) {
block, err := ps.getInmemoryBlock(&ps.bhs[bhIdx])
if err != nil {
t.Fatalf("cannot block=%d : %s", bhIdx, err)
}
if len(block.items) != int(ps.bhs[bhIdx].itemsCount) {
t.Fatalf("unexpected items count in block=%d; got %d; want %d", bhIdx, len(block.items), ps.bhs[bhIdx].itemsCount)
}
if got := string(block.items[0].Bytes(block.data)); got != wantFirstItemValue {
t.Fatalf("unexpected item in block=%d; got %q; want %q", bhIdx, got, wantFirstItemValue)
}
}
assertBlockAt(0, "a")
assertBlockAt(1, "b0")
}

View File

@@ -34,19 +34,17 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
return fmt.Errorf("cannot parse the provided csv format: %w", err)
}
wcr, err := writeconcurrencylimiter.GetReader(req.Body)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
encoding := req.Header.Get("Content-Encoding")
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
reader, err := protoparserutil.GetUncompressedReader(req.Body, encoding)
if err != nil {
return fmt.Errorf("cannot decode csv data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -57,6 +55,7 @@ func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -27,18 +27,16 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode graphite data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
@@ -49,6 +47,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []graphite.Row) erro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -61,18 +61,16 @@ func Parse(r io.Reader, encoding string, isStreamMode bool, precision, db string
}
func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string, callback func(db string, rows []influx.Row) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode influx line protocol data: %w; see https://docs.victoriametrics.com/victoriametrics/integrations/influxdb/", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -84,6 +82,7 @@ func parseStreamMode(r io.Reader, encoding string, tsMultiplier int64, db string
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -20,18 +20,16 @@ import (
//
// callback shouldn't hold block after returning.
func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, contentEncoding)
reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
br := getBufferedReader(reader)
defer putBufferedReader(br)
@@ -106,6 +104,7 @@ func Parse(r io.Reader, contentEncoding string, callback func(block *Block) erro
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
}

View File

@@ -27,13 +27,11 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getStreamContext(wcr)
ctx := getStreamContext(r)
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
@@ -42,6 +40,7 @@ func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -23,23 +23,20 @@ import (
// limitConcurrency defines whether to control the number of concurrent calls to this function.
// It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set,
// like /api/v1/write calls.
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool,
callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
if limitConcurrency {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
}
func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrency, enableMetadata bool, callback func(rows []prometheus.Row, metadataList []prometheus.Metadata) error, errLogger func(string)) error {
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode Prometheus text exposition data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
var wcr *writeconcurrencylimiter.Reader
if limitConcurrency {
wcr = writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
}
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -52,6 +49,9 @@ func Parse(r io.Reader, defaultTimestamp int64, encoding string, limitConcurrenc
uw.enableMetadata = enableMetadata
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
if wcr != nil {
wcr.DecConcurrency()
}
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -23,13 +23,11 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
//
// callback shouldn't hold tss after returning.
func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getPushCtx(wcr)
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {
return err
@@ -40,6 +38,7 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
bb := bodyBufferPool.Get()
defer bodyBufferPool.Put(bb)
var err error
if isVMRemoteWrite {
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], ctx.reqBuf.B, maxInsertRequestSize.IntN())
if err != nil {

View File

@@ -31,10 +31,7 @@ const maxSnappyBlockSize = 56_000_000
//
// The callback must not hold references to the data after returning.
func ReadUncompressedData(r io.Reader, contentType string, maxDataSize *flagutil.Bytes, callback func(data []byte) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
if contentType == "zstd" {

View File

@@ -23,18 +23,16 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 10*1024*1024, "The maxim
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -44,6 +42,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []vmimport.Row) erro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -26,18 +26,16 @@ var (
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Row) error) error {
wcr, err := writeconcurrencylimiter.GetReader(r)
if err != nil {
return err
}
defer writeconcurrencylimiter.PutReader(wcr)
reader, err := protoparserutil.GetUncompressedReader(wcr, encoding)
reader, err := protoparserutil.GetUncompressedReader(r, encoding)
if err != nil {
return fmt.Errorf("cannot decode zabbixconnector data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
reader = wcr
ctx := getStreamContext(reader)
defer putStreamContext(ctx)
for ctx.Read() {
@@ -47,6 +45,7 @@ func Parse(r io.Reader, encoding string, callback func(rows []zabbixconnector.Ro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@@ -24,67 +24,77 @@ var (
"concurrent insert requests are executed")
)
// Reader is a reader, which decreases the concurrency before every Read() call
// and increases the concurrency after Read() call.
// Reader is a reader, which increases the concurrency after the first Read() call
//
// It effectively limits the number of concurrent goroutines,
// which may process results returned by concurrently processed Reader structs.
//
// The Reader must be obtained via GetReader() call.
// The concurrency can be reduced by calling DecConcurrency().
// Then the concurrency is increased after the next Read() call.
type Reader struct {
r io.Reader
r io.Reader
increasedConcurrency bool
}
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader(r io.Reader) (*Reader, error) {
if err := incConcurrency(); err != nil {
return nil, err
}
func GetReader(r io.Reader) *Reader {
v := readerPool.Get()
if v == nil {
v = &Reader{}
return &Reader{
r: r,
}
}
rr := v.(*Reader)
rr.r = r
return rr, nil
return rr
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency.
// It decreases the concurrency if r has increased concurrency.
func PutReader(r *Reader) {
r.DecConcurrency()
r.r = nil
readerPool.Put(r)
decConcurrency()
}
var readerPool sync.Pool
// Read implements io.Reader.
//
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
func (r *Reader) Read(p []byte) (int, error) {
decConcurrency()
n, err := r.r.Read(p)
if errC := incConcurrency(); errC != nil {
return n, errC
if !r.increasedConcurrency {
if !incConcurrency() {
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
return 0, err
}
r.increasedConcurrency = true
}
if errors.Is(err, io.ErrUnexpectedEOF) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8704
err = fmt.Errorf("%w: while reading the request body. This might be caused by a timeout on the client side. "+
"Possible solutions: to lower -insert.maxQueueDuration below the clients timeout; to increase the client-side timeout; "+
"to increase compute resources at the server; to increase -maxConcurrentInserts", err)
"to scale up vmagent (e.g., adding more CPU resources); to increase -maxConcurrentInserts if CPU capacity allows", err)
}
return n, err
}
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
func (r *Reader) DecConcurrency() {
if r.increasedConcurrency {
decConcurrency()
r.increasedConcurrency = false
}
}
func initConcurrencyLimitCh() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
}
@@ -94,12 +104,12 @@ var (
concurrencyLimitChOnce sync.Once
)
func incConcurrency() error {
func incConcurrency() bool {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
select {
case concurrencyLimitCh <- struct{}{}:
return nil
return true
default:
}
@@ -108,16 +118,10 @@ func incConcurrency() error {
defer timerpool.Put(t)
select {
case concurrencyLimitCh <- struct{}{}:
return nil
return true
case <-t.C:
concurrencyLimitTimeout.Inc()
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
return false
}
}