Compare commits

..

1 Commits

Author SHA1 Message Date
hagen1778
aafbde2712 docs: update security recommendations
- move SBOM page down as general security recommendations must be mentioned first
- separate httplib security-related flags to a separate section, so it can be cross-referenced
  by products that use this lib
- mention that /metrics might require to be protected

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2026-06-18 11:15:51 +02:00
92 changed files with 441 additions and 777 deletions

View File

@@ -66,8 +66,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup Go
id: go

View File

@@ -17,7 +17,6 @@ jobs:
with:
# needed for proper diff
fetch-depth: 0
persist-credentials: false
- name: 'Validate that changelog changes are under ## tip'
run: |

View File

@@ -15,7 +15,6 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
fetch-depth: 0 # we need full history for commit verification
persist-credentials: false
- name: Check commit signatures
run: |

View File

@@ -18,8 +18,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup Go
id: go

View File

@@ -32,8 +32,6 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Set up Go
id: go

View File

@@ -21,7 +21,6 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
path: __vm
persist-credentials: false
- name: Checkout private code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -29,7 +28,6 @@ jobs:
repository: VictoriaMetrics/vmdocs
token: ${{ secrets.VM_BOT_GH_TOKEN }}
path: __vm-docs
persist-credentials: true
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@2dc316deee8e90f13e1a351ab510b4d5bc0c82cd # v7.0.0

View File

@@ -35,8 +35,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup Go
id: go
@@ -80,8 +78,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup Go
id: go
@@ -107,8 +103,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Setup Go
id: go

View File

@@ -33,8 +33,6 @@ jobs:
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: Cache node_modules
id: cache

View File

@@ -1,18 +1,9 @@
version: "2"
linters:
enable:
- errorlint
settings:
errcheck:
exclude-functions:
- (net/http.ResponseWriter).Write
errorlint:
errorf: true
# Do not enable `comparison` and `asserts`: they produce false positives,
# since many call sites intentionally compare sentinel errors directly (e.g. err == io.EOF)
# when the producer is documented to return them unwrapped. See https://github.com/VictoriaMetrics/VictoriaLogs/pull/1490
comparison: false
asserts: false
exclusions:
generated: lax
presets:

View File

@@ -315,11 +315,6 @@ func configReload(ctx context.Context, m *manager, groupsCfg []config.Group, sig
parseFn := config.Parse
for {
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return

View File

@@ -840,11 +840,6 @@ func authConfigReloader(sighupCh <-chan os.Signal) {
}
for {
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return

View File

@@ -131,13 +131,16 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
if ba.Username == "" {
return fmt.Errorf("missing `username` in `basic_auth` section")
}
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
if ba.Password != "" {
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}

View File

@@ -69,8 +69,6 @@ const (
vmAddr = "vm-addr"
vmUser = "vm-user"
vmPassword = "vm-password"
vmHeaders = "vm-headers"
vmBearerToken = "vm-bearer-token"
vmAccountID = "vm-account-id"
vmConcurrency = "vm-concurrency"
vmCompress = "vm-compress"
@@ -114,16 +112,6 @@ var (
Usage: "VictoriaMetrics password for basic auth",
EnvVars: []string{"VM_PASSWORD"},
},
&cli.StringFlag{
Name: vmHeaders,
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
},
&cli.StringFlag{
Name: vmBearerToken,
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
},
&cli.StringFlag{
Name: vmAccountID,
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +

View File

@@ -457,7 +457,7 @@ func main() {
auth.WithBearer(c.String(vmNativeDstBearerToken)),
auth.WithHeaders(c.String(vmNativeDstHeaders)))
if err != nil {
return fmt.Errorf("error initialize auth config for destination: %s: %w", dstAddr, err)
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
}
// create TLS config
@@ -596,18 +596,11 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
}
authCfg, err := auth.Generate(
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
auth.WithBearer(c.String(vmBearerToken)),
auth.WithHeaders(c.String(vmHeaders)))
if err != nil {
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %w", addr, err)
}
return vm.Config{
Addr: addr,
Transport: tr,
AuthCfg: authCfg,
User: c.String(vmUser),
Password: c.String(vmPassword),
Concurrency: uint8(c.Int(vmConcurrency)),
Compress: c.Bool(vmCompress),
AccountID: c.String(vmAccountID),

View File

@@ -74,9 +74,9 @@ func wrapErr(vmErr *vm.ImportError, verbose bool) error {
verboseMsg = "(enable `--verbose` output to get more details)"
}
if vmErr.Err == nil {
return fmt.Errorf("%w\n\tLatest delivered batch for timestamps range %d - %d %s\n%s",
return fmt.Errorf("%s\n\tLatest delivered batch for timestamps range %d - %d %s\n%s",
vmErr.Err, minTS, maxTS, verboseMsg, errTS)
}
return fmt.Errorf("%w\n\tImporting batch failed for timestamps range %d - %d %s\n%s",
return fmt.Errorf("%s\n\tImporting batch failed for timestamps range %d - %d %s\n%s",
vmErr.Err, minTS, maxTS, verboseMsg, errTS)
}

View File

@@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
@@ -28,8 +27,6 @@ type Config struct {
// --httpListenAddr value for single node version
// --httpListenAddr value of vmselect component for cluster version
Addr string
AuthCfg *auth.Config
// Transport allows specifying custom http.Transport
Transport *http.Transport
// Concurrency defines number of worker
@@ -43,6 +40,10 @@ type Config struct {
// BatchSize defines how many samples
// importer collects before sending the import request
BatchSize int
// User name for basic auth
User string
// Password for basic auth
Password string
// SignificantFigures defines the number of significant figures to leave
// in metric values before importing.
// Zero value saves all the significant decimal places
@@ -64,10 +65,11 @@ type Config struct {
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
type Importer struct {
addr string
authCfg *auth.Config
client *http.Client
importPath string
compress bool
user string
password string
close chan struct{}
input chan *TimeSeries
@@ -146,7 +148,8 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
client: client,
importPath: importPath,
compress: cfg.Compress,
authCfg: cfg.AuthCfg,
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
@@ -301,8 +304,8 @@ func (im *Importer) Ping() error {
if err != nil {
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
resp, err := im.client.Do(req)
if err != nil {
@@ -331,8 +334,8 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
if im.compress {
req.Header.Set("Content-Encoding", "gzip")

View File

@@ -6,6 +6,8 @@ import (
"flag"
"fmt"
"net/http"
nethttputil "net/http/httputil"
"net/url"
"strings"
"time"
@@ -27,7 +29,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmalertproxy"
)
var (
@@ -37,10 +38,7 @@ var (
resetCacheAuthKey = flagutil.NewPassword("search.resetCacheAuthKey", "Optional authKey for resetting rollup cache via /internal/resetRollupResultCache call. It could be passed via authKey query arg. It overrides -httpAuth.*")
logSlowQueryDuration = flag.Duration("search.logSlowQueryDuration", 5*time.Second, "Log queries with execution time exceeding this value. Zero disables slow query logging. "+
"See also -search.logQueryMemoryUsage")
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , "+
"then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert")
vmalertProxyURL = flag.String("vmalert.proxyURL", "", "Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules")
)
var slowQueries = metrics.NewCounter(`vm_slow_queries_total`)
@@ -57,8 +55,8 @@ func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Durat
concurrencyLimitCh = make(chan struct{}, maxConcurrentRequests)
initVMUIConfig()
initVMAlertProxy()
vmalertproxy.Init(*vmalertProxyURL)
flagutil.RegisterSecretFlag("vmalert.proxyURL")
}
@@ -516,11 +514,10 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
if len(*vmalertProxyURL) == 0 {
w.WriteHeader(http.StatusBadRequest)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "%s", `{"status":"error","msg":"the '-vmalert.proxyURL' command-line must be configured; `+
`see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert"}`)
fmt.Fprintf(w, "%s", `{"status":"error","msg":"for accessing vmalert flag '-vmalert.proxyURL' must be configured"}`)
return true
}
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
@@ -558,7 +555,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/rules", "/rules":
rulesRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#rules
@@ -568,7 +565,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/alerts", "/alerts":
alertsRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#alerts
@@ -578,7 +575,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
case "/api/v1/notifiers", "/notifiers":
notifiersRequests.Inc()
if len(*vmalertProxyURL) > 0 {
vmalertproxy.HandleRequest(w, r, path)
proxyVMAlertRequests(w, r, path)
return true
}
w.Header().Set("Content-Type", "application/json")
@@ -725,7 +722,48 @@ var (
metricNamesStatsResetErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/admin/status/metric_names_stats/reset"}`)
)
var vmuiConfig string
func proxyVMAlertRequests(w http.ResponseWriter, r *http.Request, path string) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
}
// Forward other panics to the caller.
panic(err)
}()
req := r.Clone(r.Context())
req.URL.Path = strings.TrimPrefix(path, "prometheus")
req.Host = vmalertProxyHost
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
// Grafana currently supports only Prometheus-style alerts. If other alert types
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
//
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
//
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
// `datasource_type=prometheus`.
//
// See:
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
q := req.URL.Query()
q.Set("datasource_type", "prometheus")
req.URL.RawQuery = q.Encode()
req.RequestURI = ""
}
vmalertProxy.ServeHTTP(w, req)
}
var (
vmalertProxyHost string
vmalertProxy *nethttputil.ReverseProxy
vmuiConfig string
)
func initVMUIConfig() {
var cfg struct {
@@ -757,3 +795,16 @@ func initVMUIConfig() {
}
vmuiConfig = string(data)
}
// initVMAlertProxy must be called after flag.Parse(), since it uses command-line flags.
func initVMAlertProxy() {
if len(*vmalertProxyURL) == 0 {
return
}
proxyURL, err := url.Parse(*vmalertProxyURL)
if err != nil {
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", *vmalertProxyURL, err)
}
vmalertProxyHost = proxyURL.Host
vmalertProxy = nethttputil.NewSingleHostReverseProxy(proxyURL)
}

View File

@@ -28,7 +28,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@@ -526,7 +525,6 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
if deletedCount > 0 {
promql.ResetRollupResultCache()
}
logger.Infof("/api/v1/admin/tsdb/delete_series has been called for %q. Deleted %d series.", sq.FiltersString(), deletedCount)
return nil
}

View File

@@ -5,7 +5,7 @@ import uPlot from "uplot";
import Button from "../../Main/Button/Button";
import { CloseIcon, DragIcon } from "../../Main/Icons";
import { SeriesItemStatsFormatted } from "../../../types";
import { STATS_ORDER_TOOLTIP } from "../../../constants/graph";
import { STATS_ORDER } from "../../../constants/graph";
export interface ChartTooltipProps {
u?: uPlot;
@@ -164,7 +164,7 @@ const ChartTooltip: FC<ChartTooltipProps> = ({
</div>
{statsFormatted && (
<table className="vm-chart-tooltip-stats">
{STATS_ORDER_TOOLTIP.map((key, i) => (
{STATS_ORDER.map((key, i) => (
<div
className="vm-chart-tooltip-stats-row"
key={i}

View File

@@ -61,7 +61,7 @@ const LegendConfigs: FC<Props> = ({ data, isCompact }) => {
label: "Hide Statistics",
value: hideStats,
onChange: onChangeStats,
info: "If enabled, hides the display of min, median, max, and last values.",
info: "If enabled, hides the display of min, median, and max values.",
}
];

View File

@@ -5,7 +5,7 @@ import "./style.scss";
import classNames from "classnames";
import { getFreeFields } from "./helpers";
import useCopyToClipboard from "../../../../../hooks/useCopyToClipboard";
import { STATS_ORDER_LEGEND } from "../../../../../constants/graph";
import { STATS_ORDER } from "../../../../../constants/graph";
import { useShowStats } from "../hooks/useShowStats";
import { useLegendFormat } from "../hooks/useLegendFormat";
import { getLabelAlias } from "../../../../../utils/metric";
@@ -80,7 +80,7 @@ const LegendItem: FC<LegendItemProps> = ({ legend, onChange, duplicateFields })
</div>
{!hideStats && showStats && (
<div className="vm-legend-item-stats">
{STATS_ORDER_LEGEND.map((key, i) => (
{STATS_ORDER.map((key, i) => (
<div
className="vm-legend-item-stats-row"
key={i}

View File

@@ -4,11 +4,11 @@ import "./style.scss";
import { LegendItemType } from "../../../../../types";
import { MouseEvent } from "react";
import classNames from "classnames";
import { STATS_ORDER_LEGEND } from "../../../../../constants/graph";
import { STATS_ORDER } from "../../../../../constants/graph";
import { useShowStats } from "../hooks/useShowStats";
import { getValueByPath } from "../../../../../utils/object";
const statsColumns = STATS_ORDER_LEGEND.map(k => ({
const statsColumns = STATS_ORDER.map(k => ({
key: `statsFormatted.${k}`,
title: k
}));

View File

@@ -26,5 +26,4 @@ export const GRAPH_SIZES: GraphSize[] = [
},
];
export const STATS_ORDER_LEGEND: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max", "last"];
export const STATS_ORDER_TOOLTIP: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max"];
export const STATS_ORDER: (keyof SeriesItemStatsFormatted)[] = ["min", "median", "max"];

View File

@@ -4,7 +4,6 @@ export interface SeriesItemStatsFormatted {
min: string,
max: string,
median: string,
last: string,
}
export interface SeriesItem extends Series {

View File

@@ -53,7 +53,6 @@ const getSeriesStatistics = (d: MetricResult) => {
min: formatPrettyNumber(min, min, max),
max: formatPrettyNumber(max, min, max),
median: formatPrettyNumber(median, min, max),
last: formatPrettyNumber(values.at(-1), min, max),
},
};
};

View File

@@ -28,7 +28,7 @@ If you like VictoriaMetrics and want to contribute, then it would be great:
## Issues
When making a new issue, make sure to create no duplicates. Use GitHub search to find whether similar issues exist already.
The new issue should be written in English and contain a concise description of the problem and the environment where it exists.
The new issue should be written in English and contain concise description of the problem and environment where it exists.
We'd very much prefer to have a specific use-case included in the description, since it could have workaround or alternative solutions.
When looking for an issue to contribute, always prefer working on [bugs](https://github.com/VictoriaMetrics/VictoriaMetrics/issues?q=is%3Aopen+is%3Aissue+label%3Abug)
@@ -48,7 +48,7 @@ We use [labels](https://docs.github.com/en/issues/using-labels-and-milestones-to
1. `need more info`, assigned to issues that require elaboration from the issue creator.
For example, if we weren't able to reproduce the reported bug based on the ticket description then we ask additional
questions which could help to reproduce the issue and add `need more info` label. This label helps other maintainers
to understand that this issue wasn't forgotten but waits for the feedback from the user.
to understand that this issue wasn't forgotten but waits for the feedback from user.
1. `completed`, assigned to issues that required code changes and those changes were merged to upstream, but not released yet.
Once a release is made, maintainers go through all labeled issues, leave a comment about the new release, and close the issue.
1. `vmui`, assigned to issues related to [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui) or [VictoriaLogs webui](https://docs.victoriametrics.com/victorialogs/querying/#web-ui)
@@ -63,31 +63,32 @@ Pull requests requirements:
1. Don't use `master` branch for making PRs, as it makes it impossible for reviewers to modify the changes.
1. All commits need to be [signed](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits).
1. Pull request title should be prefixed with `<dir>/<component>:` to show what component has been changed, i.e. `app/vmalert: fix...`.
Pull request description should contain a clear and concise description of what was done, why it is needed and for what purpose.
Pull request description should contain clear and concise description of what was done, why it is needed and for what purpose.
Use clear language, so reviewers can quickly understand the change and its impact.
1. A link to the issue(s) related to the change, if any. Use `Fixes [issue link]` if the PR resolves the issue, or `Related to [issue link]` for reference.
1. Tests proving that the change is effective. Tests are expected for non-trivial new functionality or non-trivial modifications.
Bug fixes must include tests unless a maintainer explicitly agrees otherwise.
See [this style guide](https://itnext.io/f-tests-as-a-replacement-for-table-driven-tests-in-go-8814a8b19e9e) for tests. See [this section](#testing) for how to run tests.
See [this style guide](https://itnext.io/f-tests-as-a-replacement-for-table-driven-tests-in-go-8814a8b19e9e) for tests.
To run tests and code checks locally, execute commands `make test-full` and `make check-all`.
1. Try to not extend the scope of the pull requests outside the issue, do not make unrelated changes.
1. Update [docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/docs) if needed. For example, adding a new flag or changing the behavior of existing flags or features
1. Update [docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/docs) if needed. For example, adding a new flag or changing behavior of existing flags or features
requires reflecting these changes in the documentation. For new features add `{{%/* available_from "#" */%}}` shortcode to the documentation.
It will be later automatically replaced with an actual release version.
1. A line in the [changelog](https://docs.victoriametrics.com/victoriametrics/changelog/#tip) mentioning the change and related issue in a way
that would be clear to other readers even if they don't have the full context.
1. Avoid modifying code in the `/vendor` folder manually, even when the vendored package originates from the VictoriaMetrics GitHub organization.
1. Avoid modifying code in the `/vendor` folder manually, even when the vendored package originates are from the VictoriaMetrics GitHub organization.
For instance, VictoriaLogs vendors packages under the `/lib` folder from VictoriaMetrics, and VictoriaTraces vendors the `/lib/logstorage` package from VictoriaLogs.
Submit a pull request to the upstream repository first. Afterward, a separate pull request can be opened to update the version of the vendored folder in the downstream repository.
Submit a pull request to the upstream repository first. Afterward, a separate pull request can be opened to update the version of the vendored folder in downstream repository.
* For common packages, the vendored package can be updated with this command: `go get <dependency>@vX.Y.Z`.
* For VictoriaMetrics packages, use `go get <dependency>@canonical_commit_hash`.
Finally, run `go mod tidy` and `go mod vendor` to update `go.mod`, `go.sum`, and `/vendor`.
1. Ping reviewers who you think have the best expertise on the matter.
See a good example of a [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6487).
See good example of a [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6487).
## Merging Pull Request
The person who merges the Pull Request is responsible for satisfying the requirements below:
The person who merges the Pull Request is responsible for satisfying requirements below:
1. Make sure that PR satisfies [Pull Request checklist](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist),
it is approved by at least one reviewer, all CI checks are green.
@@ -96,9 +97,9 @@ The person who merges the Pull Request is responsible for satisfying the require
1. If applicable, cherry-pick the change to [LTS release lines](https://docs.victoriametrics.com/victoriametrics/lts-releases/)
and mention in the PR comment what was or wasn't cherry-picked.
1. Update related issues with a meaningful message of what has changed and when it will be
released. _This helps users to understand the change without reading the PR._
released. _This helps users to understand the change without reading PR._
1. Add label `completed` to related issues.
1. Do not close related tickets until the release is made. If the ticket was auto-closed by GitHub or a user - re-open it.
1. Do not close related tickets until release is made. If ticket was auto-closed by GitHub or user - re-open it.
## KISS principle
@@ -114,9 +115,9 @@ We are open to third-party pull requests provided they follow [KISS design princ
- Minimize the number of moving parts in the distributed system.
- Avoid automated decisions, which may hurt cluster availability, consistency, performance or debuggability.
Adhering to the `KISS` principle, simplifies the resulting code and architecture so it can be reviewed, understood and debugged by a wider audience.
Adhering to `KISS` principle, simplifies the resulting code and architecture so it can be reviewed, understood and debugged by a wider audience.
Due to `KISS`, [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) has none of the following "features" popular in distributed computing:
Due to `KISS`, [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) has none of the following "features" popular in distributed computing world:
- Fragile gossip protocols. See [failed attempt in Thanos](https://github.com/improbable-eng/thanos/blob/030bc345c12c446962225221795f4973848caab5/docs/proposals/completed/201809_gossip-removal.md).
- Hard-to-understand-and-implement-properly [Paxos protocols](https://www.quora.com/In-distributed-systems-what-is-a-simple-explanation-of-the-Paxos-algorithm).
@@ -125,17 +126,3 @@ Due to `KISS`, [cluster version of VictoriaMetrics](https://docs.victoriametrics
- Automatic cluster resizing, which may cost you a lot of money if improperly configured.
- Automatic discovering and addition of new nodes in the cluster, which may mix data between dev and prod clusters :)
- Automatic leader election, which may result in split brain disaster on network errors.
## Testing
We recommend running the following sequence of checks and tests before submitting a pull request:
```sh
# run static checks
make check-all
# run unit test
make test-full
# run integration tests
make apptest
```

View File

@@ -1712,30 +1712,21 @@ The following versions of VictoriaMetrics receive regular security fixes:
| [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | ✅ |
| other releases | ❌ |
### Software Bill of Materials (SBOM)
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
To inspect the SBOM for an image:
```sh
docker buildx imagetools inspect \
docker.io/victoriametrics/victoria-metrics:latest \
--format "{{ json .SBOM }}"
```
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
```sh
trivy image --sbom-sources oci \
docker.io/victoriametrics/victoria-metrics:latest
```
### Reporting a Vulnerability
Please report any security issues to <security@victoriametrics.com>
### CVE handling policy
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
When detected, only the Alpine base tag is updated.
Releases proceed as planned even if upstream fixes are not yet available.
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
### General security recommendations:
* All the VictoriaMetrics components must run in protected private networks without direct access from untrusted networks such as Internet.
@@ -1749,14 +1740,24 @@ Please report any security issues to <security@victoriametrics.com>
* Set reasonable [`Content-Security-Policy`](https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP) header value to mitigate [XSS attacks](https://en.wikipedia.org/wiki/Cross-site_scripting). See `-http.header.csp` flag.
* Set reasonable [`X-Frame-Options`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) header value to mitigate [clickjacking attacks](https://en.wikipedia.org/wiki/Clickjacking), for example `DENY`. See `-http.header.frameOptions` flag.
VictoriaMetrics provides the following security-related command-line flags:
There are the following security-related command-line flags for all components with HTTP API:
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr` (TCP port 8428 is listened by default).
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr`.
[Enterprise version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports automatic issuing of TLS certificates.
See [these docs](#automatic-issuing-of-tls-certificates).
* `-mtls` and `-mtlsCAFile` for enabling [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication) for requests to `-httpListenAddr`. See [these docs](#mtls-protection).
* `-httpAuth.username` and `-httpAuth.password` for protecting all the HTTP endpoints
with [HTTP Basic Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication).
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
and `X-Frame-Options` HTTP response headers.
### Protecting service endpoints
All VictoriaMetrics components expose internal metrics in Prometheus exposition format at `/metrics` page for [#Monitoring](https://docs.victoriametrics.com/victoriametrics/#monitoring).
Consider limiting access to `/metrics` page to trusted networks only.
There are other service endpoints that might require protection:
* `-deleteAuthKey` for protecting the `/api/v1/admin/tsdb/delete_series` endpoint. See [how to delete time series](#how-to-delete-time-series).
* `-snapshotAuthKey` for protecting the `/snapshot*` endpoints. See [how to work with snapshots](#how-to-work-with-snapshots).
* `-forceFlushAuthKey` for protecting the `/internal/force_flush` endpoint. See [force flush docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#forced-flush).
@@ -1768,8 +1769,6 @@ VictoriaMetrics provides the following security-related command-line flags:
* `-pprofAuthKey` for protecting the `/debug/pprof/*` endpoints, which can be used for [profiling](#profiling).
* `-metricNamesStatsResetAuthKey` for protecting the `/api/v1/admin/status/metric_names_stats/reset` endpoint, used for [Metric Names Tracker](#track-ingested-metrics-usage).
* `-denyQueryTracing` for disallowing [query tracing](#query-tracing).
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
and `X-Frame-Options` HTTP response headers.
Explicitly set internal network interface for TCP and UDP ports for data ingestion with Graphite and OpenTSDB formats.
For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<internal_iface_ip>:2003`. This protects from unexpected requests from untrusted network interfaces.
@@ -1777,17 +1776,6 @@ For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<i
See also [security recommendation for VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#security)
and [the general security page at VictoriaMetrics website](https://victoriametrics.com/security/).
### CVE handling policy
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
When detected, only the Alpine base tag is updated.
Releases proceed as planned even if upstream fixes are not yet available.
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
### mTLS protection
By default `VictoriaMetrics` accepts http requests at `8428` port (this port can be changed via `-httpListenAddr` command-line flags).
@@ -1817,6 +1805,26 @@ This functionality can be evaluated for free according to [these docs](https://d
See also [security recommendations](#security).
### Software Bill of Materials (SBOM)
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
To inspect the SBOM for an image:
```sh
docker buildx imagetools inspect \
docker.io/victoriametrics/victoria-metrics:latest \
--format "{{ json .SBOM }}"
```
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
```sh
trivy image --sbom-sources oci \
docker.io/victoriametrics/victoria-metrics:latest
```
## Tuning
* No need in tuning for VictoriaMetrics - it uses reasonable defaults for command-line flags,

View File

@@ -26,15 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -95,8 +95,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string
@@ -623,7 +621,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
-version
Show VictoriaMetrics version
-vmalert.proxyURL string
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmalert
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
-vmui.customDashboardsPath string
Optional path to vmui dashboards. See https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards
-vmui.defaultTimezone string

View File

@@ -74,8 +74,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -115,8 +115,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmalert/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -59,8 +59,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmauth/ .
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -400,8 +400,6 @@ Run `vmbackup -help` in order to see all the available options:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -575,8 +575,6 @@ command-line flags:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -496,8 +496,6 @@ Below is the list of configuration flags (it can be viewed by running `./vmgatew
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -76,8 +76,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -102,8 +102,6 @@ Run `vmrestore -help` in order to see all the available options:
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

View File

@@ -75,8 +75,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string
@@ -325,7 +323,7 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
-version
Show VictoriaMetrics version
-vmalert.proxyURL string
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules . See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#vmalert
Optional URL for proxying requests to vmalert. For example, if -vmalert.proxyURL=http://vmalert:8880 , then alerting API requests such as /api/v1/rules from Grafana will be proxied to http://vmalert:8880/api/v1/rules
-vmstorageDialTimeout duration
Timeout for establishing RPC connections from vmselect to vmstorage. See also -vmstorageUserTimeout (default 3s)
-vmstorageUserTimeout duration

View File

@@ -68,8 +68,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/cluster-victori
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
-http.header.csp string
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
-http.header.disableServerHostname
Whether to disable 'X-Server-Hostname' header in HTTP responses
-http.header.frameOptions string
Value for 'X-Frame-Options' header
-http.header.hsts string

2
go.mod
View File

@@ -7,7 +7,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.43.2

4
go.sum
View File

@@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0/go.mod h1:6ZZMQhZKDvUvkJw2rc+oDP90tMMzuU/J+5HG1ZmPOmE=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3 h1:W5gA6Jo/kvi/LyAgmm1D5C1nTcCouKtzNy2pgRHpPoE=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3/go.mod h1:H4sDxcvk6OmC6zOt++IlDyrwfbn4F1eSLwMpR+kpRt8=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0 h1:2x1Tszv41PnCdSMumEtejz/On1RQ45kHQ+hhKT53sOk=
github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0/go.mod h1:fQtmzaSUL+HJmHozeAKmnTJTOMBT+vBccv/VWQEwhUQ=
github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7Wn2dg/6oIE=
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=

View File

@@ -64,10 +64,9 @@ var (
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
headerDisableServerHostname = flag.Bool("http.header.disableServerHostname", false, "Whether to disable 'X-Server-Hostname' header in HTTP responses")
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
headerFrameOptions = flag.String("http.header.frameOptions", "", "Value for 'X-Frame-Options' header")
headerCSP = flag.String("http.header.csp", "", `Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"`)
disableCORS = flag.Bool("http.disableCORS", false, `Disable CORS for all origins (*)`)
)
@@ -330,9 +329,7 @@ func handlerWrapper(w http.ResponseWriter, r *http.Request, rh RequestHandler) {
if *headerCSP != "" {
h.Add("Content-Security-Policy", *headerCSP)
}
if !*headerDisableServerHostname {
h.Add("X-Server-Hostname", hostname)
}
h.Add("X-Server-Hostname", hostname)
requestsTotal.Inc()
if whetherToCloseConn(r) {
connTimeoutClosedConns.Inc()

View File

@@ -228,30 +228,4 @@ func TestHandlerWrapper(t *testing.T) {
if got := h.Get("Content-Security-Policy"); got != cspHeader {
t.Fatalf("unexpected CSP header; got %q; want %q", got, cspHeader)
}
if got := h.Get("X-Server-Hostname"); got != hostname {
t.Fatalf("unexpected X-Server-Hostname header; got %q; want %q", got, hostname)
}
}
func TestHandlerWrapperDisableServerHostnameHeader(t *testing.T) {
origDisableServerHostname := *headerDisableServerHostname
*headerDisableServerHostname = true
defer func() {
*headerDisableServerHostname = origDisableServerHostname
}()
req, _ := http.NewRequest("GET", "/health", nil)
srv := &server{s: &http.Server{}}
w := &httptest.ResponseRecorder{}
handlerWrapper(w, req, func(w http.ResponseWriter, r *http.Request) bool {
return builtinRoutesHandler(srv, r, w, func(_ http.ResponseWriter, _ *http.Request) bool {
return true
})
})
if got := w.Header().Get("X-Server-Hostname"); got != "" {
t.Fatalf("unexpected X-Server-Hostname header; got %q; want empty value", got)
}
}

View File

@@ -52,9 +52,6 @@ func TestGetTimeSuccess(t *testing.T) {
f("292277025-08-18T07:12:54.999999999Z", maxTimeMsecs)
f("1562529662.324", 1562529662324)
f("1223372036.855", 1223372036855)
// relative duration that resolves to a timestamp before 1970
f("-9223372036.854", minTimeMsecs)
}
func TestGetTimeError(t *testing.T) {
@@ -66,8 +63,8 @@ func TestGetTimeError(t *testing.T) {
t.Fatalf("unexpected error in NewRequest: %s", err)
}
if msec, err := GetTime(r, "s", 123); err == nil {
t.Fatalf("expecting non-nil error in GetTime(%q); got %d", s, msec)
if _, err := GetTime(r, "s", 123); err == nil {
t.Fatalf("expecting non-nil error in GetTime(%q)", s)
}
}
@@ -87,6 +84,7 @@ func TestGetTimeError(t *testing.T) {
f("123md")
f("-12.3md")
// relative duration outside the allowed range
// relative duration that resolves to a timestamp before 1970
f("-9223372036.854")
f("-9223372036.855")
}

View File

@@ -48,12 +48,6 @@ const maxPartSize = 400e9
// The interval for flushing buffered data to parts, so it becomes visible to search.
const pendingItemsFlushInterval = time.Second
// The default interval for calling flushCallback when there is pending data to flush.
//
// It is set relatively high in order to improve the effectiveness of caches reset by flushCallback.
// It is used when the flushCallbackInterval arg at MustOpenTable is set to zero.
const defaultFlushCallbackInterval = 10 * time.Second
// maxItemsPerCachedPart is the maximum items per created part by the merge,
// which must be cached in the OS page cache.
//
@@ -94,7 +88,6 @@ type Table struct {
flushInterval time.Duration
flushCallback func()
flushCallbackInterval time.Duration
needFlushCallbackCall atomic.Bool
prepareBlock PrepareBlockCallback
@@ -339,14 +332,11 @@ func (pw *partWrapper) decRef() {
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
// The flushCallbackInterval is how often flushCallback is invoked when there is
// pending data to flush. If it is set to zero, then defaultFlushCallbackInterval is used.
//
// Optional prepareBlock is called during merge before flushing the prepared block
// to persistent storage.
//
// The table is created if it doesn't exist yet.
func MustOpenTable(path string, flushInterval time.Duration, flushCallback func(), flushCallbackInterval time.Duration, prepareBlock PrepareBlockCallback, isReadOnly *atomic.Bool) *Table {
func MustOpenTable(path string, flushInterval time.Duration, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *atomic.Bool) *Table {
path = filepath.Clean(path)
if flushInterval < pendingItemsFlushInterval {
@@ -355,10 +345,6 @@ func MustOpenTable(path string, flushInterval time.Duration, flushCallback func(
flushInterval = pendingItemsFlushInterval
}
if flushCallbackInterval <= 0 {
flushCallbackInterval = defaultFlushCallbackInterval
}
// Create a directory at the path if it doesn't exist yet.
fs.MustMkdirIfNotExist(path)
@@ -369,15 +355,14 @@ func MustOpenTable(path string, flushInterval time.Duration, flushCallback func(
fs.MustSyncPathAndParentDir(path)
tb := &Table{
path: path,
flushInterval: flushInterval,
flushCallback: flushCallback,
flushCallbackInterval: flushCallbackInterval,
prepareBlock: prepareBlock,
isReadOnly: isReadOnly,
fileParts: pws,
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
stopCh: make(chan struct{}),
path: path,
flushInterval: flushInterval,
flushCallback: flushCallback,
prepareBlock: prepareBlock,
isReadOnly: isReadOnly,
fileParts: pws,
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
stopCh: make(chan struct{}),
}
tb.mergeIdx.Store(uint64(time.Now().UnixNano()))
tb.rawItems.init()
@@ -444,9 +429,9 @@ func (tb *Table) startFlushCallbackWorker() {
}
tb.wg.Go(func() {
// call flushCallback at flushCallbackInterval in order to improve the effectiveness
// of caches, which are reset by the flushCallback.
d := timeutil.AddJitterToDuration(tb.flushCallbackInterval)
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
d := timeutil.AddJitterToDuration(time.Second * 10)
tc := time.NewTicker(d)
for {
select {

View File

@@ -40,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) {
func() {
// Re-open the table and verify the search works.
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
defer tb.MustClose()
if err := testTableSearchSerial(tb, items); err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -70,7 +70,7 @@ func TestTableSearchConcurrent(t *testing.T) {
// Re-open the table and verify the search works.
func() {
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
defer tb.MustClose()
if err := testTableSearchConcurrent(tb, items); err != nil {
t.Fatalf("unexpected error: %s", err)
@@ -144,7 +144,7 @@ func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string,
flushes.Add(1)
}
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, flushCallback, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, flushCallback, nil, &isReadOnly)
items := make([]string, itemsCount)
for i := range itemsCount {
item := fmt.Sprintf("%d:%d", r.Intn(1e9), i)

View File

@@ -33,7 +33,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
// Force finishing pending merges
tb.MustClose()
var isReadOnly atomic.Bool
tb = MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb = MustOpenTable(path, 0, nil, nil, &isReadOnly)
defer tb.MustClose()
keys := make([][]byte, len(items))
@@ -94,7 +94,7 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu
}
ts.Seek(searchKey)
if !ts.NextItem() {
panic(fmt.Errorf("BUG: NextItem must return true for searchKeys[%d]=%q; err=%w", i, searchKey, ts.Error()))
panic(fmt.Errorf("BUG: NextItem must return true for searchKeys[%d]=%q; err=%v", i, searchKey, ts.Error()))
}
if !bytes.HasPrefix(ts.Item, searchKey) {
panic(fmt.Errorf("BUG: unexpected item found for searchKey[%d]=%q; got %q; want %q", i, searchKey, ts.Item, key))

View File

@@ -18,14 +18,14 @@ func TestTableOpenClose(t *testing.T) {
// Create a new table
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
// Close it
tb.MustClose()
// Re-open created table multiple times.
for range 4 {
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
tb.MustClose()
}
}
@@ -35,7 +35,7 @@ func TestTableAddItemsTooLongItem(t *testing.T) {
fs.MustRemoveDir(path)
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
tb.MustClose()
fs.MustRemoveDir(path)
@@ -52,7 +52,7 @@ func TestTableAddItemsSerial(t *testing.T) {
flushes.Add(1)
}
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, flushCallback, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, flushCallback, nil, &isReadOnly)
const itemsCount = 10e3
testAddItemsSerial(r, tb, itemsCount)
@@ -75,7 +75,7 @@ func TestTableAddItemsSerial(t *testing.T) {
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb = MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb = MustOpenTable(path, 0, nil, nil, &isReadOnly)
const moreItemsCount = itemsCount * 3
testAddItemsSerial(r, tb, moreItemsCount)
tb.MustClose()
@@ -99,7 +99,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
fs.MustRemoveDir(path)
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
// Write a lot of items into the table, so background merges would start.
const itemsCount = 3e5
@@ -111,7 +111,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
// Close and open the table in order to flush all the data to disk before creating snapshots.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4272#issuecomment-1550221840
tb.MustClose()
tb = MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb = MustOpenTable(path, 0, nil, nil, &isReadOnly)
// Create multiple snapshots.
snapshot1 := path + "-test-snapshot1"
@@ -121,8 +121,8 @@ func TestTableCreateSnapshotAt(t *testing.T) {
tb.MustCreateSnapshotAt(snapshot2)
// Verify snapshots contain all the data.
tb1 := MustOpenTable(snapshot1, 0, nil, 0, nil, &isReadOnly)
tb2 := MustOpenTable(snapshot2, 0, nil, 0, nil, &isReadOnly)
tb1 := MustOpenTable(snapshot1, 0, nil, nil, &isReadOnly)
tb2 := MustOpenTable(snapshot2, 0, nil, nil, &isReadOnly)
var ts, ts1, ts2 TableSearch
ts.Init(tb, false)
@@ -197,7 +197,7 @@ func TestTableAddItemsConcurrentStress(t *testing.T) {
}
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, flushCallback, 0, prepareBlock, &isReadOnly)
tb := MustOpenTable(path, 0, flushCallback, prepareBlock, &isReadOnly)
testAddItems(tb)
@@ -232,7 +232,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
return data, items
}
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, flushCallback, 0, prepareBlock, &isReadOnly)
tb := MustOpenTable(path, 0, flushCallback, prepareBlock, &isReadOnly)
const itemsCount = 10e3
testAddItemsConcurrent(tb, itemsCount)
@@ -255,7 +255,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb = MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb = MustOpenTable(path, 0, nil, nil, &isReadOnly)
const moreItemsCount = itemsCount * 3
testAddItemsConcurrent(tb, moreItemsCount)
tb.MustClose()
@@ -292,7 +292,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
for range 10 {
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != uint64(itemsCount) {
@@ -308,7 +308,7 @@ func TestTableMustMergeInmemoryPartsFinal_pwsRefCount(t *testing.T) {
defer fs.MustRemoveDir(path)
var isReadOnly atomic.Bool
tb := MustOpenTable(path, 0, nil, 0, nil, &isReadOnly)
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
defer tb.MustClose()
generatePartWrappers := func(n int) []*partWrapper {

View File

@@ -155,22 +155,9 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
tickerCh = ticker.C
defer ticker.Stop()
}
stop := func() {
cfg.mustStop()
logger.Infof("stopping Prometheus scrapers")
startTime := time.Now()
scs.stop()
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
}
for {
scs.updateConfig(cfg)
waitForChans:
select {
case <-globalStopCh:
stop()
return
default:
}
select {
case <-sighupCh:
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
@@ -209,7 +196,11 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-globalStopCh:
stop()
cfg.mustStop()
logger.Infof("stopping Prometheus scrapers")
startTime := time.Now()
scs.stop()
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
return
}
}

View File

@@ -172,7 +172,7 @@ func mustOpenIndexDB(id uint64, tr TimeRange, name, path string, s *Storage, isR
}
tfssCache := lrucache.NewCache(getTagFiltersCacheSize)
tb := mergeset.MustOpenTable(path, dataFlushInterval, tfssCache.Reset, 0, mergeTagToMetricIDsRows, isReadOnly)
tb := mergeset.MustOpenTable(path, dataFlushInterval, tfssCache.Reset, mergeTagToMetricIDsRows, isReadOnly)
db := &indexDB{
legacyMinMissingTimestampByKey: make(map[string]int64),
id: id,

View File

@@ -468,21 +468,15 @@ func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
return src, nil
}
// String returns string representation of the search query: tag filters and time range.
// String returns string representation of the search query.
func (sq *SearchQuery) String() string {
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
a := sq.FiltersString()
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
}
// FiltersString returns string representation of the tag filters.
func (sq *SearchQuery) FiltersString() []string {
a := make([]string, len(sq.TagFilterss))
for i, tfs := range sq.TagFilterss {
a[i] = tagFiltersToString(tfs)
}
return a
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
}
func tagFiltersToString(tfs []TagFilter) string {

View File

@@ -19,7 +19,7 @@ func ParseDuration(s string) (time.Duration, error) {
return 0, err
}
if ms < minValidMilli || maxValidMilli < ms {
return 0, fmt.Errorf("duration %q must be in the range [%s, %s]", s, minDuration, maxDuration)
return 0, fmt.Errorf("duration %q must be in the range [%v, %v]", s, minDuration, maxDuration)
}
return time.Duration(ms) * time.Millisecond, nil
}

View File

@@ -28,6 +28,7 @@ func ParseTimeMsec(s string) (int64, error) {
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats
//
// If s doesn't contain timezone information, then the local timezone is used.
// The time must be in the range [1970-01-01T00:00:00Z, 2262-04-11T23:47:16Z].
//
// It returns unix timestamp in nanoseconds.
func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
@@ -70,10 +71,14 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
if err != nil {
return 0, err
}
if d < 0 {
if d > 0 {
d = -d
}
return subInt64NoOverflow(currentTimestamp, int64(d)), nil
nsec := currentTimestamp + int64(d)
if nsec < 0 {
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, time.Unix(0, nsec).UTC(), minTime, maxTime)
}
return nsec, nil
}
if len(s) == 4 {
// Parse YYYY
@@ -110,28 +115,22 @@ func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
return parseTimeAt(time.RFC3339, sOrig, 0, sOrig)
}
func parseTimeAt(layout, value string, tzOffsetNsec int64, sOrig string) (int64, error) {
var (
minTime = time.Unix(0, 0).UTC()
maxTime = time.Unix(0, math.MaxInt64).UTC()
)
func parseTimeAt(layout, value string, tzOffsetNanos int64, sOrig string) (int64, error) {
t, err := time.Parse(layout, value)
if err != nil {
return 0, err
}
nsec := t.UnixNano()
return subInt64NoOverflow(nsec, -tzOffsetNsec), nil
}
func subInt64NoOverflow(a, b int64) int64 {
if b >= 0 {
if a < math.MinInt64+b {
return math.MinInt64
}
return a - b
tzOffset := time.Duration(tzOffsetNanos)
t = t.UTC().Add(tzOffset)
if t.Before(minTime) || t.After(maxTime) {
return 0, fmt.Errorf("time %s (%v) must be in the range [%v, %v]", sOrig, t, minTime, maxTime)
}
if a > math.MaxInt64+b {
return math.MaxInt64
}
return a - b
return t.UnixNano(), nil
}
// TryParseUnixTimestamp parses s as unix timestamp in seconds, milliseconds, microseconds or nanoseconds and returns the parsed timestamp in nanoseconds.

View File

@@ -210,7 +210,6 @@ func TestParseTimeAtLimits(t *testing.T) {
f := func(s string, wantTime time.Time) {
t.Helper()
got, err := ParseTimeAt(s, now.UnixNano())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -232,38 +231,42 @@ func TestParseTimeAtLimits(t *testing.T) {
west := location(t, "Etc/GMT+12") // UTC-12:00
var s string
// min timestamp
f("0", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
s = fmt.Sprintf("-%d", now.Unix())
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
s = fmt.Sprintf("now-%d", now.Unix())
f(s, time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
// min year
f("1678Z", time.Date(1678, 1, 1, 0, 0, 0, 0, time.UTC))
f("1678+14:00", time.Date(1678, 1, 1, 0, 0, 0, 0, east))
f("1678-12:00", time.Date(1678, 1, 1, 0, 0, 0, 0, west))
f("1970Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1971+14:00", time.Date(1971, 1, 1, 0, 0, 0, 0, east))
f("1970-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min month
f("1677-10Z", time.Date(1677, 10, 1, 0, 0, 0, 0, time.UTC))
f("1677-10+14:00", time.Date(1677, 10, 1, 0, 0, 0, 0, east))
f("1677-10-12:00", time.Date(1677, 10, 1, 0, 0, 0, 0, west))
f("1970-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-02+14:00", time.Date(1970, 2, 1, 0, 0, 0, 0, east))
f("1970-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min day
f("1677-09-22Z", time.Date(1677, 9, 22, 0, 0, 0, 0, time.UTC))
f("1677-09-22+14:00", time.Date(1677, 9, 22, 0, 0, 0, 0, east))
f("1677-09-22-12:00", time.Date(1677, 9, 22, 0, 0, 0, 0, west))
f("1970-01-01Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-02+14:00", time.Date(1970, 1, 2, 0, 0, 0, 0, east))
f("1970-01-01-12:00", time.Date(1970, 1, 1, 0, 0, 0, 0, west))
// min hour
f("1677-09-21T01Z", time.Date(1677, 9, 21, 1, 0, 0, 0, time.UTC))
f("1677-09-21T15+14:00", time.Date(1677, 9, 21, 15, 0, 0, 0, east))
f("1677-09-21T01+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T01-12:00", time.Date(1677, 9, 21, 1, 0, 0, 0, west))
f("1970-01-01T00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// min minute
f("1677-09-21T00:12Z", time.Date(1677, 9, 21, 0, 12, 0, 0, time.UTC))
f("1677-09-21T15:12Z+14:00", time.Date(1677, 9, 21, 15, 12, 0, 0, east))
f("1677-09-21T00:13Z+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T00:13Z-12:00", time.Date(1677, 9, 21, 0, 13, 0, 0, west))
f("1970-01-01T00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// min second
f("1677-09-21T00:12:43Z", time.Date(1677, 9, 21, 0, 12, 43, 0, time.UTC))
f("1677-09-21T15:12:43Z+14:00", time.Date(1677, 9, 21, 15, 12, 43, 0, east))
f("1677-09-21T00:12:44Z+14:00", time.Unix(0, math.MinInt64))
f("1677-09-21T00:12:44Z-12:00", time.Date(1677, 9, 21, 0, 12, 44, 0, west))
f("1970-01-01T00:00:00Z", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC))
f("1970-01-01T14:00:00+14:00", time.Date(1970, 1, 1, 14, 0, 0, 0, east))
f("1969-12-31T12:00:00-12:00", time.Date(1969, 12, 31, 12, 0, 0, 0, west))
// max year
f("2262Z", time.Date(2262, 1, 1, 0, 0, 0, 0, time.UTC))
@@ -277,26 +280,23 @@ func TestParseTimeAtLimits(t *testing.T) {
// max day
f("2262-04-11Z", time.Date(2262, 4, 11, 0, 0, 0, 0, time.UTC))
f("2262-04-11+14:00", time.Date(2262, 4, 11, 0, 0, 0, 0, east))
f("2262-04-12+14:00", time.Date(2262, 4, 12, 0, 0, 0, 0, east))
f("2262-04-11-12:00", time.Date(2262, 4, 11, 0, 0, 0, 0, west))
// max hour
f("2262-04-11T23Z", time.Date(2262, 4, 11, 23, 0, 0, 0, time.UTC))
f("2262-04-11T23+14:00", time.Date(2262, 4, 11, 23, 0, 0, 0, east))
f("2262-04-12T13+14:00", time.Date(2262, 4, 12, 13, 0, 0, 0, east))
f("2262-04-11T11-12:00", time.Date(2262, 4, 11, 11, 0, 0, 0, west))
f("2262-04-11T23-12:00", time.Unix(0, math.MaxInt64))
// max minute
f("2262-04-11T23:47Z", time.Date(2262, 4, 11, 23, 47, 0, 0, time.UTC))
f("2262-04-11T23:47+14:00", time.Date(2262, 4, 11, 23, 47, 0, 0, east))
f("2262-04-12T13:47+14:00", time.Date(2262, 4, 12, 13, 47, 0, 0, east))
f("2262-04-11T11:47-12:00", time.Date(2262, 4, 11, 11, 47, 0, 0, west))
f("2262-04-11T23:47-12:00", time.Unix(0, math.MaxInt64))
// max second
f("2262-04-11T23:47:16Z", time.Date(2262, 4, 11, 23, 47, 16, 0, time.UTC))
f("2262-04-11T23:47:16+14:00", time.Date(2262, 4, 11, 23, 47, 16, 0, east))
f("2262-04-12T13:47:16+14:00", time.Date(2262, 4, 12, 13, 47, 16, 0, east))
f("2262-04-11T11:47:16-12:00", time.Date(2262, 4, 11, 11, 47, 16, 0, west))
f("2262-04-11T23:47:16-12:00", time.Unix(0, math.MaxInt64))
// max timestamp
s = fmt.Sprintf("%d", int64(maxValidSecond))
@@ -324,6 +324,85 @@ func TestParseTimeAtLimits(t *testing.T) {
f(s, time.Date(1970, 4, 17, 18, 2, 52, 36_854_776, time.UTC))
}
func TestParseTimeAtOutsideLimits(t *testing.T) {
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
f := func(s string) {
t.Helper()
got, err := ParseTimeAt(s, now.UnixNano())
if err == nil {
t.Fatalf("expected error but got %d", got)
}
if !strings.Contains(err.Error(), "must be in the range") {
t.Fatalf("expected error: %v", err)
}
}
// min timestamp
f(fmt.Sprintf("-%d", now.Unix()+1))
f(fmt.Sprintf("now-%d", now.Unix()+1))
// min year
f("1969Z")
f("1970+14:00")
f("1969-12:00")
// min month
f("1969-12Z")
f("1970-01+14:00")
f("1969-12-12:00")
// min day
f("1969-12-31Z")
f("1970-01-01+14:00")
f("1969-12-31-12:00")
// min hour
f("1969-12-31T23Z")
f("1970-01-01T13+14:00")
f("1969-12-31T11-12:00")
// min minute
f("1969-12-31T23:59Z")
f("1970-01-01T13:59+14:00")
f("1969-12-31T11:59-12:00")
// min second
f("1969-12-31T23:59:59Z")
f("1970-01-01T13:59:59+14:00")
f("1969-12-31T11:59:59-12:00")
// max year
f("2263Z")
f("2263+14:00")
f("2263-12:00")
// max month
f("2262-05Z")
f("2262-05+14:00")
f("2262-05-12:00")
// max day
f("2262-04-12Z")
f("2262-04-13+14:00")
f("2262-04-12-12:00")
// max hour
f("2262-04-12T00Z")
f("2262-04-12T14+14:00")
f("2262-04-11T12-12:00")
// max minute
f("2262-04-11T23:48Z")
f("2262-04-12T13:48+14:00")
f("2262-04-11T11:48-12:00")
// max second
f("2262-04-11T23:47:17Z")
f("2262-04-12T13:47:17+14:00")
f("2262-04-11T11:47:17-12:00")
}
func TestParseTimeAtOutsideLimits_Nanos(t *testing.T) {
now := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
@@ -355,6 +434,7 @@ func TestParseTimeMsecFailure(t *testing.T) {
}
f("")
f("2263")
f("23-45:50")
f("1223-fo:ba")
f("1223-12:ba")

View File

@@ -1,68 +0,0 @@
package vmalertproxy
import (
"net/http"
"net/http/httputil"
"net/url"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// Init initializes proxying requests to the given proxyURL when calling HandleRequest.
//
// Init must be called after flag.Parse(), since it uses command-line flags.
func Init(proxyURL string) {
if len(proxyURL) == 0 {
return
}
pu, err := url.Parse(proxyURL)
if err != nil {
logger.Fatalf("cannot parse -vmalert.proxyURL=%q: %s", proxyURL, err)
}
vmalertProxyHost = pu.Host
vmalertProxy = httputil.NewSingleHostReverseProxy(pu)
}
// HandleRequest proxies the given request path to vmalert at proxyURL passed to Init().
func HandleRequest(w http.ResponseWriter, r *http.Request, path string) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
}
// Forward other panics to the caller.
panic(err)
}()
req := r.Clone(r.Context())
req.URL.Path = path
req.Host = vmalertProxyHost
if strings.HasPrefix(r.Header.Get(`User-Agent`), `Grafana`) {
// Grafana currently supports only Prometheus-style alerts. If other alert types
// (e.g. logs or traces) are returned, it may fail with "Error loading alerts".
//
// Grafana queries the vmalert API directly, bypassing the VictoriaMetrics datasource,
// so query params (such as datasource_type) cannot be enforced on the Grafana side.
//
// To ensure compatibility, we detect Grafana requests via the User-Agent and enforce
// `datasource_type=prometheus`.
//
// See:
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/329#issuecomment-3847585443
// - https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/59
q := req.URL.Query()
q.Set("datasource_type", "prometheus")
req.URL.RawQuery = q.Encode()
req.RequestURI = ""
}
vmalertProxy.ServeHTTP(w, req)
}
var (
vmalertProxyHost string
vmalertProxy *httputil.ReverseProxy
)

View File

@@ -150,7 +150,7 @@ func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
putValuesEncoder(ve)
ch.valuesSize = uint64(len(bb.B))
if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
}
ch.valuesOffset = bloomValuesWriter.values.bytesWritten
bloomValuesWriter.values.MustWrite(bb.B)

View File

@@ -6,7 +6,7 @@ import (
var maxStringRangeValue = string([]byte{255, 255, 255, 255})
// filterStringRange matches the given string range [minValue..maxValue)
// filterStringRange matches tie given string range [minValue..maxValue)
//
// Note that the minValue is included in the range, while the maxValue isn't included in the range.
// This simplifies querying distinct log sets with string_range(A, B), string_range(B, C), etc.

View File

@@ -34,9 +34,6 @@ func parseIfFilter(lex *lexer) (*ifFilter, error) {
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter: %w", err)
}
if lex.isKeyword(";") {
lex.nextToken()
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("unexpected token %q after 'if' filter; expecting ')'", lex.token)
}

View File

@@ -7,7 +7,6 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -106,7 +105,7 @@ func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb {
s: s,
}
var isReadOnly atomic.Bool
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, time.Second, mergeTagToStreamIDsRows, &isReadOnly)
idb.tb = mergeset.MustOpenTable(path, s.flushInterval, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly)
return idb
}

View File

@@ -226,24 +226,6 @@ func (lex *lexer) checkPrevAdjacentToken(tokens ...string) error {
return nil
}
func (lex *lexer) isQueryPartTrailer() bool {
return lex.isKeywordAny(queryPartTrailers)
}
var queryPartTrailers = []string{
// filters and pipes are delimited by |
"|",
// query finish with )
")",
// query finish with ;
";",
// query finish with EOF
"",
}
func (lex *lexer) isKeyword(keywords ...string) bool {
return lex.isKeywordAny(keywords)
}
@@ -901,7 +883,10 @@ func (q *Query) addTimeFilterNoSubqueries(start, end int64) {
q.f = addTimeFilter(q.f, start, end, q.opts.timeOffset)
q.initStatsRateFuncStepsNoSubqueries()
// Initialize rate functions with the step calculated from _time:[start, end] filter.
// This fixes the bug where rate_sum() doesn't divide by stepSeconds when
// time filter is specified via HTTP params instead of LogsQL expression
q.initStatsRateFuncsFromTimeFilter()
}
func addTimeFilter(f filter, start, end, offset int64) filter {
@@ -1191,7 +1176,8 @@ func (q *Query) GetStatsLabelsAddGroupingByTime(step, offset int64) ([]string, e
// add _time:step to by (...) list at stats pipes.
q.addByTimeFieldToStatsPipes(step, offset)
q.initStatsRateFuncStepsNoSubqueries()
// propagate the step into rate* funcs at stats pipes.
q.initStatsRateFuncs(step)
// add 'partition by (_time)' to 'sort', 'first' and 'last' pipes.
q.addPartitionByTime(step)
@@ -1843,34 +1829,28 @@ func ParseQueryAtTimestamp(s string, timestamp int64) (*Query, error) {
return nil, fmt.Errorf("unexpected unparsed tail after [%s]; context: [%s]; tail: [%s]", q, lex.context(), lex.rawToken+lex.s)
}
q.optimize()
q.initStatsRateFuncSteps()
q.initStatsRateFuncsFromTimeFilter()
return q, nil
}
func (q *Query) initStatsRateFuncSteps() {
q.visitSubqueries(func(q *Query) {
q.initStatsRateFuncStepsNoSubqueries()
})
func (q *Query) initStatsRateFuncsFromTimeFilter() {
start, end := q.GetFilterTimeRange()
if start != math.MinInt64 && end != math.MaxInt64 {
step := end - start
// The increment of the step is needed in order to cover the
// last nanosecond in the selected time range [start, end].
step++
q.initStatsRateFuncs(step)
}
}
func (q *Query) initStatsRateFuncStepsNoSubqueries() {
start, end := q.GetFilterTimeRange()
step := int64(0)
if start != math.MinInt64 && end != math.MaxInt64 {
step = end - start
// The HTTP layer already converted the exclusive end into end-1, and the _time
// filter is inclusive ([start, end]). So (end - start) is 1ns short of the real
// window, and step++ adds that 1ns back.
step++
}
func (q *Query) initStatsRateFuncs(step int64) {
for _, p := range q.pipes {
if ps, ok := p.(*pipeStats); ok {
if !ps.initRateFuncsFromTimeBucket() {
ps.initRateFuncs(step)
}
ps.initRateFuncs(step)
}
}
}
@@ -1945,11 +1925,6 @@ func parseQuery(lex *lexer) (*Query, error) {
q.pipes = pipes
}
// Skip optional trailing semicolon
if lex.isKeyword(";") {
lex.nextToken()
}
return &q, nil
}
@@ -2114,7 +2089,7 @@ func parseQueryOptions(dstOpts *queryOptions, lex *lexer) error {
}
func parseFilter(lex *lexer, allowPipeKeywords bool) (filter, error) {
if lex.isQueryPartTrailer() {
if lex.isKeyword("|", ")", "") {
return nil, fmt.Errorf("missing query")
}
@@ -2143,7 +2118,7 @@ func parseFilterOr(lex *lexer, fieldName string) (filter, error) {
}
filters = append(filters, f)
switch {
case lex.isQueryPartTrailer():
case lex.isKeyword("|", ")", ""):
if len(filters) == 1 {
return filters[0], nil
}
@@ -2164,7 +2139,7 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
}
filters = append(filters, f)
switch {
case lex.isKeyword("or") || lex.isQueryPartTrailer():
case lex.isKeyword("or", "|", ")", ""):
if len(filters) == 1 {
return filters[0], nil
}
@@ -2177,10 +2152,6 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
}
func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) {
if lex.isKeyword("") {
return nil, fmt.Errorf("unexpected end of query after %q; expecting a filter", lex.prevRawToken)
}
// Verify the previous adjacent token
if lex.isKeyword("(") {
if err := lex.checkPrevAdjacentToken("|", ":", "(", "!", "-", "not", "and", "or"); err != nil {
@@ -2306,6 +2277,13 @@ func parseFilterPhrase(lex *lexer, fieldName string) (filter, error) {
}
}
// The phrase is either a search phrase or a search prefix.
if !lex.isSkippedSpace && lex.isKeyword("*") {
// The phrase is a search prefix in the form `foo*`.
lex.nextToken()
return newFilterPrefix(fieldName, phrase), nil
}
// The phrase is a search phrase.
return newFilterPhrase(fieldName, phrase), nil
}
@@ -2352,7 +2330,7 @@ func parseAnyCaseFilter(lex *lexer, fieldName string) (filter, error) {
})
}
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefixFilter bool) (filter, error)) (filter, error) {
func parseFuncArgMaybePrefix(lex *lexer, fieldName string, callback func(arg string, isPrefiFilter bool) (filter, error)) (filter, error) {
lexState := lex.backupState()
funcName := lex.token
@@ -2565,19 +2543,19 @@ func tryParseIPv6CIDR(s string) ([16]byte, [16]byte, bool) {
func parseFilterContainsAll(lex *lexer, fieldName string) (filter, error) {
var fi filterContainsAll
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fg, &fi.values)
return parseInValues(lex, fieldName, fg, &fi.values)
}
func parseFilterContainsAny(lex *lexer, fieldName string) (filter, error) {
var fi filterContainsAny
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fg, &fi.values)
return parseInValues(lex, fieldName, fg, &fi.values)
}
func parseFilterIn(lex *lexer, fieldName string) (filter, error) {
var fi filterIn
fg := newFilterGeneric(fieldName, &fi)
return parseInValues(lex, fg, &fi.values)
return parseInValues(lex, fieldName, fg, &fi.values)
}
func parseFilterContainsCommonCase(lex *lexer, fieldName string) (filter, error) {
@@ -2610,10 +2588,10 @@ func parseFilterEqualsCommonCase(lex *lexer, fieldName string) (filter, error) {
return fi, nil
}
func parseInValues(lex *lexer, f filter, iv *inValues) (filter, error) {
func parseInValues(lex *lexer, fieldName string, f filter, iv *inValues) (filter, error) {
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fi, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
fi, err := parseFuncArgsPossibleWildcard(lex, fieldName, func(args []string) (filter, error) {
iv.values = args
return f, nil
})
@@ -2713,7 +2691,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
return parseFilterGeneric(lex, "*")
}
if lex.isSkippedSpace || lex.isQueryPartTrailer() {
if lex.isSkippedSpace || lex.isKeyword("", ")", "|") {
// '*' or 'fieldName:*' filter
return newFilterPrefix(fieldName, ""), nil
}
@@ -2728,7 +2706,7 @@ func parseFilterStar(lex *lexer, fieldName string) (filter, error) {
}
lex.nextToken()
if !lex.isSkippedSpace && !lex.isQueryPartTrailer() {
if !lex.isSkippedSpace && !lex.isKeyword("", ")", "|") {
return nil, fmt.Errorf("missing whitespace between *%q* and %q", phrase, lex.token)
}
return newFilterSubstring(fieldName, phrase), nil
@@ -2991,7 +2969,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(funcName string,
return callback(funcName, args)
}
func parseFuncArgsPossibleWildcard(lex *lexer, callback func(args []string) (filter, error)) (filter, error) {
func parseFuncArgsPossibleWildcard(lex *lexer, fieldName string, callback func(args []string) (filter, error)) (filter, error) {
funcName := lex.token
lex.nextToken()
@@ -3026,7 +3004,7 @@ func parseArgsInParens(lex *lexer) ([]string, error) {
}
arg, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse arg: %w", err)
return nil, fmt.Errorf("cannot parse arg")
}
args = append(args, arg)
if lex.isKeyword(")") {
@@ -3064,7 +3042,7 @@ func parseArgsInParensPossibleWildcard(lex *lexer) ([]string, bool, error) {
} else {
token, err := lex.nextCompoundToken()
if err != nil {
return nil, false, fmt.Errorf("cannot parse arg: %w", err)
return nil, false, fmt.Errorf("cannot parse arg")
}
arg = token
}
@@ -3698,7 +3676,7 @@ func parseFilterStreamIDIn(lex *lexer) (filter, error) {
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fs, err := parseFuncArgsPossibleWildcard(lex, func(args []string) (filter, error) {
fs, err := parseFuncArgsPossibleWildcard(lex, "_stream_id", func(args []string) (filter, error) {
streamIDs := make([]streamID, len(args))
for i, arg := range args {
if !streamIDs[i].tryUnmarshalFromString(arg) {

View File

@@ -122,13 +122,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
pipes = append(pipes, p)
switch {
case lex.isQueryPartTrailer():
if !lex.isKeyword("|") {
return pipes, nil
}
case lex.isKeyword("|"):
lex.nextToken()
case lex.isKeyword(")", ""):
return pipes, nil
default:
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|', ';' or ')'", pipes[len(pipes)-1], lex.token)
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
}
}
}
@@ -179,7 +178,6 @@ func initPipeParsers() {
pipeParsers = map[string]pipeParseFunc{
"block_stats": parsePipeBlockStats,
"blocks_count": parsePipeBlocksCount,
"coalesce": parsePipeCoalesce,
"collapse_nums": parsePipeCollapseNums,
"copy": parsePipeCopy,
"cp": parsePipeCopy,

View File

@@ -133,7 +133,7 @@ func parsePipeBlocksCount(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)
}
resultName = name
} else if !lex.isQueryPartTrailer() {
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'blocks_count': %w", err)

View File

@@ -1,204 +0,0 @@
package logstorage
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
// pipeCoalesce implements '| coalesce (...) as ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe
type pipeCoalesce struct {
srcFieldFilters []string
dstField string
defaultValue string
}
func (pc *pipeCoalesce) String() string {
if len(pc.srcFieldFilters) == 0 {
logger.Panicf("BUG: pipeCoalesce must contain at least one srcField")
}
s := "coalesce(" + fieldNamesString(pc.srcFieldFilters) + ")"
if pc.defaultValue != "" {
s += " default " + quoteTokenIfNeeded(pc.defaultValue)
}
if pc.dstField != "_msg" {
s += " as " + quoteTokenIfNeeded(pc.dstField)
}
return s
}
func (pc *pipeCoalesce) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
return pc, nil
}
func (pc *pipeCoalesce) canLiveTail() bool {
return true
}
func (pc *pipeCoalesce) canReturnLastNResults() bool {
return pc.dstField != "_time"
}
func (pc *pipeCoalesce) isFixedOutputFieldsOrder() bool {
return false
}
func (pc *pipeCoalesce) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pc.dstField) {
pf.AddDenyFilter(pc.dstField)
pf.AddAllowFilters(pc.srcFieldFilters)
}
}
func (pc *pipeCoalesce) hasFilterInWithQuery() bool {
return false
}
func (pc *pipeCoalesce) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}
func (pc *pipeCoalesce) visitSubqueries(_ func(q *Query)) {
// nothing to do
}
func (pc *pipeCoalesce) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeCoalesceProcessor{
pc: pc,
ppNext: ppNext,
}
}
// pipeCoalesceProcessor processes the coalesce pipe
type pipeCoalesceProcessor struct {
pc *pipeCoalesce
ppNext pipeProcessor
shards atomicutil.Slice[pipeCoalesceProcessorShard]
}
type pipeCoalesceProcessorShard struct {
rc resultColumn
cs []*blockResultColumn
}
func (pcp *pipeCoalesceProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := pcp.shards.Get(workerID)
pc := pcp.pc
// Initialize shard.cs
cs := br.getColumns()
for _, ff := range pc.srcFieldFilters {
if !prefixfilter.IsWildcardFilter(ff) {
c := br.getColumnByName(ff)
shard.addColumn(c)
continue
}
for _, c := range cs {
if prefixfilter.MatchFilter(ff, c.name) {
shard.addColumn(c)
}
}
}
// Fill the shard.rc
for rowIdx := range br.rowsLen {
value := ""
for _, c := range shard.cs {
v := c.getValueAtRow(br, rowIdx)
if v != "" {
value = v
break
}
}
if value == "" {
value = pc.defaultValue
}
shard.rc.addValue(value)
}
shard.rc.name = pc.dstField
br.addResultColumn(shard.rc)
pcp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
clear(shard.cs)
shard.cs = shard.cs[:0]
}
func (shard *pipeCoalesceProcessorShard) addColumn(c *blockResultColumn) {
// verify whether the given column already exists in shard.cs
for _, col := range shard.cs {
if col.name == c.name {
// Nothing to add - the column already exists
return
}
}
// Add the column to cs.
shard.cs = append(shard.cs, c)
}
func (pcp *pipeCoalesceProcessor) flush() error {
return nil
}
// parsePipeCoalesce parses '| coalesce(field1, field2, field3) default "default value" as result_field'
func parsePipeCoalesce(lex *lexer) (pipe, error) {
if !lex.isKeyword("coalesce") {
return nil, fmt.Errorf("expecting 'coalesce'; got %q", lex.token)
}
lex.nextToken()
srcFieldFilters, err := parseFieldFiltersInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field names: %w", err)
}
if len(srcFieldFilters) == 0 {
return nil, fmt.Errorf("coalesce requires at least one field name")
}
// Parse optional 'default' keyword and value
defaultValue := ""
if lex.isKeyword("default") {
lex.nextToken()
v, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse default value: %w", err)
}
defaultValue = v
}
// Parse 'as' token
dstField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
v, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field name: %w", err)
}
dstField = v
}
pc := &pipeCoalesce{
srcFieldFilters: srcFieldFilters,
dstField: dstField,
defaultValue: defaultValue,
}
return pc, nil
}

View File

@@ -126,7 +126,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
switch {
case lex.isQueryPartTrailer():
case lex.isKeyword("|", ")", ""):
pc := &pipeCopy{
srcFieldFilters: srcFieldFilters,
dstFieldFilters: dstFieldFilters,
@@ -134,7 +134,7 @@ func parsePipeCopy(lex *lexer) (pipe, error) {
return pc, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|', ';' or ')'", lex.token)
return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|' or ')'", lex.token)
}
}
}

View File

@@ -72,7 +72,7 @@ func parsePipeDecolorize(lex *lexer) (pipe, error) {
lex.nextToken()
field := "_msg"
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name after 'decolorize': %w", err)

View File

@@ -264,7 +264,7 @@ func parsePipeFieldNames(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
} else if !lex.isQueryPartTrailer() {
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)

View File

@@ -173,7 +173,7 @@ func parsePipeHash(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'hash(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -182,7 +182,7 @@ func parsePipeJSONArrayLen(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -102,7 +102,7 @@ func parsePipeLast(lex *lexer) (pipe, error) {
func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
var ps pipeSort
ps.limit = 1
if !lex.isKeyword("by", "partition", "rank", "(") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("by", "partition", "rank", "(", "|", ")", "") {
s, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse number: %w", err)
@@ -144,7 +144,7 @@ func parsePipeLastFirst(lex *lexer) (*pipeSort, error) {
if lex.isKeyword("rank") {
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %w", err)
return nil, fmt.Errorf("cannot read rank field name: %s", err)
}
ps.rankFieldName = rankFieldName
}

View File

@@ -165,7 +165,7 @@ func parsePipeLen(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field after 'len(%s)': %w", quoteTokenIfNeeded(fieldName), err)

View File

@@ -113,7 +113,7 @@ func parsePipeLimit(lex *lexer) (pipe, error) {
lex.nextToken()
limit := uint64(10)
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse rows limit: %w", err)

View File

@@ -460,7 +460,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
switch {
case lex.isKeyword(","):
lex.nextToken()
case lex.isQueryPartTrailer():
case lex.isKeyword("|", ")", ""):
if len(mes) == 0 {
return nil, fmt.Errorf("missing 'math' expressions")
}
@@ -469,7 +469,7 @@ func parsePipeMath(lex *lexer) (pipe, error) {
}
return pm, nil
default:
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|', ';' or ')'", mes[len(mes)-1], lex.token)
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
}
}
}
@@ -481,7 +481,7 @@ func parseMathEntry(lex *lexer) (*mathEntry, error) {
}
resultField := ""
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
if lex.isKeyword(",", "|", ")", "") {
resultField = me.String()
} else {
if lex.isKeyword("as") {

View File

@@ -88,7 +88,7 @@ func parsePipePackJSON(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)

View File

@@ -88,7 +88,7 @@ func parsePipePackLogfmt(lex *lexer) (pipe, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isQueryPartTrailer() {
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err)

View File

@@ -135,7 +135,7 @@ func parsePipeRename(lex *lexer) (pipe, error) {
dstFieldFilters = append(dstFieldFilters, dstFieldFilter)
switch {
case lex.isQueryPartTrailer():
case lex.isKeyword("|", ")", ""):
pr := &pipeRename{
srcFieldFilters: srcFieldFilters,
dstFieldFilters: dstFieldFilters,

View File

@@ -415,7 +415,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
f.f = sf
resultName := ""
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
if lex.isKeyword(",", "|", ")", "") {
resultName = sf.String()
} else {
if lex.isKeyword("as") {
@@ -435,7 +435,7 @@ func parsePipeRunningStatsExt(lex *lexer, pipeName string) (pipe, error) {
funcs = append(funcs, f)
if lex.isQueryPartTrailer() {
if lex.isKeyword("|", ")", "") {
ps.funcs = funcs
return &ps, nil
}

View File

@@ -836,7 +836,7 @@ func parsePipeSort(lex *lexer) (pipe, error) {
case lex.isKeyword("rank"):
rankFieldName, err := parseRankFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %w", err)
return nil, fmt.Errorf("cannot read rank field name: %s", err)
}
ps.rankFieldName = rankFieldName
case lex.isKeyword("partition"):
@@ -930,7 +930,7 @@ func parseLimit(lex *lexer) (uint64, error) {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return 0, fmt.Errorf("cannot parse 'limit': %w", err)
return 0, fmt.Errorf("cannot parse 'limit': %s", err)
}
n, ok := tryParseUint64(limitStr)
@@ -949,7 +949,7 @@ func parseOffset(lex *lexer) (uint64, error) {
limitStr, err := lex.nextCompoundToken()
if err != nil {
return 0, fmt.Errorf("cannot parse 'offset': %w", err)
return 0, fmt.Errorf("cannot parse 'offset': %s", err)
}
n, ok := tryParseUint64(limitStr)

View File

@@ -334,7 +334,7 @@ func (shard *pipeTopkProcessorShard) getRowsByPartition(partition string) *pipeT
}
partition = strings.Clone(partition)
shard.rowsByPartition[partition] = rs
shard.stateSizeBudget -= int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
shard.stateSizeBudget += int(unsafe.Sizeof(*rs)+unsafe.Sizeof(rs)) + len(partition)
}
return rs
}

View File

@@ -142,7 +142,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
}
srcField := "_msg"
if !lex.isKeyword("as") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("as", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}
@@ -154,7 +154,7 @@ func parsePipeSplit(lex *lexer) (pipe, error) {
}
dstField := srcField
if !lex.isQueryPartTrailer() {
if !lex.isKeyword(")", "|", "") {
if lex.isKeyword("as") {
lex.nextToken()
}

View File

@@ -415,16 +415,6 @@ func (ps *pipeStats) initRateFuncs(step int64) {
}
}
func (ps *pipeStats) initRateFuncsFromTimeBucket() bool {
for _, bf := range ps.byFields {
if bf.name == "_time" && bf.bucketSize > 0 {
ps.initRateFuncs(int64(bf.bucketSize))
return true
}
}
return false
}
const stateSizeBudgetChunk = 1 << 20
func (ps *pipeStats) newPipeProcessor(concurrency int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
@@ -1399,11 +1389,11 @@ func parsePipeStatsExt(lex *lexer, needStatsKeyword bool) (pipe, error) {
}
ps.entries = append(ps.entries, e)
if lex.isQueryPartTrailer() {
if lex.isKeyword("|", ")", "") {
break
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|', ';' or ')'", lex.token, e)
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", lex.token, e)
}
lex.nextToken()
}
@@ -1452,7 +1442,7 @@ func parseStatsEntry(lex *lexer) (pipeStatsEntry, error) {
}
resultName := ""
if lex.isKeyword(",") || lex.isQueryPartTrailer() {
if lex.isKeyword(",", "|", ")", "") {
resultName = sf.String()
if iff != nil {
resultName += " " + iff.String()

View File

@@ -620,7 +620,7 @@ func parsePipeTop(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
}
byFields = bfs
} else if !lex.isKeyword("hits", "rank") && !lex.isQueryPartTrailer() {
} else if !lex.isKeyword("hits", "rank", ")", "|", "") {
bfs, err := parseCommaSeparatedFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)
@@ -674,11 +674,11 @@ func parseRankFieldName(lex *lexer) (string, error) {
rankFieldName := "rank"
if lex.isKeyword("as") {
lex.nextToken()
if lex.isKeyword("(") || lex.isQueryPartTrailer() {
if lex.isKeyword("", "|", ")", "(") {
return "", fmt.Errorf("missing rank name")
}
}
if !lex.isKeyword("limit") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("", "|", ")", "limit") {
s, err := parseFieldName(lex)
if err != nil {
return "", err

View File

@@ -550,7 +550,7 @@ func parsePipeUniq(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
}
byFields = bfs
} else if !lex.isKeyword("filter", "with", "hits", "limit") && !lex.isQueryPartTrailer() {
} else if !lex.isKeyword("filter", "with", "hits", "limit", ")", "|", "") {
bfs, err := parseCommaSeparatedFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)

View File

@@ -100,7 +100,6 @@ func (pu *pipeUnpackJSON) visitSubqueries(visitFunc func(q *Query)) {
func (pu *pipeUnpackJSON) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackJSON := func(uctx *fieldsUnpackerContext, s string) {
s = trimJSONWhitespace(s)
if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object
return
@@ -160,7 +159,7 @@ func parsePipeUnpackJSON(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("fields", "preserve_keys", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -142,7 +142,7 @@ func parsePipeUnpackLogfmt(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("fields", "result_prefix", "keep_original_fields", "skip_empty_results", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -2,7 +2,6 @@ package logstorage
import (
"fmt"
"strings"
"sync/atomic"
"time"
@@ -92,7 +91,6 @@ func (pu *pipeUnpackSyslog) newPipeProcessor(_ int, _ <-chan struct{}, _ func(),
year := currentYear.Load()
p := GetSyslogParser(int(year), pu.offsetTimezone)
s = strings.TrimLeft(s, " \t\n\r")
p.Parse(s)
for _, f := range p.Fields {
uctx.addField(f.Name, f.Value)
@@ -137,7 +135,7 @@ func parsePipeUnpackSyslog(lex *lexer) (pipe, error) {
}
fromField := "_msg"
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("offset", "result_prefix", "keep_original_fields", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}

View File

@@ -139,7 +139,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
lex.nextToken()
srcField := "_msg"
if !lex.isKeyword("drop_duplicates", "as") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("drop_duplicates", "as", ")", "|", "") {
if lex.isKeyword("from") {
lex.nextToken()
}
@@ -151,7 +151,7 @@ func parsePipeUnpackWords(lex *lexer) (pipe, error) {
}
dstField := srcField
if !lex.isKeyword("drop_duplicates") && !lex.isQueryPartTrailer() {
if !lex.isKeyword("drop_duplicates", ")", "|", "") {
if lex.isKeyword("as") {
lex.nextToken()
}

View File

@@ -256,7 +256,6 @@ func parsePipeUnroll(lex *lexer) (pipe, error) {
}
func unpackJSONArray(dst []string, a *arena, s string) []string {
s = trimJSONWhitespace(s)
if s == "" || s[0] != '[' {
return dst
}

View File

@@ -67,7 +67,7 @@ func parseRunningStatsLast(lex *lexer) (runningStatsFunc, error) {
return nil, err
}
if len(args) != 1 {
return nil, fmt.Errorf("unexpected number of args for the last() function; got %d; want 1; args: %q", len(args), args)
return nil, fmt.Errorf("unexpeccted number of args for the last() function; got %d; want 1; args: %q", len(args), args)
}
fieldName := args[0]

View File

@@ -170,7 +170,7 @@ type Storage struct {
// partitions are sorted by time, e.g. partitions[0] has the smallest time.
partitions []*partitionWrapper
// ptwHot is the "hot" partition, where the last rows were ingested.
// ptwHot is the "hot" partition, were the last rows were ingested.
//
// It must be accessed under partitionsLock.
ptwHot *partitionWrapper
@@ -197,7 +197,7 @@ type Storage struct {
// the check whether the given stream is already registered in the persistent storage.
streamIDCache *cache
// filterStreamCache caches streamIDs keyed by (partition, []TenantID, StreamFilter).
// filterStreamCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
//
// It reduces the load on persistent storage during querying by _stream:{...} filter.
filterStreamCache *cache
@@ -235,7 +235,7 @@ func (s *Storage) PartitionAttach(name string) error {
// Verify whether the given partition already exists in the attached partitions list.
for _, ptw := range s.partitions {
if ptw.pt.name == name {
return fmt.Errorf("cannot attach the partition %q, because it is already attached", name)
return fmt.Errorf("cannot attach the partition %q, because it is arleady attached", name)
}
}
@@ -381,7 +381,7 @@ func getSnapshotPaths(ptws []*partitionWrapper) []string {
func (s *Storage) PartitionSnapshotDelete(snapshotPath string) error {
snapshotName := filepath.Base(snapshotPath)
if err := snapshotutil.Validate(snapshotName); err != nil {
return fmt.Errorf("unsupported snapshot name %q at %q: %w", snapshotName, snapshotPath, err)
return fmt.Errorf("unsupported snapshot name %q at %q: %s", snapshotName, snapshotPath, err)
}
snapshotDir := filepath.Dir(snapshotPath)
@@ -442,7 +442,7 @@ func (s *Storage) MustDeleteStalePartitionSnapshots(maxAge time.Duration) []stri
// DeleteRunTask starts deletion of logs according to the given filter f for the given tenantIDs.
//
// The taskID must contain a unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
// The taskID must contain an unique id of the task. It is used for tracking the task at the list returned by DeleteActiveTasks().
// The timestamp must contain the timestamp in seconds when the task is started.
func (s *Storage) DeleteRunTask(_ context.Context, taskID string, timestamp int64, tenantIDs []TenantID, f *Filter) error {
// Register the task in the list of active delete tasks, so it survives application restarts and crashes.
@@ -968,7 +968,7 @@ func (s *Storage) watchDeleteTasks() {
s.deleteTasks = s.deleteTasks[1:]
if !ok {
// The delete task couldn't be completed now. Try it later.
// The delete task coudn't be completed now. Try it later.
s.deleteTasks = append(s.deleteTasks, dt)
}
s.mustSaveDeleteTasksLocked()
@@ -1031,7 +1031,7 @@ func (s *Storage) processDeleteTask(ctx context.Context, dt *DeleteTask) bool {
}
// The task couldn't be processed at the moment
logger.Warnf("cannot proceed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
logger.Warnf("cannot proceeed with the delete task with task_id=%q in %.3f seconds; retrying it later", dt.TaskID, time.Since(startTime).Seconds())
return false
}

View File

@@ -23,7 +23,7 @@ import (
"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)
// QueryContext is used for executing the query passed to NewQueryContext().
// QueryContext is used for execting the query passed to NewQueryContext()
type QueryContext struct {
// Context is the context for executing the Query.
Context context.Context
@@ -43,12 +43,12 @@ type QueryContext struct {
// HiddenFieldsFilters is an optional list of field filters, which must be hidden during query execution.
//
// The list may contain full field names and field prefixes ending with *.
// Prefixes match all the fields starting with the given prefix.
// Prefix match all the fields starting with the given prefix.
HiddenFieldsFilters []string
// startTime is creation time for the QueryContext.
//
// It is used for calculating query duration.
// It is used for calculating query druation.
startTime time.Time
}
@@ -128,14 +128,13 @@ type storageSearchOptions struct {
// hiddenFieldsFilter is the filter of fields, which must be hidden during query
hiddenFieldsFilter *prefixfilter.Filter
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected _time values
// before these values are passed to query pipes.
// timeOffset is the offset in nanoseconds, which must be subtracted from the selected the _time values before these values are passed to query pipes.
timeOffset int64
}
// partitionSearchOptions is search options for the partition.
//
// This struct must be created via partition.getSearchOptions() call.
// this struct must be created via partition.getSearchOptions() call.
type partitionSearchOptions struct {
// Optional sorted list of tenantIDs for the search.
// If it is empty, then the search is performed by streamIDs
@@ -1127,7 +1126,7 @@ func (db *DataBlock) RowsCount() int {
// GetColumns returns columns from db.
//
// If needSortColumns is set, then the returned columns are sorted in alphabetical order.
// If needSortSolumns is set, then the returned columns are sorted in alphabetical order
func (db *DataBlock) GetColumns(needSortColumns bool) []BlockColumn {
if needSortColumns {
sort.Slice(db.columns, func(i, j int) bool {

View File

@@ -135,7 +135,7 @@ func MarshalTenantIDsToJSON(tenantIDs []TenantID) []byte {
func UnmarshalTenantIDsFromJSON(src []byte) ([]TenantID, error) {
var tenantIDs []TenantID
if err := json.Unmarshal(src, &tenantIDs); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %w", err)
return nil, fmt.Errorf("cannot unmarshal tenantIDs from JSON array: %s", err)
}
return tenantIDs, nil
}

4
vendor/modules.txt vendored
View File

@@ -132,8 +132,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric
# github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0
## explicit; go 1.24.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260616132739-c901a1e31cb3
## explicit; go 1.26.4
# github.com/VictoriaMetrics/VictoriaLogs v1.50.1-0.20260415124154-6b7a6357aec0
## explicit; go 1.26.2
github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage
github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter
# github.com/VictoriaMetrics/easyproto v1.2.0