mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-29 05:25:26 +03:00
Compare commits
5 Commits
add-sa-tes
...
state-only
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90029442cb | ||
|
|
04993f2187 | ||
|
|
73a40a4178 | ||
|
|
66f8ec81f3 | ||
|
|
66672f216b |
3
.github/workflows/build.yml
vendored
3
.github/workflows/build.yml
vendored
@@ -33,7 +33,8 @@ jobs:
|
||||
name: ${{ matrix.os }}-${{ matrix.arch }}
|
||||
permissions:
|
||||
contents: read
|
||||
runs-on: ubuntu-latest
|
||||
# Runs on dedicated runner with extra resources to increase build speed.
|
||||
runs-on: 'vm-runner'
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
|
||||
7
.github/workflows/test.yml
vendored
7
.github/workflows/test.yml
vendored
@@ -30,7 +30,8 @@ jobs:
|
||||
name: lint
|
||||
permissions:
|
||||
contents: read
|
||||
runs-on: ubuntu-latest
|
||||
# Runs on dedicated runner with extra resources since golangci-lint requires extra memory
|
||||
runs-on: 'vm-runner'
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
@@ -64,7 +65,8 @@ jobs:
|
||||
name: unit
|
||||
permissions:
|
||||
contents: read
|
||||
runs-on: ubuntu-latest
|
||||
# Runs on dedicated runner with extra resources to increase tests speed.
|
||||
runs-on: 'vm-runner'
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -95,6 +97,7 @@ jobs:
|
||||
name: apptest
|
||||
permissions:
|
||||
contents: read
|
||||
# Runs on dedicated runner to isolate app tests from other tests.
|
||||
runs-on: apptest
|
||||
|
||||
steps:
|
||||
|
||||
@@ -3,27 +3,14 @@ linters:
|
||||
settings:
|
||||
errcheck:
|
||||
exclude-functions:
|
||||
- fmt.Fprintf
|
||||
- fmt.Fprint
|
||||
- (net/http.ResponseWriter).Write
|
||||
exclusions:
|
||||
generated: lax
|
||||
presets:
|
||||
- common-false-positives
|
||||
- legacy
|
||||
- std-error-handling
|
||||
rules:
|
||||
- linters:
|
||||
- staticcheck
|
||||
text: 'SA(4003|1019|5011):'
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
formatters:
|
||||
exclusions:
|
||||
generated: lax
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
- ^app/vmui/
|
||||
|
||||
4
Makefile
4
Makefile
@@ -17,7 +17,7 @@ EXTRA_GO_BUILD_TAGS ?=
|
||||
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(DATEINFO_TAG)-$(BUILDINFO_TAG)'
|
||||
TAR_OWNERSHIP ?= --owner=1000 --group=1000
|
||||
|
||||
GOLANGCI_LINT_VERSION := 2.9.0
|
||||
GOLANGCI_LINT_VERSION := 2.12.2
|
||||
|
||||
.PHONY: $(MAKECMDGOALS)
|
||||
|
||||
@@ -527,7 +527,7 @@ golangci-lint: install-golangci-lint
|
||||
golangci-lint run --build-tags 'synctest'
|
||||
|
||||
install-golangci-lint:
|
||||
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
|
||||
which golangci-lint && (golangci-lint --version | grep -q $(GOLANGCI_LINT_VERSION)) || curl -sSfL https://golangci-lint.run/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v$(GOLANGCI_LINT_VERSION)
|
||||
|
||||
remove-golangci-lint:
|
||||
rm -rf `which golangci-lint`
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
The `sa-tester` provides a stream aggregation config on the `/sa-config` endpoint for an external `vmagent` to read, writes configured mock series to `vmagent` for aggregation, and receives the output aggregation results from `vmagent`. It will print all the input and output samples in its logs, and generate a report with them.
|
||||
|
||||
See `app/test/sa_tester/config.yaml` for all supported options.
|
||||
|
||||
**Test steps:**
|
||||
|
||||
1. Start the `sa-tester` with the local config file `app/test/sa_tester/config.yaml`.
|
||||
2. Start a `vmagent` instance using the version you want to test:
|
||||
- 2.1. Use `<sa-tester-addr>/api/v1/write` as one of the `-remoteWrite.url` values. You can also add other `remoteWrite` URLs to `vmsingle` to check the aggregation results.
|
||||
- 2.2. Use `-streamAggr.config=http://192.168.0.102:8880/sa-config` to make `vmagent` use the stream aggregation config from `sa-tester`.
|
||||
3. Call `<sa-tester-addr>/start` when you're ready. The `sa-tester` will call the `vmagent` reload endpoint to ensure the stream aggregation config is up to date, and start writing the series to `vmagent` for aggregation.
|
||||
4. Check results using the `sa-tester` logs, `<sa-tester-addr>/report` page or other remoteWrite destinations.
|
||||
|
||||

|
||||
|
||||
**Notes:**
|
||||
|
||||
1. You can safely rerun the test without restarting the `vmagent` instance if you changed the stream aggregation config (due to the reload call) or just wait for a while (due to the default staleness interval).
|
||||
2. You can call `<sa-tester-addr>/reset` to update the `/sa-config` endpoint and `input_series` configs, then call `<sa-tester-addr>/start` to start a new test.
|
||||
3. You can replace the above `vmagent` with `vmsingle` if you want to check results only in `vmsingle` instead of in the `sa-tester` logs.
|
||||
4. Do not send extra metrics from vmagent to sa-tester, it will generate a wall of logs:)
|
||||
@@ -1,50 +0,0 @@
|
||||
# Stream aggregation rules served on GET /sa-config.
|
||||
# Configure vmagent with:
|
||||
# -streamAggr.config=http://localhost:8080/sa-config
|
||||
saRules:
|
||||
- name: 'increase'
|
||||
match: 'test'
|
||||
interval: 10s
|
||||
without: [instance]
|
||||
outputs: [increase]
|
||||
- name: 'increase-ignore-old'
|
||||
match: 'test'
|
||||
interval: 10s
|
||||
ignore_old_samples: true
|
||||
without: [env]
|
||||
outputs: [increase]
|
||||
- name: 'increase-prometheus'
|
||||
match: 'test'
|
||||
interval: 10s
|
||||
without: [instance]
|
||||
outputs: [increase_prometheus]
|
||||
- name: 'increase-prometheus-ignore-old'
|
||||
match: 'test'
|
||||
interval: 10s
|
||||
ignore_old_samples: true
|
||||
without: [env]
|
||||
outputs: [increase_prometheus]
|
||||
|
||||
# Address of the vmagent that loads SA rules and accepts raw samples.
|
||||
# POST /start calls <vmagent_address>/-/reload then writes samples to it.
|
||||
vmagent_address: "http://192.168.0.102:8420"
|
||||
|
||||
# Listen address for this tester's HTTP server.
|
||||
listen_address: ":8880"
|
||||
|
||||
# --- Below are for generating input samples for testing. ---
|
||||
# Time between consecutive sample slots.
|
||||
# Currently, interval is global, you can have different intervals for different input series by insert null between values.
|
||||
interval: 10s
|
||||
|
||||
input_series:
|
||||
- series: 'test{env="prod", instance="a"}'
|
||||
# One value per slot (1-indexed). null means slot is skipped but could be compensated by delays.
|
||||
values: [null, null, 1001, 1002, 1003, null, null, null, 1026, 1027, 1028]
|
||||
# delays can be used to mock delayed or out of order sample cases.
|
||||
# delays: [originalSlot, sendAtSlot, value] (slots are 1-indexed)
|
||||
# Sample is timestamped at T+(originalSlot-1)*interval, but delivered to vmagent at T+(sendAtSlot-1)*interval.
|
||||
delays:
|
||||
- [6, 9, 1004] # slot-6 sample (value=1004) sent together at slot 9
|
||||
- [7, 9, 1005] # slot-7 sample (value=1005) sent together at slot 9
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 495 KiB |
@@ -1,627 +0,0 @@
|
||||
// sa_tester is a tool for testing stream aggregation rules.
|
||||
//
|
||||
// It does two things:
|
||||
// 1. Serves a stream aggregation config YAML on GET /sa-config so that
|
||||
// vmagent can pull it with -streamAggr.config=http://host/sa-config.
|
||||
// 2. On POST /start it calls vmagent's /-/reload (recording T=now),
|
||||
// then writes synthetic samples to the same vmagent.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// go run ./app/test/sa_tester -config app/test/sa_tester/config.yaml
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"html"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type AppConfig struct {
|
||||
// SARules holds the stream aggregation rules. It is marshalled back to
|
||||
// YAML and served verbatim on GET /sa-config.
|
||||
SARules interface{} `yaml:"saRules"`
|
||||
|
||||
// Interval between consecutive sample slots, e.g. "10s".
|
||||
Interval string `yaml:"interval"`
|
||||
|
||||
// InputSeries defines the time series to generate.
|
||||
InputSeries []InputSeries `yaml:"input_series"`
|
||||
|
||||
// VmagentAddress is the vmagent that loads SA rules and accepts raw samples.
|
||||
// POST /start calls <VmagentAddress>/-/reload, then writes samples to it.
|
||||
// Default: http://localhost:8429
|
||||
VmagentAddress string `yaml:"vmagent_address"`
|
||||
|
||||
// ListenAddress is the HTTP listen address for this tester.
|
||||
// Default: :8080
|
||||
ListenAddress string `yaml:"listen_address"`
|
||||
}
|
||||
|
||||
// InputSeries describes how to generate one time series.
|
||||
type InputSeries struct {
|
||||
// Series is a Prometheus-style selector, e.g. 'test1{env="prod",instance="a"}'.
|
||||
Series string `yaml:"series"`
|
||||
|
||||
// Values lists the sample values for consecutive slots (1-indexed).
|
||||
// A null entry means the slot is skipped or handled by Delays.
|
||||
Values []*float64 `yaml:"values"`
|
||||
|
||||
// Delays is a list of [originalSlot, sendAtSlot, value] triples (1-indexed).
|
||||
// The sample is timestamped at T+(originalSlot-1)*interval
|
||||
// but sent to vmagent at T+(sendAtSlot-1)*interval.
|
||||
//
|
||||
// Example: [4, 6, 4] means a sample with value=4 whose logical timestamp
|
||||
// is slot 4 is actually delivered at slot 6 together with the slot-6 sample.
|
||||
Delays [][]float64 `yaml:"delays"`
|
||||
}
|
||||
|
||||
// scheduledSample is a single data point waiting to be sent.
|
||||
type scheduledSample struct {
|
||||
timestamp int64 // sample timestamp in milliseconds
|
||||
value float64
|
||||
}
|
||||
|
||||
// vmImportLine is one line of the VictoriaMetrics /api/v1/import NDJSON format.
|
||||
type vmImportLine struct {
|
||||
Metric map[string]string `json:"metric"`
|
||||
Values []float64 `json:"values"`
|
||||
Timestamps []int64 `json:"timestamps"`
|
||||
}
|
||||
|
||||
// --- report data types -------------------------------------------------------
|
||||
|
||||
// sentDataPoint is one data point in the sent-series chart.
|
||||
type sentDataPoint struct {
|
||||
TsSec float64 `json:"x"` // sample unix timestamp in seconds
|
||||
Value float64 `json:"y"`
|
||||
SentAtSec float64 `json:"sentAt"` // wall-clock send time in seconds; equals TsSec if not delayed
|
||||
Delayed bool `json:"delayed"`
|
||||
}
|
||||
|
||||
// sentSeriesData holds all sent data points for one configured input series.
|
||||
type sentSeriesData struct {
|
||||
Name string `json:"name"`
|
||||
Points []sentDataPoint `json:"points"`
|
||||
}
|
||||
|
||||
// recvDataPoint is one sample received on /api/v1/write.
|
||||
type recvDataPoint struct {
|
||||
TsSec float64 `json:"x"` // sample unix timestamp in seconds
|
||||
Value float64 `json:"y"`
|
||||
}
|
||||
|
||||
// recvSeriesData holds all received samples for one metric series.
|
||||
type recvSeriesData struct {
|
||||
Name string `json:"name"`
|
||||
Points []recvDataPoint `json:"points"`
|
||||
}
|
||||
|
||||
var (
|
||||
cfg *AppConfig
|
||||
configPath string // path of the config file, stored at startup for hot-reload
|
||||
saYAML []byte // SA config YAML to serve
|
||||
|
||||
mu sync.Mutex
|
||||
started bool
|
||||
|
||||
reportMu sync.RWMutex
|
||||
reportT time.Time
|
||||
reportJitter time.Duration
|
||||
reportSent []sentSeriesData
|
||||
reportRecv = make(map[string]*recvSeriesData)
|
||||
)
|
||||
|
||||
func main() {
|
||||
configFile := flag.String("config", "config.yaml", "path to config YAML file")
|
||||
flag.Parse()
|
||||
|
||||
configPath = *configFile
|
||||
if err := loadConfig(); err != nil {
|
||||
log.Fatalf("cannot load config: %v", err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/sa-config", handleSAConfig)
|
||||
mux.HandleFunc("/start", handleStart)
|
||||
mux.HandleFunc("/reset", handleReset)
|
||||
mux.HandleFunc("/api/v1/write", handleRemoteWrite)
|
||||
mux.HandleFunc("/report", handleReport)
|
||||
|
||||
log.Printf("HTTP server listening on %s", cfg.ListenAddress)
|
||||
log.Printf("Endpoints:")
|
||||
log.Printf(" GET /sa-config — serve SA rules YAML for vmagent")
|
||||
log.Printf(" POST /start — call vmagent /-/reload then write samples")
|
||||
log.Printf(" POST /reset — reload config, clear 'started' flag")
|
||||
log.Printf(" POST /api/v1/write — receive Prometheus remote-write from vmagent SA output")
|
||||
log.Printf(" GET /report — HTML report with sent/received series charts")
|
||||
log.Fatalf("server stopped: %v", http.ListenAndServe(cfg.ListenAddress, mux))
|
||||
}
|
||||
|
||||
// handleSAConfig serves the SA rules YAML so that vmagent can fetch it.
|
||||
func handleSAConfig(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("[sa-config] request from %s", r.RemoteAddr)
|
||||
w.Header().Set("Content-Type", "text/yaml; charset=utf-8")
|
||||
if _, err := w.Write(saYAML); err != nil {
|
||||
log.Printf("[sa-config] write error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleStart triggers vmagent reload and starts the sample writer goroutine.
|
||||
func handleStart(w http.ResponseWriter, r *http.Request) {
|
||||
mu.Lock()
|
||||
if started {
|
||||
mu.Unlock()
|
||||
http.Error(w, "test already running; POST /reset to allow re-running", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
started = true
|
||||
mu.Unlock()
|
||||
|
||||
interval, err := time.ParseDuration(cfg.Interval)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("invalid interval %q: %v", cfg.Interval, err), http.StatusBadRequest)
|
||||
mu.Lock()
|
||||
started = false
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Call vmagent /-/reload so it picks up the SA config from /sa-config.
|
||||
reloadURL := cfg.VmagentAddress + "/-/reload"
|
||||
log.Printf("[start] calling vmagent reload: POST %s", reloadURL)
|
||||
reloadResp, err := http.Post(reloadURL, "application/json", nil) //nolint:noctx
|
||||
if err != nil {
|
||||
log.Printf("[start] reload request failed: %v", err)
|
||||
http.Error(w, fmt.Sprintf("vmagent reload failed: %v", err), http.StatusBadGateway)
|
||||
mu.Lock()
|
||||
started = false
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
reloadBody, _ := io.ReadAll(reloadResp.Body)
|
||||
reloadResp.Body.Close()
|
||||
log.Printf("[start] reload response: status=%d body=%q", reloadResp.StatusCode, reloadBody)
|
||||
|
||||
T := time.Now()
|
||||
jitter := time.Duration(rand.Int63n(int64(interval / 2))) //nolint:gosec
|
||||
log.Printf("[start] T=%s jitter=%v first-sample-at T+%v",
|
||||
T.Format(time.RFC3339Nano), jitter, jitter)
|
||||
|
||||
reportMu.Lock()
|
||||
reportT = T
|
||||
reportJitter = jitter
|
||||
reportSent = nil
|
||||
reportRecv = make(map[string]*recvSeriesData)
|
||||
reportMu.Unlock()
|
||||
|
||||
go runTest(T, interval, jitter)
|
||||
|
||||
fmt.Fprintf(w, "test started\nT=%s\njitter=%v\n", T.Format(time.RFC3339Nano), jitter)
|
||||
}
|
||||
|
||||
// handleRemoteWrite accepts Prometheus remote-write requests (e.g. from vmagent's SA output)
|
||||
// and logs each received time series in a human-readable format for result verification.
|
||||
func handleRemoteWrite(w http.ResponseWriter, r *http.Request) {
|
||||
isVMRemoteWrite := r.Header.Get("Content-Encoding") == "zstd"
|
||||
err := stream.Parse(r.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries, _ []prompb.MetricMetadata) error {
|
||||
for i := range tss {
|
||||
ts := &tss[i]
|
||||
var sb strings.Builder
|
||||
// Build metric name + labels string.
|
||||
var metricName string
|
||||
for _, lbl := range ts.Labels {
|
||||
if lbl.Name == "__name__" {
|
||||
metricName = lbl.Value
|
||||
break
|
||||
}
|
||||
}
|
||||
sb.WriteString(metricName)
|
||||
sb.WriteByte('{')
|
||||
first := true
|
||||
for _, lbl := range ts.Labels {
|
||||
if lbl.Name == "__name__" {
|
||||
continue
|
||||
}
|
||||
if !first {
|
||||
sb.WriteByte(',')
|
||||
}
|
||||
first = false
|
||||
sb.WriteString(lbl.Name)
|
||||
sb.WriteString(`="`)
|
||||
sb.WriteString(lbl.Value)
|
||||
sb.WriteByte('"')
|
||||
}
|
||||
sb.WriteByte('}')
|
||||
metricStr := sb.String()
|
||||
|
||||
// Log each sample on its own line for easy reading.
|
||||
for _, s := range ts.Samples {
|
||||
t := time.UnixMilli(s.Timestamp)
|
||||
log.Printf("[recv] %-60s value=%-12g ts= %v, ts_human=%s",
|
||||
metricStr, s.Value, t.UnixMilli(), t.UTC().Format(time.RFC3339Nano))
|
||||
}
|
||||
|
||||
// Record for /report.
|
||||
reportMu.Lock()
|
||||
if reportRecv == nil {
|
||||
reportRecv = make(map[string]*recvSeriesData)
|
||||
}
|
||||
rd := reportRecv[metricStr]
|
||||
if rd == nil {
|
||||
rd = &recvSeriesData{Name: metricStr}
|
||||
reportRecv[metricStr] = rd
|
||||
}
|
||||
for _, s := range ts.Samples {
|
||||
rd.Points = append(rd.Points, recvDataPoint{
|
||||
TsSec: float64(s.Timestamp) / 1000.0,
|
||||
Value: s.Value,
|
||||
})
|
||||
}
|
||||
reportMu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[recv] parse error: %v", err)
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// handleReset re-reads the config file from disk, updates the SA config and
|
||||
// input_series, and clears the started flag so the test can be triggered again.
|
||||
func handleReset(w http.ResponseWriter, r *http.Request) {
|
||||
mu.Lock()
|
||||
if err := loadConfig(); err != nil {
|
||||
mu.Unlock()
|
||||
log.Printf("[reset] failed to reload config: %v", err)
|
||||
http.Error(w, fmt.Sprintf("config reload failed: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
started = false
|
||||
mu.Unlock()
|
||||
|
||||
reportMu.Lock()
|
||||
reportSent = nil
|
||||
reportRecv = make(map[string]*recvSeriesData)
|
||||
reportT = time.Time{}
|
||||
reportMu.Unlock()
|
||||
|
||||
log.Printf("[reset] config reloaded, started flag cleared")
|
||||
fmt.Fprintln(w, "reset ok")
|
||||
}
|
||||
|
||||
// loadConfig reads configPath from disk, parses it into cfg, and rebuilds saYAML.
|
||||
// Callers that need thread safety must hold mu.
|
||||
func loadConfig() error {
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read config file %q: %w", configPath, err)
|
||||
}
|
||||
|
||||
newCfg := &AppConfig{
|
||||
VmagentAddress: "http://localhost:8429",
|
||||
ListenAddress: ":8080",
|
||||
}
|
||||
if err := yaml.Unmarshal(data, newCfg); err != nil {
|
||||
return fmt.Errorf("cannot parse config: %w", err)
|
||||
}
|
||||
|
||||
var newSAYAML []byte
|
||||
if newCfg.SARules != nil {
|
||||
newSAYAML, err = yaml.Marshal(newCfg.SARules)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot re-marshal saRules to YAML: %w", err)
|
||||
}
|
||||
} else {
|
||||
newSAYAML = []byte("[]\n")
|
||||
}
|
||||
|
||||
cfg = newCfg
|
||||
saYAML = newSAYAML
|
||||
|
||||
log.Printf("[config] loaded from %q", configPath)
|
||||
log.Printf("[config] interval : %s", cfg.Interval)
|
||||
log.Printf("[config] vmagent : %s", cfg.VmagentAddress)
|
||||
log.Printf("[config] listen : %s", cfg.ListenAddress)
|
||||
log.Printf("[config] input_series count: %d", len(cfg.InputSeries))
|
||||
log.Printf("[config] SA config:\n---\n%s---", saYAML)
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- label parsing -----------------------------------------------------------
|
||||
|
||||
// seriesRe matches "metricname" or "metricname{k="v",...}".
|
||||
var seriesRe = regexp.MustCompile(`^([^{,\s]+?)(?:\{([^}]*)\})?$`)
|
||||
var labelRe = regexp.MustCompile(`(\w+)="([^"]*)"`)
|
||||
|
||||
// parseLabels converts a Prometheus-style selector string into a flat label map.
|
||||
func parseLabels(series string) (map[string]string, error) {
|
||||
series = strings.TrimSpace(series)
|
||||
m := seriesRe.FindStringSubmatch(series)
|
||||
if m == nil {
|
||||
return nil, fmt.Errorf("cannot parse series selector %q", series)
|
||||
}
|
||||
labels := map[string]string{"__name__": m[1]}
|
||||
if m[2] != "" {
|
||||
for _, pair := range labelRe.FindAllStringSubmatch(m[2], -1) {
|
||||
labels[pair[1]] = pair[2]
|
||||
}
|
||||
}
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
// --- schedule building -------------------------------------------------------
|
||||
|
||||
// buildSchedule constructs a map of sendAtMs → []scheduledSample for a series.
|
||||
//
|
||||
// Slot numbering is 1-indexed:
|
||||
// - slot i has sample timestamp T+jitter+(i-1)*interval
|
||||
// - non-null values[i-1] are sent at their own slot time
|
||||
// - delay entry [orig, sendAt, val] sends a sample timestamped at slot orig
|
||||
// but delivered to vmagent at slot sendAt
|
||||
func buildSchedule(is InputSeries, T time.Time, interval, jitter time.Duration) (map[int64][]scheduledSample, error) {
|
||||
type delayEntry struct {
|
||||
sendAtSlot int
|
||||
value float64
|
||||
}
|
||||
delayMap := make(map[int]delayEntry, len(is.Delays))
|
||||
for _, d := range is.Delays {
|
||||
if len(d) != 3 {
|
||||
return nil, fmt.Errorf("each delay must be [originalSlot, sendAtSlot, value]; got %v", d)
|
||||
}
|
||||
delayMap[int(d[0])] = delayEntry{sendAtSlot: int(d[1]), value: d[2]}
|
||||
}
|
||||
|
||||
schedule := make(map[int64][]scheduledSample)
|
||||
|
||||
// Regular (non-null) values — sent at their natural slot time.
|
||||
for i, v := range is.Values {
|
||||
if v == nil {
|
||||
continue // null → slot is handled by delays or intentionally absent
|
||||
}
|
||||
sampleTime := T.Add(jitter + time.Duration(i)*interval)
|
||||
sendAtMs := sampleTime.UnixMilli()
|
||||
schedule[sendAtMs] = append(schedule[sendAtMs], scheduledSample{
|
||||
timestamp: sampleTime.UnixMilli(),
|
||||
value: *v,
|
||||
})
|
||||
}
|
||||
|
||||
// Delayed values — timestamped at originalSlot, sent at sendAtSlot.
|
||||
for origSlot, de := range delayMap {
|
||||
sampleTime := T.Add(jitter + time.Duration(origSlot-1)*interval)
|
||||
sendAt := T.Add(jitter + time.Duration(de.sendAtSlot-1)*interval)
|
||||
sendAtMs := sendAt.UnixMilli()
|
||||
schedule[sendAtMs] = append(schedule[sendAtMs], scheduledSample{
|
||||
timestamp: sampleTime.UnixMilli(),
|
||||
value: de.value,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort samples within each slot by timestamp for deterministic ordering.
|
||||
for k, s := range schedule {
|
||||
sort.Slice(s, func(i, j int) bool { return s[i].timestamp < s[j].timestamp })
|
||||
schedule[k] = s
|
||||
}
|
||||
|
||||
return schedule, nil
|
||||
}
|
||||
|
||||
// --- test runner -------------------------------------------------------------
|
||||
|
||||
type seriesEvent struct {
|
||||
seriesName string // original series selector string from config
|
||||
labels map[string]string
|
||||
samples []scheduledSample
|
||||
}
|
||||
|
||||
func runTest(T time.Time, interval, jitter time.Duration) {
|
||||
defer func() {
|
||||
mu.Lock()
|
||||
started = false
|
||||
mu.Unlock()
|
||||
log.Printf("[write] finished")
|
||||
}()
|
||||
|
||||
// Collect all send events across every configured series.
|
||||
allEvents := make(map[int64][]seriesEvent)
|
||||
|
||||
for _, is := range cfg.InputSeries {
|
||||
labels, err := parseLabels(is.Series)
|
||||
if err != nil {
|
||||
log.Printf("[test] cannot parse series %q: %v — skipping", is.Series, err)
|
||||
continue
|
||||
}
|
||||
schedule, err := buildSchedule(is, T, interval, jitter)
|
||||
if err != nil {
|
||||
log.Printf("[test] cannot build schedule for %q: %v — skipping", is.Series, err)
|
||||
continue
|
||||
}
|
||||
log.Printf("[test] series %q: %d distinct send-time slots", is.Series, len(schedule))
|
||||
for sendAtMs, samples := range schedule {
|
||||
allEvents[sendAtMs] = append(allEvents[sendAtMs], seriesEvent{
|
||||
seriesName: is.Series,
|
||||
labels: labels,
|
||||
samples: samples,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the unique send times into chronological order.
|
||||
sendTimes := make([]int64, 0, len(allEvents))
|
||||
for t := range allEvents {
|
||||
sendTimes = append(sendTimes, t)
|
||||
}
|
||||
sort.Slice(sendTimes, func(i, j int) bool { return sendTimes[i] < sendTimes[j] })
|
||||
|
||||
log.Printf("[test] starting: %d distinct send times across %d series",
|
||||
len(sendTimes), len(cfg.InputSeries))
|
||||
|
||||
for _, sendAtMs := range sendTimes {
|
||||
sendAt := time.UnixMilli(sendAtMs)
|
||||
if now := time.Now(); sendAt.After(now) {
|
||||
sleep := sendAt.Sub(now)
|
||||
time.Sleep(sleep)
|
||||
}
|
||||
|
||||
for _, ev := range allEvents[sendAtMs] {
|
||||
recordSent(ev.seriesName, ev.samples, sendAtMs)
|
||||
if err := writeSamples(cfg.VmagentAddress, ev.labels, ev.samples); err != nil {
|
||||
log.Printf("[test] write error for %v: %v", ev.labels, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- vmagent writer ----------------------------------------------------------
|
||||
|
||||
// writeSamples POSTs one NDJSON line to vmagent's /api/v1/import endpoint.
|
||||
// All samples for the same metric series are batched into a single request.
|
||||
func writeSamples(addr string, labels map[string]string, samples []scheduledSample) error {
|
||||
values := make([]float64, len(samples))
|
||||
timestamps := make([]int64, len(samples))
|
||||
for i, s := range samples {
|
||||
values[i] = s.value
|
||||
timestamps[i] = s.timestamp
|
||||
}
|
||||
|
||||
line := vmImportLine{
|
||||
Metric: labels,
|
||||
Values: values,
|
||||
Timestamps: timestamps,
|
||||
}
|
||||
data, err := json.Marshal(line)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal: %w", err)
|
||||
}
|
||||
data = append(data, '\n')
|
||||
|
||||
url := addr + "/api/v1/import"
|
||||
log.Printf("[write] metric=%v values=%v timestamps_ms=%v",
|
||||
labels, values, timestamps)
|
||||
|
||||
resp, err := http.Post(url, "application/json", bytes.NewReader(data)) //nolint:noctx
|
||||
if err != nil {
|
||||
return fmt.Errorf("POST %s: %w", url, err)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("POST %s: status=%d body=%q", url, resp.StatusCode, body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- report ------------------------------------------------------------------
|
||||
|
||||
// recordSent stores sent samples into the report data store.
|
||||
// It must be called without holding reportMu.
|
||||
func recordSent(seriesName string, samples []scheduledSample, sendAtMs int64) {
|
||||
reportMu.Lock()
|
||||
defer reportMu.Unlock()
|
||||
|
||||
var sd *sentSeriesData
|
||||
for i := range reportSent {
|
||||
if reportSent[i].Name == seriesName {
|
||||
sd = &reportSent[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if sd == nil {
|
||||
reportSent = append(reportSent, sentSeriesData{Name: seriesName})
|
||||
sd = &reportSent[len(reportSent)-1]
|
||||
}
|
||||
|
||||
for _, s := range samples {
|
||||
sd.Points = append(sd.Points, sentDataPoint{
|
||||
TsSec: float64(s.timestamp) / 1000.0,
|
||||
Value: s.value,
|
||||
SentAtSec: float64(sendAtMs) / 1000.0,
|
||||
Delayed: sendAtMs != s.timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// handleReport renders and serves the HTML report page with sent/received charts.
|
||||
func handleReport(w http.ResponseWriter, r *http.Request) {
|
||||
// saYAML is read without mu, consistent with handleSAConfig.
|
||||
saYAMLStr := string(saYAML)
|
||||
|
||||
reportMu.RLock()
|
||||
sentSnap := make([]sentSeriesData, len(reportSent))
|
||||
for i, sd := range reportSent {
|
||||
pts := make([]sentDataPoint, len(sd.Points))
|
||||
copy(pts, sd.Points)
|
||||
sentSnap[i] = sentSeriesData{Name: sd.Name, Points: pts}
|
||||
}
|
||||
recvSnap := make([]recvSeriesData, 0, len(reportRecv))
|
||||
for _, rd := range reportRecv {
|
||||
pts := make([]recvDataPoint, len(rd.Points))
|
||||
copy(pts, rd.Points)
|
||||
recvSnap = append(recvSnap, recvSeriesData{Name: rd.Name, Points: pts})
|
||||
}
|
||||
reportMu.RUnlock()
|
||||
|
||||
sort.Slice(recvSnap, func(i, j int) bool { return recvSnap[i].Name < recvSnap[j].Name })
|
||||
|
||||
sentJSON, _ := json.Marshal(sentSnap)
|
||||
recvJSON, _ := json.Marshal(recvSnap)
|
||||
|
||||
// Single canvas for all sent series combined.
|
||||
var sentCanvas string
|
||||
if len(sentSnap) == 0 {
|
||||
sentCanvas = `<p class="no-data">No data yet — POST /start to run the test.</p>`
|
||||
} else {
|
||||
sentCanvas = `<div class="chart-wrap"><canvas id="sent-all"></canvas></div>`
|
||||
}
|
||||
|
||||
var recvCharts strings.Builder
|
||||
if len(recvSnap) == 0 {
|
||||
recvCharts.WriteString(`<p class="no-data">No data received yet.</p>`)
|
||||
} else {
|
||||
for i, rd := range recvSnap {
|
||||
fmt.Fprintf(&recvCharts,
|
||||
"<h3>%s</h3><div class=\"chart-wrap\"><canvas id=\"recv-%d\"></canvas></div>\n",
|
||||
html.EscapeString(rd.Name), i)
|
||||
}
|
||||
}
|
||||
|
||||
page := reportPageTemplate
|
||||
page = strings.ReplaceAll(page, "__GENERATED_AT__", time.Now().UTC().Format(time.RFC3339))
|
||||
page = strings.ReplaceAll(page, "__SA_YAML__", html.EscapeString(saYAMLStr))
|
||||
page = strings.ReplaceAll(page, "__SENT_COUNT__", strconv.Itoa(len(sentSnap)))
|
||||
page = strings.ReplaceAll(page, "__SENT_CANVAS__", sentCanvas)
|
||||
page = strings.ReplaceAll(page, "__RECV_COUNT__", strconv.Itoa(len(recvSnap)))
|
||||
page = strings.ReplaceAll(page, "__RECV_CHARTS__", recvCharts.String())
|
||||
page = strings.ReplaceAll(page, "__SENT_JSON__", string(sentJSON))
|
||||
page = strings.ReplaceAll(page, "__RECV_JSON__", string(recvJSON))
|
||||
startTs := "0"
|
||||
if !reportT.IsZero() {
|
||||
startTs = fmt.Sprintf("%f", float64(reportT.UnixMilli())/1000.0)
|
||||
}
|
||||
page = strings.ReplaceAll(page, "__START_TS__", startTs)
|
||||
|
||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||
fmt.Fprint(w, page)
|
||||
}
|
||||
@@ -1,281 +0,0 @@
|
||||
package main
|
||||
|
||||
// reportPageTemplate is the HTML skeleton for GET /report.
|
||||
// All __TOKEN__ placeholders are replaced at render time by handleReport.
|
||||
var reportPageTemplate = `<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head><meta charset="UTF-8">
|
||||
<title>SA Tester Report</title>
|
||||
<style>
|
||||
body{font-family:'Segoe UI',Arial,sans-serif;margin:0;padding:20px;background:#f0f2f5;color:#333}
|
||||
h1{margin-bottom:4px}
|
||||
.subtitle{color:#666;font-size:14px;margin-bottom:24px}
|
||||
.card{background:#fff;padding:20px;margin:16px 0;border-radius:8px;box-shadow:0 1px 4px rgba(0,0,0,.12)}
|
||||
h2{margin:0 0 12px;color:#444;font-size:18px;border-bottom:1px solid #eee;padding-bottom:8px}
|
||||
h3{margin:4px 0 8px;font-size:13px;font-family:monospace;color:#555;word-break:break-all}
|
||||
pre{background:#f7f7f7;padding:12px;border-radius:4px;overflow-x:auto;font-size:13px;margin:0}
|
||||
.chart-wrap{position:relative;height:340px;margin-bottom:24px}
|
||||
.no-data{color:#aaa;font-style:italic;font-size:14px}
|
||||
a{color:#1a73e8}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>SA Tester Report</h1>
|
||||
<p class="subtitle">Generated: __GENERATED_AT__ | <a href="/report">Refresh</a></p>
|
||||
<div class="card">
|
||||
<h2>SA Config</h2>
|
||||
<pre>__SA_YAML__</pre>
|
||||
</div>
|
||||
<div class="card">
|
||||
<h2>Sent Series (__SENT_COUNT__ series)</h2>
|
||||
__SENT_CANVAS__
|
||||
</div>
|
||||
<div class="card">
|
||||
<h2>Received Series / SA Output (__RECV_COUNT__ series)</h2>
|
||||
__RECV_CHARTS__
|
||||
</div>
|
||||
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.0/dist/chart.umd.min.js"></script>
|
||||
<script>
|
||||
(function(){
|
||||
var SENT = __SENT_JSON__;
|
||||
var RECV = __RECV_JSON__;
|
||||
|
||||
var PALETTE = [
|
||||
'rgba(54,162,235,0.85)',
|
||||
'rgba(255,99,132,0.85)',
|
||||
'rgba(153,102,255,0.85)',
|
||||
'rgba(255,205,86,0.9)',
|
||||
'rgba(75,192,192,0.85)',
|
||||
];
|
||||
|
||||
function fmtDate(d) {
|
||||
return d.getUTCFullYear() + '-' +
|
||||
(d.getUTCMonth()+1).toString().padStart(2,'0') + '-' +
|
||||
d.getUTCDate().toString().padStart(2,'0');
|
||||
}
|
||||
function fmtTime(d) {
|
||||
return d.getUTCHours().toString().padStart(2,'0') + ':' +
|
||||
d.getUTCMinutes().toString().padStart(2,'0') + ':' +
|
||||
d.getUTCSeconds().toString().padStart(2,'0');
|
||||
}
|
||||
// Tooltip: full datetime with milliseconds.
|
||||
function fmtTs(sec) {
|
||||
var d = new Date(sec * 1000);
|
||||
return fmtDate(d) + ' ' + fmtTime(d) + '.' +
|
||||
d.getUTCMilliseconds().toString().padStart(3,'0');
|
||||
}
|
||||
// Tick label: show date only when it changes (or on the first tick).
|
||||
function fmtTick(v, idx, ticks) {
|
||||
var d = new Date(v * 1000);
|
||||
var dateStr = fmtDate(d);
|
||||
var timeStr = fmtTime(d);
|
||||
if (idx === 0) { return timeStr + ' ' + dateStr; }
|
||||
var prev = new Date(ticks[idx - 1].value * 1000);
|
||||
if (fmtDate(prev) !== dateStr) { return timeStr + ' ' + dateStr; }
|
||||
return timeStr;
|
||||
}
|
||||
|
||||
// Plugin: draw dashed delay-span lines directly on the canvas so they are
|
||||
// owned by their series dataset and toggle with it.
|
||||
var delaySpanPlugin = {
|
||||
id: 'delaySpan',
|
||||
afterDatasetsDraw: function(chart) {
|
||||
var ctx2 = chart.ctx;
|
||||
chart.data.datasets.forEach(function(ds, dsi) {
|
||||
if (!chart.isDatasetVisible(dsi)) return;
|
||||
var meta = chart.getDatasetMeta(dsi);
|
||||
ds.data.forEach(function(pt, pi) {
|
||||
if (!pt || !pt.delayed) return;
|
||||
var el = meta.data[pi];
|
||||
if (!el) return;
|
||||
var xScale = chart.scales.x;
|
||||
var yScale = chart.scales.y;
|
||||
var x1 = el.x;
|
||||
var x2 = xScale.getPixelForValue(pt.sentAt);
|
||||
var y = el.y;
|
||||
ctx2.save();
|
||||
ctx2.beginPath();
|
||||
ctx2.setLineDash([5, 4]);
|
||||
ctx2.strokeStyle = 'rgba(255,159,64,0.7)';
|
||||
ctx2.lineWidth = 2;
|
||||
ctx2.moveTo(x1, y);
|
||||
ctx2.lineTo(x2, y);
|
||||
ctx2.stroke();
|
||||
// Arrow head at sent-at end.
|
||||
var dir = x2 > x1 ? 1 : -1;
|
||||
ctx2.setLineDash([]);
|
||||
ctx2.beginPath();
|
||||
ctx2.moveTo(x2, y);
|
||||
ctx2.lineTo(x2 - dir * 8, y - 5);
|
||||
ctx2.lineTo(x2 - dir * 8, y + 5);
|
||||
ctx2.closePath();
|
||||
ctx2.fillStyle = 'rgba(255,159,64,0.7)';
|
||||
ctx2.fill();
|
||||
ctx2.restore();
|
||||
});
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
// x-axis always starts from when /start was called.
|
||||
var START_TS = __START_TS__; // unix seconds, injected by Go
|
||||
|
||||
// Compute global x range: min is pinned to START_TS; max comes from data.
|
||||
var xMax = -Infinity;
|
||||
SENT.forEach(function(s) {
|
||||
s.points.forEach(function(p) {
|
||||
if (!p) return;
|
||||
if (p.x > xMax) xMax = p.x;
|
||||
if (p.sentAt !== undefined && p.sentAt > xMax) xMax = p.sentAt;
|
||||
});
|
||||
});
|
||||
RECV.forEach(function(s) {
|
||||
s.points.forEach(function(p) {
|
||||
if (!p) return;
|
||||
if (p.x > xMax) xMax = p.x;
|
||||
});
|
||||
});
|
||||
// Add a 5 % right-side margin; left side is pinned exactly to START_TS.
|
||||
var xPad = xMax === -Infinity ? 1 : (xMax - START_TS) * 0.05 || 1;
|
||||
var xRange = { min: START_TS, max: (xMax === -Infinity ? START_TS + 60 : xMax + xPad) };
|
||||
|
||||
var sentEl = document.getElementById('sent-all');
|
||||
if (sentEl && SENT.length > 0) {
|
||||
var datasets = [];
|
||||
SENT.forEach(function(s, si) {
|
||||
var col = PALETTE[si % PALETTE.length];
|
||||
var delayCol = 'rgba(255,159,64,0.9)';
|
||||
var pts = s.points.filter(function(p){ return p !== null; });
|
||||
if (pts.length === 0) return;
|
||||
// Attach full metadata (delayed, sentAt) to each chart point so the
|
||||
// plugin and tooltip can read it without separate datasets.
|
||||
datasets.push({
|
||||
type: 'scatter', label: s.name,
|
||||
data: pts.map(function(p){
|
||||
return {x: p.x, y: p.y, delayed: p.delayed, sentAt: p.sentAt};
|
||||
}),
|
||||
backgroundColor: pts.map(function(p){ return p.delayed ? delayCol : col; }),
|
||||
pointStyle: pts.map(function(p){ return p.delayed ? 'triangle' : 'circle'; }),
|
||||
pointRadius: pts.map(function(p){ return p.delayed ? 9 : 7; }),
|
||||
});
|
||||
});
|
||||
new Chart(sentEl, {
|
||||
type: 'scatter',
|
||||
data: { datasets: datasets },
|
||||
plugins: [delaySpanPlugin],
|
||||
options: {
|
||||
responsive: true, maintainAspectRatio: false,
|
||||
plugins: {
|
||||
legend: {
|
||||
position: 'bottom',
|
||||
onClick: function(e, legendItem, legend) {
|
||||
var chart = legend.chart;
|
||||
var idx = legendItem.datasetIndex;
|
||||
var meta = chart.getDatasetMeta(idx);
|
||||
var allOthersHidden = chart.data.datasets.every(function(_, i) {
|
||||
return i === idx || !chart.isDatasetVisible(i);
|
||||
});
|
||||
if (allOthersHidden && !meta.hidden) {
|
||||
// Already solo — restore all.
|
||||
chart.data.datasets.forEach(function(_, i) { chart.show(i); });
|
||||
} else {
|
||||
// Solo this series.
|
||||
chart.data.datasets.forEach(function(_, i) { chart.hide(i); });
|
||||
chart.show(idx);
|
||||
}
|
||||
},
|
||||
},
|
||||
tooltip: { callbacks: { label: function(ctx){
|
||||
var p = ctx.raw;
|
||||
if (!p) return '';
|
||||
var msg = 'value=' + p.y + ' ts=' + fmtTs(p.x);
|
||||
if (p.delayed) {
|
||||
var delaySec = Math.round((p.sentAt - p.x) * 10) / 10;
|
||||
msg += ' \u2192 sent at ' + fmtTs(p.sentAt) + ' (+' + delaySec + 's)';
|
||||
}
|
||||
return msg;
|
||||
}}},
|
||||
},
|
||||
scales: {
|
||||
x: Object.assign({ type: 'linear',
|
||||
title: { display: true, text: 'Unix timestamp (seconds UTC)' },
|
||||
ticks: { callback: function(v, idx, ticks) { return fmtTick(v, idx, ticks); }, maxRotation: 35, minRotation: 35 } },
|
||||
xRange),
|
||||
y: { title: { display: true, text: 'value' } },
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Plugin: highlight all received points that share the hovered timestamp.
|
||||
var sameTimestampPlugin = {
|
||||
id: 'sameTimestamp',
|
||||
afterDraw: function(chart) {
|
||||
if (!chart._hoveredTs) return;
|
||||
var ctx2 = chart.ctx;
|
||||
var ts = chart._hoveredTs;
|
||||
chart.data.datasets.forEach(function(ds, dsi) {
|
||||
var meta = chart.getDatasetMeta(dsi);
|
||||
ds.data.forEach(function(pt, pi) {
|
||||
if (!pt || pt.x !== ts) return;
|
||||
var el = meta.data[pi];
|
||||
if (!el) return;
|
||||
ctx2.save();
|
||||
ctx2.beginPath();
|
||||
ctx2.arc(el.x, el.y, (el.options.radius || 6) + 5, 0, 2 * Math.PI);
|
||||
ctx2.strokeStyle = 'rgba(75,192,75,0.9)';
|
||||
ctx2.lineWidth = 2;
|
||||
ctx2.stroke();
|
||||
ctx2.restore();
|
||||
});
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
RECV.forEach(function(s, i) {
|
||||
var el = document.getElementById('recv-' + i);
|
||||
if (!el) return;
|
||||
var pts = s.points
|
||||
.filter(function(p){ return p !== null && p !== undefined && typeof p.x === 'number'; })
|
||||
.sort(function(a, b){ return a.x - b.x; });
|
||||
var recvChart = new Chart(el, {
|
||||
type: 'scatter',
|
||||
data: { datasets: [{
|
||||
label: s.name, data: pts,
|
||||
backgroundColor: 'rgba(75,192,75,0.85)',
|
||||
pointRadius: 6, pointStyle: 'circle',
|
||||
}]},
|
||||
plugins: [sameTimestampPlugin],
|
||||
options: {
|
||||
responsive: true, maintainAspectRatio: false,
|
||||
plugins: {
|
||||
legend: { display: false },
|
||||
tooltip: { callbacks: { label: function(ctx){
|
||||
var p = ctx.raw;
|
||||
if (p === null || p === undefined) return '';
|
||||
return 'value=' + p.y + ' ts=' + fmtTs(p.x);
|
||||
}}},
|
||||
},
|
||||
scales: {
|
||||
x: Object.assign({ type: 'linear',
|
||||
title: { display: true, text: 'Unix timestamp (seconds UTC)' },
|
||||
ticks: { callback: function(v, idx, ticks) { return fmtTick(v, idx, ticks); }, maxRotation: 35, minRotation: 35 } },
|
||||
xRange),
|
||||
y: { title: { display: true, text: 'value' } },
|
||||
},
|
||||
onHover: function(evt, active) {
|
||||
var ts = (active && active.length > 0)
|
||||
? recvChart.data.datasets[active[0].datasetIndex].data[active[0].index].x
|
||||
: null;
|
||||
if (recvChart._hoveredTs !== ts) {
|
||||
recvChart._hoveredTs = ts;
|
||||
recvChart.draw();
|
||||
}
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
})();
|
||||
</script>
|
||||
</body>
|
||||
</html>`
|
||||
@@ -173,9 +173,9 @@ func (r *Rule) String() string {
|
||||
if r.Alert != "" {
|
||||
ruleType = "alerting"
|
||||
}
|
||||
b := strings.Builder{}
|
||||
b.WriteString(fmt.Sprintf("%s rule %q", ruleType, r.Name()))
|
||||
b.WriteString(fmt.Sprintf("; expr: %q", r.Expr))
|
||||
var b strings.Builder
|
||||
fmt.Fprintf(&b, "%s rule %q", ruleType, r.Name())
|
||||
fmt.Fprintf(&b, "; expr: %q", r.Expr)
|
||||
|
||||
kv := sortMap(r.Labels)
|
||||
for i := range kv {
|
||||
|
||||
1511
app/vmui/packages/vmui/package-lock.json
generated
1511
app/vmui/packages/vmui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -21,16 +21,16 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"classnames": "^2.5.1",
|
||||
"dayjs": "^1.11.20",
|
||||
"dayjs": "^1.11.21",
|
||||
"lodash.debounce": "^4.0.8",
|
||||
"marked": "^18.0.2",
|
||||
"preact": "^10.29.1",
|
||||
"qs": "^6.15.1",
|
||||
"marked": "^18.0.5",
|
||||
"preact": "^10.29.2",
|
||||
"qs": "^6.15.2",
|
||||
"react-input-mask": "^2.0.4",
|
||||
"react-router-dom": "^7.14.1",
|
||||
"react-router-dom": "^7.17.0",
|
||||
"uplot": "^1.6.32",
|
||||
"vite": "^8.0.8",
|
||||
"web-vitals": "^5.2.0"
|
||||
"vite": "^8.0.16",
|
||||
"web-vitals": "^5.3.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@eslint/eslintrc": "^3.3.5",
|
||||
@@ -39,24 +39,24 @@
|
||||
"@testing-library/jest-dom": "^6.9.1",
|
||||
"@testing-library/preact": "^3.2.4",
|
||||
"@types/lodash.debounce": "^4.0.9",
|
||||
"@types/node": "^25.6.0",
|
||||
"@types/qs": "^6.15.0",
|
||||
"@types/react": "^19.2.14",
|
||||
"@types/node": "^25.9.2",
|
||||
"@types/qs": "^6.15.1",
|
||||
"@types/react": "^19.2.17",
|
||||
"@types/react-input-mask": "^3.0.6",
|
||||
"@types/react-router-dom": "^5.3.3",
|
||||
"@typescript-eslint/eslint-plugin": "^8.58.2",
|
||||
"@typescript-eslint/parser": "^8.58.2",
|
||||
"@typescript-eslint/eslint-plugin": "^8.61.0",
|
||||
"@typescript-eslint/parser": "^8.61.0",
|
||||
"cross-env": "^10.1.0",
|
||||
"eslint": "^9.39.2",
|
||||
"eslint-plugin-react": "^7.37.5",
|
||||
"eslint-plugin-unused-imports": "^4.4.1",
|
||||
"globals": "^17.5.0",
|
||||
"http-proxy-middleware": "^3.0.5",
|
||||
"jsdom": "^29.0.2",
|
||||
"postcss": "^8.5.10",
|
||||
"sass-embedded": "^1.99.0",
|
||||
"typescript": "^6.0.2",
|
||||
"vitest": "^4.1.4"
|
||||
"globals": "^17.6.0",
|
||||
"http-proxy-middleware": "^4.1.0",
|
||||
"jsdom": "^29.1.1",
|
||||
"postcss": "^8.5.15",
|
||||
"sass-embedded": "^1.100.0",
|
||||
"typescript": "^6.0.3",
|
||||
"vitest": "^4.1.8"
|
||||
},
|
||||
"browserslist": {
|
||||
"production": [
|
||||
|
||||
@@ -26,6 +26,9 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Released at 2026-06-08
|
||||
|
||||
@@ -667,11 +667,11 @@ func TestScrapeWorkScrapeInternalStreamConcurrency(t *testing.T) {
|
||||
}
|
||||
|
||||
generateScrape := func(n int) string {
|
||||
w := strings.Builder{}
|
||||
var w strings.Builder
|
||||
for i := range n {
|
||||
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
|
||||
fmt.Fprintf(&w, "fooooo_%d 1\n", i)
|
||||
if i%100 == 0 {
|
||||
w.WriteString(fmt.Sprintf("# HELP fooooo_%d This is a test\n", i))
|
||||
fmt.Fprintf(&w, "# HELP fooooo_%d This is a test\n", i)
|
||||
}
|
||||
}
|
||||
return w.String()
|
||||
@@ -1005,9 +1005,9 @@ func TestSendStaleSeries(t *testing.T) {
|
||||
}
|
||||
}
|
||||
generateScrape := func(n int) string {
|
||||
w := strings.Builder{}
|
||||
var w strings.Builder
|
||||
for i := range n {
|
||||
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
|
||||
fmt.Fprintf(&w, "foo_%d 1\n", i)
|
||||
}
|
||||
return w.String()
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@ type avgAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
av.sum += sample.value
|
||||
av.count++
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@ type countSamplesAggrValue struct {
|
||||
count uint64
|
||||
}
|
||||
|
||||
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
|
||||
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
av.count++
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,10 @@ type countSeriesAggrValue struct {
|
||||
samples map[uint64]struct{}
|
||||
}
|
||||
|
||||
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
|
||||
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
// Count unique hashes over the keys instead of unique key values.
|
||||
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
|
||||
|
||||
@@ -45,6 +45,7 @@ type dedupAggrShardNopad struct {
|
||||
type dedupAggrSample struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
stateOnly bool
|
||||
}
|
||||
|
||||
func newDedupAggr() *dedupAggr {
|
||||
@@ -189,6 +190,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
s = &samplesBuf[len(samplesBuf)-1]
|
||||
s.value = sample.value
|
||||
s.timestamp = sample.timestamp
|
||||
s.stateOnly = sample.stateOnly
|
||||
|
||||
key := bytesutil.InternString(sample.key)
|
||||
state.m[key] = s
|
||||
@@ -197,28 +199,33 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
|
||||
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
||||
continue
|
||||
}
|
||||
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
||||
var newWins bool
|
||||
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
|
||||
if newWins {
|
||||
s.stateOnly = sample.stateOnly
|
||||
}
|
||||
}
|
||||
state.samplesBuf = samplesBuf
|
||||
}
|
||||
|
||||
// deduplicateSamples returns deduplicated timestamp and value results.
|
||||
// deduplicateSamples returns deduplicated timestamp and value results,
|
||||
// along with a boolean indicating whether the new sample won.
|
||||
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
|
||||
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
|
||||
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
|
||||
if newT > oldT {
|
||||
return newT, newV
|
||||
return newT, newV, true
|
||||
}
|
||||
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
|
||||
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
|
||||
if newT == oldT {
|
||||
if decimal.IsStaleNaN(oldV) {
|
||||
return newT, newV
|
||||
return newT, newV, true
|
||||
}
|
||||
if newV > oldV {
|
||||
return newT, newV
|
||||
return newT, newV, true
|
||||
}
|
||||
}
|
||||
return oldT, oldV
|
||||
return oldT, oldV, false
|
||||
}
|
||||
|
||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
||||
@@ -250,6 +257,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
|
||||
key: key,
|
||||
value: s.value,
|
||||
timestamp: s.timestamp,
|
||||
stateOnly: s.stateOnly,
|
||||
})
|
||||
|
||||
// Limit the number of samples per each flush in order to limit memory usage.
|
||||
|
||||
@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
|
||||
}
|
||||
da.pushSamples(samples, 0, false)
|
||||
|
||||
if n := da.sizeBytes(); n > 5_000_000 {
|
||||
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
|
||||
if n := da.sizeBytes(); n > 6_000_000 {
|
||||
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
|
||||
}
|
||||
if n := da.itemsCount(); n != seriesCount {
|
||||
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
|
||||
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
|
||||
func TestDeduplicateSamples(t *testing.T) {
|
||||
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
|
||||
t.Helper()
|
||||
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
|
||||
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
|
||||
if dedupT != expectedT || dedupV != expectedV {
|
||||
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
|
||||
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)
|
||||
|
||||
@@ -231,6 +231,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
|
||||
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
|
||||
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
|
||||
}
|
||||
deadlineTime = deadlineTime.Add(d.interval)
|
||||
for time.Now().After(deadlineTime) {
|
||||
deadlineTime = deadlineTime.Add(d.interval)
|
||||
}
|
||||
|
||||
@@ -11,6 +11,9 @@ type histogramBucketAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
av.h.Update(sample.value)
|
||||
}
|
||||
|
||||
|
||||
109
lib/streamaggr/increase.go
Normal file
109
lib/streamaggr/increase.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
type increaseLastValue struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type increaseAggrValueShared struct {
|
||||
lastValues map[string]increaseLastValue
|
||||
}
|
||||
|
||||
type increaseAggrValue struct {
|
||||
total float64
|
||||
shared *increaseAggrValueShared
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
|
||||
ac := c.(*increaseAggrConfig)
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
|
||||
lv, ok := av.shared.lastValues[key]
|
||||
if ok || keepFirstSample {
|
||||
if sample.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
return
|
||||
}
|
||||
if !sample.stateOnly {
|
||||
if sample.value >= lv.value {
|
||||
av.total += sample.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
lv.value = sample.value
|
||||
lv.timestamp = sample.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
key = bytesutil.InternString(key)
|
||||
av.shared.lastValues[key] = lv
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*increaseAggrConfig)
|
||||
suffix := ac.getSuffix()
|
||||
total := av.total
|
||||
av.total = 0
|
||||
lvs := av.shared.lastValues
|
||||
for lk, lv := range lvs {
|
||||
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
|
||||
delete(lvs, lk)
|
||||
}
|
||||
}
|
||||
ctx.appendSeries(key, suffix, total)
|
||||
}
|
||||
|
||||
func (av *increaseAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &increaseAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
return cfg
|
||||
}
|
||||
|
||||
type increaseAggrConfig struct {
|
||||
keepFirstSample bool
|
||||
|
||||
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
counterResetsTotal *metrics.Counter
|
||||
}
|
||||
|
||||
func (*increaseAggrConfig) getValue(s any) aggrValue {
|
||||
var shared *increaseAggrValueShared
|
||||
if s == nil {
|
||||
shared = &increaseAggrValueShared{
|
||||
lastValues: make(map[string]increaseLastValue),
|
||||
}
|
||||
} else {
|
||||
shared = s.(*increaseAggrValueShared)
|
||||
}
|
||||
return &increaseAggrValue{
|
||||
shared: shared,
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *increaseAggrConfig) getSuffix() string {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
@@ -6,6 +6,9 @@ type lastAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
if sample.timestamp >= av.timestamp {
|
||||
av.last = sample.value
|
||||
av.timestamp = sample.timestamp
|
||||
|
||||
@@ -6,6 +6,9 @@ type maxAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
if sample.value > av.max || !av.defined {
|
||||
av.max = sample.value
|
||||
}
|
||||
|
||||
@@ -6,6 +6,9 @@ type minAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
if sample.value < av.min || !av.defined {
|
||||
av.min = sample.value
|
||||
}
|
||||
|
||||
@@ -13,6 +13,9 @@ type quantilesAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
if av.h == nil {
|
||||
av.h = histogram.GetFast()
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ var rateAggrStateValuePool sync.Pool
|
||||
|
||||
func putRateAggrStateValue(v *rateAggrStateValue) {
|
||||
v.timestamp = 0
|
||||
v.lastTimestamp = 0
|
||||
v.increase = 0
|
||||
rateAggrStateValuePool.Put(v)
|
||||
}
|
||||
@@ -88,6 +89,10 @@ type rateAggrStateValue struct {
|
||||
// increase stores cumulative increase for the current time series on the current aggregation interval
|
||||
increase float64
|
||||
timestamp int64
|
||||
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
|
||||
// It is used for out-of-order detection, while timestamp (above) is only updated by
|
||||
// non-state-only samples and is used for rate calculation.
|
||||
lastTimestamp int64
|
||||
}
|
||||
|
||||
type rateAggrValue struct {
|
||||
@@ -101,16 +106,20 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
|
||||
sv, ok := av.shared[key]
|
||||
if ok {
|
||||
state = sv.getState(av.isGreen)
|
||||
if sample.timestamp < state.timestamp {
|
||||
if sample.timestamp < state.lastTimestamp {
|
||||
// Skip out of order sample
|
||||
return
|
||||
}
|
||||
if sample.value >= sv.value {
|
||||
state.increase += sample.value - sv.value
|
||||
if !sample.stateOnly {
|
||||
if sample.value >= sv.value {
|
||||
state.increase += sample.value - sv.value
|
||||
} else {
|
||||
// counter reset
|
||||
state.increase += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
} else {
|
||||
// counter reset
|
||||
state.increase += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
sv.prevTimestamp = sample.timestamp
|
||||
}
|
||||
} else {
|
||||
sv = getRateAggrSharedValue(av.isGreen)
|
||||
@@ -121,7 +130,10 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
|
||||
}
|
||||
sv.value = sample.value
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
state.timestamp = sample.timestamp
|
||||
state.lastTimestamp = sample.timestamp
|
||||
if !sample.stateOnly {
|
||||
state.timestamp = sample.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
|
||||
@@ -12,6 +12,9 @@ type stdAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
av.count++
|
||||
avg := av.avg + (sample.value-av.avg)/av.count
|
||||
av.q += (sample.value - av.avg) * (sample.value - avg)
|
||||
|
||||
@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "histogram_bucket":
|
||||
return newHistogramBucketAggrConfig(useSharedState), nil
|
||||
case "increase":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "increase_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
|
||||
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "last":
|
||||
return newLastAggrConfig(), nil
|
||||
case "max":
|
||||
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
|
||||
case "sum_samples":
|
||||
return newSumSamplesAggrConfig(), nil
|
||||
case "total":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
|
||||
case "total_prometheus":
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
|
||||
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
|
||||
case "unique_samples":
|
||||
return newUniqueSamplesAggrConfig(), nil
|
||||
default:
|
||||
@@ -845,6 +845,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
} else {
|
||||
a.flush(pf, flushTime, cs, false)
|
||||
}
|
||||
flushTime = flushTime.Add(a.interval)
|
||||
for time.Now().After(flushTime) {
|
||||
flushTime = flushTime.Add(a.interval)
|
||||
}
|
||||
@@ -1005,26 +1006,28 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
a.ignoredNaNSamples.Inc()
|
||||
continue
|
||||
}
|
||||
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
|
||||
// Skip old samples outside the current aggregation interval
|
||||
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
|
||||
if stateOnly {
|
||||
a.ignoredOldSamples.Inc()
|
||||
continue
|
||||
}
|
||||
lagMsec := nowMsec - s.Timestamp
|
||||
if lagMsec > maxLagMsec {
|
||||
maxLagMsec = lagMsec
|
||||
} else {
|
||||
lagMsec := nowMsec - s.Timestamp
|
||||
if lagMsec > maxLagMsec {
|
||||
maxLagMsec = lagMsec
|
||||
}
|
||||
}
|
||||
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
|
||||
ctx.green = append(ctx.green, pushSample{
|
||||
key: key,
|
||||
value: s.Value,
|
||||
timestamp: s.Timestamp,
|
||||
stateOnly: stateOnly,
|
||||
})
|
||||
} else {
|
||||
ctx.blue = append(ctx.blue, pushSample{
|
||||
key: key,
|
||||
value: s.Value,
|
||||
timestamp: s.Timestamp,
|
||||
stateOnly: stateOnly,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1098,6 +1101,10 @@ type pushSample struct {
|
||||
key string
|
||||
value float64
|
||||
timestamp int64
|
||||
|
||||
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
|
||||
// (total, rate, increase) but do not contribute to the aggregation output.
|
||||
stateOnly bool
|
||||
}
|
||||
|
||||
func getPushCtx() *pushCtx {
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
//go:build synctest
|
||||
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
@@ -485,10 +483,8 @@ foo 3.3
|
||||
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
|
||||
foo:1m_count_series{bar="baz"} 1
|
||||
foo:1m_sum_samples 0
|
||||
foo:1m_sum_samples 0
|
||||
foo:1m_sum_samples 4.3
|
||||
foo:1m_sum_samples{bar="baz"} 0
|
||||
foo:1m_sum_samples{bar="baz"} 0
|
||||
foo:1m_sum_samples{bar="baz"} 2
|
||||
foo:5m_by_bar_sum_samples 4.3
|
||||
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
||||
@@ -692,23 +688,33 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
|
||||
outputs: [rate_sum, rate_avg]
|
||||
`, "11111")
|
||||
|
||||
// test rate_sum and rate_avg, when two aggregation intervals are empty
|
||||
// test rate_sum and rate_avg, when two aggregation intervals are empty.
|
||||
// abc=777 arrives slightly before the start of each interval (-10ms) but still
|
||||
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
|
||||
f([]string{`
|
||||
foo{abc="123", cde="1"} 2
|
||||
foo{abc="456", cde="1"} 8
|
||||
foo{abc="777", cde="1"} 9 -10
|
||||
foo{abc="123", cde="1"} 1
|
||||
foo{abc="123", cde="1"} 2 1
|
||||
foo{abc="456", cde="1"} 7
|
||||
foo{abc="456", cde="1"} 8 1
|
||||
foo{abc="777", cde="1"} 8
|
||||
foo{abc="777", cde="1"} 9 1
|
||||
`, ``, ``, `
|
||||
foo{abc="123", cde="1"} 20
|
||||
foo{abc="123", cde="1"} 19
|
||||
foo{abc="123", cde="1"} 20 1
|
||||
foo{abc="456", cde="1"} 26
|
||||
foo{abc="777", cde="1"} 27 -10
|
||||
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 0.1
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 0.2
|
||||
foo{abc="456", cde="1"} 27 1
|
||||
foo{abc="777", cde="1"} 27
|
||||
foo{abc="777", cde="1"} 28 1
|
||||
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
|
||||
foo:1m_by_cde_rate_avg{cde="1"} 1
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 3
|
||||
foo:1m_by_cde_rate_sum{cde="1"} 3
|
||||
`, `
|
||||
- interval: 1m
|
||||
by: [cde]
|
||||
outputs: [rate_sum, rate_avg]
|
||||
enable_windows: true
|
||||
`, "111111")
|
||||
`, "111111111111")
|
||||
|
||||
// rate_sum and rate_avg with duplicated events
|
||||
f([]string{`
|
||||
@@ -803,4 +809,55 @@ foo:1m_sum_samples{baz="qwe"} 10
|
||||
dedup_interval: 30s
|
||||
outputs: [sum_samples]
|
||||
`, "11111111")
|
||||
|
||||
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
|
||||
// update the state reference without contributing to the interval total, so the subsequent
|
||||
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
|
||||
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
|
||||
f([]string{`
|
||||
foo 100
|
||||
`, `
|
||||
foo 150 -30
|
||||
foo 250
|
||||
`}, time.Minute, `foo:1m_total 100
|
||||
foo:1m_total 200
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [total]
|
||||
ignore_old_samples: true
|
||||
ignore_first_sample_interval: 0s
|
||||
`, "111")
|
||||
|
||||
// increase with ignore_old_samples: same correctness check for increase output.
|
||||
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
|
||||
f([]string{`
|
||||
foo 100
|
||||
`, `
|
||||
foo 150 -30
|
||||
foo 250
|
||||
`}, time.Minute, `foo:1m_increase 100
|
||||
foo:1m_increase 100
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [increase]
|
||||
ignore_old_samples: true
|
||||
ignore_first_sample_interval: 0s
|
||||
`, "111")
|
||||
|
||||
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
|
||||
// and the winning stateOnly sample's timestamp is used as the denominator start.
|
||||
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
|
||||
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
|
||||
f([]string{`
|
||||
foo 100
|
||||
`, `
|
||||
foo 150 -30
|
||||
foo 120 -40
|
||||
foo 200
|
||||
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [rate_sum]
|
||||
ignore_old_samples: true
|
||||
`, "1111")
|
||||
}
|
||||
|
||||
@@ -5,6 +5,9 @@ type sumSamplesAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
av.sum += sample.value
|
||||
}
|
||||
|
||||
|
||||
@@ -36,12 +36,14 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
// Skip out of order sample
|
||||
return
|
||||
}
|
||||
if sample.value >= lv.value {
|
||||
av.total += sample.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
if !sample.stateOnly {
|
||||
if sample.value >= lv.value {
|
||||
av.total += sample.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
av.total += sample.value
|
||||
ac.counterResetsTotal.Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
lv.value = sample.value
|
||||
@@ -54,7 +56,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
|
||||
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
|
||||
ac := c.(*totalAggrConfig)
|
||||
suffix := ac.getSuffix()
|
||||
// check for stale entries
|
||||
total := av.shared.total + av.total
|
||||
av.total = 0
|
||||
lvs := av.shared.lastValues
|
||||
@@ -63,9 +64,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
|
||||
delete(lvs, lk)
|
||||
}
|
||||
}
|
||||
if ac.resetTotalOnFlush {
|
||||
av.shared.total = 0
|
||||
} else if math.Abs(total) >= (1 << 53) {
|
||||
if math.Abs(total) >= (1 << 53) {
|
||||
// It is time to reset the entry, since it starts losing float64 precision
|
||||
av.shared.total = 0
|
||||
} else {
|
||||
@@ -78,11 +77,10 @@ func (av *totalAggrValue) state() any {
|
||||
return av.shared
|
||||
}
|
||||
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
|
||||
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
|
||||
cfg := &totalAggrConfig{
|
||||
keepFirstSample: keepFirstSample,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
}
|
||||
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
|
||||
@@ -90,8 +88,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
|
||||
}
|
||||
|
||||
type totalAggrConfig struct {
|
||||
resetTotalOnFlush bool
|
||||
|
||||
// Whether to take into account the first sample in new time series when calculating the output value.
|
||||
keepFirstSample bool
|
||||
|
||||
@@ -117,12 +113,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
|
||||
}
|
||||
|
||||
func (ac *totalAggrConfig) getSuffix() string {
|
||||
if ac.resetTotalOnFlush {
|
||||
if ac.keepFirstSample {
|
||||
return "increase"
|
||||
}
|
||||
return "increase_prometheus"
|
||||
}
|
||||
if ac.keepFirstSample {
|
||||
return "total"
|
||||
}
|
||||
|
||||
@@ -5,6 +5,9 @@ type uniqueSamplesAggrValue struct {
|
||||
}
|
||||
|
||||
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
||||
if sample.stateOnly {
|
||||
return
|
||||
}
|
||||
if _, ok := av.samples[sample.value]; !ok {
|
||||
av.samples[sample.value] = struct{}{}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user