Compare commits

..

5 Commits

Author SHA1 Message Date
Andrii Chubatiuk
90029442cb lib/streamaggr: use stale samples for increase, total and rate outputs state update
Before, with ignore_old_samples or enable_windows set, old samples were just dropped.
With this change, these samples are used to update per-series state in stateful aggregation outputs: rate, total, and increase, but do not contribute to the aggregated output.
This ensures the next in-interval sample computes the correct per-interval increase relative to the most recent pre-interval value, rather than a stale one from a previous flush cycle.
2026-06-10 15:00:20 +03:00
Aliaksandr Valialkin
04993f2187 Makefile: update golangci-lint from v2.9.0 to v2.12.2
See https://github.com/golangci/golangci-lint/releases/tag/v2.12.2

While at it, actualize .golangci.yml .
2026-06-10 12:13:01 +02:00
Yury Moladau
73a40a4178 app/vmui: update npm dependencies (#11084)
### Describe Your Changes

Updates npm dependencies and refreshes `package-lock.json`.

Signed-off-by: Yury Molodov <yurymolodov@gmail.com>
2026-06-10 11:59:05 +02:00
Andrii Chubatiuk
66f8ec81f3 lib/streamaggr: uncoditionally advance flush timer (#10808)
Due to possible time drift the flush time needs to be advanced unconditionally.
Otherwise, it is possible for flush time to remain the same during two consequitive
flushes and produce duplicated data points.

See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2026-06-10 11:58:49 +02:00
Roman Khavronenko
66672f216b github/ci: use dedicated runners for critical pipelines (#11075)
* builds, tests and linter require extra speed and resources
* add comment on why apttest uses dedicated pull

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2026-06-10 11:56:28 +02:00
33 changed files with 1093 additions and 1826 deletions

View File

@@ -33,7 +33,8 @@ jobs:
name: ${{ matrix.os }}-${{ matrix.arch }}
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources to increase build speed.
runs-on: 'vm-runner'
strategy:
fail-fast: false
matrix:

View File

@@ -30,7 +30,8 @@ jobs:
name: lint
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources since golangci-lint requires extra memory
runs-on: 'vm-runner'
steps:
- name: Code checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -64,7 +65,8 @@ jobs:
name: unit
permissions:
contents: read
runs-on: ubuntu-latest
# Runs on dedicated runner with extra resources to increase tests speed.
runs-on: 'vm-runner'
strategy:
matrix:
@@ -95,6 +97,7 @@ jobs:
name: apptest
permissions:
contents: read
# Runs on dedicated runner to isolate app tests from other tests.
runs-on: apptest
steps:

View File

@@ -3,27 +3,14 @@ linters:
settings:
errcheck:
exclude-functions:
- fmt.Fprintf
- fmt.Fprint
- (net/http.ResponseWriter).Write
exclusions:
generated: lax
presets:
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- staticcheck
text: 'SA(4003|1019|5011):'
paths:
- third_party$
- builtin$
- examples$
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
- ^app/vmui/

View File

@@ -17,7 +17,7 @@ EXTRA_GO_BUILD_TAGS ?=
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(DATEINFO_TAG)-$(BUILDINFO_TAG)'
TAR_OWNERSHIP ?= --owner=1000 --group=1000
GOLANGCI_LINT_VERSION := 2.9.0
GOLANGCI_LINT_VERSION := 2.12.2
.PHONY: $(MAKECMDGOALS)
@@ -527,7 +527,7 @@ golangci-lint: install-golangci-lint
golangci-lint run --build-tags 'synctest'
install-golangci-lint:
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://golangci-lint.run/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
remove-golangci-lint:
rm -rf `which golangci-lint`

View File

@@ -1,21 +0,0 @@
The `sa-tester` provides a stream aggregation config on the `/sa-config` endpoint for an external `vmagent` to read, writes configured mock series to `vmagent` for aggregation, and receives the output aggregation results from `vmagent`. It will print all the input and output samples in its logs, and generate a report with them.
See `app/test/sa_tester/config.yaml` for all supported options.
**Test steps:**
1. Start the `sa-tester` with the local config file `app/test/sa_tester/config.yaml`.
2. Start a `vmagent` instance using the version you want to test:
- 2.1. Use `<sa-tester-addr>/api/v1/write` as one of the `-remoteWrite.url` values. You can also add other `remoteWrite` URLs to `vmsingle` to check the aggregation results.
- 2.2. Use `-streamAggr.config=http://192.168.0.102:8880/sa-config` to make `vmagent` use the stream aggregation config from `sa-tester`.
3. Call `<sa-tester-addr>/start` when you're ready. The `sa-tester` will call the `vmagent` reload endpoint to ensure the stream aggregation config is up to date, and start writing the series to `vmagent` for aggregation.
4. Check results using the `sa-tester` logs, `<sa-tester-addr>/report` page or other remoteWrite destinations.
![alt text](image-1.png)
**Notes:**
1. You can safely rerun the test without restarting the `vmagent` instance if you changed the stream aggregation config (due to the reload call) or just wait for a while (due to the default staleness interval).
2. You can call `<sa-tester-addr>/reset` to update the `/sa-config` endpoint and `input_series` configs, then call `<sa-tester-addr>/start` to start a new test.
3. You can replace the above `vmagent` with `vmsingle` if you want to check results only in `vmsingle` instead of in the `sa-tester` logs.
4. Do not send extra metrics from vmagent to sa-tester, it will generate a wall of logs:)

View File

@@ -1,50 +0,0 @@
# Stream aggregation rules served on GET /sa-config.
# Configure vmagent with:
# -streamAggr.config=http://localhost:8080/sa-config
saRules:
- name: 'increase'
match: 'test'
interval: 10s
without: [instance]
outputs: [increase]
- name: 'increase-ignore-old'
match: 'test'
interval: 10s
ignore_old_samples: true
without: [env]
outputs: [increase]
- name: 'increase-prometheus'
match: 'test'
interval: 10s
without: [instance]
outputs: [increase_prometheus]
- name: 'increase-prometheus-ignore-old'
match: 'test'
interval: 10s
ignore_old_samples: true
without: [env]
outputs: [increase_prometheus]
# Address of the vmagent that loads SA rules and accepts raw samples.
# POST /start calls <vmagent_address>/-/reload then writes samples to it.
vmagent_address: "http://192.168.0.102:8420"
# Listen address for this tester's HTTP server.
listen_address: ":8880"
# --- Below are for generating input samples for testing. ---
# Time between consecutive sample slots.
# Currently, interval is global, you can have different intervals for different input series by insert null between values.
interval: 10s
input_series:
- series: 'test{env="prod", instance="a"}'
# One value per slot (1-indexed). null means slot is skipped but could be compensated by delays.
values: [null, null, 1001, 1002, 1003, null, null, null, 1026, 1027, 1028]
# delays can be used to mock delayed or out of order sample cases.
# delays: [originalSlot, sendAtSlot, value] (slots are 1-indexed)
# Sample is timestamped at T+(originalSlot-1)*interval, but delivered to vmagent at T+(sendAtSlot-1)*interval.
delays:
- [6, 9, 1004] # slot-6 sample (value=1004) sent together at slot 9
- [7, 9, 1005] # slot-7 sample (value=1005) sent together at slot 9

Binary file not shown.

Before

Width:  |  Height:  |  Size: 495 KiB

View File

@@ -1,627 +0,0 @@
// sa_tester is a tool for testing stream aggregation rules.
//
// It does two things:
// 1. Serves a stream aggregation config YAML on GET /sa-config so that
// vmagent can pull it with -streamAggr.config=http://host/sa-config.
// 2. On POST /start it calls vmagent's /-/reload (recording T=now),
// then writes synthetic samples to the same vmagent.
//
// Usage:
//
// go run ./app/test/sa_tester -config app/test/sa_tester/config.yaml
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"html"
"io"
"log"
"math/rand"
"net/http"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"gopkg.in/yaml.v2"
)
type AppConfig struct {
// SARules holds the stream aggregation rules. It is marshalled back to
// YAML and served verbatim on GET /sa-config.
SARules interface{} `yaml:"saRules"`
// Interval between consecutive sample slots, e.g. "10s".
Interval string `yaml:"interval"`
// InputSeries defines the time series to generate.
InputSeries []InputSeries `yaml:"input_series"`
// VmagentAddress is the vmagent that loads SA rules and accepts raw samples.
// POST /start calls <VmagentAddress>/-/reload, then writes samples to it.
// Default: http://localhost:8429
VmagentAddress string `yaml:"vmagent_address"`
// ListenAddress is the HTTP listen address for this tester.
// Default: :8080
ListenAddress string `yaml:"listen_address"`
}
// InputSeries describes how to generate one time series.
type InputSeries struct {
// Series is a Prometheus-style selector, e.g. 'test1{env="prod",instance="a"}'.
Series string `yaml:"series"`
// Values lists the sample values for consecutive slots (1-indexed).
// A null entry means the slot is skipped or handled by Delays.
Values []*float64 `yaml:"values"`
// Delays is a list of [originalSlot, sendAtSlot, value] triples (1-indexed).
// The sample is timestamped at T+(originalSlot-1)*interval
// but sent to vmagent at T+(sendAtSlot-1)*interval.
//
// Example: [4, 6, 4] means a sample with value=4 whose logical timestamp
// is slot 4 is actually delivered at slot 6 together with the slot-6 sample.
Delays [][]float64 `yaml:"delays"`
}
// scheduledSample is a single data point waiting to be sent.
type scheduledSample struct {
timestamp int64 // sample timestamp in milliseconds
value float64
}
// vmImportLine is one line of the VictoriaMetrics /api/v1/import NDJSON format.
type vmImportLine struct {
Metric map[string]string `json:"metric"`
Values []float64 `json:"values"`
Timestamps []int64 `json:"timestamps"`
}
// --- report data types -------------------------------------------------------
// sentDataPoint is one data point in the sent-series chart.
type sentDataPoint struct {
TsSec float64 `json:"x"` // sample unix timestamp in seconds
Value float64 `json:"y"`
SentAtSec float64 `json:"sentAt"` // wall-clock send time in seconds; equals TsSec if not delayed
Delayed bool `json:"delayed"`
}
// sentSeriesData holds all sent data points for one configured input series.
type sentSeriesData struct {
Name string `json:"name"`
Points []sentDataPoint `json:"points"`
}
// recvDataPoint is one sample received on /api/v1/write.
type recvDataPoint struct {
TsSec float64 `json:"x"` // sample unix timestamp in seconds
Value float64 `json:"y"`
}
// recvSeriesData holds all received samples for one metric series.
type recvSeriesData struct {
Name string `json:"name"`
Points []recvDataPoint `json:"points"`
}
var (
cfg *AppConfig
configPath string // path of the config file, stored at startup for hot-reload
saYAML []byte // SA config YAML to serve
mu sync.Mutex
started bool
reportMu sync.RWMutex
reportT time.Time
reportJitter time.Duration
reportSent []sentSeriesData
reportRecv = make(map[string]*recvSeriesData)
)
func main() {
configFile := flag.String("config", "config.yaml", "path to config YAML file")
flag.Parse()
configPath = *configFile
if err := loadConfig(); err != nil {
log.Fatalf("cannot load config: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("/sa-config", handleSAConfig)
mux.HandleFunc("/start", handleStart)
mux.HandleFunc("/reset", handleReset)
mux.HandleFunc("/api/v1/write", handleRemoteWrite)
mux.HandleFunc("/report", handleReport)
log.Printf("HTTP server listening on %s", cfg.ListenAddress)
log.Printf("Endpoints:")
log.Printf(" GET /sa-config — serve SA rules YAML for vmagent")
log.Printf(" POST /start — call vmagent /-/reload then write samples")
log.Printf(" POST /reset — reload config, clear 'started' flag")
log.Printf(" POST /api/v1/write — receive Prometheus remote-write from vmagent SA output")
log.Printf(" GET /report — HTML report with sent/received series charts")
log.Fatalf("server stopped: %v", http.ListenAndServe(cfg.ListenAddress, mux))
}
// handleSAConfig serves the SA rules YAML so that vmagent can fetch it.
func handleSAConfig(w http.ResponseWriter, r *http.Request) {
log.Printf("[sa-config] request from %s", r.RemoteAddr)
w.Header().Set("Content-Type", "text/yaml; charset=utf-8")
if _, err := w.Write(saYAML); err != nil {
log.Printf("[sa-config] write error: %v", err)
}
}
// handleStart triggers vmagent reload and starts the sample writer goroutine.
func handleStart(w http.ResponseWriter, r *http.Request) {
mu.Lock()
if started {
mu.Unlock()
http.Error(w, "test already running; POST /reset to allow re-running", http.StatusConflict)
return
}
started = true
mu.Unlock()
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
http.Error(w, fmt.Sprintf("invalid interval %q: %v", cfg.Interval, err), http.StatusBadRequest)
mu.Lock()
started = false
mu.Unlock()
return
}
// Call vmagent /-/reload so it picks up the SA config from /sa-config.
reloadURL := cfg.VmagentAddress + "/-/reload"
log.Printf("[start] calling vmagent reload: POST %s", reloadURL)
reloadResp, err := http.Post(reloadURL, "application/json", nil) //nolint:noctx
if err != nil {
log.Printf("[start] reload request failed: %v", err)
http.Error(w, fmt.Sprintf("vmagent reload failed: %v", err), http.StatusBadGateway)
mu.Lock()
started = false
mu.Unlock()
return
}
reloadBody, _ := io.ReadAll(reloadResp.Body)
reloadResp.Body.Close()
log.Printf("[start] reload response: status=%d body=%q", reloadResp.StatusCode, reloadBody)
T := time.Now()
jitter := time.Duration(rand.Int63n(int64(interval / 2))) //nolint:gosec
log.Printf("[start] T=%s jitter=%v first-sample-at T+%v",
T.Format(time.RFC3339Nano), jitter, jitter)
reportMu.Lock()
reportT = T
reportJitter = jitter
reportSent = nil
reportRecv = make(map[string]*recvSeriesData)
reportMu.Unlock()
go runTest(T, interval, jitter)
fmt.Fprintf(w, "test started\nT=%s\njitter=%v\n", T.Format(time.RFC3339Nano), jitter)
}
// handleRemoteWrite accepts Prometheus remote-write requests (e.g. from vmagent's SA output)
// and logs each received time series in a human-readable format for result verification.
func handleRemoteWrite(w http.ResponseWriter, r *http.Request) {
isVMRemoteWrite := r.Header.Get("Content-Encoding") == "zstd"
err := stream.Parse(r.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries, _ []prompb.MetricMetadata) error {
for i := range tss {
ts := &tss[i]
var sb strings.Builder
// Build metric name + labels string.
var metricName string
for _, lbl := range ts.Labels {
if lbl.Name == "__name__" {
metricName = lbl.Value
break
}
}
sb.WriteString(metricName)
sb.WriteByte('{')
first := true
for _, lbl := range ts.Labels {
if lbl.Name == "__name__" {
continue
}
if !first {
sb.WriteByte(',')
}
first = false
sb.WriteString(lbl.Name)
sb.WriteString(`="`)
sb.WriteString(lbl.Value)
sb.WriteByte('"')
}
sb.WriteByte('}')
metricStr := sb.String()
// Log each sample on its own line for easy reading.
for _, s := range ts.Samples {
t := time.UnixMilli(s.Timestamp)
log.Printf("[recv] %-60s value=%-12g ts= %v, ts_human=%s",
metricStr, s.Value, t.UnixMilli(), t.UTC().Format(time.RFC3339Nano))
}
// Record for /report.
reportMu.Lock()
if reportRecv == nil {
reportRecv = make(map[string]*recvSeriesData)
}
rd := reportRecv[metricStr]
if rd == nil {
rd = &recvSeriesData{Name: metricStr}
reportRecv[metricStr] = rd
}
for _, s := range ts.Samples {
rd.Points = append(rd.Points, recvDataPoint{
TsSec: float64(s.Timestamp) / 1000.0,
Value: s.Value,
})
}
reportMu.Unlock()
}
return nil
})
if err != nil {
log.Printf("[recv] parse error: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusNoContent)
}
// handleReset re-reads the config file from disk, updates the SA config and
// input_series, and clears the started flag so the test can be triggered again.
func handleReset(w http.ResponseWriter, r *http.Request) {
mu.Lock()
if err := loadConfig(); err != nil {
mu.Unlock()
log.Printf("[reset] failed to reload config: %v", err)
http.Error(w, fmt.Sprintf("config reload failed: %v", err), http.StatusInternalServerError)
return
}
started = false
mu.Unlock()
reportMu.Lock()
reportSent = nil
reportRecv = make(map[string]*recvSeriesData)
reportT = time.Time{}
reportMu.Unlock()
log.Printf("[reset] config reloaded, started flag cleared")
fmt.Fprintln(w, "reset ok")
}
// loadConfig reads configPath from disk, parses it into cfg, and rebuilds saYAML.
// Callers that need thread safety must hold mu.
func loadConfig() error {
data, err := os.ReadFile(configPath)
if err != nil {
return fmt.Errorf("cannot read config file %q: %w", configPath, err)
}
newCfg := &AppConfig{
VmagentAddress: "http://localhost:8429",
ListenAddress: ":8080",
}
if err := yaml.Unmarshal(data, newCfg); err != nil {
return fmt.Errorf("cannot parse config: %w", err)
}
var newSAYAML []byte
if newCfg.SARules != nil {
newSAYAML, err = yaml.Marshal(newCfg.SARules)
if err != nil {
return fmt.Errorf("cannot re-marshal saRules to YAML: %w", err)
}
} else {
newSAYAML = []byte("[]\n")
}
cfg = newCfg
saYAML = newSAYAML
log.Printf("[config] loaded from %q", configPath)
log.Printf("[config] interval : %s", cfg.Interval)
log.Printf("[config] vmagent : %s", cfg.VmagentAddress)
log.Printf("[config] listen : %s", cfg.ListenAddress)
log.Printf("[config] input_series count: %d", len(cfg.InputSeries))
log.Printf("[config] SA config:\n---\n%s---", saYAML)
return nil
}
// --- label parsing -----------------------------------------------------------
// seriesRe matches "metricname" or "metricname{k="v",...}".
var seriesRe = regexp.MustCompile(`^([^{,\s]+?)(?:\{([^}]*)\})?$`)
var labelRe = regexp.MustCompile(`(\w+)="([^"]*)"`)
// parseLabels converts a Prometheus-style selector string into a flat label map.
func parseLabels(series string) (map[string]string, error) {
series = strings.TrimSpace(series)
m := seriesRe.FindStringSubmatch(series)
if m == nil {
return nil, fmt.Errorf("cannot parse series selector %q", series)
}
labels := map[string]string{"__name__": m[1]}
if m[2] != "" {
for _, pair := range labelRe.FindAllStringSubmatch(m[2], -1) {
labels[pair[1]] = pair[2]
}
}
return labels, nil
}
// --- schedule building -------------------------------------------------------
// buildSchedule constructs a map of sendAtMs → []scheduledSample for a series.
//
// Slot numbering is 1-indexed:
// - slot i has sample timestamp T+jitter+(i-1)*interval
// - non-null values[i-1] are sent at their own slot time
// - delay entry [orig, sendAt, val] sends a sample timestamped at slot orig
// but delivered to vmagent at slot sendAt
func buildSchedule(is InputSeries, T time.Time, interval, jitter time.Duration) (map[int64][]scheduledSample, error) {
type delayEntry struct {
sendAtSlot int
value float64
}
delayMap := make(map[int]delayEntry, len(is.Delays))
for _, d := range is.Delays {
if len(d) != 3 {
return nil, fmt.Errorf("each delay must be [originalSlot, sendAtSlot, value]; got %v", d)
}
delayMap[int(d[0])] = delayEntry{sendAtSlot: int(d[1]), value: d[2]}
}
schedule := make(map[int64][]scheduledSample)
// Regular (non-null) values — sent at their natural slot time.
for i, v := range is.Values {
if v == nil {
continue // null → slot is handled by delays or intentionally absent
}
sampleTime := T.Add(jitter + time.Duration(i)*interval)
sendAtMs := sampleTime.UnixMilli()
schedule[sendAtMs] = append(schedule[sendAtMs], scheduledSample{
timestamp: sampleTime.UnixMilli(),
value: *v,
})
}
// Delayed values — timestamped at originalSlot, sent at sendAtSlot.
for origSlot, de := range delayMap {
sampleTime := T.Add(jitter + time.Duration(origSlot-1)*interval)
sendAt := T.Add(jitter + time.Duration(de.sendAtSlot-1)*interval)
sendAtMs := sendAt.UnixMilli()
schedule[sendAtMs] = append(schedule[sendAtMs], scheduledSample{
timestamp: sampleTime.UnixMilli(),
value: de.value,
})
}
// Sort samples within each slot by timestamp for deterministic ordering.
for k, s := range schedule {
sort.Slice(s, func(i, j int) bool { return s[i].timestamp < s[j].timestamp })
schedule[k] = s
}
return schedule, nil
}
// --- test runner -------------------------------------------------------------
type seriesEvent struct {
seriesName string // original series selector string from config
labels map[string]string
samples []scheduledSample
}
func runTest(T time.Time, interval, jitter time.Duration) {
defer func() {
mu.Lock()
started = false
mu.Unlock()
log.Printf("[write] finished")
}()
// Collect all send events across every configured series.
allEvents := make(map[int64][]seriesEvent)
for _, is := range cfg.InputSeries {
labels, err := parseLabels(is.Series)
if err != nil {
log.Printf("[test] cannot parse series %q: %v — skipping", is.Series, err)
continue
}
schedule, err := buildSchedule(is, T, interval, jitter)
if err != nil {
log.Printf("[test] cannot build schedule for %q: %v — skipping", is.Series, err)
continue
}
log.Printf("[test] series %q: %d distinct send-time slots", is.Series, len(schedule))
for sendAtMs, samples := range schedule {
allEvents[sendAtMs] = append(allEvents[sendAtMs], seriesEvent{
seriesName: is.Series,
labels: labels,
samples: samples,
})
}
}
// Sort the unique send times into chronological order.
sendTimes := make([]int64, 0, len(allEvents))
for t := range allEvents {
sendTimes = append(sendTimes, t)
}
sort.Slice(sendTimes, func(i, j int) bool { return sendTimes[i] < sendTimes[j] })
log.Printf("[test] starting: %d distinct send times across %d series",
len(sendTimes), len(cfg.InputSeries))
for _, sendAtMs := range sendTimes {
sendAt := time.UnixMilli(sendAtMs)
if now := time.Now(); sendAt.After(now) {
sleep := sendAt.Sub(now)
time.Sleep(sleep)
}
for _, ev := range allEvents[sendAtMs] {
recordSent(ev.seriesName, ev.samples, sendAtMs)
if err := writeSamples(cfg.VmagentAddress, ev.labels, ev.samples); err != nil {
log.Printf("[test] write error for %v: %v", ev.labels, err)
}
}
}
}
// --- vmagent writer ----------------------------------------------------------
// writeSamples POSTs one NDJSON line to vmagent's /api/v1/import endpoint.
// All samples for the same metric series are batched into a single request.
func writeSamples(addr string, labels map[string]string, samples []scheduledSample) error {
values := make([]float64, len(samples))
timestamps := make([]int64, len(samples))
for i, s := range samples {
values[i] = s.value
timestamps[i] = s.timestamp
}
line := vmImportLine{
Metric: labels,
Values: values,
Timestamps: timestamps,
}
data, err := json.Marshal(line)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
data = append(data, '\n')
url := addr + "/api/v1/import"
log.Printf("[write] metric=%v values=%v timestamps_ms=%v",
labels, values, timestamps)
resp, err := http.Post(url, "application/json", bytes.NewReader(data)) //nolint:noctx
if err != nil {
return fmt.Errorf("POST %s: %w", url, err)
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("POST %s: status=%d body=%q", url, resp.StatusCode, body)
}
return nil
}
// --- report ------------------------------------------------------------------
// recordSent stores sent samples into the report data store.
// It must be called without holding reportMu.
func recordSent(seriesName string, samples []scheduledSample, sendAtMs int64) {
reportMu.Lock()
defer reportMu.Unlock()
var sd *sentSeriesData
for i := range reportSent {
if reportSent[i].Name == seriesName {
sd = &reportSent[i]
break
}
}
if sd == nil {
reportSent = append(reportSent, sentSeriesData{Name: seriesName})
sd = &reportSent[len(reportSent)-1]
}
for _, s := range samples {
sd.Points = append(sd.Points, sentDataPoint{
TsSec: float64(s.timestamp) / 1000.0,
Value: s.value,
SentAtSec: float64(sendAtMs) / 1000.0,
Delayed: sendAtMs != s.timestamp,
})
}
}
// handleReport renders and serves the HTML report page with sent/received charts.
func handleReport(w http.ResponseWriter, r *http.Request) {
// saYAML is read without mu, consistent with handleSAConfig.
saYAMLStr := string(saYAML)
reportMu.RLock()
sentSnap := make([]sentSeriesData, len(reportSent))
for i, sd := range reportSent {
pts := make([]sentDataPoint, len(sd.Points))
copy(pts, sd.Points)
sentSnap[i] = sentSeriesData{Name: sd.Name, Points: pts}
}
recvSnap := make([]recvSeriesData, 0, len(reportRecv))
for _, rd := range reportRecv {
pts := make([]recvDataPoint, len(rd.Points))
copy(pts, rd.Points)
recvSnap = append(recvSnap, recvSeriesData{Name: rd.Name, Points: pts})
}
reportMu.RUnlock()
sort.Slice(recvSnap, func(i, j int) bool { return recvSnap[i].Name < recvSnap[j].Name })
sentJSON, _ := json.Marshal(sentSnap)
recvJSON, _ := json.Marshal(recvSnap)
// Single canvas for all sent series combined.
var sentCanvas string
if len(sentSnap) == 0 {
sentCanvas = `<p class="no-data">No data yet — POST /start to run the test.</p>`
} else {
sentCanvas = `<div class="chart-wrap"><canvas id="sent-all"></canvas></div>`
}
var recvCharts strings.Builder
if len(recvSnap) == 0 {
recvCharts.WriteString(`<p class="no-data">No data received yet.</p>`)
} else {
for i, rd := range recvSnap {
fmt.Fprintf(&recvCharts,
"<h3>%s</h3><div class=\"chart-wrap\"><canvas id=\"recv-%d\"></canvas></div>\n",
html.EscapeString(rd.Name), i)
}
}
page := reportPageTemplate
page = strings.ReplaceAll(page, "__GENERATED_AT__", time.Now().UTC().Format(time.RFC3339))
page = strings.ReplaceAll(page, "__SA_YAML__", html.EscapeString(saYAMLStr))
page = strings.ReplaceAll(page, "__SENT_COUNT__", strconv.Itoa(len(sentSnap)))
page = strings.ReplaceAll(page, "__SENT_CANVAS__", sentCanvas)
page = strings.ReplaceAll(page, "__RECV_COUNT__", strconv.Itoa(len(recvSnap)))
page = strings.ReplaceAll(page, "__RECV_CHARTS__", recvCharts.String())
page = strings.ReplaceAll(page, "__SENT_JSON__", string(sentJSON))
page = strings.ReplaceAll(page, "__RECV_JSON__", string(recvJSON))
startTs := "0"
if !reportT.IsZero() {
startTs = fmt.Sprintf("%f", float64(reportT.UnixMilli())/1000.0)
}
page = strings.ReplaceAll(page, "__START_TS__", startTs)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, page)
}

View File

@@ -1,281 +0,0 @@
package main
// reportPageTemplate is the HTML skeleton for GET /report.
// All __TOKEN__ placeholders are replaced at render time by handleReport.
var reportPageTemplate = `<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8">
<title>SA Tester Report</title>
<style>
body{font-family:'Segoe UI',Arial,sans-serif;margin:0;padding:20px;background:#f0f2f5;color:#333}
h1{margin-bottom:4px}
.subtitle{color:#666;font-size:14px;margin-bottom:24px}
.card{background:#fff;padding:20px;margin:16px 0;border-radius:8px;box-shadow:0 1px 4px rgba(0,0,0,.12)}
h2{margin:0 0 12px;color:#444;font-size:18px;border-bottom:1px solid #eee;padding-bottom:8px}
h3{margin:4px 0 8px;font-size:13px;font-family:monospace;color:#555;word-break:break-all}
pre{background:#f7f7f7;padding:12px;border-radius:4px;overflow-x:auto;font-size:13px;margin:0}
.chart-wrap{position:relative;height:340px;margin-bottom:24px}
.no-data{color:#aaa;font-style:italic;font-size:14px}
a{color:#1a73e8}
</style>
</head>
<body>
<h1>SA Tester Report</h1>
<p class="subtitle">Generated: __GENERATED_AT__ &nbsp;|&nbsp; <a href="/report">Refresh</a></p>
<div class="card">
<h2>SA Config</h2>
<pre>__SA_YAML__</pre>
</div>
<div class="card">
<h2>Sent Series (__SENT_COUNT__ series)</h2>
__SENT_CANVAS__
</div>
<div class="card">
<h2>Received Series / SA Output (__RECV_COUNT__ series)</h2>
__RECV_CHARTS__
</div>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
<script>
(function(){
var SENT = __SENT_JSON__;
var RECV = __RECV_JSON__;
var PALETTE = [
'rgba(54,162,235,0.85)',
'rgba(255,99,132,0.85)',
'rgba(153,102,255,0.85)',
'rgba(255,205,86,0.9)',
'rgba(75,192,192,0.85)',
];
function fmtDate(d) {
return d.getUTCFullYear() + '-' +
(d.getUTCMonth()+1).toString().padStart(2,'0') + '-' +
d.getUTCDate().toString().padStart(2,'0');
}
function fmtTime(d) {
return d.getUTCHours().toString().padStart(2,'0') + ':' +
d.getUTCMinutes().toString().padStart(2,'0') + ':' +
d.getUTCSeconds().toString().padStart(2,'0');
}
// Tooltip: full datetime with milliseconds.
function fmtTs(sec) {
var d = new Date(sec * 1000);
return fmtDate(d) + ' ' + fmtTime(d) + '.' +
d.getUTCMilliseconds().toString().padStart(3,'0');
}
// Tick label: show date only when it changes (or on the first tick).
function fmtTick(v, idx, ticks) {
var d = new Date(v * 1000);
var dateStr = fmtDate(d);
var timeStr = fmtTime(d);
if (idx === 0) { return timeStr + ' ' + dateStr; }
var prev = new Date(ticks[idx - 1].value * 1000);
if (fmtDate(prev) !== dateStr) { return timeStr + ' ' + dateStr; }
return timeStr;
}
// Plugin: draw dashed delay-span lines directly on the canvas so they are
// owned by their series dataset and toggle with it.
var delaySpanPlugin = {
id: 'delaySpan',
afterDatasetsDraw: function(chart) {
var ctx2 = chart.ctx;
chart.data.datasets.forEach(function(ds, dsi) {
if (!chart.isDatasetVisible(dsi)) return;
var meta = chart.getDatasetMeta(dsi);
ds.data.forEach(function(pt, pi) {
if (!pt || !pt.delayed) return;
var el = meta.data[pi];
if (!el) return;
var xScale = chart.scales.x;
var yScale = chart.scales.y;
var x1 = el.x;
var x2 = xScale.getPixelForValue(pt.sentAt);
var y = el.y;
ctx2.save();
ctx2.beginPath();
ctx2.setLineDash([5, 4]);
ctx2.strokeStyle = 'rgba(255,159,64,0.7)';
ctx2.lineWidth = 2;
ctx2.moveTo(x1, y);
ctx2.lineTo(x2, y);
ctx2.stroke();
// Arrow head at sent-at end.
var dir = x2 > x1 ? 1 : -1;
ctx2.setLineDash([]);
ctx2.beginPath();
ctx2.moveTo(x2, y);
ctx2.lineTo(x2 - dir * 8, y - 5);
ctx2.lineTo(x2 - dir * 8, y + 5);
ctx2.closePath();
ctx2.fillStyle = 'rgba(255,159,64,0.7)';
ctx2.fill();
ctx2.restore();
});
});
},
};
// x-axis always starts from when /start was called.
var START_TS = __START_TS__; // unix seconds, injected by Go
// Compute global x range: min is pinned to START_TS; max comes from data.
var xMax = -Infinity;
SENT.forEach(function(s) {
s.points.forEach(function(p) {
if (!p) return;
if (p.x > xMax) xMax = p.x;
if (p.sentAt !== undefined && p.sentAt > xMax) xMax = p.sentAt;
});
});
RECV.forEach(function(s) {
s.points.forEach(function(p) {
if (!p) return;
if (p.x > xMax) xMax = p.x;
});
});
// Add a 5 % right-side margin; left side is pinned exactly to START_TS.
var xPad = xMax === -Infinity ? 1 : (xMax - START_TS) * 0.05 || 1;
var xRange = { min: START_TS, max: (xMax === -Infinity ? START_TS + 60 : xMax + xPad) };
var sentEl = document.getElementById('sent-all');
if (sentEl && SENT.length > 0) {
var datasets = [];
SENT.forEach(function(s, si) {
var col = PALETTE[si % PALETTE.length];
var delayCol = 'rgba(255,159,64,0.9)';
var pts = s.points.filter(function(p){ return p !== null; });
if (pts.length === 0) return;
// Attach full metadata (delayed, sentAt) to each chart point so the
// plugin and tooltip can read it without separate datasets.
datasets.push({
type: 'scatter', label: s.name,
data: pts.map(function(p){
return {x: p.x, y: p.y, delayed: p.delayed, sentAt: p.sentAt};
}),
backgroundColor: pts.map(function(p){ return p.delayed ? delayCol : col; }),
pointStyle: pts.map(function(p){ return p.delayed ? 'triangle' : 'circle'; }),
pointRadius: pts.map(function(p){ return p.delayed ? 9 : 7; }),
});
});
new Chart(sentEl, {
type: 'scatter',
data: { datasets: datasets },
plugins: [delaySpanPlugin],
options: {
responsive: true, maintainAspectRatio: false,
plugins: {
legend: {
position: 'bottom',
onClick: function(e, legendItem, legend) {
var chart = legend.chart;
var idx = legendItem.datasetIndex;
var meta = chart.getDatasetMeta(idx);
var allOthersHidden = chart.data.datasets.every(function(_, i) {
return i === idx || !chart.isDatasetVisible(i);
});
if (allOthersHidden && !meta.hidden) {
// Already solo — restore all.
chart.data.datasets.forEach(function(_, i) { chart.show(i); });
} else {
// Solo this series.
chart.data.datasets.forEach(function(_, i) { chart.hide(i); });
chart.show(idx);
}
},
},
tooltip: { callbacks: { label: function(ctx){
var p = ctx.raw;
if (!p) return '';
var msg = 'value=' + p.y + ' ts=' + fmtTs(p.x);
if (p.delayed) {
var delaySec = Math.round((p.sentAt - p.x) * 10) / 10;
msg += ' \u2192 sent at ' + fmtTs(p.sentAt) + ' (+' + delaySec + 's)';
}
return msg;
}}},
},
scales: {
x: Object.assign({ type: 'linear',
title: { display: true, text: 'Unix timestamp (seconds UTC)' },
ticks: { callback: function(v, idx, ticks) { return fmtTick(v, idx, ticks); }, maxRotation: 35, minRotation: 35 } },
xRange),
y: { title: { display: true, text: 'value' } },
},
},
});
}
// Plugin: highlight all received points that share the hovered timestamp.
var sameTimestampPlugin = {
id: 'sameTimestamp',
afterDraw: function(chart) {
if (!chart._hoveredTs) return;
var ctx2 = chart.ctx;
var ts = chart._hoveredTs;
chart.data.datasets.forEach(function(ds, dsi) {
var meta = chart.getDatasetMeta(dsi);
ds.data.forEach(function(pt, pi) {
if (!pt || pt.x !== ts) return;
var el = meta.data[pi];
if (!el) return;
ctx2.save();
ctx2.beginPath();
ctx2.arc(el.x, el.y, (el.options.radius || 6) + 5, 0, 2 * Math.PI);
ctx2.strokeStyle = 'rgba(75,192,75,0.9)';
ctx2.lineWidth = 2;
ctx2.stroke();
ctx2.restore();
});
});
},
};
RECV.forEach(function(s, i) {
var el = document.getElementById('recv-' + i);
if (!el) return;
var pts = s.points
.filter(function(p){ return p !== null && p !== undefined && typeof p.x === 'number'; })
.sort(function(a, b){ return a.x - b.x; });
var recvChart = new Chart(el, {
type: 'scatter',
data: { datasets: [{
label: s.name, data: pts,
backgroundColor: 'rgba(75,192,75,0.85)',
pointRadius: 6, pointStyle: 'circle',
}]},
plugins: [sameTimestampPlugin],
options: {
responsive: true, maintainAspectRatio: false,
plugins: {
legend: { display: false },
tooltip: { callbacks: { label: function(ctx){
var p = ctx.raw;
if (p === null || p === undefined) return '';
return 'value=' + p.y + ' ts=' + fmtTs(p.x);
}}},
},
scales: {
x: Object.assign({ type: 'linear',
title: { display: true, text: 'Unix timestamp (seconds UTC)' },
ticks: { callback: function(v, idx, ticks) { return fmtTick(v, idx, ticks); }, maxRotation: 35, minRotation: 35 } },
xRange),
y: { title: { display: true, text: 'value' } },
},
onHover: function(evt, active) {
var ts = (active && active.length > 0)
? recvChart.data.datasets[active[0].datasetIndex].data[active[0].index].x
: null;
if (recvChart._hoveredTs !== ts) {
recvChart._hoveredTs = ts;
recvChart.draw();
}
},
},
});
});
})();
</script>
</body>
</html>`

View File

@@ -173,9 +173,9 @@ func (r *Rule) String() string {
if r.Alert != "" {
ruleType = "alerting"
}
b := strings.Builder{}
b.WriteString(fmt.Sprintf("%s rule %q", ruleType, r.Name()))
b.WriteString(fmt.Sprintf("; expr: %q", r.Expr))
var b strings.Builder
fmt.Fprintf(&b, "%s rule %q", ruleType, r.Name())
fmt.Fprintf(&b, "; expr: %q", r.Expr)
kv := sortMap(r.Labels)
for i := range kv {

File diff suppressed because it is too large Load Diff

View File

@@ -21,16 +21,16 @@
},
"dependencies": {
"classnames": "^2.5.1",
"dayjs": "^1.11.20",
"dayjs": "^1.11.21",
"lodash.debounce": "^4.0.8",
"marked": "^18.0.2",
"preact": "^10.29.1",
"qs": "^6.15.1",
"marked": "^18.0.5",
"preact": "^10.29.2",
"qs": "^6.15.2",
"react-input-mask": "^2.0.4",
"react-router-dom": "^7.14.1",
"react-router-dom": "^7.17.0",
"uplot": "^1.6.32",
"vite": "^8.0.8",
"web-vitals": "^5.2.0"
"vite": "^8.0.16",
"web-vitals": "^5.3.0"
},
"devDependencies": {
"@eslint/eslintrc": "^3.3.5",
@@ -39,24 +39,24 @@
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/preact": "^3.2.4",
"@types/lodash.debounce": "^4.0.9",
"@types/node": "^25.6.0",
"@types/qs": "^6.15.0",
"@types/react": "^19.2.14",
"@types/node": "^25.9.2",
"@types/qs": "^6.15.1",
"@types/react": "^19.2.17",
"@types/react-input-mask": "^3.0.6",
"@types/react-router-dom": "^5.3.3",
"@typescript-eslint/eslint-plugin": "^8.58.2",
"@typescript-eslint/parser": "^8.58.2",
"@typescript-eslint/eslint-plugin": "^8.61.0",
"@typescript-eslint/parser": "^8.61.0",
"cross-env": "^10.1.0",
"eslint": "^9.39.2",
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-unused-imports": "^4.4.1",
"globals": "^17.5.0",
"http-proxy-middleware": "^3.0.5",
"jsdom": "^29.0.2",
"postcss": "^8.5.10",
"sass-embedded": "^1.99.0",
"typescript": "^6.0.2",
"vitest": "^4.1.4"
"globals": "^17.6.0",
"http-proxy-middleware": "^4.1.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.15",
"sass-embedded": "^1.100.0",
"typescript": "^6.0.3",
"vitest": "^4.1.8"
},
"browserslist": {
"production": [

View File

@@ -26,6 +26,9 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -667,11 +667,11 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
}
generateScrape := func(n int) string {
w := strings.Builder{}
var w strings.Builder
for i := range n {
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
fmt.Fprintf(&w, "fooooo_%d 1\n", i)
if i%100 == 0 {
w.WriteString(fmt.Sprintf("# HELP fooooo_%d This is a test\n", i))
fmt.Fprintf(&w, "# HELP fooooo_%d This is a test\n", i)
}
}
return w.String()
@@ -1005,9 +1005,9 @@ func TestSendStaleSeries(t *testing.T) {
}
}
generateScrape := func(n int) string {
w := strings.Builder{}
var w strings.Builder
for i := range n {
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
fmt.Fprintf(&w, "foo_%d 1\n", i)
}
return w.String()
}

View File

@@ -6,6 +6,9 @@ type avgAggrValue struct {
}
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
av.count++
}

View File

@@ -4,7 +4,10 @@ type countSamplesAggrValue struct {
count uint64
}
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
}

View File

@@ -9,7 +9,10 @@ type countSeriesAggrValue struct {
samples map[uint64]struct{}
}
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
if sample.stateOnly {
return
}
// Count unique hashes over the keys instead of unique key values.
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))

View File

@@ -45,6 +45,7 @@ type dedupAggrShardNopad struct {
type dedupAggrSample struct {
value float64
timestamp int64
stateOnly bool
}
func newDedupAggr() *dedupAggr {
@@ -189,6 +190,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
s.stateOnly = sample.stateOnly
key := bytesutil.InternString(sample.key)
state.m[key] = s
@@ -197,28 +199,33 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
var newWins bool
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
if newWins {
s.stateOnly = sample.stateOnly
}
}
state.samplesBuf = samplesBuf
}
// deduplicateSamples returns deduplicated timestamp and value results.
// deduplicateSamples returns deduplicated timestamp and value results,
// along with a boolean indicating whether the new sample won.
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
if newT > oldT {
return newT, newV
return newT, newV, true
}
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
if newT == oldT {
if decimal.IsStaleNaN(oldV) {
return newT, newV
return newT, newV, true
}
if newV > oldV {
return newT, newV
return newT, newV, true
}
}
return oldT, oldV
return oldT, oldV, false
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
@@ -250,6 +257,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
key: key,
value: s.value,
timestamp: s.timestamp,
stateOnly: s.stateOnly,
})
// Limit the number of samples per each flush in order to limit memory usage.

View File

@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
}
da.pushSamples(samples, 0, false)
if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
if n := da.sizeBytes(); n > 6_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
func TestDeduplicateSamples(t *testing.T) {
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
t.Helper()
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
if dedupT != expectedT || dedupV != expectedV {
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)

View File

@@ -231,6 +231,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
}
deadlineTime = deadlineTime.Add(d.interval)
for time.Now().After(deadlineTime) {
deadlineTime = deadlineTime.Add(d.interval)
}

View File

@@ -11,6 +11,9 @@ type histogramBucketAggrValue struct {
}
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.h.Update(sample.value)
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrValueShared struct {
lastValues map[string]increaseLastValue
}
type increaseAggrValue struct {
total float64
shared *increaseAggrValueShared
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared.lastValues[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
suffix := ac.getSuffix()
total := av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
}
}
ctx.appendSeries(key, suffix, total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared *increaseAggrValueShared
if s == nil {
shared = &increaseAggrValueShared{
lastValues: make(map[string]increaseLastValue),
}
} else {
shared = s.(*increaseAggrValueShared)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -6,6 +6,9 @@ type lastAggrValue struct {
}
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.timestamp >= av.timestamp {
av.last = sample.value
av.timestamp = sample.timestamp

View File

@@ -6,6 +6,9 @@ type maxAggrValue struct {
}
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value > av.max || !av.defined {
av.max = sample.value
}

View File

@@ -6,6 +6,9 @@ type minAggrValue struct {
}
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value < av.min || !av.defined {
av.min = sample.value
}

View File

@@ -13,6 +13,9 @@ type quantilesAggrValue struct {
}
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if av.h == nil {
av.h = histogram.GetFast()
}

View File

@@ -34,6 +34,7 @@ var rateAggrStateValuePool sync.Pool
func putRateAggrStateValue(v *rateAggrStateValue) {
v.timestamp = 0
v.lastTimestamp = 0
v.increase = 0
rateAggrStateValuePool.Put(v)
}
@@ -88,6 +89,10 @@ type rateAggrStateValue struct {
// increase stores cumulative increase for the current time series on the current aggregation interval
increase float64
timestamp int64
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
// It is used for out-of-order detection, while timestamp (above) is only updated by
// non-state-only samples and is used for rate calculation.
lastTimestamp int64
}
type rateAggrValue struct {
@@ -101,16 +106,20 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
sv, ok := av.shared[key]
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {
if sample.timestamp < state.lastTimestamp {
// Skip out of order sample
return
}
if sample.value >= sv.value {
state.increase += sample.value - sv.value
if !sample.stateOnly {
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
sv.prevTimestamp = sample.timestamp
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -121,7 +130,10 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
}
sv.value = sample.value
sv.deleteDeadline = deleteDeadline
state.timestamp = sample.timestamp
state.lastTimestamp = sample.timestamp
if !sample.stateOnly {
state.timestamp = sample.timestamp
}
}
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {

View File

@@ -12,6 +12,9 @@ type stdAggrValue struct {
}
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
avg := av.avg + (sample.value-av.avg)/av.count
av.q += (sample.value - av.avg) * (sample.value - avg)

View File

@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:
@@ -845,6 +845,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
} else {
a.flush(pf, flushTime, cs, false)
}
flushTime = flushTime.Add(a.interval)
for time.Now().After(flushTime) {
flushTime = flushTime.Add(a.interval)
}
@@ -1005,26 +1006,28 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
a.ignoredNaNSamples.Inc()
continue
}
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
// Skip old samples outside the current aggregation interval
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
if stateOnly {
a.ignoredOldSamples.Inc()
continue
}
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
} else {
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
}
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
} else {
ctx.blue = append(ctx.blue, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
}
}
@@ -1098,6 +1101,10 @@ type pushSample struct {
key string
value float64
timestamp int64
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
// (total, rate, increase) but do not contribute to the aggregation output.
stateOnly bool
}
func getPushCtx() *pushCtx {

View File

@@ -1,5 +1,3 @@
//go:build synctest
package streamaggr
import (
@@ -485,10 +483,8 @@ foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
@@ -692,23 +688,33 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg, when two aggregation intervals are empty
// test rate_sum and rate_avg, when two aggregation intervals are empty.
// abc=777 arrives slightly before the start of each interval (-10ms) but still
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
f([]string{`
foo{abc="123", cde="1"} 2
foo{abc="456", cde="1"} 8
foo{abc="777", cde="1"} 9 -10
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
foo{abc="456", cde="1"} 7
foo{abc="456", cde="1"} 8 1
foo{abc="777", cde="1"} 8
foo{abc="777", cde="1"} 9 1
`, ``, ``, `
foo{abc="123", cde="1"} 20
foo{abc="123", cde="1"} 19
foo{abc="123", cde="1"} 20 1
foo{abc="456", cde="1"} 26
foo{abc="777", cde="1"} 27 -10
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 0.1
foo:1m_by_cde_rate_sum{cde="1"} 0.2
foo{abc="456", cde="1"} 27 1
foo{abc="777", cde="1"} 27
foo{abc="777", cde="1"} 28 1
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_by_cde_rate_sum{cde="1"} 3
`, `
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
enable_windows: true
`, "111111")
`, "111111111111")
// rate_sum and rate_avg with duplicated events
f([]string{`
@@ -803,4 +809,55 @@ foo:1m_sum_samples{baz="qwe"} 10
dedup_interval: 30s
outputs: [sum_samples]
`, "11111111")
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
// update the state reference without contributing to the interval total, so the subsequent
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_total 100
foo:1m_total 200
`, `
- interval: 1m
outputs: [total]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// increase with ignore_old_samples: same correctness check for increase output.
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_increase 100
foo:1m_increase 100
`, `
- interval: 1m
outputs: [increase]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
// and the winning stateOnly sample's timestamp is used as the denominator start.
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
f([]string{`
foo 100
`, `
foo 150 -30
foo 120 -40
foo 200
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
`, `
- interval: 1m
outputs: [rate_sum]
ignore_old_samples: true
`, "1111")
}

View File

@@ -5,6 +5,9 @@ type sumSamplesAggrValue struct {
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
}

View File

@@ -36,12 +36,14 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
// Skip out of order sample
return
}
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
@@ -54,7 +56,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
@@ -63,9 +64,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
delete(lvs, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
@@ -78,11 +77,10 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +88,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +113,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}

View File

@@ -5,6 +5,9 @@ type uniqueSamplesAggrValue struct {
}
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if _, ok := av.samples[sample.value]; !ok {
av.samples[sample.value] = struct{}{}
}