Compare commits

..

6 Commits

Author SHA1 Message Date
Zhu Jiekun
b217fa9aa9 Apply suggestions from code review
Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
Signed-off-by: Zhu Jiekun <jiekun@victoriametrics.com>
2026-06-10 17:29:04 +08:00
Jiekun
eb45399841 apptest: rename integration test to apptest 2026-06-10 16:34:56 +08:00
Jiekun
fff6c00f82 integration test: improve binary check and doc 2026-06-10 16:15:37 +08:00
Jiekun
1ab6e8e34d doc: remove unnecessary commands in testing guide 2026-06-10 15:50:12 +08:00
Jiekun
264ef58178 integration test: fix typo in Makefile 2026-06-10 15:40:46 +08:00
Jiekun
54a9038cfc integration test: add testing doc in contributing guide. add integration test alias 2026-06-10 15:38:38 +08:00
30 changed files with 904 additions and 1097 deletions

View File

@@ -33,8 +33,7 @@ jobs:
name: ${{ matrix.os }}-${{ matrix.arch }}
permissions:
contents: read
# Runs on dedicated runner with extra resources to increase build speed.
runs-on: 'vm-runner'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:

View File

@@ -30,8 +30,7 @@ jobs:
name: lint
permissions:
contents: read
# Runs on dedicated runner with extra resources since golangci-lint requires extra memory
runs-on: 'vm-runner'
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -65,8 +64,7 @@ jobs:
name: unit
permissions:
contents: read
# Runs on dedicated runner with extra resources to increase tests speed.
runs-on: 'vm-runner'
runs-on: ubuntu-latest
strategy:
matrix:
@@ -97,7 +95,6 @@ jobs:
name: apptest
permissions:
contents: read
# Runs on dedicated runner to isolate app tests from other tests.
runs-on: apptest
steps:

View File

@@ -3,14 +3,27 @@ linters:
settings:
errcheck:
exclude-functions:
- fmt.Fprintf
- fmt.Fprint
- (net/http.ResponseWriter).Write
exclusions:
generated: lax
presets:
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- staticcheck
text: 'SA(4003|1019|5011):'
paths:
- ^app/vmui/
- third_party$
- builtin$
- examples$
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$

View File

@@ -17,7 +17,7 @@ EXTRA_GO_BUILD_TAGS ?=
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(DATEINFO_TAG)-$(BUILDINFO_TAG)'
TAR_OWNERSHIP ?= --owner=1000 --group=1000
GOLANGCI_LINT_VERSION := 2.12.2
GOLANGCI_LINT_VERSION := 2.9.0
.PHONY: $(MAKECMDGOALS)
@@ -471,7 +471,7 @@ test-full-386:
apptest:
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
VM_APPTEST=1 go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
@@ -527,7 +527,7 @@ golangci-lint: install-golangci-lint
golangci-lint run --build-tags 'synctest'
install-golangci-lint:
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://golangci-lint.run/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
remove-golangci-lint:
rm -rf `which golangci-lint`

View File

@@ -173,9 +173,9 @@ func (r *Rule) String() string {
if r.Alert != "" {
ruleType = "alerting"
}
var b strings.Builder
fmt.Fprintf(&b, "%s rule %q", ruleType, r.Name())
fmt.Fprintf(&b, "; expr: %q", r.Expr)
b := strings.Builder{}
b.WriteString(fmt.Sprintf("%s rule %q", ruleType, r.Name()))
b.WriteString(fmt.Sprintf("; expr: %q", r.Expr))
kv := sortMap(r.Labels)
for i := range kv {

File diff suppressed because it is too large Load Diff

View File

@@ -21,16 +21,16 @@
},
"dependencies": {
"classnames": "^2.5.1",
"dayjs": "^1.11.21",
"dayjs": "^1.11.20",
"lodash.debounce": "^4.0.8",
"marked": "^18.0.5",
"preact": "^10.29.2",
"qs": "^6.15.2",
"marked": "^18.0.2",
"preact": "^10.29.1",
"qs": "^6.15.1",
"react-input-mask": "^2.0.4",
"react-router-dom": "^7.17.0",
"react-router-dom": "^7.14.1",
"uplot": "^1.6.32",
"vite": "^8.0.16",
"web-vitals": "^5.3.0"
"vite": "^8.0.8",
"web-vitals": "^5.2.0"
},
"devDependencies": {
"@eslint/eslintrc": "^3.3.5",
@@ -39,24 +39,24 @@
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/preact": "^3.2.4",
"@types/lodash.debounce": "^4.0.9",
"@types/node": "^25.9.2",
"@types/qs": "^6.15.1",
"@types/react": "^19.2.17",
"@types/node": "^25.6.0",
"@types/qs": "^6.15.0",
"@types/react": "^19.2.14",
"@types/react-input-mask": "^3.0.6",
"@types/react-router-dom": "^5.3.3",
"@typescript-eslint/eslint-plugin": "^8.61.0",
"@typescript-eslint/parser": "^8.61.0",
"@typescript-eslint/eslint-plugin": "^8.58.2",
"@typescript-eslint/parser": "^8.58.2",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-unused-imports": "^4.4.1",
"globals": "^17.6.0",
"http-proxy-middleware": "^4.1.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.15",
"sass-embedded": "^1.100.0",
"typescript": "^6.0.3",
"vitest": "^4.1.8"
"globals": "^17.5.0",
"http-proxy-middleware": "^3.0.5",
"jsdom": "^29.0.2",
"postcss": "^8.5.10",
"sass-embedded": "^1.99.0",
"typescript": "^6.0.2",
"vitest": "^4.1.4"
},
"browserslist": {
"production": [

View File

@@ -0,0 +1,40 @@
package tests
import (
"fmt"
"os"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func TestMain(m *testing.M) {
// check if necessary binaries are there.
checkBinaryRequirement("../../bin/victoria-metrics-race")
checkBinaryRequirement("../../bin/vmagent-race")
checkBinaryRequirement("../../bin/vmauth-race")
checkBinaryRequirement("../../bin/vmctl-race")
checkBinaryRequirement("../../bin/vmbackup-race")
checkBinaryRequirement("../../bin/vmrestore-race")
// check if the test is run via make command
checkIntegrationTestEnv()
// start the integration test.
os.Exit(m.Run())
}
// checkBinaryRequirement panic if required binary not exist.
func checkBinaryRequirement(path string) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
panic(fmt.Sprintf("apptest failed: %s not found. please run `make apptest` to execute apptest. check how different tests are executed: https://docs.victoriametrics.com/victoriametrics/contributing/#testing", path))
}
}
}
func checkIntegrationTestEnv() {
if os.Getenv("VM_APPTEST") == "" {
logger.Warnf("executing apptest with potential outdated binaries. it's recommended to execute via `make apptest` command. check this doc for more details: https://docs.victoriametrics.com/victoriametrics/contributing/#testing")
}
}

View File

@@ -68,8 +68,7 @@ Pull requests requirements:
1. A link to the issue(s) related to the change, if any. Use `Fixes [issue link]` if the PR resolves the issue, or `Related to [issue link]` for reference.
1. Tests proving that the change is effective. Tests are expected for non-trivial new functionality or non-trivial modifications.
Bug fixes must include tests unless a maintainer explicitly agrees otherwise.
See [this style guide](https://itnext.io/f-tests-as-a-replacement-for-table-driven-tests-in-go-8814a8b19e9e) for tests.
To run tests and code checks locally, execute commands `make test-full` and `make check-all`.
See [this style guide](https://itnext.io/f-tests-as-a-replacement-for-table-driven-tests-in-go-8814a8b19e9e) for tests. See [this section](#testing) for how to run tests.
1. Try to not extend the scope of the pull requests outside the issue, do not make unrelated changes.
1. Update [docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/docs) if needed. For example, adding a new flag or changing behavior of existing flags or features
requires reflecting these changes in the documentation. For new features add `{{%/* available_from "#" */%}}` shortcode to the documentation.
@@ -126,3 +125,17 @@ Due to `KISS`, [cluster version of VictoriaMetrics](https://docs.victoriametrics
- Automatic cluster resizing, which may cost you a lot of money if improperly configured.
- Automatic discovering and addition of new nodes in the cluster, which may mix data between dev and prod clusters :)
- Automatic leader election, which may result in split brain disaster on network errors.
## Testing
We recommend running the following sequence of checks and tests before submitting a pull request:
```sh
# run static checks
make check-all
# run unit test
make test-full
# run integration tests
make apptest
```

View File

@@ -26,9 +26,6 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -667,11 +667,11 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
}
generateScrape := func(n int) string {
var w strings.Builder
w := strings.Builder{}
for i := range n {
fmt.Fprintf(&w, "fooooo_%d 1\n", i)
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
if i%100 == 0 {
fmt.Fprintf(&w, "# HELP fooooo_%d This is a test\n", i)
w.WriteString(fmt.Sprintf("# HELP fooooo_%d This is a test\n", i))
}
}
return w.String()
@@ -1005,9 +1005,9 @@ func TestSendStaleSeries(t *testing.T) {
}
}
generateScrape := func(n int) string {
var w strings.Builder
w := strings.Builder{}
for i := range n {
fmt.Fprintf(&w, "foo_%d 1\n", i)
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
}
return w.String()
}

View File

@@ -6,9 +6,6 @@ type avgAggrValue struct {
}
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
av.count++
}

View File

@@ -4,10 +4,7 @@ type countSamplesAggrValue struct {
count uint64
}
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
av.count++
}

View File

@@ -9,10 +9,7 @@ type countSeriesAggrValue struct {
samples map[uint64]struct{}
}
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
if sample.stateOnly {
return
}
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
// Count unique hashes over the keys instead of unique key values.
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))

View File

@@ -45,7 +45,6 @@ type dedupAggrShardNopad struct {
type dedupAggrSample struct {
value float64
timestamp int64
stateOnly bool
}
func newDedupAggr() *dedupAggr {
@@ -190,7 +189,6 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
s.stateOnly = sample.stateOnly
key := bytesutil.InternString(sample.key)
state.m[key] = s
@@ -199,33 +197,28 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
var newWins bool
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
if newWins {
s.stateOnly = sample.stateOnly
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
}
state.samplesBuf = samplesBuf
}
// deduplicateSamples returns deduplicated timestamp and value results,
// along with a boolean indicating whether the new sample won.
// deduplicateSamples returns deduplicated timestamp and value results.
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
if newT > oldT {
return newT, newV, true
return newT, newV
}
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
if newT == oldT {
if decimal.IsStaleNaN(oldV) {
return newT, newV, true
return newT, newV
}
if newV > oldV {
return newT, newV, true
return newT, newV
}
}
return oldT, oldV, false
return oldT, oldV
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
@@ -257,7 +250,6 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
key: key,
value: s.value,
timestamp: s.timestamp,
stateOnly: s.stateOnly,
})
// Limit the number of samples per each flush in order to limit memory usage.

View File

@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
}
da.pushSamples(samples, 0, false)
if n := da.sizeBytes(); n > 6_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
func TestDeduplicateSamples(t *testing.T) {
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
t.Helper()
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
if dedupT != expectedT || dedupV != expectedV {
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)

View File

@@ -231,7 +231,6 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
}
deadlineTime = deadlineTime.Add(d.interval)
for time.Now().After(deadlineTime) {
deadlineTime = deadlineTime.Add(d.interval)
}

View File

@@ -11,9 +11,6 @@ type histogramBucketAggrValue struct {
}
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.h.Update(sample.value)
}

View File

@@ -1,109 +0,0 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrValueShared struct {
lastValues map[string]increaseLastValue
}
type increaseAggrValue struct {
total float64
shared *increaseAggrValueShared
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared.lastValues[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
suffix := ac.getSuffix()
total := av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
}
}
ctx.appendSeries(key, suffix, total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared *increaseAggrValueShared
if s == nil {
shared = &increaseAggrValueShared{
lastValues: make(map[string]increaseLastValue),
}
} else {
shared = s.(*increaseAggrValueShared)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -6,9 +6,6 @@ type lastAggrValue struct {
}
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.timestamp >= av.timestamp {
av.last = sample.value
av.timestamp = sample.timestamp

View File

@@ -6,9 +6,6 @@ type maxAggrValue struct {
}
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value > av.max || !av.defined {
av.max = sample.value
}

View File

@@ -6,9 +6,6 @@ type minAggrValue struct {
}
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value < av.min || !av.defined {
av.min = sample.value
}

View File

@@ -13,9 +13,6 @@ type quantilesAggrValue struct {
}
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if av.h == nil {
av.h = histogram.GetFast()
}

View File

@@ -34,7 +34,6 @@ var rateAggrStateValuePool sync.Pool
func putRateAggrStateValue(v *rateAggrStateValue) {
v.timestamp = 0
v.lastTimestamp = 0
v.increase = 0
rateAggrStateValuePool.Put(v)
}
@@ -89,10 +88,6 @@ type rateAggrStateValue struct {
// increase stores cumulative increase for the current time series on the current aggregation interval
increase float64
timestamp int64
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
// It is used for out-of-order detection, while timestamp (above) is only updated by
// non-state-only samples and is used for rate calculation.
lastTimestamp int64
}
type rateAggrValue struct {
@@ -106,20 +101,16 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
sv, ok := av.shared[key]
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.lastTimestamp {
if sample.timestamp < state.timestamp {
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
sv.prevTimestamp = sample.timestamp
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -130,10 +121,7 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
}
sv.value = sample.value
sv.deleteDeadline = deleteDeadline
state.lastTimestamp = sample.timestamp
if !sample.stateOnly {
state.timestamp = sample.timestamp
}
state.timestamp = sample.timestamp
}
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {

View File

@@ -12,9 +12,6 @@ type stdAggrValue struct {
}
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
avg := av.avg + (sample.value-av.avg)/av.count
av.q += (sample.value - av.avg) * (sample.value - avg)

View File

@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
case "increase_prometheus":
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:
@@ -845,7 +845,6 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
} else {
a.flush(pf, flushTime, cs, false)
}
flushTime = flushTime.Add(a.interval)
for time.Now().After(flushTime) {
flushTime = flushTime.Add(a.interval)
}
@@ -1006,28 +1005,26 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
a.ignoredNaNSamples.Inc()
continue
}
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
if stateOnly {
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
// Skip old samples outside the current aggregation interval
a.ignoredOldSamples.Inc()
} else {
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
continue
}
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
} else {
ctx.blue = append(ctx.blue, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
}
}
@@ -1101,10 +1098,6 @@ type pushSample struct {
key string
value float64
timestamp int64
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
// (total, rate, increase) but do not contribute to the aggregation output.
stateOnly bool
}
func getPushCtx() *pushCtx {

View File

@@ -1,3 +1,5 @@
//go:build synctest
package streamaggr
import (
@@ -483,8 +485,10 @@ foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
@@ -688,33 +692,23 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg, when two aggregation intervals are empty.
// abc=777 arrives slightly before the start of each interval (-10ms) but still
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
// test rate_sum and rate_avg, when two aggregation intervals are empty
f([]string{`
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
foo{abc="456", cde="1"} 7
foo{abc="456", cde="1"} 8 1
foo{abc="777", cde="1"} 8
foo{abc="777", cde="1"} 9 1
foo{abc="123", cde="1"} 2
foo{abc="456", cde="1"} 8
foo{abc="777", cde="1"} 9 -10
`, ``, ``, `
foo{abc="123", cde="1"} 19
foo{abc="123", cde="1"} 20 1
foo{abc="123", cde="1"} 20
foo{abc="456", cde="1"} 26
foo{abc="456", cde="1"} 27 1
foo{abc="777", cde="1"} 27
foo{abc="777", cde="1"} 28 1
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_by_cde_rate_sum{cde="1"} 3
foo{abc="777", cde="1"} 27 -10
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 0.1
foo:1m_by_cde_rate_sum{cde="1"} 0.2
`, `
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
enable_windows: true
`, "111111111111")
`, "111111")
// rate_sum and rate_avg with duplicated events
f([]string{`
@@ -809,55 +803,4 @@ foo:1m_sum_samples{baz="qwe"} 10
dedup_interval: 30s
outputs: [sum_samples]
`, "11111111")
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
// update the state reference without contributing to the interval total, so the subsequent
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_total 100
foo:1m_total 200
`, `
- interval: 1m
outputs: [total]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// increase with ignore_old_samples: same correctness check for increase output.
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_increase 100
foo:1m_increase 100
`, `
- interval: 1m
outputs: [increase]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
// and the winning stateOnly sample's timestamp is used as the denominator start.
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
f([]string{`
foo 100
`, `
foo 150 -30
foo 120 -40
foo 200
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
`, `
- interval: 1m
outputs: [rate_sum]
ignore_old_samples: true
`, "1111")
}

View File

@@ -5,9 +5,6 @@ type sumSamplesAggrValue struct {
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
}

View File

@@ -36,14 +36,12 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
lv.value = sample.value
@@ -56,6 +54,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
@@ -64,7 +63,9 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
delete(lvs, lk)
}
}
if math.Abs(total) >= (1 << 53) {
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
@@ -77,10 +78,11 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -88,6 +90,8 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -113,6 +117,12 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}

View File

@@ -5,9 +5,6 @@ type uniqueSamplesAggrValue struct {
}
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if _, ok := av.samples[sample.value]; !ok {
av.samples[sample.value] = struct{}{}
}