Compare commits

..

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
90029442cb lib/streamaggr: use stale samples for increase, total and rate outputs state update
Before, with ignore_old_samples or enable_windows set, old samples were just dropped.
With this change, these samples are used to update per-series state in stateful aggregation outputs: rate, total, and increase, but do not contribute to the aggregated output.
This ensures the next in-interval sample computes the correct per-interval increase relative to the most recent pre-interval value, rather than a stale one from a previous flush cycle.
2026-06-10 15:00:20 +03:00
79 changed files with 793 additions and 522 deletions

View File

@@ -52,14 +52,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@7211b7c8077ea37d8641b6271f6a365a22a5fbfa # v4.36.0
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
category: 'language:go'

View File

@@ -10,7 +10,7 @@ import (
func Compress(wr WriteRequest) []byte {
data, err := wr.Marshal()
if err != nil {
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %w", err))
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %s", err))
}
return snappy.Encode(nil, data)
}

View File

@@ -61,15 +61,15 @@ func parseInputSeries(input []series, interval *promutil.Duration, startStamp ti
for _, data := range input {
expr, err := metricsql.Parse(data.Series)
if err != nil {
return res, fmt.Errorf("failed to parse series %s: %w", data.Series, err)
return res, fmt.Errorf("failed to parse series %s: %v", data.Series, err)
}
promvals, err := parseInputValue(data.Values, true)
if err != nil {
return res, fmt.Errorf("failed to parse input series value %s: %w", data.Values, err)
return res, fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
}
metricExpr, ok := expr.(*metricsql.MetricExpr)
if !ok || len(metricExpr.LabelFilterss) != 1 {
return res, fmt.Errorf("got invalid input series %s: %w", data.Series, err)
return res, fmt.Errorf("got invalid input series %s: %v", data.Series, err)
}
samples := make([]testutil.Sample, 0, len(promvals))
ts := startStamp

View File

@@ -53,13 +53,13 @@ Outer:
if s.Labels != "" {
metricsqlExpr, err := metricsql.Parse(s.Labels)
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
continue Outer
}
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
if !ok || len(metricsqlMetricExpr.LabelFilterss) > 1 {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("got invalid exp_samples: %q", s.Labels)))
continue Outer
}

View File

@@ -329,11 +329,11 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
q, err := datasource.Init(nil)
if err != nil {
return []error{fmt.Errorf("failed to init datasource: %w", err)}
return []error{fmt.Errorf("failed to init datasource: %v", err)}
}
rw, err := remotewrite.NewDebugClient()
if err != nil {
return []error{fmt.Errorf("failed to init wr: %w", err)}
return []error{fmt.Errorf("failed to init wr: %v", err)}
}
alertEvalTimesMap := map[time.Duration]struct{}{}

View File

@@ -89,7 +89,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
labels.Visit(func(key []byte, v *fastjson.Value) {
lv, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("error when parsing label value %q: %w", v, errLocal)
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
return
}
r.Labels = append(r.Labels, prompb.Label{
@@ -112,7 +112,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
r.Timestamps = []int64{sample[0].GetInt64()}
val, err := sample[1].StringBytes()
if err != nil {
return fmt.Errorf("error when parsing `value` object %q: %w", sample[1], err)
return fmt.Errorf("error when parsing `value` object %q: %s", sample[1], err)
}
f, err := strconv.ParseFloat(bytesutil.ToUnsafeString(val), 64)
if err != nil {

View File

@@ -601,7 +601,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.QueryFn) (*labelSet, error) {
ls, err := ar.toLabels(m, qFn)
if err != nil {
return ls, fmt.Errorf("failed to expand label templates: %w", err)
return ls, fmt.Errorf("failed to expand label templates: %s", err)
}
return ls, nil
}
@@ -620,7 +620,7 @@ func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templ
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {
return as, fmt.Errorf("failed to expand annotation templates: %w", err)
return as, fmt.Errorf("failed to expand annotation templates: %s", err)
}
return as, nil
}

View File

@@ -77,7 +77,7 @@ var (
func marshalJson(v any, kind string) ([]byte, *httpserver.ErrorWithStatusCode) {
data, err := json.Marshal(v)
if err != nil {
return nil, errResponse(fmt.Errorf("failed to marshal %s: %w", kind, err), http.StatusInternalServerError)
return nil, errResponse(fmt.Errorf("failed to marshal %s: %s", kind, err), http.StatusInternalServerError)
}
return data, nil
}

View File

@@ -1639,7 +1639,7 @@ func (w *fakeResponseWriter) WriteHeader(statusCode int) {
"X-Content-Type-Options": true,
})
if err != nil {
panic(fmt.Errorf("cannot marshal headers: %w", err))
panic(fmt.Errorf("cannot marshal headers: %s", err))
}
}

View File

@@ -161,7 +161,7 @@ func fetchAndParseJWKs(ctx context.Context, jwksURI string) (*jwt.VerifierPool,
vp, err := jwt.ParseJWKs(b)
if err != nil {
return nil, fmt.Errorf("failed to parse jwks keys from %q: %w", jwksURI, err)
return nil, fmt.Errorf("failed to parse jwks keys from %q: %v", jwksURI, err)
}
return vp, nil
@@ -188,7 +188,7 @@ func getOpenIDConfiguration(ctx context.Context, issuer string) (openidConfig, e
var cfg openidConfig
if err := json.NewDecoder(resp.Body).Decode(&cfg); err != nil {
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %w", configURL, err)
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %s", configURL, err)
}
return cfg, nil

View File

@@ -43,7 +43,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
func (ip *influxProcessor) run(ctx context.Context) error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %w", err)
return fmt.Errorf("explore query failed: %s", err)
}
if len(series) < 1 {
return fmt.Errorf("found no timeseries to import")
@@ -71,7 +71,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := ip.do(s); err != nil {
influxErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for %q.%q: %w", s.Measurement, s.Field, err)
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
return
}
influxSeriesProcessed.Inc()
@@ -84,10 +84,10 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for _, s := range series {
select {
case infErr := <-errCh:
return fmt.Errorf("influx error: %w", infErr)
return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors():
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s:
}
}
@@ -100,11 +100,11 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
log.Println("Import finished!")
@@ -119,7 +119,7 @@ const valueField = "value"
func (ip *influxProcessor) do(s *influx.Series) error {
cr, err := ip.ic.FetchDataPoints(s)
if err != nil {
return fmt.Errorf("failed to fetch datapoints: %w", err)
return fmt.Errorf("failed to fetch datapoints: %s", err)
}
defer func() {
_ = cr.Close()

View File

@@ -96,10 +96,10 @@ func NewClient(cfg Config) (*Client, error) {
}
hc, err := influx.NewHTTPClient(c)
if err != nil {
return nil, fmt.Errorf("failed to establish conn: %w", err)
return nil, fmt.Errorf("failed to establish conn: %s", err)
}
if _, _, err := hc.Ping(time.Second); err != nil {
return nil, fmt.Errorf("ping failed: %w", err)
return nil, fmt.Errorf("ping failed: %s", err)
}
chunkSize := cfg.ChunkSize
@@ -155,7 +155,7 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": ["value1", "value2"]}
mFields, err := c.fieldsByMeasurement()
if err != nil {
return nil, fmt.Errorf("failed to get field keys: %w", err)
return nil, fmt.Errorf("failed to get field keys: %s", err)
}
if len(mFields) < 1 {
@@ -165,12 +165,12 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": {"tag1", "tag2"}}
measurementTags, err := c.getMeasurementTags()
if err != nil {
return nil, fmt.Errorf("failed to get tags of measurements: %w", err)
return nil, fmt.Errorf("failed to get tags of measurements: %s", err)
}
series, err := c.getSeries()
if err != nil {
return nil, fmt.Errorf("failed to get series: %w", err)
return nil, fmt.Errorf("failed to get series: %s", err)
}
var iSeries []*Series
@@ -237,7 +237,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
return nil, nil, err
}
if resp.Error() != nil {
return nil, nil, fmt.Errorf("response error for %s: %w", cr.iq.Command, resp.Error())
return nil, nil, fmt.Errorf("response error for %s: %s", cr.iq.Command, resp.Error())
}
if len(resp.Results) != 1 {
return nil, nil, fmt.Errorf("unexpected number of results in response: %d", len(resp.Results))
@@ -265,7 +265,8 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
for i, fv := range fieldValues {
v, err := toFloat64(fv)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %w", cr.field, v, err)
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %s",
cr.field, v, err)
}
values[i] = v
}
@@ -293,7 +294,7 @@ func (c *Client) FetchDataPoints(s *Series) (*ChunkedResponse, error) {
}
cr, err := c.QueryAsChunk(iq)
if err != nil {
return nil, fmt.Errorf("query %q err: %w", iq.Command, err)
return nil, fmt.Errorf("query %q err: %s", iq.Command, err)
}
return &ChunkedResponse{cr, iq, s.Field}, nil
}
@@ -307,7 +308,7 @@ func (c *Client) fieldsByMeasurement() (map[string][]string, error) {
log.Printf("fetching fields: %s", stringify(q))
qValues, err := c.do(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
var total int
@@ -351,7 +352,7 @@ func (c *Client) getSeries() ([]*Series, error) {
log.Printf("fetching series: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
const key = "key"
@@ -365,7 +366,7 @@ func (c *Client) getSeries() ([]*Series, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -416,7 +417,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
log.Printf("fetching tag keys: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
const tagKey = "tagKey"
@@ -431,7 +432,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -454,10 +455,10 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
func (c *Client) do(q influx.Query) ([]queryValues, error) {
res, err := c.Query(q)
if err != nil {
return nil, fmt.Errorf("query error: %w", err)
return nil, fmt.Errorf("query error: %s", err)
}
if res.Error() != nil {
return nil, fmt.Errorf("response error: %w", res.Error())
return nil, fmt.Errorf("response error: %s", res.Error())
}
if len(res.Results) < 1 {
return nil, fmt.Errorf("query returned 0 results")

View File

@@ -71,7 +71,7 @@ func toFloat64(v any) (float64, error) {
func parseDate(dateStr string) (int64, error) {
startTime, err := time.Parse(time.RFC3339, dateStr)
if err != nil {
return 0, fmt.Errorf("cannot parse %q: %w", dateStr, err)
return 0, fmt.Errorf("cannot parse %q: %s", dateStr, err)
}
return startTime.UnixNano() / 1e6, nil
}
@@ -92,7 +92,7 @@ func (s *Series) unmarshal(v string) error {
var err error
s.LabelPairs, err = unmarshalTags(v[n+1:], noEscapeChars)
if err != nil {
return fmt.Errorf("failed to unmarhsal tags: %w", err)
return fmt.Errorf("failed to unmarhsal tags: %s", err)
}
return nil
}

View File

@@ -88,7 +88,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_opentsdb")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %w", otsdbAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %s", otsdbAddr, addr, err)
}
oCfg := opentsdb.Config{
Addr: addr,
@@ -103,17 +103,17 @@ func main() {
}
otsdbClient, err := opentsdb.NewClient(oCfg)
if err != nil {
return fmt.Errorf("failed to create opentsdb client: %w", err)
return fmt.Errorf("failed to create opentsdb client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
@@ -137,7 +137,7 @@ func main() {
tc, err := promauth.NewTLSConfig(certFile, keyFile, caFile, serverName, insecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
iCfg := influx.Config{
@@ -157,17 +157,17 @@ func main() {
influxClient, err := influx.NewClient(iCfg)
if err != nil {
return fmt.Errorf("failed to create influx client: %w", err)
return fmt.Errorf("failed to create influx client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
processor := newInfluxProcessor(
@@ -203,7 +203,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_remoteread")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %w", remoteReadSrcAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %s", remoteReadSrcAddr, addr, err)
}
// Backwards compatible default values if none provided by user
@@ -227,17 +227,17 @@ func main() {
DisablePathAppend: c.Bool(remoteReadDisablePathAppend),
})
if err != nil {
return fmt.Errorf("error create remote read client: %w", err)
return fmt.Errorf("error create remote read client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
rmp := remoteReadProcessor{
@@ -265,12 +265,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
promCfg := prometheus.Config{
@@ -285,7 +285,7 @@ func main() {
}
cl, err := prometheus.NewClient(promCfg)
if err != nil {
return fmt.Errorf("failed to create prometheus client: %w", err)
return fmt.Errorf("failed to create prometheus client: %s", err)
}
pp := prometheusProcessor{
@@ -307,12 +307,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
mCfg := mimir.Config{
@@ -335,7 +335,7 @@ func main() {
}
cl, err := mimir.NewClient(ctx, mCfg)
if err != nil {
return fmt.Errorf("failed to create mimir client: %w", err)
return fmt.Errorf("failed to create mimir client: %s", err)
}
pp := prometheusProcessor{
@@ -356,12 +356,12 @@ func main() {
fmt.Println("Thanos import mode")
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
thanosCfg := thanos.Config{
Snapshot: c.String(thanosSnapshot),
@@ -374,7 +374,7 @@ func main() {
}
cl, err := thanos.NewClient(thanosCfg)
if err != nil {
return fmt.Errorf("failed to create thanos client: %w", err)
return fmt.Errorf("failed to create thanos client: %s", err)
}
var aggrTypes []thanos.AggrType
@@ -382,7 +382,7 @@ func main() {
for _, typeStr := range aggrTypesStr {
aggrType, err := thanos.ParseAggrType(typeStr)
if err != nil {
return fmt.Errorf("failed to parse aggregate type %q: %w", typeStr, err)
return fmt.Errorf("failed to parse aggregate type %q: %s", typeStr, err)
}
aggrTypes = append(aggrTypes, aggrType)
}
@@ -415,7 +415,7 @@ func main() {
bfMinDuration := c.Duration(vmNativeBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return fmt.Errorf("failed to create backoff object: %w", err)
return fmt.Errorf("failed to create backoff object: %s", err)
}
disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive)
@@ -439,7 +439,7 @@ func main() {
srcTC, err := promauth.NewTLSConfig(srcCertFile, srcKeyFile, srcCAFile, srcServerName, srcInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trSrc := httputil.NewTransport(false, "vmctl_src")
@@ -469,7 +469,7 @@ func main() {
dstTC, err := promauth.NewTLSConfig(dstCertFile, dstKeyFile, dstCAFile, dstServerName, dstInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trDst := httputil.NewTransport(false, "vmctl_dst")
@@ -534,7 +534,7 @@ func main() {
log.Printf("verifying block at path=%q", blockPath)
f, err := os.OpenFile(blockPath, os.O_RDONLY, 0600)
if err != nil {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q: %w", blockPath, err), 1)
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
}
defer f.Close()
var blocksCount atomic.Uint64
@@ -542,7 +542,7 @@ func main() {
blocksCount.Add(1)
return nil
}); err != nil {
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d: %w", blockPath, blocksCount.Load(), err), 1)
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1)
}
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load())
return nil
@@ -585,7 +585,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_client")
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %w", vmAddr, addr, err)
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %s", vmAddr, addr, err)
}
bfRetries := c.Int(vmBackoffRetries)
@@ -593,7 +593,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
bfMinDuration := c.Duration(vmBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
return vm.Config{}, fmt.Errorf("failed to create backoff object: %s", err)
}
return vm.Config{

View File

@@ -54,7 +54,7 @@ func (lbr *lazyBlockReader) initialize() error {
// fetching block and parse it and store it in lbr.reader
temp, err := lbr.mkTempDir()
if err != nil {
return fmt.Errorf("failed to create temp dir: %w", err)
return fmt.Errorf("failed to create temp dir: %s", err)
}
lbr.tempDirPath = temp
@@ -85,7 +85,7 @@ func (lbr *lazyBlockReader) initialize() error {
return fmt.Errorf("failed to fetch chunk file: %q: %w", chunkName, err)
}
if err := lbr.writeFile(temp, blockChunkPath, chunk); err != nil {
return fmt.Errorf("failed to write chunk file: %q: %w", chunkName, err)
return fmt.Errorf("failed to write chunk file: %q: %s", chunkName, err)
}
}
@@ -135,7 +135,7 @@ func (lbr *lazyBlockReader) Meta() tsdb.BlockMeta {
// Size returns the number of bytes that the block takes up on disk.
func (lbr *lazyBlockReader) Size() int64 {
if err := lbr.initialize(); err != nil {
lbr.err = fmt.Errorf("error get Size of the block: %w, return zero size", err)
lbr.err = fmt.Errorf("error get Size of the block: %s, return zero size", err)
return 0
}
return lbr.reader.Size()
@@ -167,11 +167,11 @@ func (lbr *lazyBlockReader) Close() error {
func (lbr *lazyBlockReader) mkTempDir() (string, error) {
temp, err := os.MkdirTemp("", lbr.ID.String())
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
return "", fmt.Errorf("failed to create temp dir: %s", err)
}
err = os.Mkdir(filepath.Join(temp, "chunks"), os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
return "", fmt.Errorf("failed to create temp dir: %s", err)
}
return temp, nil
}

View File

@@ -133,11 +133,11 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
c.RemoteFS = rfs
timeMin, err := utils.ParseTime(cfg.Filter.TimeMin)
if err != nil {
return nil, fmt.Errorf("failed to parse min time in filter: %w", err)
return nil, fmt.Errorf("failed to parse min time in filter: %s", err)
}
timeMax, err := utils.ParseTime(cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse max time in filter: %w", err)
return nil, fmt.Errorf("failed to parse max time in filter: %s", err)
}
c.filter = filter{
min: timeMin.UnixMilli(),
@@ -156,7 +156,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
indexFile, err := c.fetchIndexFile()
if err != nil {
return nil, fmt.Errorf("failed to fetch index file: %w", err)
return nil, fmt.Errorf("failed to fetch index file: %s", err)
}
var blocksToImport []tsdb.BlockReader
@@ -172,7 +172,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
lazyBlockReader, err := newLazyBlockReader(block, c.RemoteFS)
if err != nil {
return nil, fmt.Errorf("failed to create lazy block reader: %w", err)
return nil, fmt.Errorf("failed to create lazy block reader: %s", err)
}
blocksToImport = append(blocksToImport, lazyBlockReader)
}
@@ -185,7 +185,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
func (c *Client) Read(ctx context.Context, block tsdb.BlockReader) (*prometheus.CloseableSeriesSet, error) {
meta := block.Meta()
if b, ok := block.(*lazyBlockReader); ok && b.Err() != nil {
return nil, fmt.Errorf("failed to read block: %w", b.Err())
return nil, fmt.Errorf("failed to read block: %s", b.Err())
}
if meta.ULID.String() == "" {
@@ -218,20 +218,20 @@ func (c *Client) fetchIndexFile() (*Index, error) {
file, err := c.ReadFile(bucketIndexCompressedFilename)
if err != nil {
return nil, fmt.Errorf("failed to read bucket index: %w", err)
return nil, fmt.Errorf("failed to read bucket index: %s", err)
}
r := bytes.NewReader(file)
// Read all the content.
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
return nil, fmt.Errorf("failed to create gzip reader: %s", err)
}
var indexFile Index
err = json.NewDecoder(gzipReader).Decode(&indexFile)
if err != nil {
return nil, fmt.Errorf("failed to decode bucket index: %w", err)
return nil, fmt.Errorf("failed to decode bucket index: %s", err)
}
return &indexFile, nil

View File

@@ -47,7 +47,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exploreRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %w", url, err)
return nil, fmt.Errorf("cannot create request to %q: %s", url, err)
}
params := req.URL.Query()
@@ -60,14 +60,14 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
if err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("series request failed: %w", err)
return nil, fmt.Errorf("series request failed: %s", err)
}
var response Response
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("cannot decode series response: %w", err)
return nil, fmt.Errorf("cannot decode series response: %s", err)
}
exploreDuration.UpdateDuration(startTime)
return response.MetricNames, resp.Body.Close()
@@ -80,19 +80,19 @@ func (c *Client) ImportPipe(ctx context.Context, dstURL string, pr *io.PipeReade
req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr)
if err != nil {
importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create import request to %q: %w", c.Addr, err)
return fmt.Errorf("cannot create import request to %q: %s", c.Addr, err)
}
importResp, err := c.do(req, http.StatusNoContent)
if err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("import request failed: %w", err)
return fmt.Errorf("import request failed: %s", err)
}
if err := importResp.Body.Close(); err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("cannot close import response body: %w", err)
return fmt.Errorf("cannot close import response body: %s", err)
}
importDuration.UpdateDuration(startTime)
return nil
@@ -105,7 +105,7 @@ func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadC
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exportRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %w", c.Addr, err)
return nil, fmt.Errorf("cannot create request to %q: %s", c.Addr, err)
}
params := req.URL.Query()
@@ -136,7 +136,7 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
u := fmt.Sprintf("%s/%s", c.Addr, nativeTenantsAddr)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %w", u, err)
return nil, fmt.Errorf("cannot create request to %q: %s", u, err)
}
params := req.URL.Query()
@@ -150,18 +150,18 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
resp, err := c.do(req, http.StatusOK)
if err != nil {
return nil, fmt.Errorf("tenants request failed: %w", err)
return nil, fmt.Errorf("tenants request failed: %s", err)
}
var r struct {
Tenants []string `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("cannot decode tenants response: %w", err)
return nil, fmt.Errorf("cannot decode tenants response: %s", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close tenants response body: %w", err)
return nil, fmt.Errorf("cannot close tenants response body: %s", err)
}
return r.Tenants, nil
@@ -180,7 +180,7 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
if resp.StatusCode != expSC {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
return nil, fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
}
return nil, fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body))
}

View File

@@ -47,7 +47,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit)
m, err := op.oc.FindMetrics(q)
if err != nil {
return fmt.Errorf("metric discovery failed for %q: %w", q, err)
return fmt.Errorf("metric discovery failed for %q: %s", q, err)
}
metrics = append(metrics, m...)
}
@@ -76,7 +76,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
log.Printf("Starting work on %s", metric)
serieslist, err := op.oc.FindSeries(metric)
if err != nil {
return fmt.Errorf("couldn't retrieve series list for %s: %w", metric, err)
return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err)
}
/*
Create channels for collecting/processing series and errors
@@ -95,7 +95,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := op.do(s); err != nil {
otsdbErrorsTotal.Inc()
errCh <- fmt.Errorf("couldn't retrieve series for %s: %w", metric, err)
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
return
}
otsdbSeriesProcessed.Inc()
@@ -112,7 +112,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
// check for any lingering errors on the query side
for otsdbErr := range errCh {
if runErr == nil {
runErr = fmt.Errorf("import process failed:\n%w", otsdbErr)
runErr = fmt.Errorf("import process failed: \n%s", otsdbErr)
}
}
bar.Finish()
@@ -125,7 +125,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for vmErr := range op.im.Errors() {
if vmErr.Err != nil {
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
}
}
log.Println("Import finished!")
@@ -141,12 +141,12 @@ func (op *otsdbProcessor) sendQueries(ctx context.Context, serieslist []opentsdb
for _, tr := range rt.QueryRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %w", ctx.Err())
return fmt.Errorf("context canceled: %s", ctx.Err())
case otsdbErr := <-errCh:
otsdbErrorsTotal.Inc()
return fmt.Errorf("opentsdb error: %w", otsdbErr)
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
@@ -166,7 +166,7 @@ func (op *otsdbProcessor) do(s queryObj) error {
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
if err != nil {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %w", s.Series, s.Rt, s.Tr, err)
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
}
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
log.Printf("no data found for %v in %v:%v...skipping", s.Series, s.Rt, s.Tr)

View File

@@ -106,7 +106,7 @@ func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -114,12 +114,12 @@ func (c Client) FindMetrics(q string) ([]string, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %w", q, err)
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return metriclist, nil
}
@@ -130,7 +130,7 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -138,12 +138,12 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %w", q, err)
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return results.Results, nil
}
@@ -183,7 +183,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := c.c.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
/*
@@ -303,7 +303,7 @@ func NewClient(cfg Config) (*Client, error) {
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %w", r, err)
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %v", r, err)
}
retentions = append(retentions, ret)
}

View File

@@ -88,7 +88,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
}
queryLengthDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %w", chunks[2], err)
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
queryLength := queryLengthDuration.Milliseconds()
@@ -110,7 +110,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
aggTimeDuration, err := convertDuration(aggregates[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %w", aggregates[1], err)
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %s", aggregates[1], err)
}
aggTime := aggTimeDuration.Milliseconds()
if !msecTime {
@@ -119,7 +119,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
rowLengthDuration, err := convertDuration(chunks[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %w", chunks[1], err)
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
}
// set length of each row in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
rowLength := rowLengthDuration.Milliseconds()

View File

@@ -46,7 +46,7 @@ type prometheusProcessor struct {
func (pp *prometheusProcessor) run(ctx context.Context) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %w", err)
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -57,7 +57,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
}
if err := pp.processBlocks(ctx, blocks); err != nil {
return fmt.Errorf("migration failed: %w", err)
return fmt.Errorf("migration failed: %s", err)
}
log.Println("Import finished!")
@@ -68,7 +68,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
css, err := pp.cl.Read(ctx, b)
if err != nil {
return fmt.Errorf("failed to read block: %w", err)
return fmt.Errorf("failed to read block: %s", err)
}
defer func() {
if err := css.Close(); err != nil {
@@ -146,7 +146,7 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for br := range blockReadersCh {
if err := pp.do(ctx, br); err != nil {
promErrorsTotal.Inc()
errCh <- fmt.Errorf("cannot read block %q: %w", br.Meta().ULID, err)
errCh <- fmt.Errorf("cannot read block %q: %s", br.Meta().ULID, err)
return
}
if cb, ok := br.(io.Closer); ok {
@@ -164,11 +164,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %w", promErr)
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
case blockReadersCh <- br:
}
}
@@ -182,11 +182,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil

View File

@@ -59,12 +59,12 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
db, err := tsdb.OpenDBReadOnly(cfg.Snapshot, cfg.TemporaryDir, nil)
if err != nil {
return nil, fmt.Errorf("failed to open snapshot %q: %w", cfg.Snapshot, err)
return nil, fmt.Errorf("failed to open snapshot %q: %s", cfg.Snapshot, err)
}
c := &Client{DBReadOnly: db}
timeMin, timeMax, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
}
c.filter = filter{
min: timeMin,
@@ -83,7 +83,7 @@ func NewClient(cfg Config) (*Client, error) {
func (c *Client) Explore() ([]tsdb.BlockReader, error) {
blocks, err := c.Blocks()
if err != nil {
return nil, fmt.Errorf("failed to fetch blocks: %w", err)
return nil, fmt.Errorf("failed to fetch blocks: %s", err)
}
s := &vmctlutil.Stats{
Filtered: c.filter.min != 0 || c.filter.max != 0 || c.filter.label != "",
@@ -142,14 +142,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -44,7 +44,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
ranges, err := stepper.SplitDateRange(*rrp.filter.timeStart, *rrp.filter.timeEnd, rrp.filter.chunk, rrp.filter.timeReverse)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %w", err)
return fmt.Errorf("failed to create date ranges for the given time filters: %v", err)
}
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
@@ -74,7 +74,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
remoteReadErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for: %w", err)
errCh <- fmt.Errorf("request failed for: %s", err)
return
}
remoteReadRangesProcessed.Inc()
@@ -86,10 +86,10 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for _, r := range ranges {
select {
case infErr := <-errCh:
return fmt.Errorf("remote read error: %w", infErr)
return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors():
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(),
@@ -105,11 +105,11 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil
@@ -119,7 +119,7 @@ func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filte
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
if err := rrp.dst.Input(series); err != nil {
return fmt.Errorf(
"failed to read data for time range start: %d, end: %d: %w",
"failed to read data for time range start: %d, end: %d, %s",
filter.StartTimestampMs, filter.EndTimestampMs, err)
}
return nil

View File

@@ -157,7 +157,7 @@ func (c *Client) Read(ctx context.Context, filter *Filter, streamCb StreamCallba
if errors.Is(err, context.Canceled) {
return fmt.Errorf("fetch request has ben cancelled")
}
return fmt.Errorf("error while fetching data from remote storage: %w", err)
return fmt.Errorf("error while fetching data from remote storage: %s", err)
}
return nil
}

View File

@@ -52,7 +52,7 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
minTime, maxTime, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
}
return &Client{
snapshotPath: cfg.Snapshot,
@@ -183,14 +183,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -36,7 +36,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
// Use the first aggregate type to explore blocks (block list is the same for all types)
blocks, err := tp.cl.Explore(tp.aggrTypes[0])
if err != nil {
return fmt.Errorf("explore failed: %w", err)
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -84,7 +84,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
log.Println("Processing raw blocks (resolution=0)...")
stats, err := tp.processBlocks(rawBlocks, thanos.AggrTypeNone, bar)
if err != nil {
return fmt.Errorf("migration failed for raw blocks: %w", err)
return fmt.Errorf("migration failed for raw blocks: %s", err)
}
phases = append(phases, phaseStats{
name: "raw",
@@ -108,7 +108,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
aggrBlocks, err := tp.cl.Explore(aggrType)
if err != nil {
return fmt.Errorf("explore failed for aggr type %s: %w", aggrType, err)
return fmt.Errorf("explore failed for aggr type %s: %s", aggrType, err)
}
var downsampledOnly []thanos.BlockInfo
@@ -128,7 +128,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
stats, err := tp.processBlocks(downsampledOnly, aggrType, bar)
thanos.CloseBlocks(aggrBlocks)
if err != nil {
return fmt.Errorf("migration failed for aggr type %s: %w", aggrType, err)
return fmt.Errorf("migration failed for aggr type %s: %s", aggrType, err)
}
phases = append(phases, phaseStats{
name: aggrType.String(),
@@ -153,7 +153,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
for vmErr := range tp.im.Errors() {
if vmErr.Err != nil {
thanosErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
}
}
@@ -184,7 +184,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
seriesCount, samplesCount, err := tp.do(bi, aggrType)
if err != nil {
thanosErrorsTotal.Inc()
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %w", bi.Block.Meta().ULID, aggrType, err)
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %s", bi.Block.Meta().ULID, aggrType, err)
return
}
@@ -209,12 +209,12 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
case thanosErr := <-errCh:
close(blockReadersCh)
wg.Wait()
return processBlocksStats{}, fmt.Errorf("thanos error: %w", thanosErr)
return processBlocksStats{}, fmt.Errorf("thanos error: %s", thanosErr)
case vmErr := <-tp.im.Errors():
close(blockReadersCh)
wg.Wait()
thanosErrorsTotal.Inc()
return processBlocksStats{}, fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
return processBlocksStats{}, fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
case blockReadersCh <- bi:
}
}
@@ -223,7 +223,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
wg.Wait()
close(errCh)
for err := range errCh {
return processBlocksStats{}, fmt.Errorf("import process failed: %w", err)
return processBlocksStats{}, fmt.Errorf("import process failed: %s", err)
}
return processBlocksStats{
@@ -236,7 +236,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
func (tp *thanosProcessor) do(bi thanos.BlockInfo, aggrType thanos.AggrType) (uint64, uint64, error) {
ss, err := tp.cl.Read(bi)
if err != nil {
return 0, 0, fmt.Errorf("failed to read block: %w", err)
return 0, 0, fmt.Errorf("failed to read block: %s", err)
}
defer ss.Close() // Ensure querier is closed even on early returns

View File

@@ -163,7 +163,7 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
importDuration: metrics.GetOrCreateHistogram(`vmctl_importer_request_duration_seconds`),
}
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %w", addr, err)
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
}
if cfg.BatchSize < 1 {
@@ -289,7 +289,7 @@ func (im *Importer) flush(ctx context.Context, b []*TimeSeries) error {
retryableFunc := func() error { return im.Import(b) }
attempts, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
return fmt.Errorf("import failed with %d retries: %w", attempts, err)
return fmt.Errorf("import failed with %d retries: %s", attempts, err)
}
im.s.Lock()
im.s.retries = attempts
@@ -302,7 +302,7 @@ func (im *Importer) Ping() error {
url := fmt.Sprintf("%s/health", im.addr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
}
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
@@ -332,7 +332,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
req, err := http.NewRequest(http.MethodPost, im.importPath, pr)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
}
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
@@ -352,7 +352,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("unexpected error when creating gzip writer: %w", err)
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
}
w = zw
}
@@ -411,7 +411,7 @@ var ErrBadRequest = errors.New("bad request")
func (im *Importer) do(req *http.Request) error {
resp, err := im.client.Do(req)
if err != nil {
return fmt.Errorf("unexpected error when performing request: %w", err)
return fmt.Errorf("unexpected error when performing request: %s", err)
}
defer func() {
_ = resp.Body.Close()
@@ -419,7 +419,7 @@ func (im *Importer) do(req *http.Request) error {
if resp.StatusCode != http.StatusNoContent {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
return fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
}
if resp.StatusCode == http.StatusBadRequest {
return fmt.Errorf("%w: unexpected response code %d: %s", ErrBadRequest, resp.StatusCode, string(body))

View File

@@ -55,14 +55,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
start, err := vmctlutil.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = vmctlutil.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
}
@@ -91,7 +91,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
err := p.runBackfilling(ctx, tenantID, ranges)
if err != nil {
migrationErrorsTotal.Inc()
return fmt.Errorf("migration failed: %w", err)
return fmt.Errorf("migration failed: %s", err)
}
if p.interCluster {
@@ -157,7 +157,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
}
default:
}
return fmt.Errorf("failed to write into %q: %w", p.dst.Addr, err)
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
}
p.s.Lock()
@@ -184,7 +184,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
importAddr, err := vm.AddExtraLabelsToImportPath(importAddr, p.dst.ExtraLabels)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %w", err)
return fmt.Errorf("failed to add labels to import path: %s", err)
}
dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr)
@@ -222,7 +222,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
format = fmt.Sprintf(nativeWithBackoffTpl, barPrefix)
metricsMap, err = p.explore(ctx, p.src, tenantID, ranges)
if err != nil {
return fmt.Errorf("failed to explore metric names: %w", err)
return fmt.Errorf("failed to explore metric names: %s", err)
}
if len(metricsMap) == 0 {
errMsg := "no metrics found"
@@ -295,7 +295,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("export/import error: %w", infErr)
return fmt.Errorf("export/import error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
@@ -313,7 +313,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
close(errCh)
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil

View File

@@ -10,6 +10,8 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@@ -21,7 +23,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -152,7 +153,7 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
LogNewSeries: *logNewSeries,
}
strg := storage.MustOpenStorage(*storageDataPath, opts)
vmStorage = newVMStorage(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
vmStorage = newVMStorageSingleNode(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
var m storage.Metrics
strg.UpdateMetrics(&m)
@@ -174,15 +175,15 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
GetSearch = vmStorage.GetSearch
PutSearch = vmStorage.PutSearch
RequestHandler = vmStorage.requestHandler
DebugFlush = vmStorage.s.DebugFlush
DebugFlush = vmStorage.vms.s.DebugFlush
}
var storageMetrics *metrics.Set
var (
// vmStorage is an instance of vmstorage used by vminsert and
// vmStorageSingleNode is an instance of vmstorage used by vminsert and
// vmselect for writing and reading data.
vmStorage *VMStorage
vmStorage *VMStorageSingleNode
VMInsertAPI vminsertapi.API
VMSelectAPI vmselectapi.API
GetSearch func(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error)
@@ -208,12 +209,15 @@ func Stop() {
logger.Infof("the vmstorage has been stopped")
}
func (vmssn *VMStorageSingleNode) requestHandler(w http.ResponseWriter, r *http.Request) bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.requestHandler(w, r)
}
// requestHandler is a storage request handler.
// TODO(@rtm0): Move to a separate file, request_handler.go
func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) bool {
vms.wg.Add(1)
defer vms.wg.Done()
path := r.URL.Path
if path == "/internal/force_merge" {
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
@@ -253,7 +257,7 @@ func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) boo
dealine, err = strconv.Atoi(deadlineStr)
if err != nil {
logger.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err)
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %w", deadlineStr, err))
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err))
return true
}
}
@@ -358,10 +362,14 @@ var (
)
// TODO(@rtm0): Move to metrics.go.
func (vms *VMStorage) writeStorageMetrics(w io.Writer) {
vms.wg.Add(1)
defer vms.wg.Done()
func (vmssn *VMStorageSingleNode) writeStorageMetrics(w io.Writer) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
vmssn.vms.writeStorageMetrics(w)
}
// TODO(@rtm0): Move to metrics.go.
func (vms *VMStorage) writeStorageMetrics(w io.Writer) {
strg := vms.s
var m storage.Metrics
strg.UpdateMetrics(&m)

View File

@@ -1,7 +1,6 @@
package vmstorage
import (
"errors"
"flag"
"fmt"
"sync"
@@ -15,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
@@ -36,7 +34,7 @@ var (
// newVMStorage creates a new instance of of VMStorage.
//
// The created VMStorage instance takes ownership of s.
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorage {
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStorage {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid -precisionBits=%d: %s", *precisionBits, err)
}
@@ -51,8 +49,6 @@ func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int, resetCa
maxUniqueTimeseries: *maxUniqueTimeseries,
maxUniqueTimeSeriesCalculated: maxUniqueTimeseriesCalculated,
staleSnapshotsRemoverCh: make(chan struct{}),
wg: syncwg.WaitGroup{},
resetCacheIfNeeded: resetCacheIfNeeded,
}
vms.initStaleSnapshotsRemover()
return vms
@@ -82,17 +78,6 @@ type VMStorage struct {
maxUniqueTimeSeriesCalculated int
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
// for proper graceful shutdown when Stop is called.
//
// Use syncwg instead of sync, since Add is called from concurrent
// goroutines.
wg syncwg.WaitGroup
// resetCacheIfNeeded is a callback for automatic resetting of response
// cache if needed.
resetCacheIfNeeded func(mrs []storage.MetricRow)
}
func (vms *VMStorage) initStaleSnapshotsRemover() {
@@ -118,7 +103,6 @@ func (vms *VMStorage) initStaleSnapshotsRemover() {
func (vms *VMStorage) Stop() {
close(vms.staleSnapshotsRemoverCh)
vms.staleSnapshotsRemoverWG.Wait()
vms.wg.WaitAndBlock()
vms.s.MustClose()
}
@@ -127,14 +111,6 @@ func (vms *VMStorage) Stop() {
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
vms.wg.Add(1)
defer vms.wg.Done()
if vms.s.IsReadOnly() {
return errReadOnly
}
vms.resetCacheIfNeeded(rows)
vms.s.AddRows(rows, uint8(*precisionBits))
return nil
}
@@ -144,41 +120,26 @@ func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vms *VMStorage) WriteMetadata(rows []metricsmetadata.Row) error {
vms.wg.Add(1)
defer vms.wg.Done()
if vms.s.IsReadOnly() {
return errReadOnly
}
vms.s.AddMetadataRows(rows)
return nil
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
// IsReadOnly returns true is the storage is in read-only mode.
func (vms *VMStorage) IsReadOnly() bool {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.IsReadOnly()
}
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vms.wg.Done()
return nil, err
}
if len(tfss) == 0 {
vms.wg.Done()
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.wgDone = vms.wg.Done
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
bi.MustClose()
@@ -200,9 +161,8 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
wgDone func()
sr storage.Search
mb storage.MetricBlock
}
var blockIteratorsPool sync.Pool
@@ -211,8 +171,6 @@ func (bi *blockIterator) MustClose() {
bi.sr.MustClose()
bi.mb.MetricName = nil
bi.mb.Block.Reset()
bi.wgDone()
bi.wgDone = nil
blockIteratorsPool.Put(bi)
}
@@ -239,63 +197,8 @@ func (bi *blockIterator) Error() error {
return bi.sr.Error()
}
// GetSearch sets up an instance of storage search and returns it to the caller
// along with the max series count that the search can return.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// Callers of this method must call PutSearch() once the search instance is not
// needed anymore.
func (vms *VMStorage) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vms.wg.Done()
return nil, 0, err
}
sr := getSearch()
maxSeriesCount := sr.Init(qt, vms.s, tfss, tr, sq.MaxMetrics, deadline)
return sr, maxSeriesCount, nil
}
// PutSearch resets the search once it is not needed anymore and puts it aside
// for future reuse.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// The method must only be used on search instances that have been created with
// GetSearch().
func (vms *VMStorage) PutSearch(sr *storage.Search) {
putSearch(sr)
vms.wg.Done()
}
func getSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
// SearchMetricNames returns metric names for the given tfss on the given tr.
func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -316,9 +219,6 @@ func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.Sear
// SearchLabelValues searches for label values for the given labelName, tfss and
// tr.
func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
maxLabelValues = *maxTagValues
@@ -344,9 +244,6 @@ func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuer
// similar APIs.
func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
maxSuffixes = *maxTagValueSuffixesPerSearch
}
@@ -363,9 +260,6 @@ func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr s
// SearchLabelNames searches for tag keys matching the given tfss on tr.
func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
maxLabelNames = *maxTagKeys
@@ -384,8 +278,6 @@ func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery
}
func (vms *VMStorage) SeriesCount(_ *querytracer.Tracer, _, _ uint32, deadline uint64) (uint64, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetSeriesCount(deadline)
}
@@ -395,9 +287,6 @@ func (vms *VMStorage) Tenants(_ *querytracer.Tracer, _ storage.TimeRange, _ uint
// GetTSDBStatus returns TSDB status for given filters on the given date.
func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -417,9 +306,6 @@ func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery
//
// Returns the number of deleted series.
func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
vms.wg.Add(1)
defer vms.wg.Done()
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
@@ -438,26 +324,17 @@ func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQue
}
func (vms *VMStorage) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
vms.wg.Add(1)
defer vms.wg.Done()
vms.s.RegisterMetricNames(qt, mrs)
return nil
}
// GetMetricNamesUsageStats returns metric name usage stats.
func (vms *VMStorage) GetMetricNamesUsageStats(qt *querytracer.Tracer, _ *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
}
// ResetMetricNamesStats resets state for metric names usage tracker
func (vms *VMStorage) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
vms.wg.Add(1)
defer vms.wg.Done()
vms.s.ResetMetricNamesStats(qt)
return nil
}
@@ -494,8 +371,6 @@ func (vms *VMStorage) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery,
}
func (vms *VMStorage) GetMetadataRecords(qt *querytracer.Tracer, _ *storage.TenantToken, limit int, metricName string, _ uint64) ([]*metricsmetadata.Row, error) {
vms.wg.Add(1)
defer vms.wg.Done()
return vms.s.GetMetadataRows(qt, limit, metricName), nil
}

View File

@@ -0,0 +1,213 @@
package vmstorage
import (
"errors"
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
// newVMStorageSingleNode creates a new instance of of VMStorage for vmsingle.
func newVMStorageSingleNode(s *storage.Storage, maxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorageSingleNode {
vms := newVMStorage(s, maxConcurrentRequests)
return &VMStorageSingleNode{
vms: vms,
wg: syncwg.WaitGroup{},
resetCacheIfNeeded: resetCacheIfNeeded,
}
}
type VMStorageSingleNode struct {
vms *VMStorage
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
// for proper graceful shutdown when Stop is called.
//
// Use syncwg instead of sync, since Add is called from concurrent
// goroutines.
wg syncwg.WaitGroup
// resetCacheIfNeeded is a callback for automatic resetting of response
// cache if needed.
resetCacheIfNeeded func(mrs []storage.MetricRow)
}
func (vmssn *VMStorageSingleNode) Stop() {
vmssn.wg.WaitAndBlock()
vmssn.vms.Stop()
}
// WriteRows writes metric rows to the storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteRows() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteRows(rows []storage.MetricRow) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
vmssn.resetCacheIfNeeded(rows)
return vmssn.vms.WriteRows(rows)
}
// WriteMetadata writes metrics metadata to storage.
//
// Returns an error if the storage is in read-only mode.
//
// The caller should limit the number of concurrent calls to WriteMetadata() in
// order to limit memory usage.
func (vmssn *VMStorageSingleNode) WriteMetadata(rows []metricsmetadata.Row) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
if vmssn.vms.IsReadOnly() {
return errReadOnly
}
return vmssn.vms.WriteMetadata(rows)
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
func (vmssn *VMStorageSingleNode) IsReadOnly() bool {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.IsReadOnly()
}
func (vmssn *VMStorageSingleNode) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
return nil, fmt.Errorf("not implemented in vmsingle")
}
// GetSearch sets up an instance of storage search and returns it to the caller
// along with the max series count that the search can return.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// Callers of this method must call PutSearch() once the search instance is not
// needed anymore.
func (vmssn *VMStorageSingleNode) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
vmssn.wg.Add(1)
tr := sq.GetTimeRange()
maxMetrics := vmssn.vms.getMaxMetrics(sq.MaxMetrics)
tfss, err := vmssn.vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
vmssn.wg.Done()
return nil, 0, err
}
sr := getSearch()
maxSeriesCount := sr.Init(qt, vmssn.vms.s, tfss, tr, sq.MaxMetrics, deadline)
return sr, maxSeriesCount, nil
}
// PutSearch resets the search once it is not needed anymore and puts it aside
// for future reuse.
//
// This method is not part of the vmselectapi.API and must only be used by
// vmsingle HTTP handlers.
//
// The method must only be used on search instances that have been created with
// GetSearch().
func (vmssn *VMStorageSingleNode) PutSearch(sr *storage.Search) {
putSearch(sr)
vmssn.wg.Done()
}
func getSearch() *storage.Search {
v := ssPool.Get()
if v == nil {
return &storage.Search{}
}
return v.(*storage.Search)
}
func putSearch(sr *storage.Search) {
sr.MustClose()
ssPool.Put(sr)
}
var ssPool sync.Pool
func (vmssn *VMStorageSingleNode) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SearchMetricNames(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
}
func (vmssn *VMStorageSingleNode) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
}
func (vmssn *VMStorageSingleNode) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.LabelNames(qt, sq, maxLabelNames, deadline)
}
func (vmssn *VMStorageSingleNode) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.SeriesCount(qt, accountID, projectID, deadline)
}
func (vmssn *VMStorageSingleNode) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.Tenants(qt, tr, deadline)
}
func (vmssn *VMStorageSingleNode) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
}
func (vmssn *VMStorageSingleNode) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.DeleteSeries(qt, sq, deadline)
}
func (vmssn *VMStorageSingleNode) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.RegisterMetricNames(qt, mrs, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
}
func (vmssn *VMStorageSingleNode) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.ResetMetricNamesUsageStats(qt, deadline)
}
func (vmssn *VMStorageSingleNode) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
vmssn.wg.Add(1)
defer vmssn.wg.Done()
return vmssn.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
}

View File

@@ -48,7 +48,7 @@ func TestGetMaxMetrics(t *testing.T) {
t.Helper()
*maxUniqueTimeseries = storageMaxUniqueTimeseries
s := storage.MustOpenStorage(t.Name(), storage.OpenOptions{})
vms := newVMStorage(s, maxConcurrentRequests, func(mrs []storage.MetricRow) {})
vms := newVMStorage(s, maxConcurrentRequests)
defer vms.Stop()
maxMetrics := vms.getMaxMetrics(searchQueryLimit)
if maxMetrics != expect {

View File

@@ -156,14 +156,14 @@ func readAllAndClose(t *testing.T, responseBody io.ReadCloser) string {
//
// This type is expected to be embedded by the apps that serve metrics.
type metricsClient struct {
cli *Client
url string
metricsCli *Client
url string
}
func newMetricsClient(cli *Client, addr string) *metricsClient {
return &metricsClient{
cli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
metricsCli: cli,
url: fmt.Sprintf("http://%s/metrics", addr),
}
}
@@ -179,7 +179,7 @@ func (c *metricsClient) GetIntMetric(t *testing.T, metricName string) int {
func (c *metricsClient) GetMetric(t *testing.T, metricName string) float64 {
t.Helper()
metrics, statusCode := c.cli.Get(t, c.url, nil)
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -205,7 +205,7 @@ func (c *metricsClient) GetMetricsByPrefix(t *testing.T, prefix string) []float6
values := []float64{}
metrics, statusCode := c.cli.Get(t, c.url, nil)
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -234,7 +234,7 @@ func (c *metricsClient) GetMetricsByRegexp(t *testing.T, re *regexp.Regexp) []fl
values := []float64{}
metrics, statusCode := c.cli.Get(t, c.url, nil)
metrics, statusCode := c.metricsCli.Get(t, c.url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -270,7 +270,7 @@ func (c *metricsClient) rpcRowsSentTotal(t *testing.T) int {
}
type vmselectClient struct {
cli *Client
vmselectCli *Client
url func(op, path string, opts QueryOpts) string
metricNamesStatsResetURL string
tenantsURL string
@@ -287,7 +287,7 @@ func (c *vmselectClient) PrometheusAPIV1Export(t *testing.T, query string, opts
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -302,7 +302,7 @@ func (c *vmselectClient) PrometheusAPIV1ExportNative(t *testing.T, query string,
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return []byte(res)
}
@@ -315,7 +315,7 @@ func (c *vmselectClient) PrometheusAPIV1Query(t *testing.T, query string, opts Q
url := c.url("select", "prometheus/api/v1/query", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -329,7 +329,7 @@ func (c *vmselectClient) PrometheusAPIV1QueryRange(t *testing.T, query string, o
url := c.url("select", "prometheus/api/v1/query_range", opts)
values := opts.asURLValues()
values.Add("query", query)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1QueryResponse(t, res)
}
@@ -342,7 +342,7 @@ func (c *vmselectClient) PrometheusAPIV1Series(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesResponse(t, res)
}
@@ -354,7 +354,7 @@ func (c *vmselectClient) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts
t.Helper()
url := c.url("select", "prometheus/api/v1/series/count", opts)
values := opts.asURLValues()
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1SeriesCountResponse(t, res)
}
@@ -367,7 +367,7 @@ func (c *vmselectClient) PrometheusAPIV1Labels(t *testing.T, matchQuery string,
url := c.url("select", "prometheus/api/v1/labels", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelsResponse(t, res)
}
@@ -382,7 +382,7 @@ func (c *vmselectClient) PrometheusAPIV1LabelValues(t *testing.T, labelName, mat
url := c.url("select", path, opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1LabelValuesResponse(t, res)
}
@@ -394,7 +394,7 @@ func (c *vmselectClient) PrometheusAPIV1Metadata(t *testing.T, metric string, li
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
res, _ := c.cli.PostForm(t, url, values, opts.Headers)
res, _ := c.vmselectCli.PostForm(t, url, values, opts.Headers)
return NewPrometheusAPIV1Metadata(t, res)
}
@@ -408,7 +408,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminTSDBDeleteSeries(t *testing.T, matc
url := c.url("delete", "prometheus/api/v1/admin/tsdb/delete_series", opts)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -426,7 +426,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusMetricNamesStats(t *testing.T, lim
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -455,7 +455,7 @@ func (c *vmselectClient) PrometheusAPIV1StatusTSDB(t *testing.T, matchQuery stri
addNonEmpty("match[]", matchQuery)
addNonEmpty("topN", topN)
addNonEmpty("date", date)
res, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
res, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -476,7 +476,7 @@ func (c *vmselectClient) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) Grap
t.Helper()
url := c.url("select", "graphite/metrics/index.json", opts)
res, statusCode := c.cli.Get(t, url, opts.Headers)
res, statusCode := c.vmselectCli.Get(t, url, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -499,7 +499,7 @@ func (c *vmselectClient) GraphiteMetricsFind(t *testing.T, query string, opts Qu
url := c.url("select", "graphite/metrics/find", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -522,7 +522,7 @@ func (c *vmselectClient) GraphiteMetricsExpand(t *testing.T, query string, opts
url := c.url("select", "graphite/metrics/expand", opts)
values := opts.asURLValues()
values.Add("query", query)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -546,7 +546,7 @@ func (c *vmselectClient) GraphiteRender(t *testing.T, target string, opts QueryO
values := opts.asURLValues()
values.Add("format", "json")
values.Add("target", target)
resText, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
resText, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, resText)
}
@@ -567,7 +567,7 @@ func (c *vmselectClient) GraphiteTagsTagSeries(t *testing.T, record string, opts
url := c.url("select", "graphite/tags/tagSeries", opts)
values := opts.asURLValues()
values.Add("path", record)
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -584,7 +584,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
for _, rec := range records {
values.Add("path", rec)
}
_, statusCode := c.cli.PostForm(t, url, values, opts.Headers)
_, statusCode := c.vmselectCli.PostForm(t, url, values, opts.Headers)
if got, want := statusCode, http.StatusNotImplemented; got != want {
t.Fatalf("unexpected status code: got %d, want %d", got, want)
}
@@ -598,7 +598,7 @@ func (c *vmselectClient) GraphiteTagsTagMultiSeries(t *testing.T, records []stri
func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *testing.T, opts QueryOpts) {
t.Helper()
values := opts.asURLValues()
res, statusCode := c.cli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
res, statusCode := c.vmselectCli.PostForm(t, c.metricNamesStatsResetURL, values, opts.Headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusNoContent, res)
}
@@ -608,7 +608,7 @@ func (c *vmselectClient) PrometheusAPIV1AdminStatusMetricNamesStatsReset(t *test
// /admin/tenants endpoint.
func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminTenantsResponse {
t.Helper()
res, statusCode := c.cli.Get(t, c.tenantsURL, opts.Headers)
res, statusCode := c.vmselectCli.Get(t, c.tenantsURL, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
}
@@ -622,7 +622,7 @@ func (c *vmselectClient) APIV1AdminTenants(t *testing.T, opts QueryOpts) *AdminT
}
type vminsertClient struct {
cli *Client
vminsertCli *Client
url func(op, path string, opts QueryOpts) string
openTSDBURL func(op, path string, opts QueryOpts) string
graphiteListenAddr string
@@ -647,7 +647,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportCSV(t *testing.T, records []string
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -671,7 +671,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportNative(t *testing.T, data []byte,
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, 1, func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -693,7 +693,7 @@ func (c *vminsertClient) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteReque
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -745,7 +745,7 @@ func (c *vminsertClient) PrometheusAPIV1ImportPrometheus(t *testing.T, records [
headers := opts.getHeaders()
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -771,7 +771,7 @@ func (c *vminsertClient) InfluxWrite(t *testing.T, records []string, opts QueryO
headers.Set("Content-Type", "text/plain")
c.sendBlocking(t, len(records), func() {
t.Helper()
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -805,7 +805,7 @@ func (c *vminsertClient) OpentelemetryV1Metrics(t *testing.T, md otlppb.MetricsD
headers := opts.getHeaders()
headers.Set("Content-Type", "application/x-protobuf")
c.sendBlocking(t, recordsCount, func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -830,7 +830,7 @@ func (c *vminsertClient) OpenTSDBAPIPut(t *testing.T, records []string, opts Que
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
}
@@ -853,7 +853,7 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
headers := opts.getHeaders()
headers.Set("Content-Type", "application/json")
c.sendBlocking(t, len(records), func() {
_, statusCode := c.cli.Post(t, url, data, headers)
_, statusCode := c.vminsertCli.Post(t, url, data, headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -867,11 +867,11 @@ func (c *vminsertClient) ZabbixConnectorHistory(t *testing.T, records []string,
// See https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting
func (c *vminsertClient) GraphiteWrite(t *testing.T, records []string, _ QueryOpts) {
t.Helper()
c.cli.Write(t, c.graphiteListenAddr, records)
c.vminsertCli.Write(t, c.graphiteListenAddr, records)
}
type vmstorageClient struct {
cli *Client
vmstorageCli *Client
httpListenAddr string
}
@@ -881,7 +881,7 @@ func (c *vmstorageClient) ForceFlush(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_flush", c.httpListenAddr)
_, statusCode := c.cli.Get(t, url, nil)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -892,7 +892,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
t.Helper()
url := fmt.Sprintf("http://%s/internal/force_merge", c.httpListenAddr)
_, statusCode := c.cli.Get(t, url, nil)
_, statusCode := c.vmstorageCli.Get(t, url, nil)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusOK)
}
@@ -905,7 +905,7 @@ func (c *vmstorageClient) ForceMerge(t *testing.T) {
func (c *vmstorageClient) SnapshotCreate(t *testing.T) *SnapshotCreateResponse {
t.Helper()
data, statusCode := c.cli.Post(t, c.SnapshotCreateURL(), nil, nil)
data, statusCode := c.vmstorageCli.Post(t, c.SnapshotCreateURL(), nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -931,7 +931,7 @@ func (c *vmstorageClient) APIV1AdminTSDBSnapshot(t *testing.T) *APIV1AdminTSDBSn
t.Helper()
url := fmt.Sprintf("http://%s/api/v1/admin/tsdb/snapshot", c.httpListenAddr)
data, statusCode := c.cli.Post(t, url, nil, nil)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -952,7 +952,7 @@ func (c *vmstorageClient) SnapshotList(t *testing.T) *SnapshotListResponse {
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/list", c.httpListenAddr)
data, statusCode := c.cli.Get(t, url, nil)
data, statusCode := c.vmstorageCli.Get(t, url, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}
@@ -973,7 +973,7 @@ func (c *vmstorageClient) SnapshotDelete(t *testing.T, snapshotName string) *Sna
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete?snapshot=%s", c.httpListenAddr, snapshotName)
data, statusCode := c.cli.Delete(t, url)
data, statusCode := c.vmstorageCli.Delete(t, url)
wantStatusCodes := map[int]bool{
http.StatusOK: true,
http.StatusInternalServerError: true,
@@ -998,7 +998,7 @@ func (c *vmstorageClient) SnapshotDeleteAll(t *testing.T) *SnapshotDeleteAllResp
t.Helper()
url := fmt.Sprintf("http://%s/snapshot/delete_all", c.httpListenAddr)
data, statusCode := c.cli.Post(t, url, nil, nil)
data, statusCode := c.vmstorageCli.Post(t, url, nil, nil)
if got, want := statusCode, http.StatusOK; got != want {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", got, want, data)
}

View File

@@ -77,7 +77,7 @@ type vminsertRuntimeValues struct {
func newVminsert(app *app, cli *Client, rt vminsertRuntimeValues) *Vminsert {
metricsClient := newMetricsClient(cli, rt.httpListenAddr)
vminsertClient := &vminsertClient{
cli: cli,
vminsertCli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -48,7 +48,7 @@ func newVmselect(app *app, cli *Client, rt vmselectRuntimeValues) *Vmselect {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmselectClient: &vmselectClient{
cli: cli,
vmselectCli: cli,
url: func(op, path string, opts QueryOpts) string {
return getClusterPath(rt.httpListenAddr, op, path, opts)
},

View File

@@ -58,11 +58,11 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
cli: cli,
vmstorageCli: cli,
httpListenAddr: rt.httpListenAddr,
},
vmselectClient: &vmselectClient{
cli: cli,
vmselectCli: cli,
url: func(op, path string, opts QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},
@@ -70,7 +70,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
tenantsURL: "vmsingle-does-not-serve-tenants",
},
vminsertClient: &vminsertClient{
cli: cli,
vminsertCli: cli,
url: func(_, path string, _ QueryOpts) string {
return fmt.Sprintf("http://%s/%s", rt.httpListenAddr, path)
},

View File

@@ -63,7 +63,7 @@ func newVmstorage(app *app, cli *Client, rt vmstorageRuntimeValues) *Vmstorage {
app: app,
metricsClient: newMetricsClient(cli, rt.httpListenAddr),
vmstorageClient: &vmstorageClient{
cli: cli,
vmstorageCli: cli,
httpListenAddr: rt.httpListenAddr,
},
storageDataPath: rt.storageDataPath,

View File

@@ -59,7 +59,7 @@ services:
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr": },{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]'
restart: always
vmanomaly:
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.4
depends_on:
- "victoriametrics"
ports:

View File

@@ -43,7 +43,7 @@
"content": "If you don't observe any data initially, please wait a few minutes for it to appear. \n\nUpon the first running the guide (if there is not enough node_exporter monitoring data collected in your system), you may notice a significant number of false positive anomalies found. The predictions will become more accurate with at least two weeks' (full `fit_window`) worth of data provided to vmanomaly.\n\nEach row displays information for a distinct mode. The query used for anomaly detection is `sum(rate(node_cpu_seconds_total[5m])) by (mode, instance, job)`.\n",
"mode": "markdown"
},
"pluginVersion": "12.2.0",
"pluginVersion": "10.2.1",
"title": "Overview",
"type": "text"
},

View File

@@ -14,21 +14,6 @@ aliases:
---
Please find the changelog for VictoriaMetrics Anomaly Detection below.
## v1.29.5
Released: 2026-06-11
- UI: Updated [vmanomaly UI](https://docs.victoriametrics.com/anomaly-detection/ui/) from [v1.7.0](https://docs.victoriametrics.com/anomaly-detection/ui/#v170) to [v1.7.1](https://docs.victoriametrics.com/anomaly-detection/ui/#v171), see respective [release notes](https://docs.victoriametrics.com/anomaly-detection/ui/#v171) for details.
- IMPROVEMENT: Redesigned [hot reload](https://docs.victoriametrics.com/anomaly-detection/components/#hot-reload) config change detection to content-based polling with configurable `-configCheckInterval`, improving reliability for Kubernetes ConfigMap symlink rotations and other filesystems where event delivery can be inconsistent.
- IMPROVEMENT: Refined config validation errors for broken or invalid config sections, so startup and reload failures point to the affected section more clearly (e.g. YAML indentation typos).
- IMPROVEMENT: Tightened config validation for [`PeriodicScheduler`](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler) `infer_every` and [`IsolationForestModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#isolation-forest-multivariate) `contamination`, including clearer handling of missing scheduler intervals, numeric contamination strings, and invalid non-finite values.
- BUGFIX: Fixed a multiprocessing startup issue with `settings.n_workers > 1` that could leave scheduled data fetch or successive inference jobs stuck and repeatedly skipped by internal scheduler.
- BUGFIX: Bounded [`VmReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) and [`VLogsReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#victorialogs-reader) data fetch and post-fetch processing waits so stalled datasource reads or multiprocessing dataframe creation no longer keep [`PeriodicScheduler`](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#periodic-scheduler) `data_fetch` jobs running indefinitely. Previously, such stuck jobs could keep internal scheduler's `max_instances=1` slot per (scheduler, query) pair occupied, causing future data fetch, fit, or infer runs to be skipped until vmanomaly was restarted. The config validator now also warns when the configured reader timeout budget can exceed the connected scheduler interval.
## v1.29.4
Released: 2026-05-15

View File

@@ -423,7 +423,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.4
# ...
restart: always
volumes:
@@ -641,7 +641,7 @@ options:
Heres an example of using the config splitter to divide configurations based on the `extra_filters` argument from the reader section:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5 && docker image tag victoriametrics/vmanomaly:v1.29.5 vmanomaly
docker pull victoriametrics/vmanomaly:v1.29.4 && docker image tag victoriametrics/vmanomaly:v1.29.4 vmanomaly
```
```sh

View File

@@ -45,7 +45,8 @@ There are 2 types of compatibility to consider when migrating in stateful mode:
| Group start | Group end | Compatibility | Notes |
|---------|--------- |------------|-------|
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295) | Fully Compatible | - |
| [v1.29.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1293) | Latest* | Fully Compatible | Just a placeholder for new releases |
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1293) | Fully Compatible | - |
| [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) | Partially compatible* | Dumped models of class [prophet](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet) and [seasonal quantile](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) have problems with loading to [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) due to dropped `pytz` library. **Upgrading directly from v1.28.7 to [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) with a fix is suggested** |
| [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262) | [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | Fully Compatible | [v1.28.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1280) introduced [rolling](https://docs.victoriametrics.com/anomaly-detection/components/models/#rolling-models) model class drop in favor of [online](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-models) models (`rolling_quantile` and `std` models), however, it does not impact compatibility, as artifacts were not produced by default for rolling models. Also, offline `mad` and `zscore` models are redirecting to their respective online counterparts since [v1.28.4](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1284). |
| [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) | [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270) | Partially Compatible* | [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) introduced `forecast_at` argument for base [univariate](https://docs.victoriametrics.com/anomaly-detection/components/models/#univariate-models) and `Prophet` [models](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet), however, itself remains backward-reversible from newer states like [v1.26.2](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262), [v1.27.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270). (All models except `isolation_forest_multivariate` class will be dropped) |

View File

@@ -37,39 +37,29 @@ The `vmanomaly` service supports a set of command-line arguments to configure it
> Single-dashed command-line argument {{% available_from "v1.23.3" anomaly %}} format can be used, e.g. `-license.forceOffline` in addition to `--license.forceOffline`. This aligns better with other VictoriaMetrics ecosystem components. Mixing the two styles is also supported, e.g. `-license.forceOffline --loggerLevel INFO`.
```shellhelp
usage: vmanomaly.py [-h] [--license STRING | --licenseFile PATH] [--license.forceOffline] [--loggerLevel {DEBUG,INFO,WARNING,ERROR,FATAL}] [--watch] [-configCheckInterval DURATION] [--dryRun] [--outputSpec PATH] config [config ...]
usage: vmanomaly.py [-h] [--license STRING | --licenseFile PATH] [--license.forceOffline] [--loggerLevel {DEBUG,WARNING,FATAL,ERROR,INFO}] [--watch] [--dryRun] [--outputSpec PATH] config [config ...]
VictoriaMetrics Anomaly Detection Service
positional arguments:
config YAML config file(s) or directories containing YAML files. Multiple files will recursively merge each other
values so multiple configs can be combined. If a directory is provided, all `.yaml` files inside will be
merged, without recursion. Default: vmanomaly.yaml is expected in the current directory.
config YAML config file(s) or directories containing YAML files. Multiple files will recursively merge each other values so multiple configs can be combined. If a directory is provided,
all `.yaml` files inside will be merged, without recursion. Default: vmanomaly.yaml is expected in the current directory.
options:
-h Show this help message and exit
--license STRING License key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to
obtain a trial license.
--licenseFile PATH Path to file with license key for VictoriaMetrics Enterprise. See
https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--license STRING License key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--licenseFile PATH Path to file with license key for VictoriaMetrics Enterprise. See https://victoriametrics.com/products/enterprise/trial/ to obtain a trial license.
--license.forceOffline
Whether to force offline verification for VictoriaMetrics Enterprise license key, which has been passed either
via -license or via -licenseFile command-line flag. The issued license key must support offline verification
feature. Contact info@victoriametrics.com if you need offline license verification.
--loggerLevel {DEBUG,INFO,WARNING,ERROR,FATAL}
Minimum level to log. Possible values: {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'}.
--watch Watch config files for changes and trigger hot reloads. Watches the specified config file or directory for
modifications, deletions, or additions. Upon detecting changes, triggers config reload. If new config
validation fails, continues with previous valid config and state.
-configCheckInterval DURATION
Interval for checking watched config files for content changes. Default: 30s.
--dryRun Validate only: parse + merge all YAML(s) and run schema checks, then exit. Does not require a license to run.
Does not expose metrics, or launch vmanomaly service(s).
Whether to force offline verification for VictoriaMetrics Enterprise license key, which has been passed either via -license or via -licenseFile command-line flag. The issued
license key must support offline verification feature. Contact info@victoriametrics.com if you need offline license verification.
--loggerLevel {DEBUG,WARNING,FATAL,ERROR,INFO}
Minimum level to log. Possible values: DEBUG, INFO, WARNING, ERROR, FATAL.
--watch Watch config files for changes and trigger hot reloads. Watches the specified config file or directory for modifications, deletions, or additions. Upon detecting changes,
triggers config reload. If new config validation fails, continues with previous valid config and state.
--dryRun Validate only: parse + merge all YAML(s) and run schema checks, then exit. Does not require a license to run. Does not expose metrics, or launch vmanomaly service(s).
--outputSpec PATH Target location of .yaml output spec.
```
{{% available_from "v1.29.5" anomaly %}} When `--watch` is enabled, config changes are detected by fixed-interval content polling instead of filesystem event delivery. The polling frequency is controlled by `-configCheckInterval` (default: `30s`). The same option can also be passed as `--configCheckInterval`, `--config.check.interval`, `--config-check-interval`, `--config_check_interval`, or in key-value form such as `configCheckInterval=30s`.
You can specify these options when running `vmanomaly` to fine-tune logging levels or handle licensing configurations, as per your requirements.
### Licensing
@@ -132,7 +122,7 @@ Below are the steps to get `vmanomaly` up and running inside a Docker container:
1. Pull Docker image:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5
docker pull victoriametrics/vmanomaly:v1.29.4
```
2. Create the license file with your license key.
@@ -152,7 +142,7 @@ docker run -it \
-v ./license:/license \
-v ./config.yaml:/config.yaml \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.4 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -169,7 +159,7 @@ docker run -it \
-e VMANOMALY_DATA_DUMPS_DIR=/tmp/vmanomaly/data \
-e VMANOMALY_MODEL_DUMPS_DIR=/tmp/vmanomaly/models \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.4 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -182,7 +172,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.4
# ...
restart: always
volumes:

View File

@@ -315,7 +315,7 @@ docker run -it --rm \
-e VMANOMALY_MCP_SERVER_URL=http://mcp-vmanomaly:8081/mcp \
-p 8080:8080 \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.5 \
victoriametrics/vmanomaly:v1.29.4 \
vmanomaly_config.yaml
```
@@ -640,17 +640,6 @@ If the **results** look good and the **model configuration should be deployed in
## Changelog
### v1.7.1
Released: 2026-06-11
vmanomaly version: [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295)
- FEATURE: Added bulk Apply/Decline actions for [Copilot](#ai-assistance) chat suggestions.
- BUGFIX: Fixed modal windows closing when the mouse is released outside the window during text selection.
- BUGFIX: Fixed tooltip hover behavior so tooltips do not disappear while the cursor moves into the hover content.
### v1.7.0
Released: 2026-05-15

View File

@@ -143,14 +143,11 @@ server:
> This feature is better used in conjunction with [stateful service](https://docs.victoriametrics.com/anomaly-detection/components/settings/#state-restoration) to preserve the state of the models and schedulers between restarts and reuse what can be reused, thus avoiding unnecessary re-training of models, re-initialization of schedulers and re-reading of data.
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_config_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
> [!NOTE]
> {{% deprecated_from "v1.29.5" anomaly %}} File system event-based hot reload has been deprecated in favor of content-based polling with configurable `-configCheckInterval` due to reliability issues with Kubernetes ConfigMap symlink rotations and other filesystems where event delivery can be inconsistent. If you were using file system event-based hot reload, please switch to content-based polling by enabling `--watch` flag and configuring `-configCheckInterval` as needed.
{{% available_from "v1.25.0" anomaly %}} Service supports hot reload of configuration files, which allows for automatic reloading of configurations on config files change filesystem events without the need of explicit service restart. This can be enabled via the `--watch` [CLI argument](https://docs.victoriametrics.com/anomaly-detection/quickstart/#command-line-arguments). `vmanomaly_config_reload_enabled` flag in [self-monitoring metrics](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#startup-metrics) will be set to 1 (if enabled) or 0 (if disabled).
### How it works
It works by checking watched `.yml|.yaml` file contents in the specified files or directories on the configured interval `-configCheckInterval` (default is `30s`) {{% available_from "v1.29.5" anomaly %}}. When a content change is detected, the service will attempt to reload the configuration files after the existing debounce window, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-configuration) and reinitialize the components. If the reload is successful, the `vmanomaly_config_reloads_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
It works by watching for file system events, such as modifications, creations, or deletions of `.yml|.yaml` files in the specified directories. When a change is detected, the service will attempt to reload the configuration files, rebuild the [global config](https://docs.victoriametrics.com/anomaly-detection/scaling-vmanomaly/#global-configuration) and reinitialize the components. If the reload is successful, the `vmanomaly_config_reloads_total` metric will be incremented for `status="success"` label, otherwise it will be incremented with `status="failure"` label and a respective error message on config validation failure(s) will be logged.
> If the reload fails, the service will log an error message indicating the reason for the failure, and the **previous configuration will remain active until a successful reload occurs** to preserve the service's stability. This means that if there are errors in the new configuration, the service will continue to operate with the last valid configuration until the issues are resolved.

View File

@@ -449,9 +449,9 @@ models:
> The `decay` argument works only in combination with [online models](#online-models) like [`ZScoreOnlineModel`](#online-z-score) or [`OnlineQuantileModel`](#online-seasonal-quantile).
The `decay` {{% available_from "v1.23.0" anomaly %}} argument is used to control the (exponential) **decay factor** for online models, which determines how quickly the model adapts to new data. It is a positive float value from `(0.0, 1.0]` interval, where:
- Value `1.0` means no decay (the model treats all data points equally, without giving more weight to recent ones). This is the default value for backward compatibility.
- Values less than `1.0` mean that the model will give more weight to recent data, effectively "forgetting" older data over time.
The `decay` {{% available_from "v1.23.0" anomaly %}} argument is used to control the (exponential) **decay factor** for online models, which determines how quickly the model adapts to new data. It is a float value between `0.0` and `1.0`, where:
- `1.0` means no decay (the model treats all data equally, without giving more weight to recent data). This is the default value for backward compatibility.
- Less than `1.0` means that the model will give more weight to recent data, effectively "forgetting" older data over time.
Roughly speaking, for the recent N datapoints model processes `decay` = `d` means that these datapoints will contribute to the model as [1 - d^X] percent of total importance, for example decay of
- `0.99` means that 100 recent datapoints will contribute as [1 - 0.99^100] = 63.23% of total importance
@@ -998,7 +998,7 @@ Here we use Isolation Forest implementation from `scikit-learn` [library](https:
* `class` (string) - model class name `"model.isolation_forest.IsolationForestMultivariateModel"` (or `isolation_forest_multivariate` with class alias support {{% available_from "v1.13.0" anomaly %}})
* `contamination` (float or string, optional) - The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples. Default value - "auto". Should be either `"auto"` or be in the range (0.0, 0.5]. {{% available_from "v1.29.5" anomaly %}} Numeric strings, such as `"0.01"`, are accepted, while invalid non-finite values, such as `nan`, `inf`, and `-inf`, are rejected during config validation.
* `contamination` (float or string, optional) - The amount of contamination of the data set, i.e. the proportion of outliers in the data set. Used when fitting to define the threshold on the scores of the samples. Default value - "auto". Should be either `"auto"` or be in the range (0.0, 0.5].
* `seasonal_features` (list of string) - List of seasonality to encode through [cyclical encoding](https://towardsdatascience.com/cyclical-features-encoding-its-about-time-ce23581845ca), i.e. `dow` (day of week). **Introduced in [1.12.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1120)**.
- Empty by default for backward compatibility.
@@ -1265,7 +1265,7 @@ monitoring:
Let's pull the docker image for `vmanomaly`:
```sh
docker pull victoriametrics/vmanomaly:v1.29.5
docker pull victoriametrics/vmanomaly:v1.29.4
```
Now we can run the docker container putting as volumes both config and model file:
@@ -1279,7 +1279,7 @@ docker run -it \
-v $(PWD)/license:/license \
-v $(PWD)/custom_model.py:/vmanomaly/model/custom.py \
-v $(PWD)/custom.yaml:/config.yaml \
victoriametrics/vmanomaly:v1.29.5 /config.yaml \
victoriametrics/vmanomaly:v1.29.4 /config.yaml \
--licenseFile=/license
--watch
```

View File

@@ -10,12 +10,12 @@ sitemap:
- To use *vmanomaly*, part of the enterprise package, a license key is required. Obtain your key [here](https://victoriametrics.com/products/enterprise/trial/) for this tutorial or for enterprise use.
- In the tutorial, we'll be using the following VictoriaMetrics components:
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.145.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.145.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.145.0)
- [Grafana](https://grafana.com/) (v12.2.0)
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.137.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.137.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.137.0)
- [Grafana](https://grafana.com/) (v.10.2.1)
- [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/)
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.9.1) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.28.1)
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.7.0) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.27.0)
![typical setup diagram](guide-vmanomaly-vmalert_overview.webp)
@@ -323,7 +323,7 @@ Let's wrap it all up together into the `docker-compose.yml` file.
services:
vmagent:
container_name: vmagent
image: victoriametrics/vmagent:v1.145.0
image: victoriametrics/vmagent:v1.137.0
depends_on:
- "victoriametrics"
ports:
@@ -340,7 +340,7 @@ services:
victoriametrics:
container_name: victoriametrics
image: victoriametrics/victoria-metrics:v1.145.0
image: victoriametrics/victoria-metrics:v1.137.0
ports:
- 8428:8428
volumes:
@@ -356,7 +356,7 @@ services:
grafana:
container_name: grafana
image: grafana/grafana:12.2.0
image: grafana/grafana-oss:10.2.1
depends_on:
- "victoriametrics"
ports:
@@ -373,7 +373,7 @@ services:
vmalert:
container_name: vmalert
image: victoriametrics/vmalert:v1.145.0
image: victoriametrics/vmalert:v1.137.0
depends_on:
- "victoriametrics"
ports:
@@ -395,7 +395,7 @@ services:
restart: always
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.5
image: victoriametrics/vmanomaly:v1.29.4
depends_on:
- "victoriametrics"
ports:
@@ -412,7 +412,7 @@ services:
- "--licenseFile=/license"
alertmanager:
container_name: alertmanager
image: prom/alertmanager:v0.28.1
image: prom/alertmanager:v0.27.0
volumes:
- ./alertmanager.yml:/config/alertmanager.yml
command:
@@ -424,7 +424,7 @@ services:
restart: always
node-exporter:
image: quay.io/prometheus/node-exporter:v1.9.1
image: quay.io/prometheus/node-exporter:v1.7.0
container_name: node-exporter
ports:
- 9100:9100

View File

@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -121,7 +121,7 @@ func (p *Password) initRandomValue() {
_, err := io.ReadFull(rand.Reader, buf[:])
if err != nil {
// cannot use lib/logger here, since it can be uninitialized yet
panic(fmt.Errorf("FATAL: cannot read random data: %w", err))
panic(fmt.Errorf("FATAL: cannot read random data: %s", err))
}
s := string(buf[:])
p.value.Store(&s)

View File

@@ -16,7 +16,7 @@ func ParseKey(key []byte) (any, error) {
k, err := x509.ParsePKIXPublicKey(b.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse key %q: %w", key, err)
return nil, fmt.Errorf("failed to parse key %q: %v", key, err)
}
return k, nil

View File

@@ -14,7 +14,7 @@ func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) {
wru := &WriteRequestUnmarshaler{}
for pb.Next() {
if _, err := wru.UnmarshalProtobuf(data); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})

View File

@@ -97,12 +97,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var dedicatedServerDetails dedicatedServer
if err = json.Unmarshal(resp, &dedicatedServerDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// get IPs for this dedicated server.
@@ -113,12 +113,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// handle different IP formats
@@ -141,11 +141,11 @@ func getDedicatedServerList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
if err = json.Unmarshal(resp, &dedicatedServerList); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
return dedicatedServerList, nil

View File

@@ -117,12 +117,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var vpsDetails virtualPrivateServer
if err = json.Unmarshal(resp, &vpsDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// get IPs for this vps.
@@ -133,12 +133,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// handle different IP formats
@@ -162,12 +162,12 @@ func getVPSList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var vpsList []string
if err = json.Unmarshal(resp, &vpsList); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
return vpsList, nil

View File

@@ -33,7 +33,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
parsedURL, err := url.Parse(sdc.URL)
if err != nil {
return nil, fmt.Errorf("cannot parse %s: %w", sdc.URL, err)
return nil, fmt.Errorf("parse URL %s error: %v", sdc.URL, err)
}
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return nil, fmt.Errorf("URL %s scheme must be 'http' or 'https'", sdc.URL)

View File

@@ -221,7 +221,7 @@ func getIAMToken(cfg *apiConfig) (*iamToken, error) {
body := bytes.NewBuffer(passport)
resp, err := cfg.client.Post(iamURL, "application/json", body)
if err != nil {
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %w", iamURL, err)
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %s", iamURL, err)
}
data, err := readResponseBody(resp, iamURL)
if err != nil {

View File

@@ -186,7 +186,7 @@ func (s *Series) unmarshalProtobuf(src []byte) (err error) {
}
pt := &points[len(points)-1]
if err := pt.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal point: %w", err)
return fmt.Errorf("cannot unmarshal point: %s", err)
}
case 1:
data, ok := fc.MessageData()

View File

@@ -30,7 +30,7 @@ func ProcessRequestBody(b []byte) ([]byte, error) {
}
}
if err := json.Unmarshal(b, &req); err != nil {
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %w", err)
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %s", err)
}
var dst []byte

View File

@@ -99,17 +99,17 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
n, err := getFloat64(o, "value")
if err != nil {
return fmt.Errorf("missing `value` element: %w", err)
return fmt.Errorf("missing `value` element, %s", err)
}
r.Value = n
cl, err := getInt64(o, "clock")
if err != nil {
return fmt.Errorf("missing `clock` element: %w", err)
return fmt.Errorf("missing `clock` element, %s", err)
}
ns, err := getInt64(o, "ns")
if err != nil {
return fmt.Errorf("missing `ns` element: %w", err)
return fmt.Errorf("missing `ns` element, %s", err)
}
// clock - Number of seconds since Epoch to the moment when value was collected (integer part).
// ns - Number of nanoseconds to be added to clock to get a precise value collection time.
@@ -121,7 +121,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
if len(groupValue) != 0 {
groups, err := getArray(o, "groups")
if err != nil {
return fmt.Errorf("missing `groups` element: %w", err)
return fmt.Errorf("missing `groups` element, %s", err)
}
for _, g := range groups {
k := g.GetStringBytes()
@@ -141,7 +141,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
itemTags, err := getArray(o, "item_tags")
if err != nil {
return fmt.Errorf("missing `item_tags` element: %w", err)
return fmt.Errorf("missing `item_tags` element, %s", err)
}
if len(duplicateTagsSeparator) == 0 { // Do not merge tags

View File

@@ -71,9 +71,9 @@ func Create(ctx context.Context, createSnapshotURL string) (string, error) {
return snap.Snapshot, nil
}
if snap.Status == "error" {
return "", fmt.Errorf("snapshot status: %q; msg: %q", snap.Status, snap.Msg)
return "", errors.New(snap.Msg)
}
return "", fmt.Errorf("snapshot status unknown: %q", snap.Status)
return "", fmt.Errorf("unknown status: %v", snap.Status)
}
// Delete deletes a snapshot via the provided api endpoint
@@ -121,14 +121,14 @@ func Delete(ctx context.Context, deleteSnapshotURL string, snapshotName string)
if snap.Status == "error" {
return errors.New(snap.Msg)
}
return fmt.Errorf("snapshot status unknown: %q", snap.Status)
return fmt.Errorf("unknown status: %v", snap.Status)
}
// GetHTTPClient returns a new HTTP client configured for snapshot operations.
func GetHTTPClient() (*http.Client, error) {
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vm_snapshot_client")
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
return nil, fmt.Errorf("failed to create transport: %s", err)
}
hc := &http.Client{
Transport: tr,

View File

@@ -649,7 +649,7 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer,
} else {
_, key, err := unmarshalCompositeTagKey(labelName)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal composite tag key: %w", err)
return nil, fmt.Errorf("cannot unmarshal composite tag key: %s", err)
}
lns[string(key)] = struct{}{}
}

View File

@@ -6,6 +6,9 @@ type avgAggrValue struct {
}
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
av.count++
}

View File

@@ -4,7 +4,10 @@ type countSamplesAggrValue struct {
count uint64
}
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
}

View File

@@ -9,7 +9,10 @@ type countSeriesAggrValue struct {
samples map[uint64]struct{}
}
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
if sample.stateOnly {
return
}
// Count unique hashes over the keys instead of unique key values.
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))

View File

@@ -45,6 +45,7 @@ type dedupAggrShardNopad struct {
type dedupAggrSample struct {
value float64
timestamp int64
stateOnly bool
}
func newDedupAggr() *dedupAggr {
@@ -189,6 +190,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
s.stateOnly = sample.stateOnly
key := bytesutil.InternString(sample.key)
state.m[key] = s
@@ -197,28 +199,33 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
var newWins bool
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
if newWins {
s.stateOnly = sample.stateOnly
}
}
state.samplesBuf = samplesBuf
}
// deduplicateSamples returns deduplicated timestamp and value results.
// deduplicateSamples returns deduplicated timestamp and value results,
// along with a boolean indicating whether the new sample won.
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
if newT > oldT {
return newT, newV
return newT, newV, true
}
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
if newT == oldT {
if decimal.IsStaleNaN(oldV) {
return newT, newV
return newT, newV, true
}
if newV > oldV {
return newT, newV
return newT, newV, true
}
}
return oldT, oldV
return oldT, oldV, false
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
@@ -250,6 +257,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
key: key,
value: s.value,
timestamp: s.timestamp,
stateOnly: s.stateOnly,
})
// Limit the number of samples per each flush in order to limit memory usage.

View File

@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
}
da.pushSamples(samples, 0, false)
if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
if n := da.sizeBytes(); n > 6_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
func TestDeduplicateSamples(t *testing.T) {
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
t.Helper()
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
if dedupT != expectedT || dedupV != expectedV {
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)

View File

@@ -11,6 +11,9 @@ type histogramBucketAggrValue struct {
}
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.h.Update(sample.value)
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrValueShared struct {
lastValues map[string]increaseLastValue
}
type increaseAggrValue struct {
total float64
shared *increaseAggrValueShared
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared.lastValues[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
suffix := ac.getSuffix()
total := av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
}
}
ctx.appendSeries(key, suffix, total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared *increaseAggrValueShared
if s == nil {
shared = &increaseAggrValueShared{
lastValues: make(map[string]increaseLastValue),
}
} else {
shared = s.(*increaseAggrValueShared)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -6,6 +6,9 @@ type lastAggrValue struct {
}
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.timestamp >= av.timestamp {
av.last = sample.value
av.timestamp = sample.timestamp

View File

@@ -6,6 +6,9 @@ type maxAggrValue struct {
}
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value > av.max || !av.defined {
av.max = sample.value
}

View File

@@ -6,6 +6,9 @@ type minAggrValue struct {
}
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value < av.min || !av.defined {
av.min = sample.value
}

View File

@@ -13,6 +13,9 @@ type quantilesAggrValue struct {
}
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if av.h == nil {
av.h = histogram.GetFast()
}

View File

@@ -34,6 +34,7 @@ var rateAggrStateValuePool sync.Pool
func putRateAggrStateValue(v *rateAggrStateValue) {
v.timestamp = 0
v.lastTimestamp = 0
v.increase = 0
rateAggrStateValuePool.Put(v)
}
@@ -88,6 +89,10 @@ type rateAggrStateValue struct {
// increase stores cumulative increase for the current time series on the current aggregation interval
increase float64
timestamp int64
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
// It is used for out-of-order detection, while timestamp (above) is only updated by
// non-state-only samples and is used for rate calculation.
lastTimestamp int64
}
type rateAggrValue struct {
@@ -101,16 +106,20 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
sv, ok := av.shared[key]
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {
if sample.timestamp < state.lastTimestamp {
// Skip out of order sample
return
}
if sample.value >= sv.value {
state.increase += sample.value - sv.value
if !sample.stateOnly {
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
sv.prevTimestamp = sample.timestamp
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -121,7 +130,10 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
}
sv.value = sample.value
sv.deleteDeadline = deleteDeadline
state.timestamp = sample.timestamp
state.lastTimestamp = sample.timestamp
if !sample.stateOnly {
state.timestamp = sample.timestamp
}
}
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {

View File

@@ -12,6 +12,9 @@ type stdAggrValue struct {
}
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
avg := av.avg + (sample.value-av.avg)/av.count
av.q += (sample.value - av.avg) * (sample.value - avg)

View File

@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:
@@ -1006,26 +1006,28 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
a.ignoredNaNSamples.Inc()
continue
}
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
// Skip old samples outside the current aggregation interval
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
if stateOnly {
a.ignoredOldSamples.Inc()
continue
}
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
} else {
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
}
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
} else {
ctx.blue = append(ctx.blue, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
}
}
@@ -1099,6 +1101,10 @@ type pushSample struct {
key string
value float64
timestamp int64
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
// (total, rate, increase) but do not contribute to the aggregation output.
stateOnly bool
}
func getPushCtx() *pushCtx {

View File

@@ -688,7 +688,9 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg, when two aggregation intervals are empty
// test rate_sum and rate_avg, when two aggregation intervals are empty.
// abc=777 arrives slightly before the start of each interval (-10ms) but still
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
f([]string{`
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
@@ -807,4 +809,55 @@ foo:1m_sum_samples{baz="qwe"} 10
dedup_interval: 30s
outputs: [sum_samples]
`, "11111111")
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
// update the state reference without contributing to the interval total, so the subsequent
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_total 100
foo:1m_total 200
`, `
- interval: 1m
outputs: [total]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// increase with ignore_old_samples: same correctness check for increase output.
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_increase 100
foo:1m_increase 100
`, `
- interval: 1m
outputs: [increase]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
// and the winning stateOnly sample's timestamp is used as the denominator start.
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
f([]string{`
foo 100
`, `
foo 150 -30
foo 120 -40
foo 200
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
`, `
- interval: 1m
outputs: [rate_sum]
ignore_old_samples: true
`, "1111")
}

View File

@@ -74,7 +74,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}
@@ -133,7 +133,7 @@ func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregat
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}

View File

@@ -5,6 +5,9 @@ type sumSamplesAggrValue struct {
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
}

View File

@@ -36,12 +36,14 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
// Skip out of order sample
return
}
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
@@ -54,7 +56,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
@@ -63,9 +64,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
delete(lvs, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
@@ -78,11 +77,10 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +88,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +113,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}

View File

@@ -5,6 +5,9 @@ type uniqueSamplesAggrValue struct {
}
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if _, ok := av.samples[sample.value]; !ok {
av.samples[sample.value] = struct{}{}
}