Compare commits

...

15 Commits

Author SHA1 Message Date
Alexander Marshalov
26144f6d87 Merge branch 'master' into streaming-aggregation-ui 2024-01-18 10:33:14 +01:00
Alexander Marshalov
1fa619170b Merge branch 'master' into streaming-aggregation-ui 2023-12-19 14:58:36 +01:00
Alexander Marshalov
cecee0e05f Merge branch 'master' into streaming-aggregation-ui 2023-12-18 18:12:03 +01:00
Alexander Marshalov
3cb5f88325 Merge branch 'master' into streaming-aggregation-ui 2023-12-13 08:01:36 +01:00
Alexander Marshalov
99384fce4f Fix linter comments 2023-12-12 16:59:31 +01:00
Alexander Marshalov
f7ef3d4aa1 Fix linter comments 2023-12-12 16:46:52 +01:00
Alexander Marshalov
92cadfff65 Fix for PR 2023-12-12 16:34:07 +01:00
Alexander Marshalov
8f946a927c Fix image format 2023-12-12 16:16:22 +01:00
Alexander Marshalov
075bb8748f Merge branch 'master' into streaming-aggregation-ui
# Conflicts:
#	app/vmagent/main.go
2023-12-12 16:05:00 +01:00
Alexander Marshalov
fb9a9d1463 Prepare for PR - refactoring 2023-12-12 16:02:12 +01:00
Alexander Marshalov
715bf3af82 Prepare for PR - refactoring 2023-12-11 19:53:28 +01:00
Alexander Marshalov
8725b5e049 Streaming aggregation branch clean 2023-11-22 15:11:13 +01:00
Alexander Marshalov
5bc3488538 Merge branch 'master' into streaming-aggregation 2023-10-26 16:54:09 +02:00
Alexander Marshalov
1cd6232537 WIP 2023-10-23 13:14:48 +02:00
Alexander Marshalov
ed1bef0e2d WIP 2023-10-18 14:48:49 +02:00
25 changed files with 1427 additions and 92 deletions

View File

@@ -119,6 +119,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
{"expand-with-exprs", "WITH expressions' tutorial"},
{"api/v1/targets", "advanced information about discovered targets in JSON format"},
{"config", "-promscrape.config contents"},
{"stream-agg", "streaming aggregation status"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
{"api/v1/status/tsdb", "tsdb status page"},

View File

@@ -41,6 +41,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
@@ -229,6 +230,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
{"metric-relabel-debug", "debug metric relabeling"},
{"api/v1/targets", "advanced information about discovered targets in JSON format"},
{"config", "-promscrape.config contents"},
{"stream-agg", "streaming aggregation status"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
{"-/reload", "reload configuration"},
@@ -444,6 +446,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
return true
case "/stream-agg":
streamaggr.WriteHumanReadableState(w, r, remotewrite.GetAggregators())
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy)

View File

@@ -946,3 +946,24 @@ func CheckStreamAggrConfigs() error {
}
return nil
}
// GetAggregators returns aggregators for all the configured remote writes.
func GetAggregators() map[string]*streamaggr.Aggregators {
var result = map[string]*streamaggr.Aggregators{}
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for tenant, rwctxs := range rwctxsMap {
for rwNum, rw := range rwctxs {
result[fmt.Sprintf("rw %d for tenant %v:%v", rwNum, tenant.AccountID, tenant.ProjectID)] = rw.sas.Load()
}
}
rwctxsMapLock.Unlock()
} else {
for rwNum, rw := range rwctxsDefault {
result[fmt.Sprintf("remote write %d", rwNum)] = rw.sas.Load()
}
}
return result
}

View File

@@ -211,3 +211,8 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) {
logger.Errorf("cannot flush aggregate series: %s", err)
}
}
// GetAggregators returns default aggregator for vmsingle.
func GetAggregators() map[string]*streamaggr.Aggregators {
return map[string]*streamaggr.Aggregators{"default": sasGlobal.Load()}
}

View File

@@ -40,6 +40,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
)
var (
@@ -338,6 +339,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusNoContent)
return true
case "/stream-agg":
streamaggr.WriteHumanReadableState(w, r, vminsertCommon.GetAggregators())
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrape config to init targets, configs left: %d", rdy)

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 185 KiB

View File

@@ -702,3 +702,16 @@ so every `vmagent` aggregates data into distinct set of time series. These time
For example, if `vmagent` instances run in Docker or Kubernetes, then you can refer `POD_NAME` or `HOSTNAME` environment variables
as an unique label value per each `vmagent`: `-remoteWrite.label='vmagent=%{HOSTNAME}` . See [these docs](https://docs.victoriametrics.com/#environment-variables)
on how to refer environment variables in VictoriaMetrics components.
## Debugging
It is possible to see the current state of streaming aggregation via `/stream-agg` page
of [vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
On this page you can see the list of active aggregations for each remote write:
<img alt="stream aggregation -> aggregations list" src="stream-aggregation-ui-1.webp">
By clicking on the output name you can see the list of time series for this output with theirs current state:
<img alt="stream aggregation -> aggregation state" src="stream-aggregation-ui-2.webp">

View File

@@ -2,13 +2,17 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
type avgAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type avgStateValue struct {
@@ -18,8 +22,10 @@ type avgStateValue struct {
deleted bool
}
func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
func newAvgAggrState(interval time.Duration) *avgAggrState {
return &avgAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *avgAggrState) pushSample(_, outputKey string, value float64) {
@@ -55,7 +61,9 @@ again:
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -68,7 +76,35 @@ func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, avg)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *avgAggrState) getOutputName() string {
return "avg"
}
func (as *avgAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*avgStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum / float64(value.count),
samplesCount: uint64(value.count),
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,13 +2,17 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples.
type countSamplesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type countSamplesStateValue struct {
@@ -17,8 +21,10 @@ type countSamplesStateValue struct {
deleted bool
}
func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
func newCountSamplesAggrState(interval time.Duration) *countSamplesAggrState {
return &countSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) {
@@ -52,7 +58,9 @@ again:
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -65,7 +73,35 @@ func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *countSamplesAggrState) getOutputName() string {
return "count_samples"
}
func (as *countSamplesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSamplesStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
samplesCount: value.n,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,24 +2,31 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series.
type countSeriesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type countSeriesStateValue struct {
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
samplesCount uint64
deleted bool
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
func newCountSeriesAggrState(interval time.Duration) *countSeriesAggrState {
return &countSeriesAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) {
@@ -49,6 +56,7 @@ again:
sv.countedSeries[inputKey] = struct{}{}
sv.n++
}
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -59,7 +67,9 @@ again:
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -72,7 +82,36 @@ func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *countSeriesAggrState) getOutputName() string {
return "count_series"
}
func (as *countSeriesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSeriesStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -3,30 +3,34 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
stalenessSecs uint64
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp atomic.Uint64
}
type histogramBucketStateValue struct {
mu sync.Mutex
h metrics.Histogram
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
func newHistogramBucketAggrState(interval time.Duration, stalenessInterval time.Duration) *histogramBucketAggrState {
return &histogramBucketAggrState{
stalenessSecs: stalenessSecs,
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
@@ -50,6 +54,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@@ -93,12 +98,46 @@ func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
if !sv.deleted {
key := k.(string)
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
ctx.appendSeriesWithExtraLabel(key, as.getOutputName(), currentTimeMsec, float64(count), "vmrange", vmrange)
})
}
sv.mu.Unlock()
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *histogramBucketAggrState) getOutputName() string {
return "histogram_bucket"
}
func (as *histogramBucketAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
rmetrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*histogramBucketStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
value.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
rmetrics = append(rmetrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName(), prompbmarshal.Label{
Name: "vmrange",
Value: vmrange,
}),
currentValue: float64(count),
samplesCount: value.samplesCount,
})
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: rmetrics,
}
}
func roundDurationToSecs(d time.Duration) uint64 {

View File

@@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@@ -13,12 +14,15 @@ type increaseAggrState struct {
ignoreInputDeadline uint64
stalenessSecs uint64
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type increaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@@ -30,6 +34,7 @@ func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duratio
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
intervalSecs: intervalSecs,
}
}
@@ -69,6 +74,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -122,8 +128,37 @@ func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "increase", currentTimeMsec, increase)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, increase)
}
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *increaseAggrState) getOutputName() string {
return "increase"
}
func (as *increaseAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*increaseStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,23 +2,30 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
type lastAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type lastStateValue struct {
mu sync.Mutex
last float64
deleted bool
mu sync.Mutex
last float64
samplesCount uint64
deleted bool
}
func newLastAggrState() *lastAggrState {
return &lastAggrState{}
func newLastAggrState(interval time.Duration) *lastAggrState {
return &lastAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *lastAggrState) pushSample(_, outputKey string, value float64) {
@@ -42,6 +49,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.last = value
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -52,7 +60,9 @@ again:
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -65,7 +75,35 @@ func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, last)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *lastAggrState) getOutputName() string {
return "last"
}
func (as *lastAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*lastStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.last,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,23 +2,30 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
type maxAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type maxStateValue struct {
mu sync.Mutex
max float64
deleted bool
mu sync.Mutex
max float64
samplesCount uint64
deleted bool
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
func newMaxAggrState(interval time.Duration) *maxAggrState {
return &maxAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *maxAggrState) pushSample(_, outputKey string, value float64) {
@@ -44,6 +51,7 @@ again:
if value > sv.max {
sv.max = value
}
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -54,7 +62,9 @@ again:
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -62,12 +72,41 @@ func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
value := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, value)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *maxAggrState) getOutputName() string {
return "max"
}
func (as *maxAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*maxStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.max,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,23 +2,30 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
type minAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type minStateValue struct {
mu sync.Mutex
min float64
deleted bool
mu sync.Mutex
min float64
samplesCount uint64
deleted bool
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
func newMinAggrState(interval time.Duration) *minAggrState {
return &minAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *minAggrState) pushSample(_, outputKey string, value float64) {
@@ -43,6 +50,7 @@ again:
if !deleted {
if value < sv.min {
sv.min = value
sv.samplesCount++
}
}
sv.mu.Unlock()
@@ -54,7 +62,9 @@ again:
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -62,12 +72,40 @@ func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
value := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, value)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *minAggrState) getOutputName() string {
return "min"
}
func (as *minAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*minStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.min,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -3,28 +3,34 @@ package streamaggr
import (
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/valyala/histogram"
)
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
phis []float64
m sync.Map
phis []float64
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
mu sync.Mutex
h *histogram.Fast
samplesCount uint64
deleted bool
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
func newQuantilesAggrState(interval time.Duration, phis []float64) *quantilesAggrState {
return &quantilesAggrState{
phis: phis,
intervalSecs: roundDurationToSecs(interval),
phis: phis,
}
}
@@ -49,6 +55,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -59,7 +66,9 @@ again:
}
func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
phis := as.phis
var quantiles []float64
@@ -80,8 +89,43 @@ func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
ctx.appendSeriesWithExtraLabel(key, as.getOutputName(), currentTimeMsec, quantile, "quantile", phiStr)
}
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *quantilesAggrState) getOutputName() string {
return "quantiles"
}
func (as *quantilesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
var b []byte
as.m.Range(func(k, v any) bool {
value := v.(*quantilesStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
for i, quantile := range value.h.Quantiles(make([]float64, 0), as.phis) {
b = strconv.AppendFloat(b[:0], as.phis[i], 'g', -1, 64)
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName(), prompbmarshal.Label{
Name: "quantile",
Value: bytesutil.InternBytes(b),
}),
currentValue: quantile,
samplesCount: value.samplesCount,
})
}
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

81
lib/streamaggr/state.go Normal file
View File

@@ -0,0 +1,81 @@
package streamaggr
import (
"fmt"
"net/http"
"net/url"
"strconv"
)
// WriteHumanReadableState writes human-readable state for all aggregations.
func WriteHumanReadableState(w http.ResponseWriter, r *http.Request, rws map[string]*Aggregators) {
rwActive := r.FormValue("rw")
if rwActive == "" {
for key := range rws {
rwActive = key
break
}
}
rw, ok := rws[rwActive]
if !ok {
_, _ = fmt.Fprintf(w, "not found remoteWrite '%v'", rwActive)
w.WriteHeader(http.StatusNotFound)
return
}
aggParam := r.FormValue("agg")
if aggParam == "" {
WriteStreamAggHTML(w, rws, rwActive)
return
}
aggNum, err := strconv.Atoi(aggParam)
if err != nil {
_, _ = fmt.Fprintf(w, "incorrect parameter 'agg': %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
if aggNum >= len(rw.as) {
_, _ = fmt.Fprintf(w, "not found aggregation with num '%v'", aggNum)
w.WriteHeader(http.StatusNotFound)
return
}
agg := rw.as[aggNum]
var as aggrState
output := r.FormValue("output")
for _, a := range agg.aggrStates {
if output == "" {
as = a
break
}
if a.getOutputName() == output {
as = a
break
}
}
if as == nil {
_, _ = fmt.Fprintf(w, "not found output '%v'", output)
w.WriteHeader(http.StatusNotFound)
return
}
limitNum := 1000
limitParam := r.FormValue("limit")
if limitParam != "" {
limitNum, err = strconv.Atoi(limitParam)
if err != nil {
_, _ = fmt.Fprintf(w, "incorrect parameter 'limit': %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}
filter, err := url.QueryUnescape(r.FormValue("filter"))
if err != nil {
_, _ = fmt.Fprintf(w, "incorrect parameter 'filter': %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
WriteStreamAggOutputStateHTML(w, rwActive, aggNum, agg, as, limitNum, filter)
}

246
lib/streamaggr/state.qtpl Normal file
View File

@@ -0,0 +1,246 @@
{% import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
) %}
{% code
const DEFAULT_LIMIT = 1000
%}
{% stripspace %}
{% func StreamAggHTML(rws map[string]*Aggregators, rwActive string) %}
<!DOCTYPE html>
<html lang="en">
<head>
{%= htmlcomponents.CommonHeader() %}
<title>Stream aggregation</title>
</head>
<body>
{%= htmlcomponents.Navbar() %}
<div class="container-fluid">
<div class="row">
<main class="col-12">
<h1>Aggregations</h1>
<hr />
<ul class="nav nav-tabs" id="rw-tab" role="tablist">
{% for rwKey := range rws %}
<li class="nav-item" role="presentation">
<button class="nav-link{%if rwKey==rwActive %}{% space %}active{%endif%}" type="button" role="tab"
onclick="location.href='?rw={%s rwKey %}'">
{%s rwKey %}
</button>
</li>
{% endfor %}
</ul>
<div class="tab-content">
<div class="tab-pane active" role="tabpanel">
<div id="aggregations" class="table-responsive">
<table class="table table-striped table-hover table-bordered table-sm">
<thead>
<tr>
<th scope="col" style="width: 5%">Num</th>
<th scope="col" style="width: 35%">Match</th>
<th scope="col" style="width: 10%">By</th>
<th scope="col" style="width: 10%">Without</a>
<th scope="col" style="width: 40%">Outputs</a>
</tr>
</thead>
<tbody>
{% code aggs := rws[rwActive] %}
{% for an, agg := range aggs.as %}
<tr>
<td>{%d an %}</td>
<td>
<code>{%s agg.match.String() %}</code>
</td>
<td class="labels">
{% for abn, ab := range agg.by %}
{% if abn > 0 %}
<span>, </span>
{% endif %}
<span class="badge bg-secondary">
{%s ab %}
</span>
{% endfor %}
</td>
<td class="labels">
{% for awn, aw := range agg.without %}
{% if awn > 0 %}
<span>, </span>
{% endif %}
<span class="badge bg-secondary">
{%s aw %}
</span>
{% endfor %}
</td>
<td class="labels">
{% for asn, as := range agg.aggrStates %}
{% if asn > 0 %}
<span>, </span>
{% endif %}
<a href="?rw={%s rwActive %}&agg={%d an %}&output={%s as.getOutputName() %}&limit={%d DEFAULT_LIMIT %}">
{%s as.getOutputName() %}
</a>
{% endfor %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</main>
</div>
</div>
</body>
</html>
{% endfunc %}
{% func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) %}
<!DOCTYPE html>
<html lang="en">
<head>
{%= htmlcomponents.CommonHeader() %}
<title>Stream aggregation</title>
</head>
<body>
{%= htmlcomponents.Navbar() %}
<div class="container-fluid">
<div class="row">
<main class="col-12">
{% code
sr := as.getStateRepresentation(agg.suffix)
if filter != "" {
filter = strings.ToLower(filter)
metrics := sr.metrics[:0]
for _, m := range sr.metrics {
if strings.Contains(strings.ToLower(m.metric), filter) {
metrics = append(metrics, m)
}
}
sr.metrics = metrics
}
sort.Slice(sr.metrics, func(i, j int) bool {
return sr.metrics[i].metric < sr.metrics[j].metric
})
if len(sr.metrics) > limit {
sr.metrics = sr.metrics[:limit]
}
%}
<h1>Aggregation state</h1>
<h4> [ <a href="?rw={%s rwActive %}">back to aggregations</a> ] </h3>
<hr />
<h6>
<div class="row container-sm">
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="remote-write" style="width: 200px">Remote write:</span>
<input type="text" class="form-control" aria-label="Remote write" aria-describedby="remote-write" value="{%s rwActive %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="agg-num" style="width: 200px">Aggregation num:</span>
<input type="number" class="form-control" aria-label="Aggregation num" aria-describedby="agg-num" value="{%d aggNum %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="match" style="width: 200px">Match:</span>
<input type="string" class="form-control" aria-label="Match" aria-describedby="match" value="{%s agg.match.String() %}" readonly />
</div>
{% if len(agg.by) > 0 %}
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="by" style="width: 200px">By:</span>
<input type="string" class="form-control" aria-label="By" aria-describedby="by" value="{%s strings.Join(agg.by, ", ") %}" readonly />
</div>
{% endif %}
{% if len(agg.without) > 0 %}
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="without" style="width: 200px">Without:</span>
<input type="string" class="form-control" aria-label="Without" aria-describedby="without" value="{%s strings.Join(agg.without, ", ") %}" readonly />
</div>
{% endif %}
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="interval" style="width: 200px">Interval (seconds):</span>
<input type="number" class="form-control" aria-label="Interval (seconds)" aria-describedby="interval" value="{%v sr.intervalSecs %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="last-push-time" style="width: 200px">Last push time:</span>
<input type="string" class="form-control" aria-label="Last push time" aria-describedby="last-push-time" value="{% if sr.lastPushTimestamp == 0 %}-{% else %}{%s time.Unix(int64(sr.lastPushTimestamp), 0).Format(time.RFC3339) %}{% endif %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="next-push-time" style="width: 200px">Next push time:</span>
<input type="string" class="form-control" aria-label="Next push time" aria-describedby="next-push-time" value="{% if sr.lastPushTimestamp == 0 %}{%s time.Unix(int64(agg.initialTime + sr.intervalSecs), 0).Format(time.RFC3339) %}{% else %}{%s time.Unix(int64(sr.lastPushTimestamp + sr.intervalSecs), 0).Format(time.RFC3339) %}{% endif %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="limit-label" style="width: 200px">Items on the page:</span>
<input id="limit" type="number" class="form-control" aria-label="Limit" aria-describedby="limit-label" value="{%d limit %}" />
<button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s as.getOutputName() %}&limit='+document.querySelector(`#limit`).value+'&filter='+encodeURIComponent(document.querySelector(`#filter`).value)">apply</button>
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="filter-label" style="width: 200px">Filter:</span>
<input id="filter" type="text" class="form-control" aria-label="Filter" aria-describedby="filter-label" value="{%s filter %}" />
<button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s as.getOutputName() %}&limit={%d limit %}&filter='+encodeURIComponent(document.querySelector(`#filter`).value)">apply</button>
</div>
</div>
</h6>
<hr />
<ul class="nav nav-tabs" id="rw-tab" role="tablist">
{% for _, a := range agg.aggrStates %}
<li class="nav-item" role="presentation">
<button class="nav-link{%if a.getOutputName()==as.getOutputName() %}{% space %}active{%endif%}" type="button" role="tab"
onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s a.getOutputName() %}&limit={%d limit %}'">
{%s a.getOutputName() %}
</button>
</li>
{% endfor %}
</ul>
<div class="tab-content">
<div class="tab-pane active" role="tabpanel">
<div id="aggregation-state" class="table-responsive">
<table class="table table-striped table-hover table-bordered table-sm">
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Current value</th>
<th scope="col">Samples count</th>
</tr>
</thead>
<tbody>
{% for _, asr := range sr.metrics %}
<tr>
<td>
<code>{%s asr.metric %}</code>
</td>
<td class="text-end">
{%f asr.currentValue %}
</td>
<td class="text-end">
{%s fmt.Sprintf("%v", asr.samplesCount) %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</main>
</div>
</div>
</body>
</html>
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,433 @@
// Code generated by qtc from "state.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line lib/streamaggr/state.qtpl:1
package streamaggr
//line lib/streamaggr/state.qtpl:1
import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
)
//line lib/streamaggr/state.qtpl:10
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line lib/streamaggr/state.qtpl:10
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line lib/streamaggr/state.qtpl:11
const DEFAULT_LIMIT = 1000
//line lib/streamaggr/state.qtpl:16
func StreamStreamAggHTML(qw422016 *qt422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:16
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:20
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:20
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:24
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:24
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12"><h1>Aggregations</h1><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:31
for rwKey := range rws {
//line lib/streamaggr/state.qtpl:31
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:33
if rwKey == rwActive {
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:33
}
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:34
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:34
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:35
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:35
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:38
}
//line lib/streamaggr/state.qtpl:38
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregations" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm"><thead><tr><th scope="col" style="width: 5%">Num</th><th scope="col" style="width: 35%">Match</th><th scope="col" style="width: 10%">By</th><th scope="col" style="width: 10%">Without</a><th scope="col" style="width: 40%">Outputs</a></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:54
aggs := rws[rwActive]
//line lib/streamaggr/state.qtpl:55
for an, agg := range aggs.as {
//line lib/streamaggr/state.qtpl:55
qw422016.N().S(`<tr><td>`)
//line lib/streamaggr/state.qtpl:57
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:57
qw422016.N().S(`</td><td><code>`)
//line lib/streamaggr/state.qtpl:59
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:59
qw422016.N().S(`</code></td><td class="labels">`)
//line lib/streamaggr/state.qtpl:62
for abn, ab := range agg.by {
//line lib/streamaggr/state.qtpl:63
if abn > 0 {
//line lib/streamaggr/state.qtpl:63
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:65
}
//line lib/streamaggr/state.qtpl:65
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:67
qw422016.E().S(ab)
//line lib/streamaggr/state.qtpl:67
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:69
}
//line lib/streamaggr/state.qtpl:69
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:72
for awn, aw := range agg.without {
//line lib/streamaggr/state.qtpl:73
if awn > 0 {
//line lib/streamaggr/state.qtpl:73
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:75
}
//line lib/streamaggr/state.qtpl:75
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:77
qw422016.E().S(aw)
//line lib/streamaggr/state.qtpl:77
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:79
}
//line lib/streamaggr/state.qtpl:79
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:82
for asn, as := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:83
if asn > 0 {
//line lib/streamaggr/state.qtpl:83
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:85
}
//line lib/streamaggr/state.qtpl:85
qw422016.N().S(`<a href="?rw=`)
//line lib/streamaggr/state.qtpl:86
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:86
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:86
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:86
qw422016.N().D(DEFAULT_LIMIT)
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`">`)
//line lib/streamaggr/state.qtpl:87
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:87
qw422016.N().S(`</a>`)
//line lib/streamaggr/state.qtpl:89
}
//line lib/streamaggr/state.qtpl:89
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:92
}
//line lib/streamaggr/state.qtpl:92
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:103
func WriteStreamAggHTML(qq422016 qtio422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:103
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:103
StreamStreamAggHTML(qw422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:103
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:103
func StreamAggHTML(rws map[string]*Aggregators, rwActive string) string {
//line lib/streamaggr/state.qtpl:103
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:103
WriteStreamAggHTML(qb422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:103
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:103
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:103
return qs422016
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:105
func StreamStreamAggOutputStateHTML(qw422016 *qt422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) {
//line lib/streamaggr/state.qtpl:105
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:109
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:109
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:113
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:113
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12">`)
//line lib/streamaggr/state.qtpl:118
sr := as.getStateRepresentation(agg.suffix)
if filter != "" {
filter = strings.ToLower(filter)
metrics := sr.metrics[:0]
for _, m := range sr.metrics {
if strings.Contains(strings.ToLower(m.metric), filter) {
metrics = append(metrics, m)
}
}
sr.metrics = metrics
}
sort.Slice(sr.metrics, func(i, j int) bool {
return sr.metrics[i].metric < sr.metrics[j].metric
})
if len(sr.metrics) > limit {
sr.metrics = sr.metrics[:limit]
}
//line lib/streamaggr/state.qtpl:135
qw422016.N().S(`<h1>Aggregation state</h1><h4> [ <a href="?rw=`)
//line lib/streamaggr/state.qtpl:138
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:138
qw422016.N().S(`">back to aggregations</a> ] </h3><hr /><h6><div class="row container-sm"><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="remote-write" style="width: 200px">Remote write:</span><input type="text" class="form-control" aria-label="Remote write" aria-describedby="remote-write" value="`)
//line lib/streamaggr/state.qtpl:144
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:144
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="agg-num" style="width: 200px">Aggregation num:</span><input type="number" class="form-control" aria-label="Aggregation num" aria-describedby="agg-num" value="`)
//line lib/streamaggr/state.qtpl:149
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:149
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="match" style="width: 200px">Match:</span><input type="string" class="form-control" aria-label="Match" aria-describedby="match" value="`)
//line lib/streamaggr/state.qtpl:154
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:154
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:157
if len(agg.by) > 0 {
//line lib/streamaggr/state.qtpl:157
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="by" style="width: 200px">By:</span><input type="string" class="form-control" aria-label="By" aria-describedby="by" value="`)
//line lib/streamaggr/state.qtpl:160
qw422016.E().S(strings.Join(agg.by, ", "))
//line lib/streamaggr/state.qtpl:160
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:162
}
//line lib/streamaggr/state.qtpl:163
if len(agg.without) > 0 {
//line lib/streamaggr/state.qtpl:163
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="without" style="width: 200px">Without:</span><input type="string" class="form-control" aria-label="Without" aria-describedby="without" value="`)
//line lib/streamaggr/state.qtpl:166
qw422016.E().S(strings.Join(agg.without, ", "))
//line lib/streamaggr/state.qtpl:166
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:168
}
//line lib/streamaggr/state.qtpl:168
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="interval" style="width: 200px">Interval (seconds):</span><input type="number" class="form-control" aria-label="Interval (seconds)" aria-describedby="interval" value="`)
//line lib/streamaggr/state.qtpl:172
qw422016.E().V(sr.intervalSecs)
//line lib/streamaggr/state.qtpl:172
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="last-push-time" style="width: 200px">Last push time:</span><input type="string" class="form-control" aria-label="Last push time" aria-describedby="last-push-time" value="`)
//line lib/streamaggr/state.qtpl:177
if sr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:177
qw422016.N().S(`-`)
//line lib/streamaggr/state.qtpl:177
} else {
//line lib/streamaggr/state.qtpl:177
qw422016.E().S(time.Unix(int64(sr.lastPushTimestamp), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:177
}
//line lib/streamaggr/state.qtpl:177
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="next-push-time" style="width: 200px">Next push time:</span><input type="string" class="form-control" aria-label="Next push time" aria-describedby="next-push-time" value="`)
//line lib/streamaggr/state.qtpl:182
if sr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:182
qw422016.E().S(time.Unix(int64(agg.initialTime+sr.intervalSecs), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:182
} else {
//line lib/streamaggr/state.qtpl:182
qw422016.E().S(time.Unix(int64(sr.lastPushTimestamp+sr.intervalSecs), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:182
}
//line lib/streamaggr/state.qtpl:182
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="limit-label" style="width: 200px">Items on the page:</span><input id="limit" type="number" class="form-control" aria-label="Limit" aria-describedby="limit-label" value="`)
//line lib/streamaggr/state.qtpl:187
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:187
qw422016.N().S(`" /><button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:188
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:188
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&limit='+document.querySelector(`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`#limit`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`).value+'&filter='+encodeURIComponent(document.querySelector(`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`#filter`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`).value)">apply</button></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="filter-label" style="width: 200px">Filter:</span><input id="filter" type="text" class="form-control" aria-label="Filter" aria-describedby="filter-label" value="`)
//line lib/streamaggr/state.qtpl:193
qw422016.E().S(filter)
//line lib/streamaggr/state.qtpl:193
qw422016.N().S(`" /><button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:194
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:194
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&filter='+encodeURIComponent(document.querySelector(`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`#filter`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`).value)">apply</button></div></div></h6><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:200
for _, a := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:200
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:202
if a.getOutputName() == as.getOutputName() {
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:202
}
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:203
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:203
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:203
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:203
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:204
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:204
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:207
}
//line lib/streamaggr/state.qtpl:207
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregation-state" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm"><thead><tr><th scope="col">Metric</th><th scope="col">Current value</th><th scope="col">Samples count</th></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:221
for _, asr := range sr.metrics {
//line lib/streamaggr/state.qtpl:221
qw422016.N().S(`<tr><td><code>`)
//line lib/streamaggr/state.qtpl:224
qw422016.E().S(asr.metric)
//line lib/streamaggr/state.qtpl:224
qw422016.N().S(`</code></td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:227
qw422016.N().F(asr.currentValue)
//line lib/streamaggr/state.qtpl:227
qw422016.N().S(`</td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:230
qw422016.E().S(fmt.Sprintf("%v", asr.samplesCount))
//line lib/streamaggr/state.qtpl:230
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:233
}
//line lib/streamaggr/state.qtpl:233
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:244
}
//line lib/streamaggr/state.qtpl:244
func WriteStreamAggOutputStateHTML(qq422016 qtio422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) {
//line lib/streamaggr/state.qtpl:244
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:244
StreamStreamAggOutputStateHTML(qw422016, rwActive, aggNum, agg, as, limit, filter)
//line lib/streamaggr/state.qtpl:244
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:244
}
//line lib/streamaggr/state.qtpl:244
func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) string {
//line lib/streamaggr/state.qtpl:244
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:244
WriteStreamAggOutputStateHTML(qb422016, rwActive, aggNum, agg, as, limit, filter)
//line lib/streamaggr/state.qtpl:244
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:244
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:244
return qs422016
//line lib/streamaggr/state.qtpl:244
}

View File

@@ -3,13 +3,17 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
type stddevAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type stddevStateValue struct {
@@ -20,8 +24,10 @@ type stddevStateValue struct {
deleted bool
}
func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
func newStddevAggrState(interval time.Duration) *stddevAggrState {
return &stddevAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *stddevAggrState) pushSample(_, outputKey string, value float64) {
@@ -55,7 +61,9 @@ again:
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -68,7 +76,36 @@ func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stddev)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *stddevAggrState) getOutputName() string {
return "stddev"
}
func (as *stddevAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*stddevStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: math.Sqrt(value.q / value.count),
samplesCount: uint64(value.count),
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -2,13 +2,17 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
type stdvarAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type stdvarStateValue struct {
@@ -19,8 +23,10 @@ type stdvarStateValue struct {
deleted bool
}
func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
func newStdvarAggrState(interval time.Duration) *stdvarAggrState {
return &stdvarAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) {
@@ -54,7 +60,9 @@ again:
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -67,7 +75,35 @@ func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stdvar)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *stdvarAggrState) getOutputName() string {
return "stdvar"
}
func (as *stdvarAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*stdvarStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.q / value.count,
samplesCount: uint64(value.count),
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@@ -241,11 +242,27 @@ type aggregator struct {
wg sync.WaitGroup
stopCh chan struct{}
initialTime uint64
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
getOutputName() string
getStateRepresentation(suffix string) aggrStateRepresentation
}
type aggrStateRepresentation struct {
intervalSecs uint64
lastPushTimestamp uint64
metrics []aggrStateRepresentationMetric
}
type aggrStateRepresentationMetric struct {
metric string
currentValue float64
samplesCount uint64
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@@ -328,7 +345,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
aggrStates[i] = newQuantilesAggrState(interval, phis)
continue
}
switch output {
@@ -337,25 +354,25 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
aggrStates[i] = newCountSeriesAggrState(interval)
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
aggrStates[i] = newCountSamplesAggrState(interval)
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
aggrStates[i] = newSumSamplesAggrState(interval)
case "last":
aggrStates[i] = newLastAggrState()
aggrStates[i] = newLastAggrState(interval)
case "min":
aggrStates[i] = newMinAggrState()
aggrStates[i] = newMinAggrState(interval)
case "max":
aggrStates[i] = newMaxAggrState()
aggrStates[i] = newMaxAggrState(interval)
case "avg":
aggrStates[i] = newAvgAggrState()
aggrStates[i] = newAvgAggrState(interval)
case "stddev":
aggrStates[i] = newStddevAggrState()
aggrStates[i] = newStddevAggrState(interval)
case "stdvar":
aggrStates[i] = newStdvarAggrState()
aggrStates[i] = newStdvarAggrState(interval)
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
aggrStates[i] = newHistogramBucketAggrState(interval, stalenessInterval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/stream-aggregation.html", output, supportedOutputs)
@@ -374,7 +391,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState()
dedupAggr = newLastAggrState(interval)
}
// initialize the aggregator
@@ -395,6 +412,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix,
stopCh: make(chan struct{}),
initialTime: fasttime.UnixTimestamp(),
}
if dedupAggr != nil {
@@ -829,3 +848,21 @@ func sortAndRemoveDuplicates(a []string) []string {
}
return dst
}
func getLabelsStringFromKey(
key string,
suffix string,
output string,
extraLabels ...prompbmarshal.Label,
) string {
labels := make([]prompbmarshal.Label, 0)
labels, _ = unmarshalLabelsFast(labels, []byte(key))
labels = addMetricSuffix(labels, 0, suffix, output)
labels = append(labels, extraLabels...)
a := make([]string, len(labels))
for i, label := range labels {
a[i] = fmt.Sprintf("%s=%q", label.Name, label.Value)
}
sort.Strings(a)
return "{" + strings.Join(a, ",") + "}"
}

View File

@@ -2,23 +2,30 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
type sumSamplesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
lastPushTimestamp atomic.Uint64
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
deleted bool
mu sync.Mutex
sum float64
samplesCount uint64
deleted bool
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
func newSumSamplesAggrState(interval time.Duration) *sumSamplesAggrState {
return &sumSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) {
@@ -42,6 +49,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.sum += value
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -52,7 +60,9 @@ again:
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
@@ -61,11 +71,40 @@ func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
sv.sum = 0
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, sum)
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *sumSamplesAggrState) getOutputName() string {
return "sum_samples"
}
func (as *sumSamplesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*sumSamplesStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View File

@@ -3,6 +3,7 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@@ -10,16 +11,18 @@ import (
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
type totalAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp atomic.Uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@@ -36,6 +39,7 @@ func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration)
return &totalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
intervalSecs: intervalSecs,
}
}
@@ -75,6 +79,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@@ -131,8 +136,36 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "total", currentTimeMsec, total)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, total)
}
return true
})
as.lastPushTimestamp.Store(currentTime)
}
func (as *totalAggrState) getOutputName() string {
return "total"
}
func (as *totalAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*totalStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}