Compare commits

..

1 Commits

Author SHA1 Message Date
Haley Wang
884da8ecff add extra log for resolving backend address 2026-04-19 15:28:36 +08:00
90 changed files with 229 additions and 856 deletions

View File

@@ -1,3 +1 @@
**PLEASE REMOVE LINE BELOW BEFORE SUBMITTING**
Before creating the PR, make sure you have read and followed the [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
Before creating the PR, please read [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist) and remove this line after confirming you understand and follow them.

View File

@@ -27,7 +27,7 @@ jobs:
- run: go version
- name: Cache Go artifacts
uses: actions/cache@v5
uses: actions/cache@v4
with:
path: |
~/.cache/go-build

View File

@@ -40,7 +40,7 @@ jobs:
- run: go version
- name: Cache Go artifacts
uses: actions/cache@v5
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
@@ -50,14 +50,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@v4.35.1
uses: github/codeql-action/init@v4
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@v4.35.1
uses: github/codeql-action/autobuild@v4
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4.35.1
uses: github/codeql-action/analyze@v4
with:
category: 'language:go'

View File

@@ -47,7 +47,7 @@ jobs:
- run: go version
- name: Cache golangci-lint
uses: actions/cache@v5
uses: actions/cache@v4
with:
path: |
~/.cache/golangci-lint

View File

@@ -222,9 +222,6 @@ func (r *Rule) Validate() error {
if r.Expr == "" {
return fmt.Errorf("expression can't be empty")
}
if _, ok := r.Labels["__name__"]; ok {
return fmt.Errorf("invalid rule label __name__")
}
return checkOverflow(r.XXX, "rule")
}

View File

@@ -136,9 +136,6 @@ func TestRuleValidate(t *testing.T) {
if err := (&Rule{Alert: "alert"}).Validate(); err == nil {
t.Fatalf("expected empty expr error")
}
if err := (&Rule{Record: "record", Expr: "sum(test)", Labels: map[string]string{"__name__": "test"}}).Validate(); err == nil {
t.Fatalf("invalid rule label; got %s", err)
}
if err := (&Rule{Alert: "alert", Expr: "test>0"}).Validate(); err != nil {
t.Fatalf("expected valid rule; got %s", err)
}

View File

@@ -87,7 +87,6 @@ func (m *Metric) DelLabel(key string) {
for i, l := range m.Labels {
if l.Name == key {
m.Labels = append(m.Labels[:i], m.Labels[i+1:]...)
break
}
}
}

View File

@@ -312,11 +312,9 @@ type labelSet struct {
// On k conflicts in origin set, the original value is preferred and copied
// to processed with `exported_%k` key. The copy happens only if passed v isn't equal to origin[k] value.
func (ls *labelSet) add(k, v string) {
// do not add label with empty value to the result, as it has no meaning:
// if the label already exists in the original query result, remove it to preserve compatibility with relabeling, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10766.
// otherwise, ignore the label, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984.
// do not add label with empty value, since it has no meaning.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984
if v == "" {
delete(ls.processed, k)
return
}
ls.processed[k] = v

View File

@@ -1363,7 +1363,6 @@ func TestAlertingRule_ToLabels(t *testing.T) {
{Name: "instance", Value: "0.0.0.0:8800"},
{Name: "group", Value: "vmalert"},
{Name: "alertname", Value: "ConfigurationReloadFailure"},
{Name: "pod", Value: "vmalert-0"},
},
Values: []float64{1},
Timestamps: []int64{time.Now().UnixNano()},
@@ -1375,7 +1374,6 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"group": "vmalert", // this shouldn't have effect since value in metric is equal
"invalid_label": "{{ .Values.mustRuntimeFail }}",
"empty_label": "", // this should be dropped
"pod": "", // this should remove the pod label from query result
},
Expr: "sum(vmalert_alerting_rules_error) by(instance, group, alertname) > 0",
Name: "AlertingRulesError",
@@ -1387,7 +1385,6 @@ func TestAlertingRule_ToLabels(t *testing.T) {
"group": "vmalert",
"alertname": "ConfigurationReloadFailure",
"alertgroup": "vmalert",
"pod": "vmalert-0",
"invalid_label": `error evaluating template: template: :1:298: executing "" at <.Values.mustRuntimeFail>: can't evaluate field Values in type notifier.tplData`,
}

View File

@@ -409,9 +409,6 @@ func (g *Group) Start(ctx context.Context, rw remotewrite.RWClient, rr datasourc
g.mu.Unlock()
defer g.evalCancel()
// start the interval ticker before the first evaluation,
// so that the evaluation timestamps of groups with the `eval_offset` option are also aligned,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10773
t := time.NewTicker(g.Interval)
defer t.Stop()

View File

@@ -293,11 +293,9 @@ func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompb.TimeSeries {
}
// add extra labels configured by user
for k := range rr.Labels {
// do not add label with empty value to the result, as it has no meaning:
// if the label already exists in the original query result, remove it to preserve compatibility with relabeling, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10766.
// otherwise, ignore the label, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984.
// do not add label with empty value, since it has no meaning.
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9984
if rr.Labels[k] == "" {
m.DelLabel(k)
continue
}
existingLabel := promrelabel.GetLabelByName(m.Labels, k)

View File

@@ -163,13 +163,11 @@ func TestRecordingRule_Exec(t *testing.T) {
f(&RecordingRule{
Name: "job:foo",
Labels: map[string]string{
"source": "test",
"empty_label": "", // this should be dropped
"pod": "", // this should remove the pod label from query result
"source": "test",
},
}, [][]datasource.Metric{{
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo", "pod", "vmalert-0"),
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin", "pod", "vmalert-1"),
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin"),
metricWithValueAndLabels(t, 1, "__name__", "baz", "job", "baz", "source", "test"),
}}, [][]prompb.TimeSeries{{
newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompb.Label{

View File

@@ -362,62 +362,40 @@ func (up *URLPrefix) setLoadBalancingPolicy(loadBalancingPolicy string) error {
}
type backendURLs struct {
bhc backendHealthCheck
healthChecksContext context.Context
healthChecksCancel func()
healthChecksWG sync.WaitGroup
bus []*backendURL
}
type backendHealthCheck struct {
ctx context.Context
// mu protects fields below
cancel func()
mu sync.Mutex
isStopped bool
wg sync.WaitGroup
}
func (bhc *backendHealthCheck) run(hc func()) {
bhc.mu.Lock()
defer bhc.mu.Unlock()
if bhc.isStopped {
return
}
bhc.wg.Go(hc)
}
func (bhc *backendHealthCheck) stop() {
bhc.mu.Lock()
bhc.cancel()
bhc.isStopped = true
bhc.mu.Unlock()
bhc.wg.Wait()
}
func newBackendURLs() *backendURLs {
ctx, cancel := context.WithCancel(context.Background())
return &backendURLs{
bhc: backendHealthCheck{
ctx: ctx,
cancel: cancel,
},
healthChecksContext: ctx,
healthChecksCancel: cancel,
}
}
func (bus *backendURLs) add(u *url.URL) {
bus.bus = append(bus.bus, &backendURL{
url: u,
bhc: &bus.bhc,
hasPlaceHolders: hasAnyPlaceholders(u),
url: u,
healthCheckContext: bus.healthChecksContext,
healthCheckWG: &bus.healthChecksWG,
hasPlaceHolders: hasAnyPlaceholders(u),
})
}
func (bus *backendURLs) stopHealthChecks() {
bus.bhc.stop()
bus.healthChecksCancel()
bus.healthChecksWG.Wait()
}
type backendURL struct {
broken atomic.Bool
bhc *backendHealthCheck
healthCheckContext context.Context
healthCheckWG *sync.WaitGroup
concurrentRequests atomic.Int32
@@ -432,7 +410,7 @@ func (bu *backendURL) isBroken() bool {
func (bu *backendURL) setBroken() {
if bu.broken.CompareAndSwap(false, true) {
bu.bhc.run(func() {
bu.healthCheckWG.Go(func() {
bu.runHealthCheck()
bu.broken.Store(false)
})
@@ -454,11 +432,11 @@ func (bu *backendURL) runHealthCheck() {
case <-t.C:
// Verify network connectivity via TCP dial before marking backend healthy.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9997
ctx, cancel := context.WithTimeout(bu.bhc.ctx, time.Second)
ctx, cancel := context.WithTimeout(bu.healthCheckContext, time.Second)
c, err := netutil.Dialer.DialContext(ctx, "tcp", addr)
cancel()
if err != nil {
if errors.Is(bu.bhc.ctx.Err(), context.Canceled) {
if errors.Is(bu.healthCheckContext.Err(), context.Canceled) {
return
}
logger.Warnf("ignoring the backend at %s for %s because of dial error: %s", addr, *failTimeout, err)
@@ -467,7 +445,7 @@ func (bu *backendURL) runHealthCheck() {
_ = c.Close()
return
case <-bu.bhc.ctx.Done():
case <-bu.healthCheckContext.Done():
return
}
}
@@ -539,6 +517,7 @@ func (up *URLPrefix) discoverBackendAddrsIfNeeded() {
continue
}
logger.Infof("try to resolve backend IPs for %s", host)
var resolvedAddrs []string
if strings.HasPrefix(host, "srv+") {
// The host has the format 'srv+realhost'. Strip 'srv+' prefix before performing the lookup.
@@ -566,6 +545,7 @@ func (up *URLPrefix) discoverBackendAddrsIfNeeded() {
resolvedAddrs = make([]string, len(addrs))
for i, addr := range addrs {
resolvedAddrs[i] = net.JoinHostPort(addr.String(), port)
logger.Infof("discover backend IPs for %s into %d addresses, one is %s", bu, len(resolvedAddrs), resolvedAddrs[i])
}
}
}
@@ -588,6 +568,7 @@ func (up *URLPrefix) discoverBackendAddrsIfNeeded() {
bus := up.bus.Load()
if areEqualBackendURLs(bus.bus, busNew.bus) {
logger.Infof("resolved addr are the same as the original one")
return
}

View File

@@ -8,10 +8,10 @@ import (
"time"
vmetrics "github.com/VictoriaMetrics/metrics"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type otsdbProcessor struct {
@@ -89,6 +89,9 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
// we're going to make serieslist * queryRanges queries, so we should represent that in the progress bar
otsdbSeriesTotal.Add(len(serieslist) * queryRanges)
bar := pb.StartNew(len(serieslist) * queryRanges)
defer func(bar *pb.ProgressBar) {
bar.Finish()
}(bar)
var wg sync.WaitGroup
for range op.otsdbcc {
wg.Go(func() {
@@ -103,22 +106,41 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
}
})
}
runErr := op.sendQueries(ctx, serieslist, seriesCh, errCh, startTime)
/*
Loop through all series for this metric, processing all retentions and time ranges
requested. This loop is our primary "collect data from OpenTSDB loop" and should
be async, sending data to VictoriaMetrics over time.
// Always drain channels and wait for workers to prevent goroutine leaks
The idea with having the select at the inner-most loop is to ensure quick
short-circuiting on error.
*/
for _, series := range serieslist {
for _, rt := range op.oc.Retentions {
for _, tr := range rt.QueryRanges {
select {
case otsdbErr := <-errCh:
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
FirstOrder: rt.FirstOrder, SecondOrder: rt.SecondOrder, AggTime: rt.AggTime}}:
}
}
}
}
// Drain channels per metric
close(seriesCh)
wg.Wait()
close(errCh)
// check for any lingering errors on the query side
for otsdbErr := range errCh {
if runErr == nil {
runErr = fmt.Errorf("import process failed: \n%s", otsdbErr)
}
return fmt.Errorf("import process failed: \n%s", otsdbErr)
}
bar.Finish()
if runErr != nil {
return runErr
}
log.Print(op.im.Stats())
}
op.im.Close()
@@ -133,34 +155,6 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
return nil
}
// sendQueries iterates over all series and retention ranges, sending queries to workers.
// It returns early if ctx is canceled or an error is received.
func (op *otsdbProcessor) sendQueries(ctx context.Context, serieslist []opentsdb.Meta, seriesCh chan<- queryObj, errCh <-chan error, startTime int64) error {
for _, series := range serieslist {
for _, rt := range op.oc.Retentions {
for _, tr := range rt.QueryRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %s", ctx.Err())
case otsdbErr := <-errCh:
otsdbErrorsTotal.Inc()
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
FirstOrder: rt.FirstOrder,
SecondOrder: rt.SecondOrder,
AggTime: rt.AggTime,
}}:
}
}
}
}
return nil
}
func (op *otsdbProcessor) do(s queryObj) error {
start := s.StartTime - s.Tr.Start
end := s.StartTime - s.Tr.End
@@ -169,7 +163,6 @@ func (op *otsdbProcessor) do(s queryObj) error {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
}
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
log.Printf("no data found for %v in %v:%v...skipping", s.Series, s.Rt, s.Tr)
return nil
}
labels := make([]vm.LabelPair, 0, len(data.Tags))

View File

@@ -108,10 +108,10 @@ func (c Client) FindMetrics(q string) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("bad return from OpenTSDB: %d: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
@@ -130,12 +130,12 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("bad return from OpenTSDB: %d: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
@@ -185,7 +185,6 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
/*
There are three potential failures here, none of which should kill the entire
migration run:
@@ -197,6 +196,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
log.Printf("bad response code from OpenTSDB query %v for %q...skipping", resp.StatusCode, q)
return Metric{}, nil
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("couldn't read response body from OpenTSDB query...skipping")
@@ -239,20 +239,27 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
In all "bad" cases, we don't end the migration, we just don't process that particular message
*/
if len(output) < 1 {
// no results returned...return an empty object without error
return Metric{}, nil
}
if len(output) > 1 {
return Metric{}, fmt.Errorf("unexpected number of series returned: %d for query %q; expected 1", len(output), q)
// multiple series returned for a single query. We can't process this right, so...
return Metric{}, nil
}
if len(output[0].AggregateTags) > 0 {
return Metric{}, fmt.Errorf("aggregate tags %v present in response for query %q; series may be suppressed", output[0].AggregateTags, q)
// This failure means we've suppressed potential series somehow...
return Metric{}, nil
}
data := Metric{}
data.Metric = output[0].Metric
data.Tags = output[0].Tags
/*
We evaluate data for correctness before formatting the actual values
to skip a little bit of time if the series has invalid formatting
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, fmt.Errorf("failed to convert metric data for query %q: %w", q, err)
return Metric{}, nil
}
/*

View File

@@ -32,7 +32,7 @@ func convertDuration(duration string) (time.Duration, error) {
var err error
var timeValue int
if strings.HasSuffix(duration, "y") {
timeValue, err = strconv.Atoi(strings.TrimSuffix(duration, "y"))
timeValue, err = strconv.Atoi(strings.Trim(duration, "y"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
@@ -42,7 +42,7 @@ func convertDuration(duration string) (time.Duration, error) {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else if strings.HasSuffix(duration, "w") {
timeValue, err = strconv.Atoi(strings.TrimSuffix(duration, "w"))
timeValue, err = strconv.Atoi(strings.Trim(duration, "w"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
@@ -52,7 +52,7 @@ func convertDuration(duration string) (time.Duration, error) {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
} else if strings.HasSuffix(duration, "d") {
timeValue, err = strconv.Atoi(strings.TrimSuffix(duration, "d"))
timeValue, err = strconv.Atoi(strings.Trim(duration, "d"))
if err != nil {
return 0, fmt.Errorf("invalid time range: %q", duration)
}
@@ -95,9 +95,6 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
if !msecTime {
queryLength = queryLength / 1000
}
if queryLength <= 0 {
return Retention{}, fmt.Errorf("ttl %q resolves to non-positive query range %d; use a larger duration", chunks[2], queryLength)
}
queryRange := queryLength
// bump by the offset so we don't look at empty ranges any time offset > ttl
queryLength += offset
@@ -141,29 +138,16 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
2. we discover the actual size of each "chunk"
This is second division step
*/
divisor := queryRange / (rowLength * 4)
if divisor == 0 {
querySize = queryRange
} else {
querySize = queryRange / divisor
}
querySize = int64(queryRange / (queryRange / (rowLength * 4)))
} else {
/*
Unless the aggTime (how long a range of data we're requesting per individual point)
is greater than the row size. Then we'll need to use that to determine
how big each individual query should be
*/
divisor := queryRange / (aggTime * 4)
if divisor == 0 {
querySize = queryRange
} else {
querySize = queryRange / divisor
}
querySize = int64(queryRange / (queryRange / (aggTime * 4)))
}
if querySize <= 0 {
return Retention{}, fmt.Errorf("computed non-positive querySize=%d for retention %q; check parameters", querySize, retention)
}
var timeChunks []TimeRange
var i int64
for i = offset; i <= queryLength; i = i + querySize {

View File

@@ -93,14 +93,6 @@ type QueryOpts struct {
Headers http.Header
}
// getTenant returns tenant with optional default value
func (qos *QueryOpts) getTenant() string {
if qos.Tenant == "" {
return "0"
}
return qos.Tenant
}
func (qos *QueryOpts) getHeaders() http.Header {
if qos.Headers == nil {
qos.Headers = make(http.Header)

View File

@@ -1,186 +0,0 @@
package tests
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"strings"
"testing"
)
// openTSDBPoint is a single data point served by the mock OpenTSDB server.
type openTSDBPoint struct {
Metric string
Tags map[string]string
Timestamp int64
Value float64
}
// openTSDBMockServer implements the minimal subset of the OpenTSDB HTTP API
// used by vmctl opentsdb: /api/suggest, /api/search/lookup, /api/query.
type openTSDBMockServer struct {
server *httptest.Server
points []openTSDBPoint
}
// newOpenTSDBMockServer starts an httptest server serving the given points.
func newOpenTSDBMockServer(t *testing.T, points []openTSDBPoint) *openTSDBMockServer {
t.Helper()
s := &openTSDBMockServer{points: points}
mux := http.NewServeMux()
mux.HandleFunc("/api/suggest", s.handleSuggest)
mux.HandleFunc("/api/search/lookup", s.handleLookup)
mux.HandleFunc("/api/query", s.handleQuery)
s.server = httptest.NewServer(mux)
return s
}
// close shuts down the server.
func (s *openTSDBMockServer) close() { s.server.Close() }
// httpAddr returns the server URL.
func (s *openTSDBMockServer) httpAddr() string { return s.server.URL }
// handleSuggest serves https://opentsdb.net/docs/build/html/api_http/suggest.html
func (s *openTSDBMockServer) handleSuggest(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query().Get("q")
seen := make(map[string]bool, len(s.points))
var out []string
for _, p := range s.points {
if seen[p.Metric] {
continue
}
if q != "" && !strings.Contains(p.Metric, q) {
continue
}
seen[p.Metric] = true
out = append(out, p.Metric)
}
_ = json.NewEncoder(w).Encode(out)
}
// handleLookup serves https://opentsdb.net/docs/build/html/api_http/search/lookup.html
func (s *openTSDBMockServer) handleLookup(w http.ResponseWriter, r *http.Request) {
metric := r.URL.Query().Get("m")
type meta struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
}
seen := make(map[string]bool, len(s.points))
var results []meta
for _, p := range s.points {
if p.Metric != metric {
continue
}
key := tagsKey(p.Tags)
if seen[key] {
continue
}
seen[key] = true
results = append(results, meta{p.Metric, p.Tags})
}
_ = json.NewEncoder(w).Encode(map[string]any{
"type": "LOOKUP",
"metric": metric,
"results": results,
})
}
// handleQuery serves https://opentsdb.net/docs/build/html/api_http/query/index.html
func (s *openTSDBMockServer) handleQuery(w http.ResponseWriter, r *http.Request) {
m := r.URL.Query().Get("m")
metric, tagFilter, ok := parseQuery(m)
if !ok {
http.Error(w, "bad query param", http.StatusBadRequest)
return
}
start, err := strconv.ParseInt(r.URL.Query().Get("start"), 10, 64)
if err != nil {
http.Error(w, "bad start param", http.StatusBadRequest)
return
}
end, err := strconv.ParseInt(r.URL.Query().Get("end"), 10, 64)
if err != nil {
http.Error(w, "bad end param", http.StatusBadRequest)
return
}
type resp struct {
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
AggregateTags []string `json:"aggregateTags"`
Dps map[string]float64 `json:"dps"`
}
grouped := make(map[string]*resp, len(s.points))
for _, p := range s.points {
if p.Metric != metric {
continue
}
if !matchTags(p.Tags, tagFilter) {
continue
}
if p.Timestamp < start || p.Timestamp > end {
continue
}
key := tagsKey(p.Tags)
if _, exists := grouped[key]; !exists {
grouped[key] = &resp{
Metric: p.Metric,
Tags: p.Tags,
AggregateTags: []string{},
Dps: map[string]float64{},
}
}
grouped[key].Dps[fmt.Sprintf("%d", p.Timestamp)] = p.Value
}
out := make([]*resp, 0, len(grouped))
for _, v := range grouped {
out = append(out, v)
}
_ = json.NewEncoder(w).Encode(out)
}
// parseQuery parses the OpenTSDB m= query parameter.
// Format: "<agg>:<bucket>-<agg>-none:<metric>{k=v,k=v}"
func parseQuery(m string) (string, map[string]string, bool) {
parts := strings.SplitN(m, ":", 3)
if len(parts) != 3 {
return "", nil, false
}
metric, tagStr, _ := strings.Cut(parts[2], "{")
tags := make(map[string]string, 4)
tagStr = strings.TrimSuffix(tagStr, "}")
for _, kv := range strings.Split(tagStr, ",") {
if k, v, ok := strings.Cut(kv, "="); ok {
tags[k] = v
}
}
return metric, tags, true
}
func matchTags(got, filter map[string]string) bool {
for k, v := range filter {
if v == "*" {
continue
}
if got[k] != v {
return false
}
}
return true
}
func tagsKey(tags map[string]string) string {
keys := make([]string, 0, len(tags))
for k := range tags {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, k := range keys {
parts = append(parts, k+"="+tags[k])
}
return strings.Join(parts, ",")
}

View File

@@ -1,167 +0,0 @@
package tests
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestSingleVmctlOpenTSDBProtocol(t *testing.T) {
fs.MustRemoveDir(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
vmsingleDst := tc.MustStartDefaultVmsingle()
vmAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
// Generate 60 points at 1-minute intervals starting 2 hours ago.
// This ensures data falls within vmctl's default query window (now - retention).
baseTS := time.Now().Add(-2 * time.Hour).Truncate(time.Minute).Unix()
points := make([]openTSDBPoint, 0, 60)
for i := range 60 {
points = append(points, openTSDBPoint{
Metric: "test.cpu",
Tags: map[string]string{"host": "h1", "env": "prod"},
Timestamp: baseTS + int64(i*60),
Value: float64(i),
})
}
otsdb := newOpenTSDBMockServer(t, points)
defer otsdb.close()
vmctlFlags := []string{
`opentsdb`,
`--otsdb-addr=` + otsdb.httpAddr(),
`--vm-addr=` + vmAddr,
`--otsdb-retentions=ssum-1m-avg:1d:1d`,
`--otsdb-filters=test`,
`--otsdb-normalize`,
`--disable-progress-bar=true`,
`-s`,
}
testOpenTSDBProtocol(tc, vmsingleDst, vmctlFlags, points, "test_cpu", baseTS)
}
func TestClusterVmctlOpenTSDBProtocol(t *testing.T) {
fs.MustRemoveDir(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
cluster := tc.MustStartDefaultCluster()
vmAddr := fmt.Sprintf("http://%s/", cluster.Vminsert.HTTPAddr())
// Generate 60 points at 1-minute intervals starting 2 hours ago.
baseTS := time.Now().Add(-2 * time.Hour).Truncate(time.Minute).Unix()
points := make([]openTSDBPoint, 0, 60)
for i := range 60 {
points = append(points, openTSDBPoint{
Metric: "test.mem",
Tags: map[string]string{"host": "h1"},
Timestamp: baseTS + int64(i*60),
Value: float64(i * 2),
})
}
otsdb := newOpenTSDBMockServer(t, points)
defer otsdb.close()
vmctlFlags := []string{
`opentsdb`,
`--otsdb-addr=` + otsdb.httpAddr(),
`--vm-addr=` + vmAddr,
`--otsdb-retentions=sum-1m-avg:1d:1d`,
`--otsdb-filters=test`,
`--otsdb-normalize`,
`--disable-progress-bar=true`,
`--vm-account-id=0`,
`-s`,
}
testOpenTSDBProtocol(tc, cluster, vmctlFlags, points, "test_mem", baseTS)
}
func testOpenTSDBProtocol(
tc *apptest.TestCase,
queries apptest.PrometheusWriteQuerier,
vmctlFlags []string,
points []openTSDBPoint,
vmMetricName string,
baseTS int64,
) {
t := tc.T()
t.Helper()
// Build dynamic time range covering all data points with 1-hour padding.
queryStart := time.Unix(baseTS-3600, 0).UTC().Format(time.RFC3339)
queryEnd := time.Unix(baseTS+7200, 0).UTC().Format(time.RFC3339)
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
got := queries.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
Step: "5m",
Time: queryStart,
})
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
tc.MustStartVmctl("vmctl", vmctlFlags)
queries.ForceFlush(t)
expected := buildExpectedOpenTSDBResult(points, vmMetricName)
tc.Assert(&apptest.AssertOptions{
Retries: 300,
Msg: `unexpected metrics stored via opentsdb protocol`,
Got: func() any {
r := queries.PrometheusAPIV1Export(t, fmt.Sprintf(`{__name__=%q}`, vmMetricName), apptest.QueryOpts{
Start: queryStart,
End: queryEnd,
})
r.Sort()
return r.Data.Result
},
Want: expected,
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
},
})
}
func buildExpectedOpenTSDBResult(points []openTSDBPoint, vmMetricName string) []*apptest.QueryResult {
grouped := map[string]*apptest.QueryResult{}
for _, p := range points {
metric := map[string]string{"__name__": vmMetricName}
for k, v := range p.Tags {
metric[k] = v
}
key := tagsKey(metric)
if _, ok := grouped[key]; !ok {
grouped[key] = &apptest.QueryResult{Metric: metric}
}
grouped[key].Samples = append(grouped[key].Samples, &apptest.Sample{
Timestamp: p.Timestamp * 1000,
Value: p.Value,
})
}
out := make([]*apptest.QueryResult, 0, len(grouped))
for _, v := range grouped {
out = append(out, v)
}
resp := apptest.PrometheusAPIV1QueryResponse{
Data: &apptest.QueryData{Result: out},
}
resp.Sort()
return resp.Data.Result
}

View File

@@ -106,7 +106,7 @@ func (app *Vminsert) HTTPAddr() string {
func (app *Vminsert) InfluxWrite(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/influx/write", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/influx/write", app.httpListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {
@@ -141,7 +141,7 @@ func (app *Vminsert) GraphiteWrite(t *testing.T, records []string, _ QueryOpts)
func (app *Vminsert) PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/csv", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/csv", app.httpListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {
@@ -166,7 +166,7 @@ func (app *Vminsert) PrometheusAPIV1ImportCSV(t *testing.T, records []string, op
func (app *Vminsert) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/native", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/native", app.httpListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {
@@ -190,7 +190,7 @@ func (app *Vminsert) PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts
func (app *Vminsert) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/opentsdb/api/put", app.openTSDBListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/opentsdb/api/put", app.openTSDBListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {
@@ -213,7 +213,7 @@ func (app *Vminsert) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOp
func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.Tenant)
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
recordsCount := len(wr.Timeseries)
if prommetadata.IsEnabled() {
@@ -238,7 +238,7 @@ func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest,
func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {
@@ -287,7 +287,7 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
func (app *Vminsert) ZabbixConnectorHistory(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/zabbixconnector/api/v1/history", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/insert/%s/zabbixconnector/api/v1/history", app.httpListenAddr, opts.Tenant)
uv := opts.asURLValues()
uvs := uv.Encode()
if len(uvs) > 0 {

View File

@@ -72,7 +72,7 @@ func (app *Vmselect) HTTPAddr() string {
func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
t.Helper()
exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export", app.httpListenAddr, opts.getTenant())
exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
@@ -88,7 +88,7 @@ func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query string, opts Quer
func (app *Vmselect) PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte {
t.Helper()
exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export/native", app.httpListenAddr, opts.getTenant())
exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export/native", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("match[]", query)
values.Add("format", "promapi")
@@ -104,7 +104,7 @@ func (app *Vmselect) PrometheusAPIV1ExportNative(t *testing.T, query string, opt
func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
t.Helper()
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/query", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/query", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("query", query)
@@ -120,7 +120,7 @@ func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query string, opts Query
func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
t.Helper()
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/query_range", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/query_range", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("query", query)
@@ -135,7 +135,7 @@ func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query string, opts
func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series", app.httpListenAddr, opts.getTenant())
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
@@ -150,7 +150,7 @@ func (app *Vmselect) PrometheusAPIV1Series(t *testing.T, matchQuery string, opts
func (app *Vmselect) PrometheusAPIV1SeriesCount(t *testing.T, opts QueryOpts) *PrometheusAPIV1SeriesCountResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series/count", app.httpListenAddr, opts.getTenant())
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/series/count", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
res, _ := app.cli.PostForm(t, seriesURL, values, opts.Headers)
@@ -167,7 +167,7 @@ func (app *Vmselect) PrometheusAPIV1Labels(t *testing.T, matchQuery string, opts
values := opts.asURLValues()
values.Add("match[]", matchQuery)
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/labels", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/labels", app.httpListenAddr, opts.Tenant)
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
return NewPrometheusAPIV1LabelsResponse(t, res)
}
@@ -181,7 +181,7 @@ func (app *Vmselect) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQu
values := opts.asURLValues()
values.Add("match[]", matchQuery)
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/label/%s/values", app.httpListenAddr, opts.getTenant(), labelName)
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/label/%s/values", app.httpListenAddr, opts.Tenant, labelName)
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
return NewPrometheusAPIV1LabelValuesResponse(t, res)
@@ -195,7 +195,7 @@ func (app *Vmselect) PrometheusAPIV1Metadata(t *testing.T, metric string, limit
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/metadata", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/metadata", app.httpListenAddr, opts.Tenant)
res, _ := app.cli.PostForm(t, queryURL, values, opts.Headers)
return NewPrometheusAPIV1Metadata(t, res)
@@ -208,7 +208,7 @@ func (app *Vmselect) PrometheusAPIV1Metadata(t *testing.T, metric string, limit
func (app *Vmselect) APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts) {
t.Helper()
queryURL := fmt.Sprintf("http://%s/delete/%s/prometheus/api/v1/admin/tsdb/delete_series", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/delete/%s/prometheus/api/v1/admin/tsdb/delete_series", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("match[]", matchQuery)
@@ -229,7 +229,7 @@ func (app *Vmselect) MetricNamesStats(t *testing.T, limit, le, matchPattern stri
values.Add("limit", limit)
values.Add("le", le)
values.Add("match_pattern", matchPattern)
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/metric_names_stats", app.httpListenAddr, opts.getTenant())
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/metric_names_stats", app.httpListenAddr, opts.Tenant)
res, statusCode := app.cli.PostForm(t, queryURL, values, opts.Headers)
if statusCode != http.StatusOK {
@@ -263,7 +263,7 @@ func (app *Vmselect) MetricNamesStatsReset(t *testing.T, opts QueryOpts) {
func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date string, topN string, opts QueryOpts) TSDBStatusResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/tsdb", app.httpListenAddr, opts.getTenant())
seriesURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/status/tsdb", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
addNonEmpty := func(name, value string) {
if len(value) == 0 {
@@ -294,7 +294,7 @@ func (app *Vmselect) APIV1StatusTSDB(t *testing.T, matchQuery string, date strin
func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) GraphiteMetricsIndexResponse {
t.Helper()
seriesURL := fmt.Sprintf("http://%s/select/%s/graphite/metrics/index.json", app.httpListenAddr, opts.getTenant())
seriesURL := fmt.Sprintf("http://%s/select/%s/graphite/metrics/index.json", app.httpListenAddr, opts.Tenant)
res, statusCode := app.cli.Get(t, seriesURL, opts.Headers)
if statusCode != http.StatusOK {
t.Fatalf("unexpected status code: got %d, want %d, resp text=%q", statusCode, http.StatusOK, res)
@@ -313,7 +313,7 @@ func (app *Vmselect) GraphiteMetricsIndex(t *testing.T, opts QueryOpts) Graphite
func (app *Vmselect) GraphiteTagsTagSeries(t *testing.T, record string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/select/%s/graphite/tags/tagSeries", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/select/%s/graphite/tags/tagSeries", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
values.Add("path", record)
@@ -326,7 +326,7 @@ func (app *Vmselect) GraphiteTagsTagSeries(t *testing.T, record string, opts Que
func (app *Vmselect) GraphiteTagsTagMultiSeries(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/select/%s/graphite/tags/tagMultiSeries", app.httpListenAddr, opts.getTenant())
url := fmt.Sprintf("http://%s/select/%s/graphite/tags/tagMultiSeries", app.httpListenAddr, opts.Tenant)
values := opts.asURLValues()
for _, rec := range records {
values.Add("path", rec)

View File

@@ -151,7 +151,7 @@ Some alerting rules thresholds are just recommendations and could require an adj
The list of alerting rules is the following:
* [alerts-health.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-health.yml):
alerting rules related to all VictoriaMetrics components for tracking their "health" state;
* [alerts-single-node.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-single-node.yml):
* [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts.yml):
alerting rules related to [single-server VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) installation;
* [alerts-cluster.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-cluster.yml):
alerting rules related to [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/);

View File

@@ -125,7 +125,7 @@ services:
ports:
- 8880:8880
volumes:
- ./rules/alerts-cluster.yml:/etc/alerts/alerts-cluster.yml
- ./rules/alerts-cluster.yml:/etc/alerts/alerts.yml
- ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml
- ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml

View File

@@ -66,7 +66,7 @@ services:
ports:
- 8880:8880
volumes:
- ./rules/alerts-single-node.yml:/etc/alerts/alerts-single-node.yml
- ./rules/alerts.yml:/etc/alerts/alerts.yml
- ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml
- ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml

View File

@@ -170,57 +170,3 @@ groups:
is saturated by more than 90% and vminsert won't be able to keep up.\n
This usually means that more vminsert or vmstorage nodes must be added to the cluster in order to increase
the total number of vminsert -> vmstorage links."
- alert: MetadataCacheUtilizationIsTooHigh
expr: |
vm_metrics_metadata_storage_size_bytes / vm_metrics_metadata_storage_max_size_bytes > 0.95
for: 15m
labels:
severity: warning
annotations:
summary: "Metadata cache capacity on {{ $labels.instance }} (job={{ $labels.job }}) is utilized for more than 95% for the last 15min"
description: "Metadata cache stores meta information about ingested time series - see https://docs.victoriametrics.com/victoriametrics/#metrics-metadata.
When cache is overutilized, the oldest entries will be dropped out automatically. It may result into incomplete
response for /api/v1/metadata API calls. It doesn't impact regular queries or alerts. Cache size is controlled
via -storage.maxMetadataStorageSize cmd-line flag."
- alert: MetricNameStatsCacheUtilizationIsTooHigh
expr: |
vm_cache_size_bytes{type="storage/metricNamesStatsTracker"} / vm_cache_size_max_bytes{type="storage/metricNamesStatsTracker"} > 0.95
for: 15m
labels:
severity: warning
annotations:
summary: "Cache capacity for tracking metric names usage on {{ $labels.instance }} (job={{ $labels.job }}) is utilized for more than 95% during the last 15min"
description: "Metric names usage cache stores information about unique metric names and how frequently they are queried - see https://docs.victoriametrics.com/victoriametrics/#track-ingested-metrics-usage.
When cache is overutilized, it will stop tracking the new metric names. It has no other negative impact.
Usually, the number of unique metric names is very limited (thousands). The cache can be overutilized only if metric names
are changing too frequently or if the cache size is too low. There are following ways to mitigate cache overutilization:
- disable cache via `--storage.trackMetricNamesStats=false` flag, so metric names usage will stop tracking
- increase the cache size via `--storage.cacheSizeMetricNamesStats` flag
- reset the cache (see docs for details)"
- alert: IndexDBRecordsDrop
expr: increase(vm_indexdb_items_dropped_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "IndexDB skipped registering items during data ingestion with reason={{ $labels.reason }}."
description: |
VictoriaMetrics could skip registering new timeseries during ingestion if they fail the validation process.
For example, `reason=too_long_item` means that time series cannot exceed 64KB. Please, reduce the number
of labels or label values for such series. Or enforce these limits via `-maxLabelsPerTimeseries` and
`-maxLabelValueLen` command-line flags.
- alert: TooManyTSIDMisses
expr: increase(vm_missing_tsids_for_metric_id_total[5m]) > 0
for: 15m
labels:
severity: critical
annotations:
summary: "Unexpected TSID misses for job \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes"
description: |
Unexpected TSID misses for \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes.
If this happens after unclean shutdown of VictoriaMetrics process (via \"kill -9\", OOM or power off),
then this is OK - the alert must go away in a few minutes after the restart.
Otherwise this may point to the corruption of index data.

View File

@@ -82,6 +82,19 @@ groups:
Check the logs for the given target. Check also the \"location\" label at the vm_log_messages_total metric if -loggerLevel command-line flag is set to value other than INFO.
This label contains code locations responsible for generating log messages suppressed by -loggerLevel.
- alert: TooManyTSIDMisses
expr: increase(vm_missing_tsids_for_metric_id_total[5m]) > 0
for: 15m
labels:
severity: critical
annotations:
summary: "Unexpected TSID misses for job \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes"
description: |
Unexpected TSID misses for \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes.
If this happens after unclean shutdown of VictoriaMetrics process (via \"kill -9\", OOM or power off),
then this is OK - the alert must go away in a few minutes after the restart.
Otherwise this may point to the corruption of index data.
- alert: ConcurrentInsertsHitTheLimit
expr: avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity
for: 15m
@@ -96,6 +109,28 @@ groups:
making write attempts. If vmagent's or vminsert's CPU usage and network saturation are at normal level, then
it might be worth adjusting `-maxConcurrentInserts` cmd-line flag.
- alert: IndexDBRecordsDrop
expr: increase(vm_indexdb_items_dropped_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "IndexDB skipped registering items during data ingestion with reason={{ $labels.reason }}."
description: |
VictoriaMetrics could skip registering new timeseries during ingestion if they fail the validation process.
For example, `reason=too_long_item` means that time series cannot exceed 64KB. Please, reduce the number
of labels or label values for such series. Or enforce these limits via `-maxLabelsPerTimeseries` and
`-maxLabelValueLen` command-line flags.
- alert: RowsRejectedOnIngestion
expr: rate(vm_rows_ignored_total[5m]) > 0
for: 15m
labels:
severity: warning
annotations:
summary: "Some rows are rejected on \"{{ $labels.instance }}\" on ingestion attempt"
description: "Ingested rows on instance \"{{ $labels.instance }}\" are rejected due to the
following reason: \"{{ $labels.reason }}\""
- alert: TooHighQueryLoad
expr: increase(vm_concurrent_select_limit_timeout_total[5m]) > 0
for: 15m
@@ -113,14 +148,3 @@ groups:
* increase compute resources or number of replicas;
* adjust limits `-search.maxConcurrentRequests` and `-search.maxQueueDuration`.
See more at https://docs.victoriametrics.com/victoriametrics/troubleshooting/#slow-queries.
- alert: RowsRejectedOnIngestion
expr: rate(vm_rows_ignored_total[5m]) > 0
for: 15m
labels:
severity: warning
annotations:
summary: "Some rows are rejected on \"{{ $labels.instance }}\" on ingestion attempt"
description: "Ingested rows on instance \"{{ $labels.instance }}\" are rejected due to the
following reason: \"{{ $labels.reason }}\""

View File

@@ -148,45 +148,4 @@ groups:
description: "Metadata cache stores meta information about ingested time series - see https://docs.victoriametrics.com/victoriametrics/#metrics-metadata.
When cache is overutilized, the oldest entries will be dropped out automatically. It may result into incomplete
response for /api/v1/metadata API calls. It doesn't impact regular queries or alerts. Cache size is controlled
via -storage.maxMetadataStorageSize cmd-line flag."
- alert: MetricNameStatsCacheUtilizationIsTooHigh
expr: |
vm_cache_size_bytes{type="storage/metricNamesStatsTracker"} / vm_cache_size_max_bytes{type="storage/metricNamesStatsTracker"} > 0.95
for: 15m
labels:
severity: warning
annotations:
summary: "Cache capacity for tracking metric names usage on {{ $labels.instance }} (job={{ $labels.job }}) is utilized for more than 95% during the last 15min"
description: "Metric names usage cache stores information about unique metric names and how frequently they are queried - see https://docs.victoriametrics.com/victoriametrics/#track-ingested-metrics-usage.
When cache is overutilized, it will stop tracking the new metric names. It has no other negative impact.
Usually, the number of unique metric names is very limited (thousands). The cache can be overutilized only if metric names
are changing too frequently or if the cache size is too low. There are following ways to mitigate cache overutilization:
- disable cache via `--storage.trackMetricNamesStats=false` flag, so metric names usage will stop tracking
- increase the cache size via `--storage.cacheSizeMetricNamesStats` flag
- reset the cache (see docs for details)"
- alert: IndexDBRecordsDrop
expr: increase(vm_indexdb_items_dropped_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "IndexDB skipped registering items during data ingestion with reason={{ $labels.reason }}."
description: |
VictoriaMetrics could skip registering new timeseries during ingestion if they fail the validation process.
For example, `reason=too_long_item` means that time series cannot exceed 64KB. Please, reduce the number
of labels or label values for such series. Or enforce these limits via `-maxLabelsPerTimeseries` and
`-maxLabelValueLen` command-line flags.
- alert: TooManyTSIDMisses
expr: increase(vm_missing_tsids_for_metric_id_total[5m]) > 0
for: 15m
labels:
severity: critical
annotations:
summary: "Unexpected TSID misses for job \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes"
description: |
Unexpected TSID misses for \"{{ $labels.job }}\" ({{ $labels.instance }}) for the last 15 minutes.
If this happens after unclean shutdown of VictoriaMetrics process (via \"kill -9\", OOM or power off),
then this is OK - the alert must go away in a few minutes after the restart.
Otherwise this may point to the corruption of index data.
via -storage.maxMetadataStorageSize cmd-line flag."

View File

@@ -48,15 +48,13 @@ Please see example graph illustrating this logic below:
## What data does vmanomaly operate on?
> [!NOTE]
> `vmanomaly` operates on timeseries (metrics) data, and supports both **VictoriaMetrics** and **VictoriaLogs/VictoriaTraces** as data sources to get metrics-compatible data. Choose the source depending on the use case. Single-node / Cluster and OpenSource / Enterprise datasources are supported as well, `vmanomaly` is compatible with both, yet itself requires an [Enterprise license](https://victoriametrics.com/products/enterprise/) to run.
`vmanomaly` operates on timeseries (metrics) data, and supports both **VictoriaMetrics** and **VictoriaLogs** as data sources. Choose the source depending on the use case.
**VictoriaMetrics (metrics):** use full [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/) for selection, sampling, and processing; [global filters](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#prometheus-querying-api-enhancements) are also supported. See the [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) for the details.
**VictoriaLogs (logs → metrics):** {{% available_from "v1.26.0" anomaly %}} use [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) via the [`VLogsReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vlogs-reader) to create log-derived or traces-derived metrics for anomaly detection (e.g., error rates, request latencies, error spans count).
**VictoriaLogs (logs → metrics):** {{% available_from "v1.26.0" anomaly %}} use [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) via the [`VLogsReader`](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vlogs-reader) to create log-derived metrics for anomaly detection (e.g., error rates, request latencies).
> [!NOTE]
> Please note that only LogsQL queries with [stats pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) functions [subset](https://docs.victoriametrics.com/anomaly-detection/components/reader/#valid-stats-functions) are supported, as they produce **numeric** time series.
> Please note that only LogsQL queries with [stats pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) functions [subset](https://docs.victoriametrics.com/anomaly-detection/components/reader/#valid-stats-functions) are supported, as they produce **numeric** time series.
## Using offsets

View File

@@ -9,17 +9,14 @@ sitemap:
In today's fast-paced and complex landscape of system monitoring, [VictoriaMetrics Anomaly Detection](https://victoriametrics.com/products/enterprise/anomaly-detection/) (`vmanomaly`), a part of our [Enterprise offering](https://victoriametrics.com/products/enterprise/), serves as an **observability layer** for SREs and DevOps teams atop of collected data to **automate the detection of anomalies in time-series data**, reducing manual efforts required to identify abnormal system behavior.
Unlike traditional threshold-based alerting, which relies on **raw metric values** and requires constant tuning and maintenance of thresholds and alerting rules, `vmanomaly` introduces a **unified, interpretable [anomaly score](https://docs.victoriametrics.com/anomaly-detection/faq/#what-is-anomaly-score)** - a **de-trended, de-seasonalized metric** generated through machine learning. This approach eliminates the need for frequent manual adjustments by enabling **stable, long-term static thresholds (as simple as `anomaly_score > 1`)** that remain effective over time through continuous model retraining and updates.
Unlike traditional threshold-based alerting, which relies on **raw metric values** and requires constant tuning and maintenance of thresholds and alerting rules, `vmanomaly` introduces a **unified, interpretable [anomaly score](https://docs.victoriametrics.com/anomaly-detection/faq/#what-is-anomaly-score)** - a **de-trended, de-seasonalized metric** generated through machine learning. This approach eliminates the need for frequent manual adjustments by enabling **stable, long-term static thresholds (as simple as `anomaly_score > 1`)** that remain effective over time through continuous model retraining.
By shifting to anomaly-based detection, teams can **identify and respond to potential issues faster**, enhancing system reliability and operational efficiency while significantly **reducing the engineering effort spent on handcrafting and maintaining alerting rules**.
## What does it do?
`vmanomaly` is designed to **periodically analyze new data points** across selected metrics - either requested from [VictoriaMetrics TSDB](https://docs.victoriametrics.com/victoriametrics/) or produced by [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) or [VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) metrics [endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) - to generate a **unified metric** called [anomaly score](https://docs.victoriametrics.com/anomaly-detection/faq/#what-is-anomaly-score).
> [!NOTE]
> `vmanomaly` can use both single-node and cluster versions of VictoriaMetrics/VictoriaLogs/VictoriaTraces as a data source, and is compatible with both OpenSource and Enterprise versions of it. However, `vmanomaly` itself requires an Enterprise license to run, and is part of our [Enterprise offering](https://victoriametrics.com/products/enterprise/).
`vmanomaly` is designed to **periodically analyze new data points** across selected metrics (either requested from [VictoriaMetrics TSDB](https://docs.victoriametrics.com/victoriametrics/) or produced by [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) metrics [endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats)), generating a **unified metric** called [anomaly score](https://docs.victoriametrics.com/anomaly-detection/faq/#what-is-anomaly-score).
Key functions:
- **Automated anomaly detection** - continuously scans time-series data to identify deviations from expected behavior.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 357 KiB

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 467 KiB

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 188 KiB

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 234 KiB

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 282 KiB

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.0 MiB

After

Width:  |  Height:  |  Size: 181 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 929 KiB

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 563 KiB

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 871 KiB

After

Width:  |  Height:  |  Size: 331 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 310 KiB

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 944 KiB

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 2.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 303 KiB

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 681 KiB

After

Width:  |  Height:  |  Size: 140 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 805 KiB

After

Width:  |  Height:  |  Size: 109 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 111 KiB

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.2 MiB

After

Width:  |  Height:  |  Size: 160 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 189 KiB

After

Width:  |  Height:  |  Size: 236 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 206 KiB

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 206 KiB

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 206 KiB

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 428 KiB

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 9.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 740 KiB

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 225 KiB

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 189 KiB

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 88 KiB

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 160 KiB

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 93 KiB

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 425 KiB

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 514 KiB

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 193 KiB

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 64 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

@@ -26,31 +26,21 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: all VictoriaMetrics components: add support for reading cpu/memory limits configured via [systemd slices](https://www.freedesktop.org/software/systemd/man/latest/systemd.slice.html). Previously, only limits set directly on the process's own cgroup were detected. See [#10635](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10635). Thanks to @andriibeee for the contribution.
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): improve error handling at opentsdb migration. See [#10797](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10797)
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): now `Run query` link on the Alerting Rules page correctly propagates the alerts interval and evaluation time. See [#10366](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10366).
* FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules): add new `MetricNameStatsCacheUtilizationIsTooHigh` alerting rule to track overutilization of [Metric names usage stats tracker](https://docs.victoriametrics.com/victoriametrics/#track-ingested-metrics-usage) (used in [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer)). See [#10840](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10840).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add `vm_streamaggr_counter_resets_total` metric for `total*`, `increase*` and `rate*` outputs that is useful for aggregation behaviour tracking. These metrics help to identify issues described in [Troubleshooting: counter resets](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#counter-resets).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix increased memory usage after upgrade to [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0) by properly accounting for internal buffer count when calculating per-storage buffer size. See [#10725](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10725#issuecomment-4282256709).
* BUGFIX: all VictoriaMetrics components: properly parse IPv6 source address when accepting connections with proxy protocol v2 enabled. See [#10839](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10839). Thanks to @andriibeee for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): `-maxScrapeSize` is now correctly applied when reading response bodies, including non-OK scrape error responses. See [#10804](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10804).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix `ec2_sd_configs` returning 401 `AuthFailure` from AWS when credentials are obtained via IRSA, instance role or `AWS_CONTAINER_CREDENTIALS_*` env vars. The regression was introduced in [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0). See [#10815](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10815).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix leak of backend TCP connections, file descriptors and goroutines when the client cancels the request after the backend response has been received. See [#10833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10833). Thanks to @andriibeee for the contribution.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): fix a rare panic during config reload when a backend is marked as broken. See [#10806](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10806).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): stop logging warnings about failed handshakes when the `clusternative` port receives TCP healthchecks from load balancers. See [#10786](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786). Thanks to @andriibeee for the contribution.
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): fix an issue where vmrestore could hang indefinitely when interrupted during backup download. See [#10794](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10794).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): properly execute graceful shutdown for vmsingle if `-maxIngestionRate` is configured. See [#10795](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10795).
* BUGFIX: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): fix time display on Alerting Rules page to use selected timezone. See [#10827](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10827).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): delete labels from rule results if they are specified with an empty string value in rule or group labels. See [#10766](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10766).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix incorrect evaluation of binary operations caused by an ordering bug (e.g. `10 - (3 + 3 + 4)` being evaluated as `10 - 3 + 3 + 4`). The issue was introduced in v1.140.0, v1.136.4, and v1.122.19. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
## [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0)
Released at 2026-04-10
**Update Note 1:** [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): [CSV export](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-export-csv-data) (`/api/v1/export/csv`) now adds a header row as the first line of the response, so existing CSV-processing scripts may need to skip this header. See [#10666](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10666).
**Update Note 2:** [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): Due to an ordering bug in binary operations, some queries may produce incorrect results. For example, `10 - (3 + 3 + 4)` is evaluated as `10 - 3 + 3 + 4`. The issue was introduced in versions v1.140.0, v1.136.4, v1.122.19, and is addressed in upcoming releases. It is strongly recommended to avoid these versions entirely. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
* SECURITY: upgrade Go builder from Go1.26.1 to Go1.26.2. See [the list of issues addressed in Go1.26.2](https://github.com/golang/go/issues?q=milestone%3AGo1.26.2%20label%3ACherryPickApproved).
@@ -155,20 +145,8 @@ It enables back `Discovered targets` debug UI by default.
* BUGFIX: `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly apply `extra_filters[]` filter when querying `vm_account_id` or `vm_project_id` labels via [multitenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy) request for `/api/v1/label/…/values` API. Before, `extra_filters` was ignored. See [#10503](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10503).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): revert the use of rollup result cache for [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) that contain [`rate`](https://docs.victoriametrics.com/MetricsQL.html#rate) function with a lookbehind window larger than `-search.minWindowForInstantRollupOptimization`. The cache usage was removed since [v1.132.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.132.0). See [#10098](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10098#issuecomment-3895011084) for more details.
## [v1.136.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.136.5)
Released at 2026-04-23
**v1.136.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
All these fixes are also included in [the latest community release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest).
The v1.136.x line will be supported for at least 12 months since [v1.136.0](https://docs.victoriametrics.com/victoriametrics/changelog/#v11360) release**
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix incorrect evaluation of binary operations caused by an ordering bug (e.g. `10 - (3 + 3 + 4)` being evaluated as `10 - 3 + 3 + 4`). The issue was introduced in v1.140.0, v1.136.4, and v1.122.19. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
## [v1.136.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.136.4)
**Update Note 1:** [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): Due to an ordering bug in binary operations, some queries may produce incorrect results. For example, `10 - (3 + 3 + 4)` is evaluated as `10 - 3 + 3 + 4`. The issue was introduced in versions v1.140.0, v1.136.4, v1.122.19, and is addressed in upcoming releases. It is strongly recommended to avoid these versions entirely. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
Released at 2026-04-10
**v1.136.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
@@ -387,20 +365,8 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/changelog_2025/#v11230)
## [v1.122.20](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.122.20)
Released at 2026-04-23
**v1.122.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
All these fixes are also included in [the latest community release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest).
The v1.122.x line will be supported for at least 12 months since [v1.122.0](https://docs.victoriametrics.com/victoriametrics/changelog/#v11220) release**
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix incorrect evaluation of binary operations caused by an ordering bug (e.g. `10 - (3 + 3 + 4)` being evaluated as `10 - 3 + 3 + 4`). The issue was introduced in v1.140.0, v1.136.4, and v1.122.19. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
## [v1.122.19](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.122.19)
**Update Note 1:** [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): Due to an ordering bug in binary operations, some queries may produce incorrect results. For example, `10 - (3 + 3 + 4)` is evaluated as `10 - 3 + 3 + 4`. The issue was introduced in versions v1.140.0, v1.136.4, v1.122.19, and is addressed in upcoming releases. It is strongly recommended to avoid these versions entirely. See [#10856](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10856).
Released at 2026-04-10
**v1.122.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
@@ -1550,4 +1516,4 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
## Previous releases
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
See [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).

View File

@@ -571,7 +571,6 @@ Below is an example of an `aggr.yaml` configuration that drops the `replica` and
# Troubleshooting
- [Unexpected spikes for `total` or `increase` outputs](#staleness).
- [Excessively large values for `total*`, `increase*`, and `rate*` outputs](#counter-resets).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#staleness).
- [High memory usage and CPU usage](#high-resource-usage).
- [Unexpected results in vmagent cluster mode](#cluster-mode).
@@ -602,10 +601,6 @@ the following settings:
- `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#stream-aggregation-config).
It allows enabling aggregation windows for a specific aggregator.
## Counter resets
If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, produce values that are significantly higher than anticipated, then check the `vm_streamaggr_counter_resets_total` metric. This metric increments each time when [counter reset event](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) is happening and could be caused by duplication or collision of raw samples. If you observe duplication or collision - try solving this problem by either fixing the source of these metrics or by [deduplicating](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication) these samples before aggregation.
## Staleness
The following outputs track the last seen per-series values in order to properly calculate output values:

View File

@@ -284,12 +284,9 @@ expr: <string>
# Available starting from https://docs.victoriametrics.com/victoriametrics/changelog/#v1860
[ update_entries_limit: <integer> | default 0 ]
# Labels to add or overwrite labels from other external label sources, such as group labels, for each alert.
# Labels to add or overwrite for each alert.
# Labels are merged with labels received from `expr` evaluation and uniquely identify each generated alert.
#
# In case of conflicts, original labels are kept with prefix `exported_`.
# As a special case, specifying a label with an empty string value removes the label from the result if it exists
# in the original query result; otherwise, it is ignored.
#
# Labels only support limited templating variables in https://docs.victoriametrics.com/victoriametrics/vmalert/#templating,
# including `$labels`, `$value` and `$expr`, to avoid breaking alert states or causing cardinality issue with results.
@@ -419,11 +416,8 @@ record: <string>
# must contain valid Graphite expression.
expr: <string>
# Labels to add or overwrite labels from other external label sources, such as group labels, before storing the result.
#
# Labels to add or overwrite before storing the result.
# In case of conflicts, original labels are kept with prefix `exported_`.
# As a special case, specifying a label with an empty string value removes the label from the result if it exists
# in the original query result; otherwise, it is ignored.
#
# Labels do not support templating in https://docs.victoriametrics.com/victoriametrics/vmalert/#templating due to cardinality concerns. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8171.
labels:

View File

@@ -1523,11 +1523,7 @@ It is recommended to protect the following endpoints with authKeys:
* `/metrics` with `-metricsAuthKey` command-line flag, so unauthorized users couldn't access [vmauth metrics](https://docs.victoriametrics.com/victoriametrics/vmauth/#monitoring).
* `/debug/pprof` with `-pprofAuthKey` command-line flag, so unauthorized users couldn't access [profiling information](#profiling).
As an alternative, you can serve internal API routes on a different listen address using the command-line flag `-httpInternalListenAddr=127.0.0.1:8426`{{% available_from "v1.111.0" %}}.
To enable TLS on the public listener while keeping the internal listener non-TLS, configure multiple listeners like this:
```
/path/to/vmauth -httpInternalListenAddr=,localhost:8426 -httpListenAddr=0.0.0.0:443, -tls=true,false -tlsCertFile=a-cert.crt -tlsKeyFile=a-key.key
```
As an alternative, you can serve internal API routes on a different listen address using the command-line flag `-httpInternalListenAddr=127.0.0.1:8426`. {{% available_from "v1.111.0" %}}
`vmauth` also supports restricting access by IP - see [these docs](#ip-filters). See also [concurrency limiting docs](#concurrency-limiting).

2
go.mod
View File

@@ -11,7 +11,7 @@ require (
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.43.2
github.com/VictoriaMetrics/metricsql v0.87.0
github.com/VictoriaMetrics/metricsql v0.86.1
github.com/aws/aws-sdk-go-v2 v1.41.5
github.com/aws/aws-sdk-go-v2/config v1.32.14
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.13

8
go.sum
View File

@@ -58,10 +58,14 @@ github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=
github.com/VictoriaMetrics/fastcache v1.13.3/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/VictoriaMetrics/metrics v1.43.1 h1:j3Ba4l2K1q3pkvzPqt6aSiQ2DBlAEj3VPVeBtpR3t/Y=
github.com/VictoriaMetrics/metrics v1.43.1/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metrics v1.43.2 h1:+8pIQEGwchKS5CYFyvv3LKvNXGi7baZ9hmIV4RHqibY=
github.com/VictoriaMetrics/metrics v1.43.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metricsql v0.87.0 h1:Koxh3GkB/Z0f3O0bEChVFxiE4YZoxYyn5TzmGJfSfaw=
github.com/VictoriaMetrics/metricsql v0.87.0/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VictoriaMetrics/metricsql v0.86.0 h1:IFD08amp+nkW6I+pB3+iyamewkIrbEojkQP4cmEbwkU=
github.com/VictoriaMetrics/metricsql v0.86.0/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VictoriaMetrics/metricsql v0.86.1 h1:GuNqbbIaWZ9eNa6dOCi6itG/fJ96TGOFV3KWLnAyC2o=
github.com/VictoriaMetrics/metricsql v0.86.1/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=

View File

@@ -3,7 +3,6 @@ package cgroup
import (
"fmt"
"os"
"path"
"runtime"
"strconv"
"strings"
@@ -101,31 +100,17 @@ func getOnlineCPUCount() float64 {
return n
}
// See https://www.freedesktop.org/software/systemd/man/latest/systemd.slice.html
func getCPUQuotaV2(sysfsPrefix, cgroupPath string) (float64, error) {
subPath, err := readCgroupV2SubPath(cgroupPath)
func getCPUQuotaV2(sysPrefix, cgroupPath string) (float64, error) {
data, err := getFileContents("cpu.max", sysPrefix, cgroupPath, "")
if err != nil {
subPath = "/"
return 0, err
}
var minQuota float64 = -1
for {
// travers sub path hierarchy and use a minimal value for stat
data, err := os.ReadFile(path.Join(sysfsPrefix, subPath, "cpu.max"))
if err == nil {
quota, err := parseCPUMax(strings.TrimSpace(string(data)))
if err != nil {
return 0, fmt.Errorf("cannot parse cpu.max at %s: %w", subPath, err)
}
if quota > 0 && (minQuota < 0 || quota < minQuota) {
minQuota = quota
}
}
if subPath == "/" || subPath == "." {
break
}
subPath = path.Dir(subPath)
data = strings.TrimSpace(data)
n, err := parseCPUMax(data)
if err != nil {
return 0, fmt.Errorf("cannot parse cpu.max file contents: %w", err)
}
return minQuota, nil
return n, nil
}
// See https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#cpu

View File

@@ -37,7 +37,4 @@ func TestGetCPUQuotaV2(t *testing.T) {
f("testdata/cgroup", "testdata/self/cgroupv2", 2)
f("testdata/cgroup/cpu_unset", "", -1)
f("testdata/cgroup/cpu_onlymax", "", 2)
// systemd slice
f("testdata/v2slice", "testdata/self/cgroupv2_slice", 2)
}

View File

@@ -1,12 +1,9 @@
package cgroup
import (
"fmt"
"os"
"path"
"runtime/debug"
"strconv"
"strings"
)
// GetGOGC returns GOGC value for the currently running process.
@@ -45,44 +42,15 @@ func GetMemoryLimit() int64 {
return n
}
n, err = getMemStatV2("memory.max")
if err != nil || n <= 0 {
if err != nil {
return 0
}
return n
}
func getMemStatV2(statName string) (int64, error) {
// See https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory-interface-files
return getMemLimitV2("/sys/fs/cgroup", "/proc/self/cgroup", statName)
}
func getMemLimitV2(sysfsPrefix, cgroupPath, statName string) (int64, error) {
subPath, err := readCgroupV2SubPath(cgroupPath)
if err != nil {
subPath = "/"
}
var minLimit int64 = -1
for {
// travers sub path hierarchy and use a minimal value for stat
data, err := os.ReadFile(path.Join(sysfsPrefix, subPath, statName))
if err == nil {
s := strings.TrimSpace(string(data))
if s != "max" {
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse %s at %s: %w", statName, subPath, err)
}
if n > 0 && (minLimit < 0 || n < minLimit) {
minLimit = n
}
}
}
if subPath == "/" || subPath == "." {
break
}
subPath = path.Dir(subPath)
}
return minLimit, nil
// See https: //www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory-interface-files
return getStatGeneric(statName, "/sys/fs/cgroup", "/proc/self/cgroup", "")
}
func getMemStat(statName string) (int64, error) {

View File

@@ -19,22 +19,6 @@ func TestGetHierarchicalMemoryLimitSuccess(t *testing.T) {
f("testdata/cgroup", "testdata/self/cgroup", 120)
}
func TestGetMemLimitV2(t *testing.T) {
f := func(sysPrefix, cgroupPath string, want int64) {
t.Helper()
got, err := getMemLimitV2(sysPrefix, cgroupPath, "memory.max")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if got != want {
t.Fatalf("unexpected result, got: %d, want %d", got, want)
}
}
f("testdata/cgroup", "testdata/self/cgroupv2", 523372036854771712)
// systemd slice
f("testdata/v2slice", "testdata/self/cgroupv2_slice", 1073741824)
}
func TestGetHierarchicalMemoryLimitFailure(t *testing.T) {
f := func(sysPath, cgroupPath string) {
t.Helper()

View File

@@ -1 +0,0 @@
0::/vm.slice/vmagent.service

View File

@@ -1 +0,0 @@
max 100000

View File

@@ -1 +0,0 @@
max

View File

@@ -1 +0,0 @@
200000 100000

View File

@@ -1 +0,0 @@
1073741824

View File

@@ -1 +0,0 @@
max 100000

View File

@@ -1 +0,0 @@
max

View File

@@ -43,18 +43,6 @@ func getFileContents(statName, sysfsPrefix, cgroupPath, cgroupGrepLine string) (
return string(data), nil
}
// readCgroupV2SubPath reads cgroupv2 sub-path
// for example 0::/user.slice/user-1000.slice/session-5.scope
// See https://www.freedesktop.org/software/systemd/man/latest/systemd.slice.html
// and https://docs.oracle.com/en/operating-systems/oracle-linux/9/systemd/SystemdMngCgroupsV2.html#SystemdScopes
func readCgroupV2SubPath(cgroupPath string) (string, error) {
data, err := os.ReadFile(cgroupPath)
if err != nil {
return "", err
}
return grepFirstMatch(string(data), "", 2, ":")
}
// grepFirstMatch searches match line at data and returns item from it by index with given delimiter.
func grepFirstMatch(data string, match string, index int, delimiter string) (string, error) {
lines := strings.Split(string(data), "\n")

View File

@@ -128,10 +128,8 @@ func readProxyProto(r io.Reader) (net.Addr, error) {
if len(bb.B) < 36 {
return nil, fmt.Errorf("cannot read ipv6 address from proxy protocol block with the length %d bytes; expected at least 36 bytes", len(bb.B))
}
var ipv6Addr net.IP
ipv6Addr = append(ipv6Addr, bb.B[:16]...)
remoteAddr := &net.TCPAddr{
IP: ipv6Addr,
IP: bb.B[0:16],
Port: int(binary.BigEndian.Uint16(bb.B[32:34])),
}
return remoteAddr, nil

View File

@@ -107,28 +107,6 @@ func TestParseProxyProtocolFail(t *testing.T) {
0, 80, 0, 0})
}
func TestParseProxyProtocolIPv6DoesNotAliasPool(t *testing.T) {
header := func(last byte) *bytes.Buffer {
return bytes.NewBuffer([]byte{
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x24,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, last,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
0, 80, 0, 0,
})
}
got, err := readProxyProto(header(1))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if _, err := readProxyProto(header(2)); err != nil {
t.Fatalf("unexpected error: %s", err)
}
want := &net.TCPAddr{IP: net.ParseIP("::1"), Port: 80}
if !reflect.DeepEqual(got, want) {
t.Fatalf("first addr mutated by pool reuse; got %v, want %v", got, want)
}
}
func TestProxyProtocolConnReadWriteSuccessful(t *testing.T) {
server, client := net.Pipe()
defer server.Close()

View File

@@ -1,11 +1,8 @@
package streamaggr
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
@@ -95,8 +92,7 @@ type rateAggrValue struct {
isGreen bool
}
func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*rateAggrConfig)
func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
var state *rateAggrStateValue
sv, ok := av.shared[key]
if ok {
@@ -110,7 +106,6 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -174,17 +169,14 @@ func (av *rateAggrValue) state() any {
return av.shared
}
func newRateAggrConfig(ms *metrics.Set, metricLabels string, isAvg bool) aggrConfig {
cfg := rateAggrConfig{
func newRateAggrConfig(isAvg bool) aggrConfig {
return &rateAggrConfig{
isAvg: isAvg,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return &cfg
}
type rateAggrConfig struct {
isAvg bool
counterResetsTotal *metrics.Counter
isAvg bool
}
func (*rateAggrConfig) getValue(s any) aggrValue {

View File

@@ -613,8 +613,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
ac, err := newOutputConfig(output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -724,7 +723,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -770,9 +769,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -780,9 +779,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "min":
return newMinAggrConfig(), nil
case "rate_avg":
return newRateAggrConfig(ms, metricLabels, true), nil
return newRateAggrConfig(true), nil
case "rate_sum":
return newRateAggrConfig(ms, metricLabels, false), nil
return newRateAggrConfig(false), nil
case "stddev":
return newStddevAggrConfig(), nil
case "stdvar":
@@ -790,9 +789,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -1,11 +1,8 @@
package streamaggr
import (
"fmt"
"math"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
@@ -41,7 +38,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
lv.value = sample.value
@@ -78,15 +74,13 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
return &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type totalAggrConfig struct {
@@ -99,7 +93,6 @@ type totalAggrConfig struct {
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*totalAggrConfig) getValue(s any) aggrValue {

View File

@@ -1933,11 +1933,11 @@ func (be *BinaryOpExpr) appendStringNoKeepMetricNames(dst []byte) []byte {
}
func (be *BinaryOpExpr) needLeftParens() bool {
return needBinaryOpArgParens(be.Left)
return needBinaryOpArgParens(be.Left, be.Op, false)
}
func (be *BinaryOpExpr) needRightParens() bool {
if needBinaryOpArgParens(be.Right) {
if needBinaryOpArgParens(be.Right, be.Op, true) {
return true
}
switch t := be.Right.(type) {
@@ -1974,10 +1974,26 @@ func (be *BinaryOpExpr) appendModifiers(dst []byte) []byte {
return dst
}
func needBinaryOpArgParens(arg Expr) bool {
func needBinaryOpArgParens(arg Expr, parentOp string, isRight bool) bool {
switch t := arg.(type) {
case *BinaryOpExpr:
return true
// Parens are required when the child op priority not equal to parent o one.
// For example, a + b / c - d should be a + (b / c) - d.
if binaryOpPriority(t.Op) != binaryOpPriority(parentOp) {
return true
}
// Same op: parens are only needed when the sub-expression is not a simple leaf chain.
if t.Op != parentOp {
if isRight && !isRightAssociativeBinaryOp(parentOp) {
return true
}
if !isRight && isRightAssociativeBinaryOp(parentOp) {
return true
}
}
return !isBinaryOpLeafSimple(t)
case *RollupExpr:
if be, ok := t.Expr.(*BinaryOpExpr); ok && be.KeepMetricNames {
return true
@@ -1988,6 +2004,26 @@ func needBinaryOpArgParens(arg Expr) bool {
}
}
func isBinaryOpLeafSimple(arg Expr) bool {
switch t := arg.(type) {
case *NumberExpr:
return true
case *MetricExpr:
metricName := t.getMetricName()
// Parens should be added if metric name equals to a reserved word, such as group_left
// For example, a + group_left should become a + (group_left). Otherwise, query won't be parsed.
return !isReservedBinaryOpIdent(metricName)
case *BinaryOpExpr:
if t.GroupModifier.Op != "" || t.KeepMetricNames || t.JoinModifier.Op != "" {
return false
}
return isBinaryOpLeafSimple(t.Left) && isBinaryOpLeafSimple(t.Right)
default:
return false
}
}
func isReservedBinaryOpIdent(s string) bool {
return isBinaryOpGroupModifier(s) || isBinaryOpJoinModifier(s) || isBinaryOpBoolModifier(s) || isPrefixModifier(s)
}

2
vendor/modules.txt vendored
View File

@@ -145,7 +145,7 @@ github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.43.2
## explicit; go 1.24.0
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.87.0
# github.com/VictoriaMetrics/metricsql v0.86.1
## explicit; go 1.24.2
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop