Compare commits

...

8 Commits

Author SHA1 Message Date
Artem Fetishev
6653f6a5e7 apptest: rename client fields to generic cli (#11088)
Initially, the fields were given a different name because I mistakenly
believed that the fields with the same name will clash when the
corresponding types are embedded in some other type (for example,
vmselectClient and vminsertClient are embedded into vmsingle).

But it appears not to be the case. Thus, changing the field name to
something more generic.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-10 17:01:44 +02:00
Max Kotliar
71c7a73716 app/vmbackupmanager: make assertions in TestHTTPClient_CreateSnapshot more explisit (#1021)
The change should help surface why the test fails sometimes. My expectations
that CreateSnapshot returns another error than expect but since we never
check it we move to the next assert which than fail

```
2026-04-09T14:04:32.674Z    info
/home/runner/work/VictoriaMetrics-enterprise/VictoriaMetrics-enterprise/lib/snapshot/snapshot.go:35
Creating snapshot
--- FAIL: TestHTTPClient_CreateSnapshot (0.00s)
    client_test.go:53: unexpected error
2026-04-09T14:04:32.675Z    info
/home/runner/work/VictoriaMetrics-enterprise/VictoriaMetrics-enterprise/lib/snapshot/snapshot.go:81
Deleting snapshot foo
```

PR https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/1021
CI example: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/actions/runs/24194217809/job/70619495033
2026-06-10 13:05:15 +02:00
Aliaksandr Valialkin
10eb212d2b all: consistently use "%w" for wrapping errors in fmt.Errorf()
This is needed in order to avoid issues similar to https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419 ,
when the returned error is unwrapped in order to decide how to handle it.

See the fix for this issue at c38ca01c25
2026-06-10 12:37:16 +02:00
Aliaksandr Valialkin
5e005f5dbb app/vmctl: consistently use "%w" for wrapping errors in fmt.Errorf()
This is needed in order to avoid issues similar to https://github.com/VictoriaMetrics/VictoriaLogs/issues/1419 ,
when the returned error is unwrapped in order to decide how to handle it.

See the fix for this issue at c38ca01c25
2026-06-10 12:31:58 +02:00
Aliaksandr Valialkin
04993f2187 Makefile: update golangci-lint from v2.9.0 to v2.12.2
See https://github.com/golangci/golangci-lint/releases/tag/v2.12.2

While at it, actualize .golangci.yml .
2026-06-10 12:13:01 +02:00
Yury Moladau
73a40a4178 app/vmui: update npm dependencies (#11084)
### Describe Your Changes

Updates npm dependencies and refreshes `package-lock.json`.

Signed-off-by: Yury Molodov <yurymolodov@gmail.com>
2026-06-10 11:59:05 +02:00
Andrii Chubatiuk
66f8ec81f3 lib/streamaggr: uncoditionally advance flush timer (#10808)
Due to possible time drift the flush time needs to be advanced unconditionally.
Otherwise, it is possible for flush time to remain the same during two consequitive
flushes and produce duplicated data points.

See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2026-06-10 11:58:49 +02:00
Roman Khavronenko
66672f216b github/ci: use dedicated runners for critical pipelines (#11075)
* builds, tests and linter require extra speed and resources
* add comment on why apttest uses dedicated pull

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2026-06-10 11:56:28 +02:00
58 changed files with 1077 additions and 1044 deletions

View File

@@ -33,7 +33,8 @@ jobs:
name: ${{ matrix.os }}-${{ matrix.arch }}
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources to increase build speed.
runs-on: 'vm-runner'
strategy:
fail-fast: false
matrix:

View File

@@ -30,7 +30,8 @@ jobs:
name: lint
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources since golangci-lint requires extra memory
runs-on: 'vm-runner'
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -64,7 +65,8 @@ jobs:
name: unit
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources to increase tests speed.
runs-on: 'vm-runner'
strategy:
matrix:
@@ -95,6 +97,7 @@ jobs:
name: apptest
permissions:
contents: read
# Runs on dedicated runner to isolate app tests from other tests.
runs-on: apptest
steps:

View File

@@ -3,27 +3,14 @@ linters:
settings:
errcheck:
exclude-functions:
- fmt.Fprintf
- fmt.Fprint
- (net/http.ResponseWriter).Write
exclusions:
generated: lax
presets:
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- staticcheck
text: 'SA(4003|1019|5011):'
paths:
- third_party$
- builtin$
- examples$
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
- ^app/vmui/

View File

@@ -17,7 +17,7 @@ EXTRA_GO_BUILD_TAGS ?=
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(DATEINFO_TAG)-$(BUILDINFO_TAG)'
TAR_OWNERSHIP ?= --owner=1000 --group=1000
GOLANGCI_LINT_VERSION := 2.9.0
GOLANGCI_LINT_VERSION := 2.12.2
.PHONY: $(MAKECMDGOALS)
@@ -527,7 +527,7 @@ golangci-lint: install-golangci-lint
golangci-lint run --build-tags 'synctest'
install-golangci-lint:
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://golangci-lint.run/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
remove-golangci-lint:
rm -rf `which golangci-lint`

View File

@@ -10,7 +10,7 @@ import (
func Compress(wr WriteRequest) []byte {
data, err := wr.Marshal()
if err != nil {
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %s", err))
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %w", err))
}
return snappy.Encode(nil, data)
}

View File

@@ -61,15 +61,15 @@ func parseInputSeries(input []series, interval *promutil.Duration, startStamp ti
for _, data := range input {
expr, err := metricsql.Parse(data.Series)
if err != nil {
return res, fmt.Errorf("failed to parse series %s: %v", data.Series, err)
return res, fmt.Errorf("failed to parse series %s: %w", data.Series, err)
}
promvals, err := parseInputValue(data.Values, true)
if err != nil {
return res, fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
return res, fmt.Errorf("failed to parse input series value %s: %w", data.Values, err)
}
metricExpr, ok := expr.(*metricsql.MetricExpr)
if !ok || len(metricExpr.LabelFilterss) != 1 {
return res, fmt.Errorf("got invalid input series %s: %v", data.Series, err)
return res, fmt.Errorf("got invalid input series %s: %w", data.Series, err)
}
samples := make([]testutil.Sample, 0, len(promvals))
ts := startStamp

View File

@@ -53,13 +53,13 @@ Outer:
if s.Labels != "" {
metricsqlExpr, err := metricsql.Parse(s.Labels)
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
continue Outer
}
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
if !ok || len(metricsqlMetricExpr.LabelFilterss) > 1 {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("got invalid exp_samples: %q", s.Labels)))
continue Outer
}

View File

@@ -329,11 +329,11 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
q, err := datasource.Init(nil)
if err != nil {
return []error{fmt.Errorf("failed to init datasource: %v", err)}
return []error{fmt.Errorf("failed to init datasource: %w", err)}
}
rw, err := remotewrite.NewDebugClient()
if err != nil {
return []error{fmt.Errorf("failed to init wr: %v", err)}
return []error{fmt.Errorf("failed to init wr: %w", err)}
}
alertEvalTimesMap := map[time.Duration]struct{}{}

View File

@@ -173,9 +173,9 @@ func (r *Rule) String() string {
if r.Alert != "" {
ruleType = "alerting"
}
b := strings.Builder{}
b.WriteString(fmt.Sprintf("%s rule %q", ruleType, r.Name()))
b.WriteString(fmt.Sprintf("; expr: %q", r.Expr))
var b strings.Builder
fmt.Fprintf(&b, "%s rule %q", ruleType, r.Name())
fmt.Fprintf(&b, "; expr: %q", r.Expr)
kv := sortMap(r.Labels)
for i := range kv {

View File

@@ -89,7 +89,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
labels.Visit(func(key []byte, v *fastjson.Value) {
lv, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
err = fmt.Errorf("error when parsing label value %q: %w", v, errLocal)
return
}
r.Labels = append(r.Labels, prompb.Label{
@@ -112,7 +112,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
r.Timestamps = []int64{sample[0].GetInt64()}
val, err := sample[1].StringBytes()
if err != nil {
return fmt.Errorf("error when parsing `value` object %q: %s", sample[1], err)
return fmt.Errorf("error when parsing `value` object %q: %w", sample[1], err)
}
f, err := strconv.ParseFloat(bytesutil.ToUnsafeString(val), 64)
if err != nil {

View File

@@ -601,7 +601,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.QueryFn) (*labelSet, error) {
ls, err := ar.toLabels(m, qFn)
if err != nil {
return ls, fmt.Errorf("failed to expand label templates: %s", err)
return ls, fmt.Errorf("failed to expand label templates: %w", err)
}
return ls, nil
}
@@ -620,7 +620,7 @@ func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templ
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {
return as, fmt.Errorf("failed to expand annotation templates: %s", err)
return as, fmt.Errorf("failed to expand annotation templates: %w", err)
}
return as, nil
}

View File

@@ -77,7 +77,7 @@ var (
func marshalJson(v any, kind string) ([]byte, *httpserver.ErrorWithStatusCode) {
data, err := json.Marshal(v)
if err != nil {
return nil, errResponse(fmt.Errorf("failed to marshal %s: %s", kind, err), http.StatusInternalServerError)
return nil, errResponse(fmt.Errorf("failed to marshal %s: %w", kind, err), http.StatusInternalServerError)
}
return data, nil
}

View File

@@ -1639,7 +1639,7 @@ func (w *fakeResponseWriter) WriteHeader(statusCode int) {
"X-Content-Type-Options": true,
})
if err != nil {
panic(fmt.Errorf("cannot marshal headers: %s", err))
panic(fmt.Errorf("cannot marshal headers: %w", err))
}
}

View File

@@ -161,7 +161,7 @@ func fetchAndParseJWKs(ctx context.Context, jwksURI string) (*jwt.VerifierPool,
vp, err := jwt.ParseJWKs(b)
if err != nil {
return nil, fmt.Errorf("failed to parse jwks keys from %q: %v", jwksURI, err)
return nil, fmt.Errorf("failed to parse jwks keys from %q: %w", jwksURI, err)
}
return vp, nil
@@ -188,7 +188,7 @@ func getOpenIDConfiguration(ctx context.Context, issuer string) (openidConfig, e
var cfg openidConfig
if err := json.NewDecoder(resp.Body).Decode(&cfg); err != nil {
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %s", configURL, err)
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %w", configURL, err)
}
return cfg, nil

View File

@@ -43,7 +43,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
func (ip *influxProcessor) run(ctx context.Context) error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %s", err)
return fmt.Errorf("explore query failed: %w", err)
}
if len(series) < 1 {
return fmt.Errorf("found no timeseries to import")
@@ -71,7 +71,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := ip.do(s); err != nil {
influxErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
errCh <- fmt.Errorf("request failed for %q.%q: %w", s.Measurement, s.Field, err)
return
}
influxSeriesProcessed.Inc()
@@ -84,10 +84,10 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for _, s := range series {
select {
case infErr := <-errCh:
return fmt.Errorf("influx error: %s", infErr)
return fmt.Errorf("influx error: %w", infErr)
case vmErr := <-ip.im.Errors():
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s:
}
}
@@ -100,11 +100,11 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
return fmt.Errorf("import process failed: %w", err)
}
log.Println("Import finished!")
@@ -119,7 +119,7 @@ const valueField = "value"
func (ip *influxProcessor) do(s *influx.Series) error {
cr, err := ip.ic.FetchDataPoints(s)
if err != nil {
return fmt.Errorf("failed to fetch datapoints: %s", err)
return fmt.Errorf("failed to fetch datapoints: %w", err)
}
defer func() {
_ = cr.Close()

View File

@@ -96,10 +96,10 @@ func NewClient(cfg Config) (*Client, error) {
}
hc, err := influx.NewHTTPClient(c)
if err != nil {
return nil, fmt.Errorf("failed to establish conn: %s", err)
return nil, fmt.Errorf("failed to establish conn: %w", err)
}
if _, _, err := hc.Ping(time.Second); err != nil {
return nil, fmt.Errorf("ping failed: %s", err)
return nil, fmt.Errorf("ping failed: %w", err)
}
chunkSize := cfg.ChunkSize
@@ -155,7 +155,7 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": ["value1", "value2"]}
mFields, err := c.fieldsByMeasurement()
if err != nil {
return nil, fmt.Errorf("failed to get field keys: %s", err)
return nil, fmt.Errorf("failed to get field keys: %w", err)
}
if len(mFields) < 1 {
@@ -165,12 +165,12 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": {"tag1", "tag2"}}
measurementTags, err := c.getMeasurementTags()
if err != nil {
return nil, fmt.Errorf("failed to get tags of measurements: %s", err)
return nil, fmt.Errorf("failed to get tags of measurements: %w", err)
}
series, err := c.getSeries()
if err != nil {
return nil, fmt.Errorf("failed to get series: %s", err)
return nil, fmt.Errorf("failed to get series: %w", err)
}
var iSeries []*Series
@@ -237,7 +237,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
return nil, nil, err
}
if resp.Error() != nil {
return nil, nil, fmt.Errorf("response error for %s: %s", cr.iq.Command, resp.Error())
return nil, nil, fmt.Errorf("response error for %s: %w", cr.iq.Command, resp.Error())
}
if len(resp.Results) != 1 {
return nil, nil, fmt.Errorf("unexpected number of results in response: %d", len(resp.Results))
@@ -265,8 +265,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
for i, fv := range fieldValues {
v, err := toFloat64(fv)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %s",
cr.field, v, err)
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %w", cr.field, v, err)
}
values[i] = v
}
@@ -294,7 +293,7 @@ func (c *Client) FetchDataPoints(s *Series) (*ChunkedResponse, error) {
}
cr, err := c.QueryAsChunk(iq)
if err != nil {
return nil, fmt.Errorf("query %q err: %s", iq.Command, err)
return nil, fmt.Errorf("query %q err: %w", iq.Command, err)
}
return &ChunkedResponse{cr, iq, s.Field}, nil
}
@@ -308,7 +307,7 @@ func (c *Client) fieldsByMeasurement() (map[string][]string, error) {
log.Printf("fetching fields: %s", stringify(q))
qValues, err := c.do(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
}
var total int
@@ -352,7 +351,7 @@ func (c *Client) getSeries() ([]*Series, error) {
log.Printf("fetching series: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
}
const key = "key"
@@ -366,7 +365,7 @@ func (c *Client) getSeries() ([]*Series, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -417,7 +416,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
log.Printf("fetching tag keys: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
}
const tagKey = "tagKey"
@@ -432,7 +431,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -455,10 +454,10 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
func (c *Client) do(q influx.Query) ([]queryValues, error) {
res, err := c.Query(q)
if err != nil {
return nil, fmt.Errorf("query error: %s", err)
return nil, fmt.Errorf("query error: %w", err)
}
if res.Error() != nil {
return nil, fmt.Errorf("response error: %s", res.Error())
return nil, fmt.Errorf("response error: %w", res.Error())
}
if len(res.Results) < 1 {
return nil, fmt.Errorf("query returned 0 results")

View File

@@ -71,7 +71,7 @@ func toFloat64(v any) (float64, error) {
func parseDate(dateStr string) (int64, error) {
startTime, err := time.Parse(time.RFC3339, dateStr)
if err != nil {
return 0, fmt.Errorf("cannot parse %q: %s", dateStr, err)
return 0, fmt.Errorf("cannot parse %q: %w", dateStr, err)
}
return startTime.UnixNano() / 1e6, nil
}
@@ -92,7 +92,7 @@ func (s *Series) unmarshal(v string) error {
var err error
s.LabelPairs, err = unmarshalTags(v[n+1:], noEscapeChars)
if err != nil {
return fmt.Errorf("failed to unmarhsal tags: %s", err)
return fmt.Errorf("failed to unmarhsal tags: %w", err)
}
return nil
}

View File

@@ -88,7 +88,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_opentsdb")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %s", otsdbAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %w", otsdbAddr, addr, err)
}
oCfg := opentsdb.Config{
Addr: addr,
@@ -103,17 +103,17 @@ func main() {
}
otsdbClient, err := opentsdb.NewClient(oCfg)
if err != nil {
return fmt.Errorf("failed to create opentsdb client: %s", err)
return fmt.Errorf("failed to create opentsdb client: %w", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
@@ -137,7 +137,7 @@ func main() {
tc, err := promauth.NewTLSConfig(certFile, keyFile, caFile, serverName, insecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %s", err)
return fmt.Errorf("failed to create TLS Config: %w", err)
}
iCfg := influx.Config{
@@ -157,17 +157,17 @@ func main() {
influxClient, err := influx.NewClient(iCfg)
if err != nil {
return fmt.Errorf("failed to create influx client: %s", err)
return fmt.Errorf("failed to create influx client: %w", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
processor := newInfluxProcessor(
@@ -203,7 +203,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_remoteread")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %s", remoteReadSrcAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %w", remoteReadSrcAddr, addr, err)
}
// Backwards compatible default values if none provided by user
@@ -227,17 +227,17 @@ func main() {
DisablePathAppend: c.Bool(remoteReadDisablePathAppend),
})
if err != nil {
return fmt.Errorf("error create remote read client: %s", err)
return fmt.Errorf("error create remote read client: %w", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
rmp := remoteReadProcessor{
@@ -265,12 +265,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
promCfg := prometheus.Config{
@@ -285,7 +285,7 @@ func main() {
}
cl, err := prometheus.NewClient(promCfg)
if err != nil {
return fmt.Errorf("failed to create prometheus client: %s", err)
return fmt.Errorf("failed to create prometheus client: %w", err)
}
pp := prometheusProcessor{
@@ -307,12 +307,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
mCfg := mimir.Config{
@@ -335,7 +335,7 @@ func main() {
}
cl, err := mimir.NewClient(ctx, mCfg)
if err != nil {
return fmt.Errorf("failed to create mimir client: %s", err)
return fmt.Errorf("failed to create mimir client: %w", err)
}
pp := prometheusProcessor{
@@ -356,12 +356,12 @@ func main() {
fmt.Println("Thanos import mode")
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %s", err)
return fmt.Errorf("failed to init VM configuration: %w", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %s", err)
return fmt.Errorf("failed to create VM importer: %w", err)
}
thanosCfg := thanos.Config{
Snapshot: c.String(thanosSnapshot),
@@ -374,7 +374,7 @@ func main() {
}
cl, err := thanos.NewClient(thanosCfg)
if err != nil {
return fmt.Errorf("failed to create thanos client: %s", err)
return fmt.Errorf("failed to create thanos client: %w", err)
}
var aggrTypes []thanos.AggrType
@@ -382,7 +382,7 @@ func main() {
for _, typeStr := range aggrTypesStr {
aggrType, err := thanos.ParseAggrType(typeStr)
if err != nil {
return fmt.Errorf("failed to parse aggregate type %q: %s", typeStr, err)
return fmt.Errorf("failed to parse aggregate type %q: %w", typeStr, err)
}
aggrTypes = append(aggrTypes, aggrType)
}
@@ -415,7 +415,7 @@ func main() {
bfMinDuration := c.Duration(vmNativeBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return fmt.Errorf("failed to create backoff object: %s", err)
return fmt.Errorf("failed to create backoff object: %w", err)
}
disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive)
@@ -439,7 +439,7 @@ func main() {
srcTC, err := promauth.NewTLSConfig(srcCertFile, srcKeyFile, srcCAFile, srcServerName, srcInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %s", err)
return fmt.Errorf("failed to create TLS Config: %w", err)
}
trSrc := httputil.NewTransport(false, "vmctl_src")
@@ -469,7 +469,7 @@ func main() {
dstTC, err := promauth.NewTLSConfig(dstCertFile, dstKeyFile, dstCAFile, dstServerName, dstInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %s", err)
return fmt.Errorf("failed to create TLS Config: %w", err)
}
trDst := httputil.NewTransport(false, "vmctl_dst")
@@ -534,7 +534,7 @@ func main() {
log.Printf("verifying block at path=%q", blockPath)
f, err := os.OpenFile(blockPath, os.O_RDONLY, 0600)
if err != nil {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q: %w", blockPath, err), 1)
}
defer f.Close()
var blocksCount atomic.Uint64
@@ -542,7 +542,7 @@ func main() {
blocksCount.Add(1)
return nil
}); err != nil {
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1)
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d: %w", blockPath, blocksCount.Load(), err), 1)
}
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load())
return nil
@@ -585,7 +585,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_client")
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %s", vmAddr, addr, err)
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %w", vmAddr, addr, err)
}
bfRetries := c.Int(vmBackoffRetries)
@@ -593,7 +593,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
bfMinDuration := c.Duration(vmBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %s", err)
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
}
return vm.Config{

View File

@@ -54,7 +54,7 @@ func (lbr *lazyBlockReader) initialize() error {
// fetching block and parse it and store it in lbr.reader
temp, err := lbr.mkTempDir()
if err != nil {
return fmt.Errorf("failed to create temp dir: %s", err)
return fmt.Errorf("failed to create temp dir: %w", err)
}
lbr.tempDirPath = temp
@@ -85,7 +85,7 @@ func (lbr *lazyBlockReader) initialize() error {
return fmt.Errorf("failed to fetch chunk file: %q: %w", chunkName, err)
}
if err := lbr.writeFile(temp, blockChunkPath, chunk); err != nil {
return fmt.Errorf("failed to write chunk file: %q: %s", chunkName, err)
return fmt.Errorf("failed to write chunk file: %q: %w", chunkName, err)
}
}
@@ -135,7 +135,7 @@ func (lbr *lazyBlockReader) Meta() tsdb.BlockMeta {
// Size returns the number of bytes that the block takes up on disk.
func (lbr *lazyBlockReader) Size() int64 {
if err := lbr.initialize(); err != nil {
lbr.err = fmt.Errorf("error get Size of the block: %s, return zero size", err)
lbr.err = fmt.Errorf("error get Size of the block: %w, return zero size", err)
return 0
}
return lbr.reader.Size()
@@ -167,11 +167,11 @@ func (lbr *lazyBlockReader) Close() error {
func (lbr *lazyBlockReader) mkTempDir() (string, error) {
temp, err := os.MkdirTemp("", lbr.ID.String())
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %s", err)
return "", fmt.Errorf("failed to create temp dir: %w", err)
}
err = os.Mkdir(filepath.Join(temp, "chunks"), os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %s", err)
return "", fmt.Errorf("failed to create temp dir: %w", err)
}
return temp, nil
}

View File

@@ -133,11 +133,11 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
c.RemoteFS = rfs
timeMin, err := utils.ParseTime(cfg.Filter.TimeMin)
if err != nil {
return nil, fmt.Errorf("failed to parse min time in filter: %s", err)
return nil, fmt.Errorf("failed to parse min time in filter: %w", err)
}
timeMax, err := utils.ParseTime(cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse max time in filter: %s", err)
return nil, fmt.Errorf("failed to parse max time in filter: %w", err)
}
c.filter = filter{
min: timeMin.UnixMilli(),
@@ -156,7 +156,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
indexFile, err := c.fetchIndexFile()
if err != nil {
return nil, fmt.Errorf("failed to fetch index file: %s", err)
return nil, fmt.Errorf("failed to fetch index file: %w", err)
}
var blocksToImport []tsdb.BlockReader
@@ -172,7 +172,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
lazyBlockReader, err := newLazyBlockReader(block, c.RemoteFS)
if err != nil {
return nil, fmt.Errorf("failed to create lazy block reader: %s", err)
return nil, fmt.Errorf("failed to create lazy block reader: %w", err)
}
blocksToImport = append(blocksToImport, lazyBlockReader)
}
@@ -185,7 +185,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
func (c *Client) Read(ctx context.Context, block tsdb.BlockReader) (*prometheus.CloseableSeriesSet, error) {
meta := block.Meta()
if b, ok := block.(*lazyBlockReader); ok && b.Err() != nil {
return nil, fmt.Errorf("failed to read block: %s", b.Err())
return nil, fmt.Errorf("failed to read block: %w", b.Err())
}
if meta.ULID.String() == "" {
@@ -218,20 +218,20 @@ func (c *Client) fetchIndexFile() (*Index, error) {
file, err := c.ReadFile(bucketIndexCompressedFilename)
if err != nil {
return nil, fmt.Errorf("failed to read bucket index: %s", err)
return nil, fmt.Errorf("failed to read bucket index: %w", err)
}
r := bytes.NewReader(file)
// Read all the content.
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %s", err)
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
var indexFile Index
err = json.NewDecoder(gzipReader).Decode(&indexFile)
if err != nil {
return nil, fmt.Errorf("failed to decode bucket index: %s", err)
return nil, fmt.Errorf("failed to decode bucket index: %w", err)
}
return &indexFile, nil

View File

@@ -47,7 +47,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exploreRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %s", url, err)
return nil, fmt.Errorf("cannot create request to %q: %w", url, err)
}
params := req.URL.Query()
@@ -60,14 +60,14 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
if err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("series request failed: %s", err)
return nil, fmt.Errorf("series request failed: %w", err)
}
var response Response
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("cannot decode series response: %s", err)
return nil, fmt.Errorf("cannot decode series response: %w", err)
}
exploreDuration.UpdateDuration(startTime)
return response.MetricNames, resp.Body.Close()
@@ -80,19 +80,19 @@ func (c *Client) ImportPipe(ctx context.Context, dstURL string, pr *io.PipeReade
req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr)
if err != nil {
importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create import request to %q: %s", c.Addr, err)
return fmt.Errorf("cannot create import request to %q: %w", c.Addr, err)
}
importResp, err := c.do(req, http.StatusNoContent)
if err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("import request failed: %s", err)
return fmt.Errorf("import request failed: %w", err)
}
if err := importResp.Body.Close(); err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("cannot close import response body: %s", err)
return fmt.Errorf("cannot close import response body: %w", err)
}
importDuration.UpdateDuration(startTime)
return nil
@@ -105,7 +105,7 @@ func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadC
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exportRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %s", c.Addr, err)
return nil, fmt.Errorf("cannot create request to %q: %w", c.Addr, err)
}
params := req.URL.Query()
@@ -136,7 +136,7 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
u := fmt.Sprintf("%s/%s", c.Addr, nativeTenantsAddr)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %s", u, err)
return nil, fmt.Errorf("cannot create request to %q: %w", u, err)
}
params := req.URL.Query()
@@ -150,18 +150,18 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
resp, err := c.do(req, http.StatusOK)
if err != nil {
return nil, fmt.Errorf("tenants request failed: %s", err)
return nil, fmt.Errorf("tenants request failed: %w", err)
}
var r struct {
Tenants []string `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("cannot decode tenants response: %s", err)
return nil, fmt.Errorf("cannot decode tenants response: %w", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close tenants response body: %s", err)
return nil, fmt.Errorf("cannot close tenants response body: %w", err)
}
return r.Tenants, nil
@@ -180,7 +180,7 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
if resp.StatusCode != expSC {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
return nil, fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
}
return nil, fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body))
}

View File

@@ -47,7 +47,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit)
m, err := op.oc.FindMetrics(q)
if err != nil {
return fmt.Errorf("metric discovery failed for %q: %s", q, err)
return fmt.Errorf("metric discovery failed for %q: %w", q, err)
}
metrics = append(metrics, m...)
}
@@ -76,7 +76,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
log.Printf("Starting work on %s", metric)
serieslist, err := op.oc.FindSeries(metric)
if err != nil {
return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err)
return fmt.Errorf("couldn't retrieve series list for %s: %w", metric, err)
}
/*
Create channels for collecting/processing series and errors
@@ -95,7 +95,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := op.do(s); err != nil {
otsdbErrorsTotal.Inc()
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
errCh <- fmt.Errorf("couldn't retrieve series for %s: %w", metric, err)
return
}
otsdbSeriesProcessed.Inc()
@@ -112,7 +112,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
// check for any lingering errors on the query side
for otsdbErr := range errCh {
if runErr == nil {
runErr = fmt.Errorf("import process failed: \n%s", otsdbErr)
runErr = fmt.Errorf("import process failed:\n%w", otsdbErr)
}
}
bar.Finish()
@@ -125,7 +125,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for vmErr := range op.im.Errors() {
if vmErr.Err != nil {
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
}
}
log.Println("Import finished!")
@@ -141,12 +141,12 @@ func (op *otsdbProcessor) sendQueries(ctx context.Context, serieslist []opentsdb
for _, tr := range rt.QueryRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %s", ctx.Err())
return fmt.Errorf("context canceled: %w", ctx.Err())
case otsdbErr := <-errCh:
otsdbErrorsTotal.Inc()
return fmt.Errorf("opentsdb error: %s", otsdbErr)
return fmt.Errorf("opentsdb error: %w", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
@@ -166,7 +166,7 @@ func (op *otsdbProcessor) do(s queryObj) error {
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
if err != nil {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
return fmt.Errorf("failed to collect data for %v in %v:%v :: %w", s.Series, s.Rt, s.Tr, err)
}
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
log.Printf("no data found for %v in %v:%v...skipping", s.Series, s.Rt, s.Tr)

View File

@@ -106,7 +106,7 @@ func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -114,12 +114,12 @@ func (c Client) FindMetrics(q string) ([]string, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
return nil, fmt.Errorf("could not retrieve metric data from %q: %w", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
}
return metriclist, nil
}
@@ -130,7 +130,7 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -138,12 +138,12 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
return nil, fmt.Errorf("could not retrieve series data from %q: %w", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
}
return results.Results, nil
}
@@ -183,7 +183,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := c.c.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
return Metric{}, fmt.Errorf("failed to send GET request to %q: %w", q, err)
}
defer func() { _ = resp.Body.Close() }()
/*
@@ -303,7 +303,7 @@ func NewClient(cfg Config) (*Client, error) {
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %v", r, err)
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %w", r, err)
}
retentions = append(retentions, ret)
}

View File

@@ -88,7 +88,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
}
queryLengthDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %w", chunks[2], err)
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
queryLength := queryLengthDuration.Milliseconds()
@@ -110,7 +110,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
aggTimeDuration, err := convertDuration(aggregates[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %s", aggregates[1], err)
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %w", aggregates[1], err)
}
aggTime := aggTimeDuration.Milliseconds()
if !msecTime {
@@ -119,7 +119,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
rowLengthDuration, err := convertDuration(chunks[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %w", chunks[1], err)
}
// set length of each row in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
rowLength := rowLengthDuration.Milliseconds()

View File

@@ -46,7 +46,7 @@ type prometheusProcessor struct {
func (pp *prometheusProcessor) run(ctx context.Context) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
return fmt.Errorf("explore failed: %w", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -57,7 +57,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
}
if err := pp.processBlocks(ctx, blocks); err != nil {
return fmt.Errorf("migration failed: %s", err)
return fmt.Errorf("migration failed: %w", err)
}
log.Println("Import finished!")
@@ -68,7 +68,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
css, err := pp.cl.Read(ctx, b)
if err != nil {
return fmt.Errorf("failed to read block: %s", err)
return fmt.Errorf("failed to read block: %w", err)
}
defer func() {
if err := css.Close(); err != nil {
@@ -146,7 +146,7 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for br := range blockReadersCh {
if err := pp.do(ctx, br); err != nil {
promErrorsTotal.Inc()
errCh <- fmt.Errorf("cannot read block %q: %s", br.Meta().ULID, err)
errCh <- fmt.Errorf("cannot read block %q: %w", br.Meta().ULID, err)
return
}
if cb, ok := br.(io.Closer); ok {
@@ -164,11 +164,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %s", promErr)
return fmt.Errorf("prometheus error: %w", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
case blockReadersCh <- br:
}
}
@@ -182,11 +182,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
return fmt.Errorf("import process failed: %w", err)
}
return nil

View File

@@ -59,12 +59,12 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
db, err := tsdb.OpenDBReadOnly(cfg.Snapshot, cfg.TemporaryDir, nil)
if err != nil {
return nil, fmt.Errorf("failed to open snapshot %q: %s", cfg.Snapshot, err)
return nil, fmt.Errorf("failed to open snapshot %q: %w", cfg.Snapshot, err)
}
c := &Client{DBReadOnly: db}
timeMin, timeMax, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
}
c.filter = filter{
min: timeMin,
@@ -83,7 +83,7 @@ func NewClient(cfg Config) (*Client, error) {
func (c *Client) Explore() ([]tsdb.BlockReader, error) {
blocks, err := c.Blocks()
if err != nil {
return nil, fmt.Errorf("failed to fetch blocks: %s", err)
return nil, fmt.Errorf("failed to fetch blocks: %w", err)
}
s := &vmctlutil.Stats{
Filtered: c.filter.min != 0 || c.filter.max != 0 || c.filter.label != "",
@@ -142,14 +142,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -44,7 +44,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
ranges, err := stepper.SplitDateRange(*rrp.filter.timeStart, *rrp.filter.timeEnd, rrp.filter.chunk, rrp.filter.timeReverse)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %v", err)
return fmt.Errorf("failed to create date ranges for the given time filters: %w", err)
}
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
@@ -74,7 +74,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
remoteReadErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for: %s", err)
errCh <- fmt.Errorf("request failed for: %w", err)
return
}
remoteReadRangesProcessed.Inc()
@@ -86,10 +86,10 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for _, r := range ranges {
select {
case infErr := <-errCh:
return fmt.Errorf("remote read error: %s", infErr)
return fmt.Errorf("remote read error: %w", infErr)
case vmErr := <-rrp.dst.Errors():
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(),
@@ -105,11 +105,11 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
return fmt.Errorf("import process failed: %w", err)
}
return nil
@@ -119,7 +119,7 @@ func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filte
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
if err := rrp.dst.Input(series); err != nil {
return fmt.Errorf(
"failed to read data for time range start: %d, end: %d, %s",
"failed to read data for time range start: %d, end: %d: %w",
filter.StartTimestampMs, filter.EndTimestampMs, err)
}
return nil

View File

@@ -157,7 +157,7 @@ func (c *Client) Read(ctx context.Context, filter *Filter, streamCb StreamCallba
if errors.Is(err, context.Canceled) {
return fmt.Errorf("fetch request has ben cancelled")
}
return fmt.Errorf("error while fetching data from remote storage: %s", err)
return fmt.Errorf("error while fetching data from remote storage: %w", err)
}
return nil
}

View File

@@ -52,7 +52,7 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
minTime, maxTime, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
}
return &Client{
snapshotPath: cfg.Snapshot,
@@ -183,14 +183,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -36,7 +36,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
// Use the first aggregate type to explore blocks (block list is the same for all types)
blocks, err := tp.cl.Explore(tp.aggrTypes[0])
if err != nil {
return fmt.Errorf("explore failed: %s", err)
return fmt.Errorf("explore failed: %w", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -84,7 +84,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
log.Println("Processing raw blocks (resolution=0)...")
stats, err := tp.processBlocks(rawBlocks, thanos.AggrTypeNone, bar)
if err != nil {
return fmt.Errorf("migration failed for raw blocks: %s", err)
return fmt.Errorf("migration failed for raw blocks: %w", err)
}
phases = append(phases, phaseStats{
name: "raw",
@@ -108,7 +108,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
aggrBlocks, err := tp.cl.Explore(aggrType)
if err != nil {
return fmt.Errorf("explore failed for aggr type %s: %s", aggrType, err)
return fmt.Errorf("explore failed for aggr type %s: %w", aggrType, err)
}
var downsampledOnly []thanos.BlockInfo
@@ -128,7 +128,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
stats, err := tp.processBlocks(downsampledOnly, aggrType, bar)
thanos.CloseBlocks(aggrBlocks)
if err != nil {
return fmt.Errorf("migration failed for aggr type %s: %s", aggrType, err)
return fmt.Errorf("migration failed for aggr type %s: %w", aggrType, err)
}
phases = append(phases, phaseStats{
name: aggrType.String(),
@@ -153,7 +153,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
for vmErr := range tp.im.Errors() {
if vmErr.Err != nil {
thanosErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
}
}
@@ -184,7 +184,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
seriesCount, samplesCount, err := tp.do(bi, aggrType)
if err != nil {
thanosErrorsTotal.Inc()
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %s", bi.Block.Meta().ULID, aggrType, err)
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %w", bi.Block.Meta().ULID, aggrType, err)
return
}
@@ -209,12 +209,12 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
case thanosErr := <-errCh:
close(blockReadersCh)
wg.Wait()
return processBlocksStats{}, fmt.Errorf("thanos error: %s", thanosErr)
return processBlocksStats{}, fmt.Errorf("thanos error: %w", thanosErr)
case vmErr := <-tp.im.Errors():
close(blockReadersCh)
wg.Wait()
thanosErrorsTotal.Inc()
return processBlocksStats{}, fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
return processBlocksStats{}, fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
case blockReadersCh <- bi:
}
}
@@ -223,7 +223,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
wg.Wait()
close(errCh)
for err := range errCh {
return processBlocksStats{}, fmt.Errorf("import process failed: %s", err)
return processBlocksStats{}, fmt.Errorf("import process failed: %w", err)
}
return processBlocksStats{
@@ -236,7 +236,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
func (tp *thanosProcessor) do(bi thanos.BlockInfo, aggrType thanos.AggrType) (uint64, uint64, error) {
ss, err := tp.cl.Read(bi)
if err != nil {
return 0, 0, fmt.Errorf("failed to read block: %s", err)
return 0, 0, fmt.Errorf("failed to read block: %w", err)
}
defer ss.Close() // Ensure querier is closed even on early returns

View File

@@ -163,7 +163,7 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
importDuration: metrics.GetOrCreateHistogram(`vmctl_importer_request_duration_seconds`),
}
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
return nil, fmt.Errorf("ping to %q failed: %w", addr, err)
}
if cfg.BatchSize < 1 {
@@ -289,7 +289,7 @@ func (im *Importer) flush(ctx context.Context, b []*TimeSeries) error {
retryableFunc := func() error { return im.Import(b) }
attempts, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
return fmt.Errorf("import failed with %d retries: %s", attempts, err)
return fmt.Errorf("import failed with %d retries: %w", attempts, err)
}
im.s.Lock()
im.s.retries = attempts
@@ -302,7 +302,7 @@ func (im *Importer) Ping() error {
url := fmt.Sprintf("%s/health", im.addr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
@@ -332,7 +332,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
req, err := http.NewRequest(http.MethodPost, im.importPath, pr)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
}
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
@@ -352,7 +352,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
return fmt.Errorf("unexpected error when creating gzip writer: %w", err)
}
w = zw
}
@@ -411,7 +411,7 @@ var ErrBadRequest = errors.New("bad request")
func (im *Importer) do(req *http.Request) error {
resp, err := im.client.Do(req)
if err != nil {
return fmt.Errorf("unexpected error when performing request: %s", err)
return fmt.Errorf("unexpected error when performing request: %w", err)
}
defer func() {
_ = resp.Body.Close()
@@ -419,7 +419,7 @@ func (im *Importer) do(req *http.Request) error {
if resp.StatusCode != http.StatusNoContent {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
return fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
}
if resp.StatusCode == http.StatusBadRequest {
return fmt.Errorf("%w: unexpected response code %d: %s", ErrBadRequest, resp.StatusCode, string(body))

View File

@@ -55,14 +55,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
start, err := vmctlutil.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = vmctlutil.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
}
@@ -91,7 +91,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
err := p.runBackfilling(ctx, tenantID, ranges)
if err != nil {
migrationErrorsTotal.Inc()
return fmt.Errorf("migration failed: %s", err)
return fmt.Errorf("migration failed: %w", err)
}
if p.interCluster {
@@ -157,7 +157,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
}
default:
}
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
return fmt.Errorf("failed to write into %q: %w", p.dst.Addr, err)
}
p.s.Lock()
@@ -184,7 +184,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
importAddr, err := vm.AddExtraLabelsToImportPath(importAddr, p.dst.ExtraLabels)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %s", err)
return fmt.Errorf("failed to add labels to import path: %w", err)
}
dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr)
@@ -222,7 +222,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
format = fmt.Sprintf(nativeWithBackoffTpl, barPrefix)
metricsMap, err = p.explore(ctx, p.src, tenantID, ranges)
if err != nil {
return fmt.Errorf("failed to explore metric names: %s", err)
return fmt.Errorf("failed to explore metric names: %w", err)
}
if len(metricsMap) == 0 {
errMsg := "no metrics found"
@@ -295,7 +295,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("export/import error: %s", infErr)
return fmt.Errorf("export/import error: %w", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
@@ -313,7 +313,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
close(errCh)
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
return fmt.Errorf("import process failed: %w", err)
}
return nil

View File

@@ -257,7 +257,7 @@ func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) boo
dealine, err = strconv.Atoi(deadlineStr)
if err != nil {
logger.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err)
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err))
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %w", deadlineStr, err))
return true
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -21,16 +21,16 @@
},
"dependencies": {
"classnames": "^2.5.1",
"dayjs": "^1.11.20",
"dayjs": "^1.11.21",
"lodash.debounce": "^4.0.8",
"marked": "^18.0.2",
"preact": "^10.29.1",
"qs": "^6.15.1",
"marked": "^18.0.5",
"preact": "^10.29.2",
"qs": "^6.15.2",
"react-input-mask": "^2.0.4",
"react-router-dom": "^7.14.1",
"react-router-dom": "^7.17.0",
"uplot": "^1.6.32",
"vite": "^8.0.8",
"web-vitals": "^5.2.0"
"vite": "^8.0.16",
"web-vitals": "^5.3.0"
},
"devDependencies": {
"@eslint/eslintrc": "^3.3.5",
@@ -39,24 +39,24 @@
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/preact": "^3.2.4",
"@types/lodash.debounce": "^4.0.9",
"@types/node": "^25.6.0",
"@types/qs": "^6.15.0",
"@types/react": "^19.2.14",
"@types/node": "^25.9.2",
"@types/qs": "^6.15.1",
"@types/react": "^19.2.17",
"@types/react-input-mask": "^3.0.6",
"@types/react-router-dom": "^5.3.3",
"@typescript-eslint/eslint-plugin": "^8.58.2",
"@typescript-eslint/parser": "^8.58.2",
"@typescript-eslint/eslint-plugin": "^8.61.0",
"@typescript-eslint/parser": "^8.61.0",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-unused-imports": "^4.4.1",
"globals": "^17.5.0",
"http-proxy-middleware": "^3.0.5",
"jsdom": "^29.0.2",
"postcss": "^8.5.10",
"sass-embedded": "^1.99.0",
"typescript": "^6.0.2",
"vitest": "^4.1.4"
"globals": "^17.6.0",
"http-proxy-middleware": "^4.1.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.15",
"sass-embedded": "^1.100.0",
"typescript": "^6.0.3",
"vitest": "^4.1.8"
},
"browserslist": {
"production": [

View File

@@ -156,14 +156,14 @@ func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string {
//
// This type is expected to be embedded by the apps that serve metrics.
type metricsClient struct {
metricsCli *Client
url string
cli *Client
url string
}
func newMetricsClient(cli *Client, addr string) *metricsClient {
return &metricsClient{
metricsCli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
cli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
}
}
@@ -179,7 +179,7 @@ func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int {
func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 {
t.Helper()
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -205,7 +205,7 @@ func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float6
values := []float64{}
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -234,7 +234,7 @@ func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []fl
values := []float64{}
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
metrics, statusCode := c.cli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -270,7 +270,7 @@ func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int {
}
type vmselectClient struct {
vmselectCli *Client
cli *Client
url func(op, path string, opts QueryOpts) string
metricNamesStatsResetURL string
tenantsURL string
@@ -287,7 +287,7 @@ func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -302,7 +302,7 @@ func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string,
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return []byte(res)
}
@@ -315,7 +315,7 @@ func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts Q
url := c.url("select", "prometheus/api/v1/query", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -329,7 +329,7 @@ func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, o
url := c.url("select", "prometheus/api/v1/query_range", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -342,7 +342,7 @@ func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesResponse(t, res)
}
@@ -354,7 +354,7 @@ func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts
t.Helper()
url := c.url("select", "prometheus/api/v1/series/count", opts)
values := opts.asURLValues()
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesCountResponse(t, res)
}
@@ -367,7 +367,7 @@ func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/labels", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelsResponse(t, res)
}
@@ -382,7 +382,7 @@ func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, mat
url := c.url("select", path, opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelValuesResponse(t, res)
}
@@ -394,7 +394,7 @@ func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, li
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1Metadata(t, res)
}
@@ -408,7 +408,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matc
url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -426,7 +426,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, lim
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -455,7 +455,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery stri
addNonEmpty("match[]", matchQuery)
addNonEmpty("topN", topN)
addNonEmpty("date", date)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -476,7 +476,7 @@ func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) Grap
t.Helper()
url := c.url("select", "graphite/metrics/index.json", opts)
res, statusCode := c.vmselectCli.Get(t, url, opts.Headers)
res, statusCode := c.cli.Get(t, url, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -499,7 +499,7 @@ func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts Qu
url := c.url("select", "graphite/metrics/find", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -522,7 +522,7 @@ func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts
url := c.url("select", "graphite/metrics/expand", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -546,7 +546,7 @@ func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryO
values := opts.asURLValues()
values.Add("format", "json")
values.Add("target", target)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -567,7 +567,7 @@ func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts
url := c.url("select", "graphite/tags/tagSeries", opts)
values := opts.asURLValues()
values.Add("path", record)
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -584,7 +584,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
for _, rec := range records {
values.Add("path", rec)
}
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -598,7 +598,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
t.Helper()
values := opts.asURLValues()
res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
res, statusCode := c.cli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -608,7 +608,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *test
// /admin/tenants endpoint.
func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse {
t.Helper()
res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers)
res, statusCode := c.cli.Get(t, c.tenantsURL, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -622,7 +622,7 @@ func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminT
}
type vminsertClient struct {
vminsertCli *Client
cli *Client
url func(op, path string, opts QueryOpts) string
openTSDBURL func(op, path string, opts QueryOpts) string
graphiteListenAddr string
@@ -647,7 +647,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -671,7 +671,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte,
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, 1, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -693,7 +693,7 @@ func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteReque
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -745,7 +745,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records [
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -771,7 +771,7 @@ func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryO
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
t.Helper()
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -805,7 +805,7 @@ func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsD
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -830,7 +830,7 @@ func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts Que
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -853,7 +853,7 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
_, statusCode := c.cli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -867,11 +867,11 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
t.Helper()
c.vminsertCli.Write(t, c.graphiteListenAddr, records)
c.cli.Write(t, c.graphiteListenAddr, records)
}
type vmstorageClient struct {
vmstorageCli *Client
cli *Client
httpListenAddr string
}
@@ -881,7 +881,7 @@ func (c *vmstorageClient) ForceFlush(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
_, statusCode := c.cli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -892,7 +892,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
_, statusCode := c.cli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -905,7 +905,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
t.Helper()
data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil)
data, statusCode := c.cli.Post(t, c.SnapshotCreateURL(), nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -931,7 +931,7 @@ func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSn
t.Helper()
url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
data, statusCode := c.cli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -952,7 +952,7 @@ func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse {
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Get(t, url, nil)
data, statusCode := c.cli.Get(t, url, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -973,7 +973,7 @@ func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *Sna
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName)
data, statusCode := c.vmstorageCli.Delete(t, url)
data, statusCode := c.cli.Delete(t, url)
wantStatusCodes := map[int]bool{
http.StatusOK: true,
http.StatusInternalServerError: true,
@@ -998,7 +998,7 @@ func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResp
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
data, statusCode := c.cli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}

View File

@@ -77,7 +77,7 @@ type vminsertRuntimeValues struct {
func newVminsert(app *app, cli *Client, rt vminsertRuntimeValues) *Vminsert {
metricsClient := newMetricsClient(cli, rt.httpListenAddr)
vminsertClient := &vminsertClient{
vminsertCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -48,7 +48,7 @@ func newVmselect(app *app, cli *Client, rt vmselectRuntimeValues) *Vmselect {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmselectClient: &vmselectClient{
vmselectCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -58,11 +58,11 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
vmstorageCli: cli,
cli: cli,
httpListenAddr: rt.httpListenAddr,
},
vmselectClient: &vmselectClient{
vmselectCli: cli,
cli: cli,
url: func(op, path string, opts QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},
@@ -70,7 +70,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
tenantsURL: "vmsingle-does-not-serve-tenants",
},
vminsertClient: &vminsertClient{
vminsertCli: cli,
cli: cli,
url: func(_, path string, _ QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},

View File

@@ -63,7 +63,7 @@ func newVmstorage(app *app, cli *Client, rt vmstorageRuntimeValues) *Vmstorage {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
vmstorageCli: cli,
cli: cli,
httpListenAddr: rt.httpListenAddr,
},
storageDataPath: rt.storageDataPath,

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -121,7 +121,7 @@ func (p *Password) initRandomValue() {
_, err := io.ReadFull(rand.Reader, buf[:])
if err != nil {
// cannot use lib/logger here, since it can be uninitialized yet
panic(fmt.Errorf("FATAL: cannot read random data: %s", err))
panic(fmt.Errorf("FATAL: cannot read random data: %w", err))
}
s := string(buf[:])
p.value.Store(&s)

View File

@@ -16,7 +16,7 @@ func ParseKey(key []byte) (any, error) {
k, err := x509.ParsePKIXPublicKey(b.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse key %q: %v", key, err)
return nil, fmt.Errorf("failed to parse key %q: %w", key, err)
}
return k, nil

View File

@@ -14,7 +14,7 @@ func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) {
wru := &WriteRequestUnmarshaler{}
for pb.Next() {
if _, err := wru.UnmarshalProtobuf(data); err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
panic(fmt.Errorf("unexpected error: %w", err))
}
}
})

View File

@@ -97,12 +97,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
var dedicatedServerDetails dedicatedServer
if err = json.Unmarshal(resp, &dedicatedServerDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
// get IPs for this dedicated server.
@@ -113,12 +113,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
// handle different IP formats
@@ -141,11 +141,11 @@ func getDedicatedServerList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
if err = json.Unmarshal(resp, &dedicatedServerList); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
return dedicatedServerList, nil

View File

@@ -117,12 +117,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
var vpsDetails virtualPrivateServer
if err = json.Unmarshal(resp, &vpsDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
// get IPs for this vps.
@@ -133,12 +133,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
// handle different IP formats
@@ -162,12 +162,12 @@ func getVPSList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
}
var vpsList []string
if err = json.Unmarshal(resp, &vpsList); err != nil {
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
}
return vpsList, nil

View File

@@ -33,7 +33,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
parsedURL, err := url.Parse(sdc.URL)
if err != nil {
return nil, fmt.Errorf("parse URL %s error: %v", sdc.URL, err)
return nil, fmt.Errorf("cannot parse %s: %w", sdc.URL, err)
}
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return nil, fmt.Errorf("URL %s scheme must be 'http' or 'https'", sdc.URL)

View File

@@ -221,7 +221,7 @@ func getIAMToken(cfg *apiConfig) (*iamToken, error) {
body := bytes.NewBuffer(passport)
resp, err := cfg.client.Post(iamURL, "application/json", body)
if err != nil {
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %s", iamURL, err)
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %w", iamURL, err)
}
data, err := readResponseBody(resp, iamURL)
if err != nil {

View File

@@ -667,11 +667,11 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
}
generateScrape := func(n int) string {
w := strings.Builder{}
var w strings.Builder
for i := range n {
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
fmt.Fprintf(&w, "fooooo_%d 1\n", i)
if i%100 == 0 {
w.WriteString(fmt.Sprintf("# HELP fooooo_%d This is a test\n", i))
fmt.Fprintf(&w, "# HELP fooooo_%d This is a test\n", i)
}
}
return w.String()
@@ -1005,9 +1005,9 @@ func TestSendStaleSeries(t *testing.T) {
}
}
generateScrape := func(n int) string {
w := strings.Builder{}
var w strings.Builder
for i := range n {
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
fmt.Fprintf(&w, "foo_%d 1\n", i)
}
return w.String()
}

View File

@@ -186,7 +186,7 @@ func (s *Series) unmarshalProtobuf(src []byte) (err error) {
}
pt := &points[len(points)-1]
if err := pt.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal point: %s", err)
return fmt.Errorf("cannot unmarshal point: %w", err)
}
case 1:
data, ok := fc.MessageData()

View File

@@ -30,7 +30,7 @@ func ProcessRequestBody(b []byte) ([]byte, error) {
}
}
if err := json.Unmarshal(b, &req); err != nil {
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %s", err)
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %w", err)
}
var dst []byte

View File

@@ -99,17 +99,17 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
n, err := getFloat64(o, "value")
if err != nil {
return fmt.Errorf("missing `value` element, %s", err)
return fmt.Errorf("missing `value` element: %w", err)
}
r.Value = n
cl, err := getInt64(o, "clock")
if err != nil {
return fmt.Errorf("missing `clock` element, %s", err)
return fmt.Errorf("missing `clock` element: %w", err)
}
ns, err := getInt64(o, "ns")
if err != nil {
return fmt.Errorf("missing `ns` element, %s", err)
return fmt.Errorf("missing `ns` element: %w", err)
}
// clock - Number of seconds since Epoch to the moment when value was collected (integer part).
// ns - Number of nanoseconds to be added to clock to get a precise value collection time.
@@ -121,7 +121,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
if len(groupValue) != 0 {
groups, err := getArray(o, "groups")
if err != nil {
return fmt.Errorf("missing `groups` element, %s", err)
return fmt.Errorf("missing `groups` element: %w", err)
}
for _, g := range groups {
k := g.GetStringBytes()
@@ -141,7 +141,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
itemTags, err := getArray(o, "item_tags")
if err != nil {
return fmt.Errorf("missing `item_tags` element, %s", err)
return fmt.Errorf("missing `item_tags` element: %w", err)
}
if len(duplicateTagsSeparator) == 0 { // Do not merge tags

View File

@@ -71,9 +71,9 @@ func Create(ctx context.Context, createSnapshotURL string) (string, error) {
return snap.Snapshot, nil
}
if snap.Status == "error" {
return "", errors.New(snap.Msg)
return "", fmt.Errorf("snapshot status: %q; msg: %q", snap.Status, snap.Msg)
}
return "", fmt.Errorf("unknown status: %v", snap.Status)
return "", fmt.Errorf("snapshot status unknown: %q", snap.Status)
}
// Delete deletes a snapshot via the provided api endpoint
@@ -121,14 +121,14 @@ func Delete(ctx context.Context, deleteSnapshotURL string, snapshotName string)
if snap.Status == "error" {
return errors.New(snap.Msg)
}
return fmt.Errorf("unknown status: %v", snap.Status)
return fmt.Errorf("snapshot status unknown: %q", snap.Status)
}
// GetHTTPClient returns a new HTTP client configured for snapshot operations.
func GetHTTPClient() (*http.Client, error) {
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vm_snapshot_client")
if err != nil {
return nil, fmt.Errorf("failed to create transport: %s", err)
return nil, fmt.Errorf("failed to create transport: %w", err)
}
hc := &http.Client{
Transport: tr,

View File

@@ -649,7 +649,7 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer,
} else {
_, key, err := unmarshalCompositeTagKey(labelName)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal composite tag key: %s", err)
return nil, fmt.Errorf("cannot unmarshal composite tag key: %w", err)
}
lns[string(key)] = struct{}{}
}

View File

@@ -231,6 +231,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
}
deadlineTime = deadlineTime.Add(d.interval)
for time.Now().After(deadlineTime) {
deadlineTime = deadlineTime.Add(d.interval)
}

View File

@@ -845,6 +845,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
} else {
a.flush(pf, flushTime, cs, false)
}
flushTime = flushTime.Add(a.interval)
for time.Now().After(flushTime) {
flushTime = flushTime.Add(a.interval)
}

View File

@@ -1,5 +1,3 @@
//go:build synctest
package streamaggr
import (
@@ -485,10 +483,8 @@ foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
@@ -694,21 +690,29 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
// test rate_sum and rate_avg, when two aggregation intervals are empty
f([]string{`
foo{abc="123", cde="1"} 2
foo{abc="456", cde="1"} 8
foo{abc="777", cde="1"} 9 -10
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
foo{abc="456", cde="1"} 7
foo{abc="456", cde="1"} 8 1
foo{abc="777", cde="1"} 8
foo{abc="777", cde="1"} 9 1
`, ``, ``, `
foo{abc="123", cde="1"} 20
foo{abc="123", cde="1"} 19
foo{abc="123", cde="1"} 20 1
foo{abc="456", cde="1"} 26
foo{abc="777", cde="1"} 27 -10
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 0.1
foo:1m_by_cde_rate_sum{cde="1"} 0.2
foo{abc="456", cde="1"} 27 1
foo{abc="777", cde="1"} 27
foo{abc="777", cde="1"} 28 1
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_by_cde_rate_sum{cde="1"} 3
`, `
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
enable_windows: true
`, "111111")
`, "111111111111")
// rate_sum and rate_avg with duplicated events
f([]string{`

View File

@@ -74,7 +74,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
}
return a
}
@@ -133,7 +133,7 @@ func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregat
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
}
return a
}