mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-05 18:13:06 +03:00
Compare commits
4 Commits
dependabot
...
streamaggr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ad29bd69ee | ||
|
|
aca33307a9 | ||
|
|
129b2236ef | ||
|
|
c8685741b3 |
@@ -231,15 +231,23 @@ func Init() {
|
||||
// Start config reloader.
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var streamAggrConfigReloaderCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
streamAggrConfigReloaderCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
case <-streamAggrConfigReloaderCh:
|
||||
reloadStreamAggrConfigs()
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
}
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -376,11 +384,9 @@ func Stop() {
|
||||
close(configReloaderStopCh)
|
||||
configReloaderWG.Wait()
|
||||
|
||||
sasGlobal.Load().MustStop()
|
||||
if deduplicatorGlobal != nil {
|
||||
deduplicatorGlobal.MustStop()
|
||||
deduplicatorGlobal = nil
|
||||
}
|
||||
sasGlobal.Load().MustStop(nil)
|
||||
deduplicatorGlobal.MustStop()
|
||||
deduplicatorGlobal = nil
|
||||
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.MustStop()
|
||||
@@ -850,12 +856,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
// sas and deduplicator must be stopped before rwctx is closed
|
||||
// because they can write pending series to rwctx.pss if there are any
|
||||
sas := rwctx.sas.Swap(nil)
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
|
||||
if rwctx.deduplicator != nil {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
|
||||
@@ -4,12 +4,10 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -17,6 +15,8 @@ var (
|
||||
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
|
||||
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
|
||||
"and -remoteWrite.streamAggr.config")
|
||||
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
|
||||
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
|
||||
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
|
||||
@@ -79,47 +79,32 @@ func CheckStreamAggrConfigs() error {
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfigs() {
|
||||
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
|
||||
for idx, rwctx := range rwctxs {
|
||||
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
|
||||
reloadStreamAggrConfig(-1)
|
||||
for idx := range rwctxs {
|
||||
reloadStreamAggrConfig(idx)
|
||||
}
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
|
||||
path, opts := getStreamAggrOpts(idx)
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
|
||||
if err != nil {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
|
||||
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
|
||||
return
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfig(idx int) {
|
||||
path, _ := getStreamAggrOpts(idx)
|
||||
var sas *streamaggr.Aggregators
|
||||
var flag string
|
||||
if idx < 0 {
|
||||
flag = "-streamAggr.config"
|
||||
sas = sasGlobal.Load()
|
||||
} else {
|
||||
flag = "-remoteWrite.streamAggr.config"
|
||||
sas = rwctxs[idx].sas.Load()
|
||||
}
|
||||
|
||||
if !sasNew.Equal(sas) {
|
||||
var sasOld *streamaggr.Aggregators
|
||||
if idx < 0 {
|
||||
sasOld = sasGlobal.Swap(sasNew)
|
||||
} else {
|
||||
sasOld = rwctxs[idx].sas.Swap(sasNew)
|
||||
}
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
|
||||
if sas == nil {
|
||||
return
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("reloading stream aggregation configs pointed by %s=%q", flag, path)
|
||||
if err := sas.Reload(); err != nil {
|
||||
logger.Errorf("cannot reload %s=%q; continue using the previously loaded config; error: %s", flag, path, err)
|
||||
return
|
||||
}
|
||||
logger.Infof("successfully reloaded stream aggregation config %s=%q", flag, path)
|
||||
}
|
||||
|
||||
func getStreamAggrOpts(idx int) (string, streamaggr.Options) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -15,13 +16,14 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
|
||||
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
|
||||
"and -remoteWrite.streamAggr.config")
|
||||
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+
|
||||
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
|
||||
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
|
||||
@@ -42,11 +44,6 @@ var (
|
||||
saCfgReloaderStopCh chan struct{}
|
||||
saCfgReloaderWG sync.WaitGroup
|
||||
|
||||
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
|
||||
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
|
||||
saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil)
|
||||
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
|
||||
|
||||
sasGlobal atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
)
|
||||
@@ -68,7 +65,7 @@ func CheckStreamAggrConfig() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
|
||||
}
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -101,16 +98,21 @@ func InitStreamAggr() {
|
||||
}
|
||||
|
||||
sasGlobal.Store(sas)
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
// Start config reloader.
|
||||
saCfgReloaderWG.Add(1)
|
||||
go func() {
|
||||
var streamAggrConfigReloaderCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
streamAggrConfigReloaderCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer saCfgReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
case <-streamAggrConfigReloaderCh:
|
||||
case <-saCfgReloaderStopCh:
|
||||
return
|
||||
}
|
||||
@@ -121,33 +123,12 @@ func InitStreamAggr() {
|
||||
|
||||
func reloadStreamAggrConfig() {
|
||||
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
|
||||
saCfgReloads.Inc()
|
||||
|
||||
opts := streamaggr.Options{
|
||||
DedupInterval: *streamAggrDedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
|
||||
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
|
||||
Alias: "global",
|
||||
}
|
||||
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
|
||||
if err != nil {
|
||||
saCfgSuccess.Set(0)
|
||||
saCfgReloadErr.Inc()
|
||||
sas := sasGlobal.Load()
|
||||
if err := sas.Reload(); err != nil {
|
||||
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
|
||||
return
|
||||
}
|
||||
sas := sasGlobal.Load()
|
||||
if !sasNew.Equal(sas) {
|
||||
sasOld := sasGlobal.Swap(sasNew)
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
|
||||
} else {
|
||||
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
|
||||
sasNew.MustStop()
|
||||
}
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
|
||||
}
|
||||
|
||||
// MustStopStreamAggr stops stream aggregators.
|
||||
@@ -156,12 +137,10 @@ func MustStopStreamAggr() {
|
||||
saCfgReloaderWG.Wait()
|
||||
|
||||
sas := sasGlobal.Swap(nil)
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
|
||||
if deduplicator != nil {
|
||||
deduplicator.MustStop()
|
||||
deduplicator = nil
|
||||
}
|
||||
deduplicator.MustStop()
|
||||
deduplicator = nil
|
||||
}
|
||||
|
||||
type streamAggrCtx struct {
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
|
||||
github.com/VictoriaMetrics/easyproto v0.1.4
|
||||
github.com/VictoriaMetrics/fastcache v1.12.2
|
||||
github.com/VictoriaMetrics/metrics v1.34.0
|
||||
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
|
||||
github.com/VictoriaMetrics/metricsql v0.76.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.30.1
|
||||
github.com/aws/aws-sdk-go-v2/config v1.27.23
|
||||
|
||||
4
go.sum
4
go.sum
@@ -73,6 +73,10 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC
|
||||
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
|
||||
github.com/VictoriaMetrics/metrics v1.34.0 h1:0i8k/gdOJdSoZB4Z9pikVnVQXfhcIvnG7M7h2WaQW2w=
|
||||
github.com/VictoriaMetrics/metrics v1.34.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e h1:IgNJoIYb2IhknxOLEAAG0ktj0f1609jpgmXjpPVrJ7s=
|
||||
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 h1:qP+3SX4eslXLPmsJpGjnMv+9UbmyrSj/Yf5CqPm6bLE=
|
||||
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
|
||||
github.com/VictoriaMetrics/metricsql v0.76.0 h1:hl7vqJqyH2d8zKImzalkFrkFiD5q4ACF8gl3s86DqKA=
|
||||
github.com/VictoriaMetrics/metricsql v0.76.0/go.mod h1:1g4hdCwlbJZ851PU9VN65xy9Rdlzupo6fx3SNZ8Z64U=
|
||||
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// avgAggrState calculates output=avg, e.g. the average value over input samples.
|
||||
@@ -12,18 +11,23 @@ type avgAggrState struct {
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
type avgState struct {
|
||||
sum float64
|
||||
count float64
|
||||
}
|
||||
|
||||
type avgStateValue struct {
|
||||
mu sync.Mutex
|
||||
sum float64
|
||||
count int64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]avgState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newAvgAggrState() *avgAggrState {
|
||||
return &avgAggrState{}
|
||||
}
|
||||
|
||||
func (as *avgAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -32,25 +36,21 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &avgStateValue{
|
||||
sum: s.value,
|
||||
count: 1,
|
||||
}
|
||||
v = &avgStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The entry has been successfully stored
|
||||
continue
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*avgStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
sv.sum += s.value
|
||||
sv.count++
|
||||
sv.state[idx].sum += s.value
|
||||
sv.state[idx].count++
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -61,26 +61,28 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *avgAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*avgStateValue)
|
||||
sv.mu.Lock()
|
||||
avg := sv.sum / float64(sv.count)
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = avgState{}
|
||||
sv.mu.Unlock()
|
||||
if state.count > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "avg", flushTimestamp, state.sum/state.count)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
|
||||
@@ -13,16 +12,17 @@ type countSamplesAggrState struct {
|
||||
}
|
||||
|
||||
type countSamplesStateValue struct {
|
||||
mu sync.Mutex
|
||||
n uint64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]uint64
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newCountSamplesAggrState() *countSamplesAggrState {
|
||||
return &countSamplesAggrState{}
|
||||
}
|
||||
|
||||
func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -31,23 +31,20 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &countSamplesStateValue{
|
||||
n: 1,
|
||||
}
|
||||
v = &countSamplesStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*countSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
sv.n++
|
||||
sv.state[idx]++
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -58,26 +55,28 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *countSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*countSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
n := sv.n
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = 0
|
||||
sv.mu.Unlock()
|
||||
if state > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "count_samples", flushTimestamp, float64(state))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
@@ -14,16 +13,17 @@ type countSeriesAggrState struct {
|
||||
}
|
||||
|
||||
type countSeriesStateValue struct {
|
||||
mu sync.Mutex
|
||||
m map[uint64]struct{}
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]map[uint64]struct{}
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newCountSeriesAggrState() *countSeriesAggrState {
|
||||
return &countSeriesAggrState{}
|
||||
}
|
||||
|
||||
func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
inputKey, outputKey := getInputOutputKey(s.key)
|
||||
@@ -36,27 +36,26 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &countSeriesStateValue{
|
||||
m: map[uint64]struct{}{
|
||||
h: {},
|
||||
},
|
||||
csv := &countSeriesStateValue{}
|
||||
for ic := range csv.state {
|
||||
csv.state[ic] = make(map[uint64]struct{})
|
||||
}
|
||||
v = csv
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The entry has been added to the map.
|
||||
continue
|
||||
if loaded {
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*countSeriesStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
if _, ok := sv.m[h]; !ok {
|
||||
sv.m[h] = struct{}{}
|
||||
if _, ok := sv.state[idx][h]; !ok {
|
||||
sv.state[idx][h] = struct{}{}
|
||||
}
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -67,26 +66,28 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *countSeriesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*countSeriesStateValue)
|
||||
sv.mu.Lock()
|
||||
n := len(sv.m)
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := len(sv.state[idx])
|
||||
sv.state[idx] = make(map[uint64]struct{})
|
||||
sv.mu.Unlock()
|
||||
if state > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "count_series", flushTimestamp, float64(state))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -14,7 +14,8 @@ import (
|
||||
const dedupAggrShardsCount = 128
|
||||
|
||||
type dedupAggr struct {
|
||||
shards []dedupAggrShard
|
||||
shards []dedupAggrShard
|
||||
currentIdx atomic.Int32
|
||||
}
|
||||
|
||||
type dedupAggrShard struct {
|
||||
@@ -25,16 +26,18 @@ type dedupAggrShard struct {
|
||||
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
|
||||
}
|
||||
|
||||
type dedupAggrShardNopad struct {
|
||||
mu sync.Mutex
|
||||
m map[string]*dedupAggrSample
|
||||
|
||||
type dedupAggrState struct {
|
||||
m map[string]*dedupAggrSample
|
||||
samplesBuf []dedupAggrSample
|
||||
|
||||
sizeBytes atomic.Uint64
|
||||
itemsCount atomic.Uint64
|
||||
}
|
||||
|
||||
type dedupAggrShardNopad struct {
|
||||
mu sync.RWMutex
|
||||
state [aggrStateSize]*dedupAggrState
|
||||
}
|
||||
|
||||
type dedupAggrSample struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
@@ -49,21 +52,27 @@ func newDedupAggr() *dedupAggr {
|
||||
|
||||
func (da *dedupAggr) sizeBytes() uint64 {
|
||||
n := uint64(unsafe.Sizeof(*da))
|
||||
currentIdx := da.currentIdx.Load()
|
||||
for i := range da.shards {
|
||||
n += da.shards[i].sizeBytes.Load()
|
||||
if da.shards[i].state[currentIdx] != nil {
|
||||
n += da.shards[i].state[currentIdx].sizeBytes.Load()
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (da *dedupAggr) itemsCount() uint64 {
|
||||
n := uint64(0)
|
||||
currentIdx := da.currentIdx.Load()
|
||||
for i := range da.shards {
|
||||
n += da.shards[i].itemsCount.Load()
|
||||
if da.shards[i].state[currentIdx] != nil {
|
||||
n += da.shards[i].state[currentIdx].itemsCount.Load()
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (da *dedupAggr) pushSamples(samples []pushSample) {
|
||||
func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) {
|
||||
pss := getPerShardSamples()
|
||||
shards := pss.shards
|
||||
for _, sample := range samples {
|
||||
@@ -75,7 +84,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample) {
|
||||
if len(shardSamples) == 0 {
|
||||
continue
|
||||
}
|
||||
da.shards[i].pushSamples(shardSamples)
|
||||
da.shards[i].pushSamples(shardSamples, dedupIdx)
|
||||
}
|
||||
putPerShardSamples(pss)
|
||||
}
|
||||
@@ -104,7 +113,7 @@ func (ctx *dedupFlushCtx) reset() {
|
||||
ctx.samples = ctx.samples[:0]
|
||||
}
|
||||
|
||||
func (da *dedupAggr) flush(f func(samples []pushSample)) {
|
||||
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
|
||||
var wg sync.WaitGroup
|
||||
for i := range da.shards {
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
@@ -116,10 +125,11 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) {
|
||||
}()
|
||||
|
||||
ctx := getDedupFlushCtx()
|
||||
shard.flush(ctx, f)
|
||||
shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx)
|
||||
putDedupFlushCtx(ctx)
|
||||
}(&da.shards[i])
|
||||
}
|
||||
da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
@@ -154,18 +164,20 @@ func putPerShardSamples(pss *perShardSamples) {
|
||||
|
||||
var perShardSamplesPool sync.Pool
|
||||
|
||||
func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
||||
func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) {
|
||||
das.mu.Lock()
|
||||
defer das.mu.Unlock()
|
||||
|
||||
m := das.m
|
||||
if m == nil {
|
||||
m = make(map[string]*dedupAggrSample, len(samples))
|
||||
das.m = m
|
||||
state := das.state[dedupIdx]
|
||||
if state == nil {
|
||||
state = &dedupAggrState{
|
||||
m: make(map[string]*dedupAggrSample, len(samples)),
|
||||
}
|
||||
das.state[dedupIdx] = state
|
||||
}
|
||||
samplesBuf := das.samplesBuf
|
||||
samplesBuf := state.samplesBuf
|
||||
for _, sample := range samples {
|
||||
s, ok := m[sample.key]
|
||||
s, ok := state.m[sample.key]
|
||||
if !ok {
|
||||
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
|
||||
s = &samplesBuf[len(samplesBuf)-1]
|
||||
@@ -173,10 +185,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
||||
s.timestamp = sample.timestamp
|
||||
|
||||
key := bytesutil.InternString(sample.key)
|
||||
m[key] = s
|
||||
state.m[key] = s
|
||||
|
||||
das.itemsCount.Add(1)
|
||||
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
||||
das.state[dedupIdx].itemsCount.Add(1)
|
||||
das.state[dedupIdx].sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
|
||||
continue
|
||||
}
|
||||
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
|
||||
@@ -185,18 +197,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
|
||||
s.timestamp = sample.timestamp
|
||||
}
|
||||
}
|
||||
das.samplesBuf = samplesBuf
|
||||
das.state[dedupIdx].samplesBuf = samplesBuf
|
||||
}
|
||||
|
||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
|
||||
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
|
||||
das.mu.Lock()
|
||||
|
||||
m := das.m
|
||||
if len(m) > 0 {
|
||||
das.m = make(map[string]*dedupAggrSample, len(m))
|
||||
das.sizeBytes.Store(0)
|
||||
das.itemsCount.Store(0)
|
||||
das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf))
|
||||
var m map[string]*dedupAggrSample
|
||||
state := das.state[dedupIdx]
|
||||
if state != nil && len(state.m) > 0 {
|
||||
m = state.m
|
||||
das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m))
|
||||
das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf))
|
||||
das.state[dedupIdx].sizeBytes.Store(0)
|
||||
das.state[dedupIdx].itemsCount.Store(0)
|
||||
}
|
||||
|
||||
das.mu.Unlock()
|
||||
@@ -215,11 +229,11 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
|
||||
|
||||
// Limit the number of samples per each flush in order to limit memory usage.
|
||||
if len(dstSamples) >= 10_000 {
|
||||
f(dstSamples)
|
||||
f(dstSamples, deleteDeadline, flushIdx)
|
||||
clear(dstSamples)
|
||||
dstSamples = dstSamples[:0]
|
||||
}
|
||||
}
|
||||
f(dstSamples)
|
||||
f(dstSamples, deleteDeadline, flushIdx)
|
||||
ctx.samples = dstSamples
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDedupAggrSerial(t *testing.T) {
|
||||
@@ -20,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) {
|
||||
sample.value = float64(i + j)
|
||||
expectedSamplesMap[sample.key] = *sample
|
||||
}
|
||||
da.pushSamples(samples)
|
||||
da.pushSamples(samples, 0, 0)
|
||||
}
|
||||
|
||||
if n := da.sizeBytes(); n > 5_000_000 {
|
||||
@@ -32,14 +33,16 @@ func TestDedupAggrSerial(t *testing.T) {
|
||||
|
||||
flushedSamplesMap := make(map[string]pushSample)
|
||||
var mu sync.Mutex
|
||||
flushSamples := func(samples []pushSample) {
|
||||
flushSamples := func(samples []pushSample, _ int64, _ int) {
|
||||
mu.Lock()
|
||||
for _, sample := range samples {
|
||||
flushedSamplesMap[sample.key] = sample
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
da.flush(flushSamples)
|
||||
|
||||
flushTimestamp := time.Now().UnixMilli()
|
||||
da.flush(flushSamples, flushTimestamp, 0, 0)
|
||||
|
||||
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
|
||||
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
|
||||
@@ -70,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
|
||||
sample.key = fmt.Sprintf("key_%d", j)
|
||||
sample.value = float64(i + j)
|
||||
}
|
||||
da.pushSamples(samples)
|
||||
da.pushSamples(samples, 0, 0)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
for i := 0; i < loops; i++ {
|
||||
da.pushSamples(benchSamples)
|
||||
da.pushSamples(benchSamples, 0, 0)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@@ -17,7 +17,8 @@ import (
|
||||
type Deduplicator struct {
|
||||
da *dedupAggr
|
||||
|
||||
dropLabels []string
|
||||
dropLabels []string
|
||||
dedupInterval int64
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
@@ -38,8 +39,9 @@ type Deduplicator struct {
|
||||
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
||||
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
|
||||
d := &Deduplicator{
|
||||
da: newDedupAggr(),
|
||||
dropLabels: dropLabels,
|
||||
da: newDedupAggr(),
|
||||
dropLabels: dropLabels,
|
||||
dedupInterval: dedupInterval.Milliseconds(),
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
ms: metrics.NewSet(),
|
||||
@@ -71,6 +73,9 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
|
||||
|
||||
// MustStop stops d.
|
||||
func (d *Deduplicator) MustStop() {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
metrics.UnregisterSet(d.ms)
|
||||
d.ms = nil
|
||||
|
||||
@@ -86,6 +91,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
buf := ctx.buf
|
||||
|
||||
dropLabels := d.dropLabels
|
||||
aggrIntervals := int64(aggrStateSize)
|
||||
for _, ts := range tss {
|
||||
if len(dropLabels) > 0 {
|
||||
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
|
||||
@@ -101,7 +107,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
buf = lc.Compress(buf, labels.Labels)
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, s := range ts.Samples {
|
||||
pss = append(pss, pushSample{
|
||||
flushIntervals := s.Timestamp/d.dedupInterval + 1
|
||||
idx := int(flushIntervals % aggrIntervals)
|
||||
pss[idx] = append(pss[idx], pushSample{
|
||||
key: key,
|
||||
value: s.Value,
|
||||
timestamp: s.Timestamp,
|
||||
@@ -109,7 +117,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
}
|
||||
}
|
||||
|
||||
d.da.pushSamples(pss)
|
||||
for idx, ps := range pss {
|
||||
d.da.pushSamples(ps, 0, idx)
|
||||
}
|
||||
|
||||
ctx.pss = pss
|
||||
ctx.buf = buf
|
||||
@@ -132,17 +142,18 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration
|
||||
select {
|
||||
case <-d.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
d.flush(pushFunc, dedupInterval)
|
||||
case t := <-t.C:
|
||||
flushTime := t.Truncate(dedupInterval).Add(dedupInterval)
|
||||
flushTimestamp := flushTime.UnixMilli()
|
||||
flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond))
|
||||
flushIdx := flushIntervals % aggrStateSize
|
||||
d.flush(pushFunc, dedupInterval, flushTime, flushIdx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
|
||||
startTime := time.Now()
|
||||
|
||||
timestamp := startTime.UnixMilli()
|
||||
d.da.flush(func(pss []pushSample) {
|
||||
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) {
|
||||
d.da.flush(func(pss []pushSample, _ int64, _ int) {
|
||||
ctx := getDeduplicatorFlushCtx()
|
||||
|
||||
tss := ctx.tss
|
||||
@@ -155,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
|
||||
samplesLen := len(samples)
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Value: ps.value,
|
||||
Timestamp: timestamp,
|
||||
Timestamp: ps.timestamp,
|
||||
})
|
||||
|
||||
tss = append(tss, prompbmarshal.TimeSeries{
|
||||
@@ -169,9 +180,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
|
||||
ctx.labels = labels
|
||||
ctx.samples = samples
|
||||
putDeduplicatorFlushCtx(ctx)
|
||||
})
|
||||
}, flushTime.UnixMilli(), flushIdx, flushIdx)
|
||||
|
||||
duration := time.Since(startTime)
|
||||
duration := time.Since(flushTime)
|
||||
d.dedupFlushDuration.Update(duration.Seconds())
|
||||
if duration > dedupInterval {
|
||||
d.dedupFlushTimeouts.Inc()
|
||||
@@ -182,14 +193,15 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
|
||||
}
|
||||
|
||||
type deduplicatorPushCtx struct {
|
||||
pss []pushSample
|
||||
pss [aggrStateSize][]pushSample
|
||||
labels promutils.Labels
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func (ctx *deduplicatorPushCtx) reset() {
|
||||
clear(ctx.pss)
|
||||
ctx.pss = ctx.pss[:0]
|
||||
for i, sc := range ctx.pss {
|
||||
ctx.pss[i] = sc[:0]
|
||||
}
|
||||
|
||||
ctx.labels.Reset()
|
||||
|
||||
|
||||
@@ -21,20 +21,26 @@ func TestDeduplicator(t *testing.T) {
|
||||
tss := prompbmarshal.MustParsePromMetrics(`
|
||||
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
|
||||
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
|
||||
x 8943 1000
|
||||
x 8943 1
|
||||
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
|
||||
x 90984 900
|
||||
x 433 1000
|
||||
x 90984
|
||||
x 433 1
|
||||
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
|
||||
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
|
||||
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
|
||||
`, offsetMsecs)
|
||||
|
||||
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global")
|
||||
dedupInterval := time.Hour
|
||||
d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global")
|
||||
for i := 0; i < 10; i++ {
|
||||
d.Push(tss)
|
||||
}
|
||||
d.flush(pushFunc, time.Hour)
|
||||
|
||||
flushTime := time.Now()
|
||||
flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1
|
||||
idx := int(flushIntervals % int64(aggrStateSize))
|
||||
|
||||
d.flush(pushFunc, time.Hour, time.Now(), idx)
|
||||
d.MustStop()
|
||||
|
||||
result := timeSeriessToString(tssResult)
|
||||
|
||||
@@ -1,39 +1,30 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
|
||||
type histogramBucketAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
stalenessSecs uint64
|
||||
}
|
||||
|
||||
type histogramBucketStateValue struct {
|
||||
mu sync.Mutex
|
||||
h metrics.Histogram
|
||||
deleteDeadline uint64
|
||||
state [aggrStateSize]metrics.Histogram
|
||||
total metrics.Histogram
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
|
||||
stalenessSecs := roundDurationToSecs(stalenessInterval)
|
||||
return &histogramBucketAggrState{
|
||||
stalenessSecs: stalenessSecs,
|
||||
}
|
||||
func newHistogramBucketAggrState() *histogramBucketAggrState {
|
||||
return &histogramBucketAggrState{}
|
||||
}
|
||||
|
||||
func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + as.stalenessSecs
|
||||
func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -54,7 +45,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
sv.h.Update(s.value)
|
||||
sv.state[idx].Update(s.value)
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
@@ -66,54 +57,32 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
|
||||
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
var staleOutputSamples int
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
sv := v.(*histogramBucketStateValue)
|
||||
|
||||
sv.mu.Lock()
|
||||
deleted := currentTime > sv.deleteDeadline
|
||||
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
staleOutputSamples++
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
if deleted {
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
sv.total.Merge(&sv.state[idx])
|
||||
total := &sv.total
|
||||
sv.state[idx] = metrics.Histogram{}
|
||||
sv.mu.Unlock()
|
||||
key := k.(string)
|
||||
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", flushTimestamp, float64(count), "vmrange", vmrange)
|
||||
})
|
||||
return true
|
||||
})
|
||||
ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples)
|
||||
}
|
||||
|
||||
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
currentTimeMsec := int64(currentTime) * 1000
|
||||
|
||||
as.removeOldEntries(ctx, currentTime)
|
||||
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
sv := v.(*histogramBucketStateValue)
|
||||
sv.mu.Lock()
|
||||
if !sv.deleted {
|
||||
key := k.(string)
|
||||
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
|
||||
})
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func roundDurationToSecs(d time.Duration) uint64 {
|
||||
if d < 0 {
|
||||
return 0
|
||||
}
|
||||
secs := d.Seconds()
|
||||
return uint64(math.Ceil(secs))
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// lastAggrState calculates output=last, e.g. the last value over input samples.
|
||||
@@ -13,17 +12,22 @@ type lastAggrState struct {
|
||||
}
|
||||
|
||||
type lastStateValue struct {
|
||||
mu sync.Mutex
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]lastState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type lastState struct {
|
||||
last float64
|
||||
timestamp int64
|
||||
deleted bool
|
||||
}
|
||||
|
||||
func newLastAggrState() *lastAggrState {
|
||||
return &lastAggrState{}
|
||||
}
|
||||
|
||||
func (as *lastAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -32,27 +36,23 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &lastStateValue{
|
||||
last: s.value,
|
||||
timestamp: s.timestamp,
|
||||
}
|
||||
v = &lastStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*lastStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
if s.timestamp >= sv.timestamp {
|
||||
sv.last = s.value
|
||||
sv.timestamp = s.timestamp
|
||||
if s.timestamp >= sv.state[idx].timestamp {
|
||||
sv.state[idx].last = s.value
|
||||
sv.state[idx].timestamp = s.timestamp
|
||||
}
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -63,26 +63,28 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *lastAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*lastStateValue)
|
||||
sv.mu.Lock()
|
||||
last := sv.last
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "last", currentTimeMsec, last)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = lastState{}
|
||||
sv.mu.Unlock()
|
||||
if state.timestamp > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "last", flushTimestamp, state.last)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
|
||||
@@ -13,16 +12,22 @@ type maxAggrState struct {
|
||||
}
|
||||
|
||||
type maxStateValue struct {
|
||||
mu sync.Mutex
|
||||
max float64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]maxState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type maxState struct {
|
||||
max float64
|
||||
exists bool
|
||||
}
|
||||
|
||||
func newMaxAggrState() *maxAggrState {
|
||||
return &maxAggrState{}
|
||||
}
|
||||
|
||||
func (as *maxAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -31,25 +36,26 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &maxStateValue{
|
||||
max: s.value,
|
||||
}
|
||||
v = &maxStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*maxStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
if s.value > sv.max {
|
||||
sv.max = s.value
|
||||
state := &sv.state[idx]
|
||||
if !state.exists {
|
||||
state.max = s.value
|
||||
state.exists = true
|
||||
} else if s.value > state.max {
|
||||
state.max = s.value
|
||||
}
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -60,26 +66,28 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *maxAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*maxStateValue)
|
||||
sv.mu.Lock()
|
||||
max := sv.max
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "max", currentTimeMsec, max)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = maxState{}
|
||||
sv.mu.Unlock()
|
||||
if state.exists {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "max", flushTimestamp, state.max)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// minAggrState calculates output=min, e.g. the minimum value over input samples.
|
||||
@@ -13,16 +12,22 @@ type minAggrState struct {
|
||||
}
|
||||
|
||||
type minStateValue struct {
|
||||
mu sync.Mutex
|
||||
min float64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]minState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type minState struct {
|
||||
min float64
|
||||
exists bool
|
||||
}
|
||||
|
||||
func newMinAggrState() *minAggrState {
|
||||
return &minAggrState{}
|
||||
}
|
||||
|
||||
func (as *minAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -31,25 +36,26 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &minStateValue{
|
||||
min: s.value,
|
||||
}
|
||||
v = &minStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*minStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
if s.value < sv.min {
|
||||
sv.min = s.value
|
||||
state := &sv.state[idx]
|
||||
if !state.exists {
|
||||
state.min = s.value
|
||||
state.exists = true
|
||||
} else if s.value < state.min {
|
||||
state.min = s.value
|
||||
}
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -60,25 +66,28 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *minAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*minStateValue)
|
||||
sv.mu.Lock()
|
||||
min := sv.min
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = minState{}
|
||||
sv.mu.Unlock()
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "min", currentTimeMsec, min)
|
||||
if state.exists {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "min", flushTimestamp, state.min)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,21 +5,20 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/valyala/histogram"
|
||||
)
|
||||
|
||||
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
|
||||
type quantilesAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
m sync.Map
|
||||
phis []float64
|
||||
}
|
||||
|
||||
type quantilesStateValue struct {
|
||||
mu sync.Mutex
|
||||
h *histogram.Fast
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]*histogram.Fast
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
|
||||
@@ -28,7 +27,7 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *quantilesAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -37,15 +36,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
h := histogram.GetFast()
|
||||
v = &quantilesStateValue{
|
||||
h: h,
|
||||
}
|
||||
v = &quantilesStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if loaded {
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
histogram.PutFast(h)
|
||||
v = vNew
|
||||
}
|
||||
}
|
||||
@@ -53,7 +48,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
sv.h.Update(s.value)
|
||||
if sv.state[idx] == nil {
|
||||
sv.state[idx] = histogram.GetFast()
|
||||
}
|
||||
sv.state[idx].Update(s.value)
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -64,33 +63,39 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *quantilesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
phis := as.phis
|
||||
var quantiles []float64
|
||||
var b []byte
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*quantilesStateValue)
|
||||
sv.mu.Lock()
|
||||
quantiles = sv.h.Quantiles(quantiles[:0], phis)
|
||||
histogram.PutFast(sv.h)
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
quantiles = quantiles[:0]
|
||||
if state != nil {
|
||||
quantiles = state.Quantiles(quantiles[:0], phis)
|
||||
histogram.PutFast(state)
|
||||
state.Reset()
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
for i, quantile := range quantiles {
|
||||
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
|
||||
phiStr := bytesutil.InternBytes(b)
|
||||
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
|
||||
if len(quantiles) > 0 {
|
||||
key := k.(string)
|
||||
for i, quantile := range quantiles {
|
||||
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
|
||||
phiStr := bytesutil.InternBytes(b)
|
||||
ctx.appendSeriesWithExtraLabel(key, "quantiles", flushTimestamp, quantile, "quantile", phiStr)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
@@ -2,53 +2,52 @@ package streamaggr
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// rateAggrState calculates output=rate, e.g. the counter per-second change.
|
||||
type rateAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
m sync.Map
|
||||
suffix string
|
||||
|
||||
// Time series state is dropped if no new samples are received during stalenessSecs.
|
||||
stalenessSecs uint64
|
||||
}
|
||||
|
||||
type rateStateValue struct {
|
||||
mu sync.Mutex
|
||||
lastValues map[string]rateLastValueState
|
||||
deleteDeadline uint64
|
||||
state map[string]rateState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type rateState struct {
|
||||
lastValues [aggrStateSize]rateLastValueState
|
||||
// prevTimestamp stores timestamp of the last registered value
|
||||
// in the previous aggregation interval
|
||||
prevTimestamp int64
|
||||
|
||||
// prevValue stores last registered value
|
||||
// in the previous aggregation interval
|
||||
prevValue float64
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type rateLastValueState struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline uint64
|
||||
firstValue float64
|
||||
value float64
|
||||
timestamp int64
|
||||
|
||||
// total stores cumulative difference between registered values
|
||||
// in the aggregation interval
|
||||
total float64
|
||||
// prevTimestamp stores timestamp of the last registered value
|
||||
// in the previous aggregation interval
|
||||
prevTimestamp int64
|
||||
}
|
||||
|
||||
func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState {
|
||||
stalenessSecs := roundDurationToSecs(stalenessInterval)
|
||||
func newRateAggrState(suffix string) *rateAggrState {
|
||||
return &rateAggrState{
|
||||
suffix: suffix,
|
||||
stalenessSecs: stalenessSecs,
|
||||
suffix: suffix,
|
||||
}
|
||||
}
|
||||
|
||||
func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + as.stalenessSecs
|
||||
func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
inputKey, outputKey := getInputOutputKey(s.key)
|
||||
@@ -57,9 +56,10 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &rateStateValue{
|
||||
lastValues: make(map[string]rateLastValueState),
|
||||
rsv := &rateStateValue{
|
||||
state: make(map[string]rateState),
|
||||
}
|
||||
v = rsv
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if loaded {
|
||||
@@ -71,15 +71,17 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
lv, ok := sv.lastValues[inputKey]
|
||||
if ok {
|
||||
state, ok := sv.state[inputKey]
|
||||
lv := state.lastValues[idx]
|
||||
if ok && lv.timestamp > 0 {
|
||||
if s.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
sv.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
if lv.prevTimestamp == 0 {
|
||||
lv.prevTimestamp = lv.timestamp
|
||||
if state.prevTimestamp == 0 {
|
||||
state.prevTimestamp = lv.timestamp
|
||||
state.prevValue = lv.value
|
||||
}
|
||||
if s.value >= lv.value {
|
||||
lv.total += s.value - lv.value
|
||||
@@ -87,13 +89,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||
// counter reset
|
||||
lv.total += s.value
|
||||
}
|
||||
} else if state.prevTimestamp > 0 {
|
||||
lv.firstValue = s.value
|
||||
}
|
||||
lv.value = s.value
|
||||
lv.timestamp = s.timestamp
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
|
||||
state.lastValues[idx] = lv
|
||||
state.deleteDeadline = deleteDeadline
|
||||
inputKey = bytesutil.InternString(inputKey)
|
||||
sv.lastValues[inputKey] = lv
|
||||
sv.state[inputKey] = state
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
@@ -105,18 +109,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
currentTimeMsec := int64(currentTime) * 1000
|
||||
func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
var staleOutputSamples, staleInputSamples int
|
||||
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
sv := v.(*rateStateValue)
|
||||
sv.mu.Lock()
|
||||
|
||||
// check for stale entries
|
||||
deleted := currentTime > sv.deleteDeadline
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
@@ -126,26 +127,33 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Delete outdated entries in sv.lastValues
|
||||
// Delete outdated entries in state
|
||||
var rate float64
|
||||
lvs := sv.lastValues
|
||||
for k1, v1 := range lvs {
|
||||
if currentTime > v1.deleteDeadline {
|
||||
delete(lvs, k1)
|
||||
var totalItems int
|
||||
for k1, state := range sv.state {
|
||||
if flushTimestamp > state.deleteDeadline {
|
||||
delete(sv.state, k1)
|
||||
staleInputSamples++
|
||||
continue
|
||||
}
|
||||
rateInterval := v1.timestamp - v1.prevTimestamp
|
||||
if v1.prevTimestamp > 0 && rateInterval > 0 {
|
||||
v1 := state.lastValues[idx]
|
||||
rateInterval := v1.timestamp - state.prevTimestamp
|
||||
if rateInterval > 0 && state.prevTimestamp > 0 {
|
||||
if v1.firstValue >= state.prevValue {
|
||||
v1.total += v1.firstValue - state.prevValue
|
||||
} else {
|
||||
v1.total += v1.firstValue
|
||||
}
|
||||
|
||||
// calculate rate only if value was seen at least twice with different timestamps
|
||||
rate += v1.total * 1000 / float64(rateInterval)
|
||||
v1.prevTimestamp = v1.timestamp
|
||||
v1.total = 0
|
||||
lvs[k1] = v1
|
||||
rate += (v1.total) * 1000 / float64(rateInterval)
|
||||
state.prevTimestamp = v1.timestamp
|
||||
state.prevValue = v1.value
|
||||
totalItems++
|
||||
}
|
||||
state.lastValues[idx] = rateLastValueState{}
|
||||
sv.state[k1] = state
|
||||
}
|
||||
// capture m length after deleted items were removed
|
||||
totalItems := len(lvs)
|
||||
sv.mu.Unlock()
|
||||
|
||||
if as.suffix == "rate_avg" && totalItems > 0 {
|
||||
@@ -153,7 +161,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
|
||||
}
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
|
||||
ctx.appendSeries(key, as.suffix, flushTimestamp, rate)
|
||||
return true
|
||||
})
|
||||
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
|
||||
@@ -14,18 +13,23 @@ type stddevAggrState struct {
|
||||
}
|
||||
|
||||
type stddevStateValue struct {
|
||||
mu sync.Mutex
|
||||
count float64
|
||||
avg float64
|
||||
q float64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]stddevState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type stddevState struct {
|
||||
count float64
|
||||
avg float64
|
||||
q float64
|
||||
}
|
||||
|
||||
func newStddevAggrState() *stddevAggrState {
|
||||
return &stddevAggrState{}
|
||||
}
|
||||
|
||||
func (as *stddevAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -47,10 +51,12 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
||||
sv.count++
|
||||
avg := sv.avg + (s.value-sv.avg)/sv.count
|
||||
sv.q += (s.value - sv.avg) * (s.value - avg)
|
||||
sv.avg = avg
|
||||
state := &sv.state[idx]
|
||||
state.count++
|
||||
avg := state.avg + (s.value-state.avg)/state.count
|
||||
state.q += (s.value - state.avg) * (s.value - avg)
|
||||
state.avg = avg
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -61,26 +67,28 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *stddevAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*stddevStateValue)
|
||||
sv.mu.Lock()
|
||||
stddev := math.Sqrt(sv.q / sv.count)
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = stddevState{}
|
||||
sv.mu.Unlock()
|
||||
if state.count > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "stddev", flushTimestamp, math.Sqrt(state.q/state.count))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
|
||||
@@ -13,18 +12,23 @@ type stdvarAggrState struct {
|
||||
}
|
||||
|
||||
type stdvarStateValue struct {
|
||||
mu sync.Mutex
|
||||
count float64
|
||||
avg float64
|
||||
q float64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]stdvarState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type stdvarState struct {
|
||||
count float64
|
||||
avg float64
|
||||
q float64
|
||||
}
|
||||
|
||||
func newStdvarAggrState() *stdvarAggrState {
|
||||
return &stdvarAggrState{}
|
||||
}
|
||||
|
||||
func (as *stdvarAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -46,10 +50,12 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
||||
sv.count++
|
||||
avg := sv.avg + (s.value-sv.avg)/sv.count
|
||||
sv.q += (s.value - sv.avg) * (s.value - avg)
|
||||
sv.avg = avg
|
||||
state := &sv.state[idx]
|
||||
state.count++
|
||||
avg := state.avg + (s.value-state.avg)/state.count
|
||||
state.q += (s.value - state.avg) * (s.value - avg)
|
||||
state.avg = avg
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -60,26 +66,28 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *stdvarAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*stdvarStateValue)
|
||||
sv.mu.Lock()
|
||||
stdvar := sv.q / sv.count
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = stdvarState{}
|
||||
sv.mu.Unlock()
|
||||
if state.count > 0 {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "stdvar", flushTimestamp, state.q/state.count)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
@@ -23,9 +24,13 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/histogram"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
// count of aggregation intervals for states
|
||||
const aggrStateSize = 2
|
||||
|
||||
var supportedOutputs = []string{
|
||||
"rate_sum",
|
||||
"rate_avg",
|
||||
@@ -69,21 +74,28 @@ var (
|
||||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) {
|
||||
data, err := fscore.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load aggregators: %w", err)
|
||||
a := &Aggregators{
|
||||
path: path,
|
||||
pushFunc: pushFunc,
|
||||
opts: opts,
|
||||
}
|
||||
data, err = envtemplate.ReplaceBytes(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
|
||||
if err := a.load(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
as, err := LoadFromData(data, pushFunc, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
|
||||
// Reload reads config file and updates aggregators if there're any changes
|
||||
func (a *Aggregators) Reload() error {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc()
|
||||
if err := a.load(); err != nil {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0)
|
||||
return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err)
|
||||
}
|
||||
|
||||
return as, nil
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Options contains optional settings for the Aggregators.
|
||||
@@ -243,45 +255,83 @@ type Config struct {
|
||||
|
||||
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
|
||||
// configData contains marshaled configs.
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
|
||||
ms *metrics.Set
|
||||
as []*aggregator
|
||||
ms *metrics.Set
|
||||
path string
|
||||
pushFunc PushFunc
|
||||
opts Options
|
||||
}
|
||||
|
||||
// LoadFromData loads aggregators from data.
|
||||
func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) {
|
||||
func (a *Aggregators) load() error {
|
||||
data, err := fscore.ReadFileOrHTTP(a.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load aggregators: %w", err)
|
||||
}
|
||||
data, err = envtemplate.ReplaceBytes(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err)
|
||||
}
|
||||
if err = a.loadAggregatorsFromData(data); err != nil {
|
||||
return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator {
|
||||
idx := slices.IndexFunc(a.as, func(ac *aggregator) bool {
|
||||
return ac.configData == aggr.configData
|
||||
})
|
||||
if idx >= 0 {
|
||||
return a.as[idx]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) loadAggregatorsFromData(data []byte) error {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
|
||||
return fmt.Errorf("cannot parse stream aggregation config: %w", err)
|
||||
}
|
||||
|
||||
ms := metrics.NewSet()
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
var unchanged, ac []*aggregator
|
||||
var ignoreAggrConfigs []string
|
||||
for i, cfg := range cfgs {
|
||||
opts.aggrID = i + 1
|
||||
a, err := newAggregator(cfg, pushFunc, ms, opts)
|
||||
a.opts.aggrID = i + 1
|
||||
aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts)
|
||||
if err != nil {
|
||||
// Stop already initialized aggregators before returning the error.
|
||||
for _, a := range as[:i] {
|
||||
a.MustStop()
|
||||
for _, c := range ac[:i] {
|
||||
c.MustStop()
|
||||
}
|
||||
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
}
|
||||
as[i] = a
|
||||
}
|
||||
configData, err := json.Marshal(cfgs)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
|
||||
if oldAggr := a.getAggregator(aggr); oldAggr != nil {
|
||||
aggr.MustStop()
|
||||
if !slices.Contains(ignoreAggrConfigs, oldAggr.configData) {
|
||||
unchanged = append(unchanged, oldAggr)
|
||||
ignoreAggrConfigs = append(ignoreAggrConfigs, oldAggr.configData)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if slices.ContainsFunc(ac, func(x *aggregator) bool {
|
||||
return x.configData == aggr.configData
|
||||
}) {
|
||||
aggr.MustStop()
|
||||
continue
|
||||
}
|
||||
ac = append(ac, aggr)
|
||||
}
|
||||
|
||||
metricLabels := fmt.Sprintf("url=%q", opts.Alias)
|
||||
metricLabels := fmt.Sprintf("url=%q", a.opts.Alias)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrConfigs))
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac))
|
||||
a.MustStop(ignoreAggrConfigs)
|
||||
a.as = slices.Concat(unchanged, ac)
|
||||
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
||||
n := uint64(0)
|
||||
for _, aggr := range as {
|
||||
for _, aggr := range a.as {
|
||||
if aggr.da != nil {
|
||||
n += aggr.da.sizeBytes()
|
||||
}
|
||||
@@ -290,7 +340,7 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
|
||||
})
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
||||
n := uint64(0)
|
||||
for _, aggr := range as {
|
||||
for _, aggr := range a.as {
|
||||
if aggr.da != nil {
|
||||
n += aggr.da.itemsCount()
|
||||
}
|
||||
@@ -299,11 +349,8 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
|
||||
})
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
configData: configData,
|
||||
ms: ms,
|
||||
}, nil
|
||||
a.ms = ms
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsEnabled returns true if Aggregators has at least one configured aggregator
|
||||
@@ -318,7 +365,7 @@ func (a *Aggregators) IsEnabled() bool {
|
||||
}
|
||||
|
||||
// MustStop stops a.
|
||||
func (a *Aggregators) MustStop() {
|
||||
func (a *Aggregators) MustStop(ignoreAggrConfigs []string) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
@@ -327,7 +374,9 @@ func (a *Aggregators) MustStop() {
|
||||
a.ms = nil
|
||||
|
||||
for _, aggr := range a.as {
|
||||
aggr.MustStop()
|
||||
if ignoreAggrConfigs == nil || !slices.Contains(ignoreAggrConfigs, aggr.configData) {
|
||||
aggr.MustStop()
|
||||
}
|
||||
}
|
||||
a.as = nil
|
||||
}
|
||||
@@ -337,7 +386,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
|
||||
if a == nil || b == nil {
|
||||
return a == nil && b == nil
|
||||
}
|
||||
return string(a.configData) == string(b.configData)
|
||||
return slices.Compare(a.configData(), b.configData()) == 0
|
||||
}
|
||||
|
||||
func (a *Aggregators) configData() []string {
|
||||
result := make([]string, len(a.as))
|
||||
for i := range result {
|
||||
result[i] = a.as[i].configData
|
||||
}
|
||||
slices.Sort(result)
|
||||
return result
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
@@ -370,15 +428,18 @@ type aggregator struct {
|
||||
|
||||
dropInputLabels []string
|
||||
|
||||
inputRelabeling *promrelabel.ParsedConfigs
|
||||
outputRelabeling *promrelabel.ParsedConfigs
|
||||
inputRelabeling *promrelabel.ParsedConfigs
|
||||
outputRelabeling *promrelabel.ParsedConfigs
|
||||
stalenessInterval time.Duration
|
||||
|
||||
keepMetricNames bool
|
||||
ignoreOldSamples bool
|
||||
configData string
|
||||
|
||||
by []string
|
||||
without []string
|
||||
aggregateOnlyByTime bool
|
||||
tickInterval int64
|
||||
|
||||
// da is set to non-nil if input samples must be de-duplicated
|
||||
da *dedupAggr
|
||||
@@ -389,6 +450,10 @@ type aggregator struct {
|
||||
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
|
||||
minTimestamp atomic.Int64
|
||||
|
||||
// time to wait after interval end before flush
|
||||
flushAfter *histogram.Fast
|
||||
muFlushAfter sync.Mutex
|
||||
|
||||
// suffix contains a suffix, which should be added to aggregate metric names
|
||||
//
|
||||
// It contains the interval, labels in (by, without), plus output name.
|
||||
@@ -414,17 +479,15 @@ type aggregator struct {
|
||||
}
|
||||
|
||||
type aggrState interface {
|
||||
// pushSamples must push samples to the aggrState.
|
||||
//
|
||||
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
|
||||
pushSamples(samples []pushSample)
|
||||
|
||||
flushState(ctx *flushCtx, resetState bool)
|
||||
pushSamples(samples []pushSample, deleteDeadline int64, idx int)
|
||||
flushState(ctx *flushCtx, flushTimestamp int64, idx int)
|
||||
}
|
||||
|
||||
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
|
||||
type PushFunc func(tss []prompbmarshal.TimeSeries)
|
||||
|
||||
type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int)
|
||||
|
||||
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
|
||||
//
|
||||
// opts can contain additional options. If opts is nil, then default options are used.
|
||||
@@ -475,6 +538,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
dropInputLabels := opts.DropInputLabels
|
||||
if v := cfg.DropInputLabels; v != nil {
|
||||
dropInputLabels = *v
|
||||
} else {
|
||||
cfg.DropInputLabels = &dropInputLabels
|
||||
}
|
||||
|
||||
// initialize input_relabel_configs and output_relabel_configs
|
||||
@@ -502,6 +567,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
keepMetricNames := opts.KeepMetricNames
|
||||
if v := cfg.KeepMetricNames; v != nil {
|
||||
keepMetricNames = *v
|
||||
} else {
|
||||
cfg.KeepMetricNames = &keepMetricNames
|
||||
}
|
||||
if keepMetricNames {
|
||||
if len(cfg.Outputs) != 1 {
|
||||
@@ -516,12 +583,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
ignoreOldSamples := opts.IgnoreOldSamples
|
||||
if v := cfg.IgnoreOldSamples; v != nil {
|
||||
ignoreOldSamples = *v
|
||||
} else {
|
||||
cfg.IgnoreOldSamples = &ignoreOldSamples
|
||||
}
|
||||
|
||||
// check cfg.IgnoreFirstIntervals
|
||||
ignoreFirstIntervals := opts.IgnoreFirstIntervals
|
||||
if v := cfg.IgnoreFirstIntervals; v != nil {
|
||||
ignoreFirstIntervals = *v
|
||||
} else {
|
||||
cfg.IgnoreFirstIntervals = &ignoreFirstIntervals
|
||||
}
|
||||
|
||||
// initialize outputs list
|
||||
@@ -529,6 +600,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
|
||||
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
|
||||
}
|
||||
slices.Sort(cfg.Outputs)
|
||||
cfg.Outputs = slices.Compact(cfg.Outputs)
|
||||
aggrStates := make([]aggrState, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
if strings.HasPrefix(output, "quantiles(") {
|
||||
@@ -557,17 +630,17 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
}
|
||||
switch output {
|
||||
case "total":
|
||||
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
|
||||
aggrStates[i] = newTotalAggrState(false, true)
|
||||
case "total_prometheus":
|
||||
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
|
||||
aggrStates[i] = newTotalAggrState(false, false)
|
||||
case "increase":
|
||||
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
|
||||
aggrStates[i] = newTotalAggrState(true, true)
|
||||
case "increase_prometheus":
|
||||
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
|
||||
aggrStates[i] = newTotalAggrState(true, false)
|
||||
case "rate_sum":
|
||||
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum")
|
||||
aggrStates[i] = newRateAggrState("rate_sum")
|
||||
case "rate_avg":
|
||||
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg")
|
||||
aggrStates[i] = newRateAggrState("rate_avg")
|
||||
case "count_series":
|
||||
aggrStates[i] = newCountSeriesAggrState()
|
||||
case "count_samples":
|
||||
@@ -589,7 +662,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
case "stdvar":
|
||||
aggrStates[i] = newStdvarAggrState()
|
||||
case "histogram_bucket":
|
||||
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
|
||||
aggrStates[i] = newHistogramBucketAggrState()
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
|
||||
"see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
|
||||
@@ -626,18 +699,21 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
inputRelabeling: inputRelabeling,
|
||||
outputRelabeling: outputRelabeling,
|
||||
|
||||
keepMetricNames: keepMetricNames,
|
||||
ignoreOldSamples: ignoreOldSamples,
|
||||
keepMetricNames: keepMetricNames,
|
||||
ignoreOldSamples: ignoreOldSamples,
|
||||
stalenessInterval: stalenessInterval,
|
||||
|
||||
by: by,
|
||||
without: without,
|
||||
aggregateOnlyByTime: aggregateOnlyByTime,
|
||||
tickInterval: interval.Milliseconds(),
|
||||
|
||||
aggrStates: aggrStates,
|
||||
|
||||
suffix: suffix,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
stopCh: make(chan struct{}),
|
||||
flushAfter: histogram.NewFast(),
|
||||
|
||||
flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
|
||||
dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
|
||||
@@ -663,21 +739,38 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
||||
}
|
||||
if dedupInterval > 0 {
|
||||
a.da = newDedupAggr()
|
||||
a.tickInterval = dedupInterval.Milliseconds()
|
||||
}
|
||||
|
||||
alignFlushToInterval := !opts.NoAlignFlushToInterval
|
||||
noAlignFlushToInterval := opts.NoAlignFlushToInterval
|
||||
if v := cfg.NoAlignFlushToInterval; v != nil {
|
||||
alignFlushToInterval = !*v
|
||||
noAlignFlushToInterval = *v
|
||||
} else {
|
||||
cfg.NoAlignFlushToInterval = &noAlignFlushToInterval
|
||||
}
|
||||
|
||||
skipIncompleteFlush := !opts.FlushOnShutdown
|
||||
flushOnShutdown := opts.FlushOnShutdown
|
||||
if v := cfg.FlushOnShutdown; v != nil {
|
||||
skipIncompleteFlush = !*v
|
||||
flushOnShutdown = *v
|
||||
} else {
|
||||
cfg.FlushOnShutdown = &flushOnShutdown
|
||||
}
|
||||
|
||||
configData, err := json.Marshal(&cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to marshal config: %w", err)
|
||||
}
|
||||
a.configData = string(configData)
|
||||
|
||||
minTime := time.Now()
|
||||
if !flushOnShutdown && !noAlignFlushToInterval {
|
||||
minTime = minTime.Truncate(interval).Add(interval)
|
||||
}
|
||||
a.minTimestamp.Store(minTime.UnixMilli())
|
||||
|
||||
a.wg.Add(1)
|
||||
go func() {
|
||||
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
|
||||
a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals)
|
||||
a.wg.Done()
|
||||
}()
|
||||
|
||||
@@ -709,75 +802,89 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
|
||||
}
|
||||
}
|
||||
|
||||
if dedupInterval <= 0 {
|
||||
alignedSleep(interval)
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
flushDeadline := time.Now().Truncate(interval).Add(interval)
|
||||
tickInterval := time.Duration(a.tickInterval) * time.Millisecond
|
||||
alignedSleep(tickInterval)
|
||||
|
||||
if alignFlushToInterval && skipIncompleteFlush {
|
||||
a.flush(nil, interval, true)
|
||||
ignoreFirstIntervals--
|
||||
var dedupIdx, flushIdx int
|
||||
|
||||
t := time.NewTicker(tickInterval)
|
||||
defer t.Stop()
|
||||
|
||||
isSkippedFirstFlush := false
|
||||
for tickerWait(t) {
|
||||
ct := time.Now()
|
||||
|
||||
dedupTime := ct.Truncate(tickInterval)
|
||||
if a.ignoreOldSamples {
|
||||
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
|
||||
}
|
||||
pf := pushFunc
|
||||
|
||||
// Calculate delay
|
||||
a.muFlushAfter.Lock()
|
||||
flushAfterMsec := a.flushAfter.Quantile(0.95)
|
||||
a.flushAfter.Reset()
|
||||
a.muFlushAfter.Unlock()
|
||||
flushAfter := time.Duration(flushAfterMsec) * time.Millisecond
|
||||
|
||||
if flushAfter > tickInterval {
|
||||
logger.Warnf("metrics ingestion lag (%v) is more than tick interval (%v). "+
|
||||
"gaps are expected in aggregations", flushAfter, tickInterval)
|
||||
pf = nil
|
||||
} else {
|
||||
time.Sleep(flushAfter)
|
||||
}
|
||||
|
||||
for tickerWait(t) {
|
||||
if ignoreFirstIntervals > 0 {
|
||||
a.flush(nil, interval, true)
|
||||
a.dedupFlush(dedupInterval, dedupTime.UnixMilli(), dedupIdx, flushIdx)
|
||||
|
||||
if ct.After(flushDeadline) {
|
||||
// It is time to flush the aggregated state
|
||||
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
|
||||
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
|
||||
isSkippedFirstFlush = true
|
||||
} else if ignoreFirstIntervals > 0 {
|
||||
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
|
||||
ignoreFirstIntervals--
|
||||
} else {
|
||||
a.flush(pushFunc, interval, true)
|
||||
a.flush(pf, interval, flushDeadline.UnixMilli(), flushIdx)
|
||||
}
|
||||
|
||||
if alignFlushToInterval {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
for ct.After(flushDeadline) {
|
||||
flushDeadline = flushDeadline.Add(interval)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
alignedSleep(dedupInterval)
|
||||
t := time.NewTicker(dedupInterval)
|
||||
defer t.Stop()
|
||||
|
||||
flushDeadline := time.Now().Add(interval)
|
||||
isSkippedFirstFlush := false
|
||||
for tickerWait(t) {
|
||||
a.dedupFlush(dedupInterval)
|
||||
|
||||
ct := time.Now()
|
||||
if ct.After(flushDeadline) {
|
||||
// It is time to flush the aggregated state
|
||||
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
|
||||
a.flush(nil, interval, true)
|
||||
ignoreFirstIntervals--
|
||||
isSkippedFirstFlush = true
|
||||
} else if ignoreFirstIntervals > 0 {
|
||||
a.flush(nil, interval, true)
|
||||
ignoreFirstIntervals--
|
||||
} else {
|
||||
a.flush(pushFunc, interval, true)
|
||||
}
|
||||
for ct.After(flushDeadline) {
|
||||
flushDeadline = flushDeadline.Add(interval)
|
||||
}
|
||||
}
|
||||
|
||||
if alignFlushToInterval {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
if alignFlushToInterval {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
|
||||
a.dedupFlush(dedupInterval)
|
||||
a.flush(pushFunc, interval, true)
|
||||
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
|
||||
if a.ignoreOldSamples {
|
||||
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
|
||||
}
|
||||
a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx)
|
||||
a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
|
||||
func getAggrIdxs(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) {
|
||||
flushIdx := getStateIdx(interval.Milliseconds(), flushTime.Add(-interval).UnixMilli())
|
||||
dedupIdx := flushIdx
|
||||
if dedupInterval > 0 {
|
||||
dedupIdx = getStateIdx(dedupInterval.Milliseconds(), dedupTime.Add(-dedupInterval).UnixMilli())
|
||||
}
|
||||
return dedupIdx, flushIdx
|
||||
}
|
||||
|
||||
func getStateIdx(interval int64, ts int64) int {
|
||||
return int(ts/interval) % aggrStateSize
|
||||
}
|
||||
|
||||
func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) {
|
||||
if dedupInterval <= 0 {
|
||||
// The de-duplication is disabled.
|
||||
return
|
||||
@@ -785,7 +892,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
a.da.flush(a.pushSamples)
|
||||
a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx)
|
||||
|
||||
d := time.Since(startTime)
|
||||
a.dedupFlushDuration.Update(d.Seconds())
|
||||
@@ -797,14 +904,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
|
||||
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, flushTimestamp int64, idx int) {
|
||||
startTime := time.Now()
|
||||
|
||||
// Update minTimestamp before flushing samples to the storage,
|
||||
// since the flush durtion can be quite long.
|
||||
// This should prevent from dropping samples with old timestamps when the flush takes long time.
|
||||
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, as := range a.aggrStates {
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
@@ -816,7 +918,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
|
||||
}()
|
||||
|
||||
ctx := getFlushCtx(a, pushFunc)
|
||||
as.flushState(ctx, resetState)
|
||||
as.flushState(ctx, flushTimestamp, idx)
|
||||
ctx.flushSeries()
|
||||
ctx.resetSeries()
|
||||
putFlushCtx(ctx)
|
||||
@@ -826,6 +928,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
|
||||
|
||||
d := time.Since(startTime)
|
||||
a.flushDuration.Update(d.Seconds())
|
||||
a.minTimestamp.Store(flushTimestamp)
|
||||
if d > interval {
|
||||
a.flushTimeouts.Inc()
|
||||
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+
|
||||
@@ -855,11 +958,16 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||
labels := &ctx.labels
|
||||
inputLabels := &ctx.inputLabels
|
||||
outputLabels := &ctx.outputLabels
|
||||
currentTime := time.Now()
|
||||
currentTimestamp := currentTime.UnixMilli()
|
||||
deleteDeadline := currentTime.Add(a.stalenessInterval)
|
||||
deleteDeadlineMilli := deleteDeadline.UnixMilli()
|
||||
|
||||
dropLabels := a.dropInputLabels
|
||||
ignoreOldSamples := a.ignoreOldSamples
|
||||
minTimestamp := a.minTimestamp.Load()
|
||||
var maxLag int64
|
||||
var flushIdx int
|
||||
for idx, ts := range tss {
|
||||
if !a.match.Match(ts.Labels) {
|
||||
continue
|
||||
@@ -892,37 +1000,47 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
||||
// do not intern key because number of unique keys could be too high
|
||||
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
for _, sample := range ts.Samples {
|
||||
a.muFlushAfter.Lock()
|
||||
a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp))
|
||||
a.muFlushAfter.Unlock()
|
||||
if math.IsNaN(sample.Value) {
|
||||
a.ignoredNanSamples.Inc()
|
||||
// Skip NaN values
|
||||
a.ignoredNanSamples.Inc()
|
||||
continue
|
||||
}
|
||||
if ignoreOldSamples && sample.Timestamp < minTimestamp {
|
||||
a.ignoredOldSamples.Inc()
|
||||
// Skip old samples outside the current aggregation interval
|
||||
a.ignoredOldSamples.Inc()
|
||||
continue
|
||||
}
|
||||
if maxLag < now-sample.Timestamp {
|
||||
maxLag = now - sample.Timestamp
|
||||
}
|
||||
samples = append(samples, pushSample{
|
||||
if ignoreOldSamples {
|
||||
flushIdx = getStateIdx(a.tickInterval, sample.Timestamp)
|
||||
}
|
||||
samples[flushIdx] = append(samples[flushIdx], pushSample{
|
||||
key: key,
|
||||
value: sample.Value,
|
||||
timestamp: sample.Timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
if len(samples) > 0 {
|
||||
a.matchedSamples.Add(len(samples))
|
||||
a.samplesLag.Update(float64(maxLag) / 1_000)
|
||||
}
|
||||
|
||||
ctx.samples = samples
|
||||
ctx.buf = buf
|
||||
|
||||
pushSamples := a.pushSamples
|
||||
if a.da != nil {
|
||||
a.da.pushSamples(samples)
|
||||
} else {
|
||||
a.pushSamples(samples)
|
||||
pushSamples = a.da.pushSamples
|
||||
}
|
||||
|
||||
for idx, s := range samples {
|
||||
if len(s) > 0 {
|
||||
a.samplesLag.Update(float64(maxLag) / 1_000)
|
||||
a.matchedSamples.Add(len(s))
|
||||
pushSamples(s, deleteDeadlineMilli, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -963,14 +1081,14 @@ func getInputOutputKey(key string) (string, string) {
|
||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||
}
|
||||
|
||||
func (a *aggregator) pushSamples(samples []pushSample) {
|
||||
func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for _, as := range a.aggrStates {
|
||||
as.pushSamples(samples)
|
||||
as.pushSamples(samples, deleteDeadline, idx)
|
||||
}
|
||||
}
|
||||
|
||||
type pushCtx struct {
|
||||
samples []pushSample
|
||||
samples [aggrStateSize][]pushSample
|
||||
labels promutils.Labels
|
||||
inputLabels promutils.Labels
|
||||
outputLabels promutils.Labels
|
||||
@@ -978,8 +1096,9 @@ type pushCtx struct {
|
||||
}
|
||||
|
||||
func (ctx *pushCtx) reset() {
|
||||
clear(ctx.samples)
|
||||
ctx.samples = ctx.samples[:0]
|
||||
for i := range ctx.samples {
|
||||
ctx.samples[i] = ctx.samples[i][:0]
|
||||
}
|
||||
|
||||
ctx.labels.Reset()
|
||||
ctx.inputLabels.Reset()
|
||||
|
||||
@@ -19,12 +19,11 @@ func TestAggregatorsFailure(t *testing.T) {
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {
|
||||
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, Options{})
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
a := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if a != nil {
|
||||
t.Fatalf("expecting nil a")
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,12 +156,17 @@ func TestAggregatorsEqual(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
aa, err := LoadFromData([]byte(a), pushFunc, Options{})
|
||||
if err != nil {
|
||||
aa := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := aa.loadAggregatorsFromData([]byte(a)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
ab, err := LoadFromData([]byte(b), pushFunc, Options{})
|
||||
if err != nil {
|
||||
|
||||
ab := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := ab.loadAggregatorsFromData([]byte(b)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
result := aa.Equal(ab)
|
||||
@@ -220,12 +224,14 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
tssOutput = appendClonedTimeseries(tssOutput, tss)
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
opts := Options{
|
||||
FlushOnShutdown: true,
|
||||
NoAlignFlushToInterval: true,
|
||||
a := &Aggregators{
|
||||
opts: Options{
|
||||
FlushOnShutdown: true,
|
||||
NoAlignFlushToInterval: true,
|
||||
},
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, opts)
|
||||
if err != nil {
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
@@ -233,7 +239,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
offsetMsecs := time.Now().UnixMilli()
|
||||
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
||||
matchIdxs := a.Push(tssInput, nil)
|
||||
a.MustStop()
|
||||
a.MustStop(nil)
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
@@ -262,7 +268,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
outputs: [count_samples, sum_samples, count_series, last]
|
||||
`, `
|
||||
foo{abc="123"} 4
|
||||
bar 5 100
|
||||
bar 5 11
|
||||
bar 34 10
|
||||
foo{abc="123"} 8.5
|
||||
foo{abc="456",de="fg"} 8
|
||||
@@ -507,8 +513,8 @@ foo:1m_by_abc_sum_samples{abc="456"} 8
|
||||
`, `
|
||||
foo 123
|
||||
bar{baz="qwe"} 4.34
|
||||
`, `bar:1m_total{baz="qwe"} 0
|
||||
foo:1m_total 0
|
||||
`, `bar:1m_total{baz="qwe"} 4.34
|
||||
foo:1m_total 123
|
||||
`, "11")
|
||||
|
||||
// total_prometheus output for non-repeated series
|
||||
@@ -529,16 +535,16 @@ foo:1m_total_prometheus 0
|
||||
`, `
|
||||
foo 123
|
||||
bar{baz="qwe"} 1.31
|
||||
bar{baz="qwe"} 4.34 1000
|
||||
bar{baz="qwe"} 4.34 1
|
||||
bar{baz="qwe"} 2
|
||||
foo{baz="qwe"} -5
|
||||
bar{baz="qwer"} 343
|
||||
bar{baz="qwer"} 344
|
||||
foo{baz="qwe"} 10
|
||||
`, `bar:1m_total{baz="qwe"} 3.03
|
||||
bar:1m_total{baz="qwer"} 1
|
||||
foo:1m_total 0
|
||||
foo:1m_total{baz="qwe"} 15
|
||||
`, `bar:1m_total{baz="qwe"} 4.34
|
||||
bar:1m_total{baz="qwer"} 344
|
||||
foo:1m_total 123
|
||||
foo:1m_total{baz="qwe"} 10
|
||||
`, "11111111")
|
||||
|
||||
// total_prometheus output for repeated series
|
||||
@@ -574,8 +580,8 @@ foo{baz="qwe"} -5
|
||||
bar{baz="qwer"} 343
|
||||
bar{baz="qwer"} 344
|
||||
foo{baz="qwe"} 10
|
||||
`, `bar:1m_total 6.02
|
||||
foo:1m_total 15
|
||||
`, `bar:1m_total 350.34
|
||||
foo:1m_total 133
|
||||
`, "11111111")
|
||||
|
||||
// total_prometheus output for repeated series with group by __name__
|
||||
@@ -603,8 +609,8 @@ foo:1m_total_prometheus 15
|
||||
`, `
|
||||
foo 123
|
||||
bar{baz="qwe"} 4.34
|
||||
`, `bar:1m_increase{baz="qwe"} 0
|
||||
foo:1m_increase 0
|
||||
`, `bar:1m_increase{baz="qwe"} 4.34
|
||||
foo:1m_increase 123
|
||||
`, "11")
|
||||
|
||||
// increase_prometheus output for non-repeated series
|
||||
@@ -631,10 +637,10 @@ foo{baz="qwe"} -5
|
||||
bar{baz="qwer"} 343
|
||||
bar{baz="qwer"} 344
|
||||
foo{baz="qwe"} 10
|
||||
`, `bar:1m_increase{baz="qwe"} 5.02
|
||||
bar:1m_increase{baz="qwer"} 1
|
||||
foo:1m_increase 0
|
||||
foo:1m_increase{baz="qwe"} 15
|
||||
`, `bar:1m_increase{baz="qwe"} 6.34
|
||||
bar:1m_increase{baz="qwer"} 344
|
||||
foo:1m_increase 123
|
||||
foo:1m_increase{baz="qwe"} 10
|
||||
`, "11111111")
|
||||
|
||||
// increase_prometheus output for repeated series
|
||||
@@ -917,12 +923,14 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
opts := Options{
|
||||
DedupInterval: 30 * time.Second,
|
||||
FlushOnShutdown: true,
|
||||
a := &Aggregators{
|
||||
opts: Options{
|
||||
DedupInterval: 30 * time.Second,
|
||||
FlushOnShutdown: true,
|
||||
},
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, opts)
|
||||
if err != nil {
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
@@ -930,7 +938,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
offsetMsecs := time.Now().UnixMilli()
|
||||
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
||||
matchIdxs := a.Push(tssInput, nil)
|
||||
a.MustStop()
|
||||
a.MustStop(nil)
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
|
||||
@@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
|
||||
}
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
a := newBenchAggregators(outputs, pushFunc)
|
||||
defer a.MustStop()
|
||||
defer a.MustStop(nil)
|
||||
_ = a.Push(benchSeries, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
@@ -55,7 +55,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
|
||||
b.SetBytes(int64(len(benchSeries) * len(outputs)))
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, aggr := range a.as {
|
||||
aggr.flush(pushFunc, time.Hour, false)
|
||||
aggr.flush(pushFunc, time.Hour, time.Now().UnixMilli(), 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
|
||||
func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
a := newBenchAggregators([]string{output}, pushFunc)
|
||||
defer a.MustStop()
|
||||
defer a.MustStop(nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
@@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
||||
outputs: [%s]
|
||||
`, strings.Join(outputsQuoted, ","))
|
||||
|
||||
a, err := LoadFromData([]byte(config), pushFunc, Options{})
|
||||
if err != nil {
|
||||
a := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
|
||||
}
|
||||
return a
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
|
||||
@@ -13,16 +12,22 @@ type sumSamplesAggrState struct {
|
||||
}
|
||||
|
||||
type sumSamplesStateValue struct {
|
||||
mu sync.Mutex
|
||||
sum float64
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]sumState
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
type sumState struct {
|
||||
sum float64
|
||||
exists bool
|
||||
}
|
||||
|
||||
func newSumSamplesAggrState() *sumSamplesAggrState {
|
||||
return &sumSamplesAggrState{}
|
||||
}
|
||||
|
||||
func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -31,23 +36,21 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &sumSamplesStateValue{
|
||||
sum: s.value,
|
||||
}
|
||||
v = &sumSamplesStateValue{}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*sumSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
sv.sum += s.value
|
||||
sv.state[idx].sum += s.value
|
||||
sv.state[idx].exists = true
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -58,26 +61,28 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*sumSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
sum := sv.sum
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := sv.state[idx]
|
||||
sv.state[idx] = sumState{}
|
||||
sv.mu.Unlock()
|
||||
if state.exists {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "sum_samples", flushTimestamp, state.sum)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,10 +3,8 @@ package streamaggr
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
|
||||
@@ -20,36 +18,28 @@ type totalAggrState struct {
|
||||
|
||||
// Whether to take into account the first sample in new time series when calculating the output value.
|
||||
keepFirstSample bool
|
||||
|
||||
// Time series state is dropped if no new samples are received during stalenessSecs.
|
||||
//
|
||||
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
|
||||
// see ignoreFirstSampleDeadline for more details.
|
||||
stalenessSecs uint64
|
||||
|
||||
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
||||
// This allows avoiding an initial spike of the output values at startup when new time series
|
||||
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
||||
ignoreFirstSampleDeadline uint64
|
||||
}
|
||||
|
||||
type totalStateValue struct {
|
||||
mu sync.Mutex
|
||||
lastValues map[string]totalLastValueState
|
||||
total float64
|
||||
deleteDeadline uint64
|
||||
shared totalState
|
||||
state [aggrStateSize]float64
|
||||
deleteDeadline int64
|
||||
deleted bool
|
||||
}
|
||||
|
||||
type totalState struct {
|
||||
total float64
|
||||
lastValues map[string]totalLastValueState
|
||||
}
|
||||
|
||||
type totalLastValueState struct {
|
||||
value float64
|
||||
timestamp int64
|
||||
deleteDeadline uint64
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
|
||||
stalenessSecs := roundDurationToSecs(stalenessInterval)
|
||||
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
|
||||
func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
|
||||
suffix := "total"
|
||||
if resetTotalOnFlush {
|
||||
suffix = "increase"
|
||||
@@ -58,18 +48,14 @@ func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepF
|
||||
suffix += "_prometheus"
|
||||
}
|
||||
return &totalAggrState{
|
||||
suffix: suffix,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
keepFirstSample: keepFirstSample,
|
||||
stalenessSecs: stalenessSecs,
|
||||
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
||||
suffix: suffix,
|
||||
resetTotalOnFlush: resetTotalOnFlush,
|
||||
keepFirstSample: keepFirstSample,
|
||||
}
|
||||
}
|
||||
|
||||
func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + as.stalenessSecs
|
||||
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
|
||||
func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
var deleted bool
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
inputKey, outputKey := getInputOutputKey(s.key)
|
||||
@@ -79,7 +65,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &totalStateValue{
|
||||
lastValues: make(map[string]totalLastValueState),
|
||||
shared: totalState{
|
||||
lastValues: make(map[string]totalLastValueState),
|
||||
},
|
||||
}
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
@@ -90,10 +78,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
sv := v.(*totalStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
deleted = sv.deleted
|
||||
if !deleted {
|
||||
lv, ok := sv.lastValues[inputKey]
|
||||
if ok || keepFirstSample {
|
||||
lv, ok := sv.shared.lastValues[inputKey]
|
||||
if ok || as.keepFirstSample {
|
||||
if s.timestamp < lv.timestamp {
|
||||
// Skip out of order sample
|
||||
sv.mu.Unlock()
|
||||
@@ -101,10 +89,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
|
||||
if s.value >= lv.value {
|
||||
sv.total += s.value - lv.value
|
||||
sv.state[idx] += s.value - lv.value
|
||||
} else {
|
||||
// counter reset
|
||||
sv.total += s.value
|
||||
sv.state[idx] += s.value
|
||||
}
|
||||
}
|
||||
lv.value = s.value
|
||||
@@ -112,7 +100,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
lv.deleteDeadline = deleteDeadline
|
||||
|
||||
inputKey = bytesutil.InternString(inputKey)
|
||||
sv.lastValues[inputKey] = lv
|
||||
sv.shared.lastValues[inputKey] = lv
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
@@ -124,64 +112,44 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
|
||||
func (as *totalAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
var total float64
|
||||
m := &as.m
|
||||
var staleInputSamples, staleOutputSamples int
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
sv := v.(*totalStateValue)
|
||||
|
||||
sv.mu.Lock()
|
||||
deleted := currentTime > sv.deleteDeadline
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
staleOutputSamples++
|
||||
} else {
|
||||
// Delete outdated entries in sv.lastValues
|
||||
m := sv.lastValues
|
||||
for k1, v1 := range m {
|
||||
if currentTime > v1.deleteDeadline {
|
||||
delete(m, k1)
|
||||
staleInputSamples++
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
total = sv.shared.total + sv.state[idx]
|
||||
for k1, v1 := range sv.shared.lastValues {
|
||||
if flushTimestamp > v1.deleteDeadline {
|
||||
delete(sv.shared.lastValues, k1)
|
||||
}
|
||||
}
|
||||
sv.state[idx] = 0
|
||||
if !as.resetTotalOnFlush {
|
||||
if math.Abs(total) >= (1 << 53) {
|
||||
// It is time to reset the entry, since it starts losing float64 precision
|
||||
sv.shared.total = 0
|
||||
} else {
|
||||
sv.shared.total = total
|
||||
}
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
if deleted {
|
||||
m.Delete(k)
|
||||
}
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, as.suffix, flushTimestamp, total)
|
||||
return true
|
||||
})
|
||||
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
|
||||
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
|
||||
}
|
||||
|
||||
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
currentTimeMsec := int64(currentTime) * 1000
|
||||
|
||||
as.removeOldEntries(ctx, currentTime)
|
||||
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
sv := v.(*totalStateValue)
|
||||
sv.mu.Lock()
|
||||
total := sv.total
|
||||
if resetState {
|
||||
if as.resetTotalOnFlush {
|
||||
sv.total = 0
|
||||
} else if math.Abs(sv.total) >= (1 << 53) {
|
||||
// It is time to reset the entry, since it starts losing float64 precision
|
||||
sv.total = 0
|
||||
}
|
||||
}
|
||||
deleted := sv.deleted
|
||||
sv.mu.Unlock()
|
||||
if !deleted {
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, as.suffix, currentTimeMsec, total)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values.
|
||||
@@ -13,16 +12,17 @@ type uniqueSamplesAggrState struct {
|
||||
}
|
||||
|
||||
type uniqueSamplesStateValue struct {
|
||||
mu sync.Mutex
|
||||
m map[float64]struct{}
|
||||
deleted bool
|
||||
mu sync.Mutex
|
||||
state [aggrStateSize]map[float64]struct{}
|
||||
deleted bool
|
||||
deleteDeadline int64
|
||||
}
|
||||
|
||||
func newUniqueSamplesAggrState() *uniqueSamplesAggrState {
|
||||
return &uniqueSamplesAggrState{}
|
||||
}
|
||||
|
||||
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
|
||||
for i := range samples {
|
||||
s := &samples[i]
|
||||
outputKey := getOutputKey(s.key)
|
||||
@@ -31,27 +31,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
v, ok := as.m.Load(outputKey)
|
||||
if !ok {
|
||||
// The entry is missing in the map. Try creating it.
|
||||
v = &uniqueSamplesStateValue{
|
||||
m: map[float64]struct{}{
|
||||
s.value: {},
|
||||
},
|
||||
usv := &uniqueSamplesStateValue{}
|
||||
for iu := range usv.state {
|
||||
usv.state[iu] = make(map[float64]struct{})
|
||||
}
|
||||
v = usv
|
||||
outputKey = bytesutil.InternString(outputKey)
|
||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||
if !loaded {
|
||||
// The new entry has been successfully created.
|
||||
continue
|
||||
if loaded {
|
||||
// Update the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
// Use the entry created by a concurrent goroutine.
|
||||
v = vNew
|
||||
}
|
||||
sv := v.(*uniqueSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
deleted := sv.deleted
|
||||
if !deleted {
|
||||
if _, ok := sv.m[s.value]; !ok {
|
||||
sv.m[s.value] = struct{}{}
|
||||
if _, ok := sv.state[idx][s.value]; !ok {
|
||||
sv.state[idx][s.value] = struct{}{}
|
||||
}
|
||||
sv.deleteDeadline = deleteDeadline
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
if deleted {
|
||||
@@ -62,26 +61,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
|
||||
}
|
||||
}
|
||||
|
||||
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
|
||||
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
|
||||
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
|
||||
m := &as.m
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
if resetState {
|
||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||
m.Delete(k)
|
||||
}
|
||||
|
||||
sv := v.(*uniqueSamplesStateValue)
|
||||
sv.mu.Lock()
|
||||
n := len(sv.m)
|
||||
if resetState {
|
||||
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
|
||||
sv.deleted = true
|
||||
}
|
||||
sv.mu.Unlock()
|
||||
|
||||
// check for stale entries
|
||||
deleted := flushTimestamp > sv.deleteDeadline
|
||||
if deleted {
|
||||
// Mark the current entry as deleted
|
||||
sv.deleted = deleted
|
||||
sv.mu.Unlock()
|
||||
m.Delete(k)
|
||||
return true
|
||||
}
|
||||
state := len(sv.state[idx])
|
||||
sv.state[idx] = make(map[float64]struct{})
|
||||
sv.mu.Unlock()
|
||||
key := k.(string)
|
||||
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
|
||||
ctx.appendSeries(key, "unique_samples", flushTimestamp, float64(state))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/metrics/histogram.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/metrics/histogram.go
generated
vendored
@@ -130,7 +130,7 @@ func (h *Histogram) Merge(b *Histogram) {
|
||||
h.decimalBuckets[i] = &b
|
||||
}
|
||||
for j := range db {
|
||||
h.decimalBuckets[i][j] = db[j]
|
||||
h.decimalBuckets[i][j] += db[j]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -115,7 +115,7 @@ github.com/VictoriaMetrics/easyproto
|
||||
# github.com/VictoriaMetrics/fastcache v1.12.2
|
||||
## explicit; go 1.13
|
||||
github.com/VictoriaMetrics/fastcache
|
||||
# github.com/VictoriaMetrics/metrics v1.34.0
|
||||
# github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
|
||||
## explicit; go 1.17
|
||||
github.com/VictoriaMetrics/metrics
|
||||
# github.com/VictoriaMetrics/metricsql v0.76.0
|
||||
|
||||
Reference in New Issue
Block a user