Compare commits

...

4 Commits

Author SHA1 Message Date
AndrewChubatiuk
ad29bd69ee hot-reload enabled 2024-07-09 11:28:28 +03:00
AndrewChubatiuk
aca33307a9 lib/streamaggr: added aggregation windows 2024-07-09 11:28:27 +03:00
AndrewChubatiuk
129b2236ef review comments 2024-07-09 11:07:52 +03:00
AndrewChubatiuk
c8685741b3 lib/streamaggr: added aggregation windows 2024-07-09 11:07:52 +03:00
29 changed files with 961 additions and 834 deletions

View File

@@ -231,15 +231,23 @@ func Init() {
// Start config reloader.
configReloaderWG.Add(1)
go func() {
var streamAggrConfigReloaderCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
streamAggrConfigReloaderCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-sighupCh:
reloadRelabelConfigs()
reloadStreamAggrConfigs()
case <-streamAggrConfigReloaderCh:
reloadStreamAggrConfigs()
case <-configReloaderStopCh:
return
}
reloadRelabelConfigs()
reloadStreamAggrConfigs()
}
}()
}
@@ -376,11 +384,9 @@ func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()
sasGlobal.Load().MustStop()
if deduplicatorGlobal != nil {
deduplicatorGlobal.MustStop()
deduplicatorGlobal = nil
}
sasGlobal.Load().MustStop(nil)
deduplicatorGlobal.MustStop()
deduplicatorGlobal = nil
for _, rwctx := range rwctxs {
rwctx.MustStop()
@@ -850,12 +856,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
// sas and deduplicator must be stopped before rwctx is closed
// because they can write pending series to rwctx.pss if there are any
sas := rwctx.sas.Swap(nil)
sas.MustStop()
sas.MustStop(nil)
if rwctx.deduplicator != nil {
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
}
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
for _, ps := range rwctx.pss {
ps.MustStop()

View File

@@ -4,12 +4,10 @@ import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -17,6 +15,8 @@ var (
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
"and -remoteWrite.streamAggr.config")
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
@@ -79,47 +79,32 @@ func CheckStreamAggrConfigs() error {
}
func reloadStreamAggrConfigs() {
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
for idx, rwctx := range rwctxs {
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
reloadStreamAggrConfig(-1)
for idx := range rwctxs {
reloadStreamAggrConfig(idx)
}
}
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
path, opts := getStreamAggrOpts(idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
return
}
func reloadStreamAggrConfig(idx int) {
path, _ := getStreamAggrOpts(idx)
var sas *streamaggr.Aggregators
var flag string
if idx < 0 {
flag = "-streamAggr.config"
sas = sasGlobal.Load()
} else {
flag = "-remoteWrite.streamAggr.config"
sas = rwctxs[idx].sas.Load()
}
if !sasNew.Equal(sas) {
var sasOld *streamaggr.Aggregators
if idx < 0 {
sasOld = sasGlobal.Swap(sasNew)
} else {
sasOld = rwctxs[idx].sas.Swap(sasNew)
}
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
} else {
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
if sas == nil {
return
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
logger.Infof("reloading stream aggregation configs pointed by %s=%q", flag, path)
if err := sas.Reload(); err != nil {
logger.Errorf("cannot reload %s=%q; continue using the previously loaded config; error: %s", flag, path, err)
return
}
logger.Infof("successfully reloaded stream aggregation config %s=%q", flag, path)
}
func getStreamAggrOpts(idx int) (string, streamaggr.Options) {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -15,13 +16,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
"and -remoteWrite.streamAggr.config")
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
@@ -42,11 +44,6 @@ var (
saCfgReloaderStopCh chan struct{}
saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil)
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
sasGlobal atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
)
@@ -68,7 +65,7 @@ func CheckStreamAggrConfig() error {
if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
sas.MustStop()
sas.MustStop(nil)
return nil
}
@@ -101,16 +98,21 @@ func InitStreamAggr() {
}
sasGlobal.Store(sas)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader.
saCfgReloaderWG.Add(1)
go func() {
var streamAggrConfigReloaderCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
streamAggrConfigReloaderCh = ticker.C
defer ticker.Stop()
}
defer saCfgReloaderWG.Done()
for {
select {
case <-sighupCh:
case <-streamAggrConfigReloaderCh:
case <-saCfgReloaderStopCh:
return
}
@@ -121,33 +123,12 @@ func InitStreamAggr() {
func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
opts := streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: "global",
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
sas := sasGlobal.Load()
if err := sas.Reload(); err != nil {
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return
}
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
sasNew.MustStop()
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
}
// MustStopStreamAggr stops stream aggregators.
@@ -156,12 +137,10 @@ func MustStopStreamAggr() {
saCfgReloaderWG.Wait()
sas := sasGlobal.Swap(nil)
sas.MustStop()
sas.MustStop(nil)
if deduplicator != nil {
deduplicator.MustStop()
deduplicator = nil
}
deduplicator.MustStop()
deduplicator = nil
}
type streamAggrCtx struct {

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.2
github.com/VictoriaMetrics/metrics v1.34.0
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
github.com/VictoriaMetrics/metricsql v0.76.0
github.com/aws/aws-sdk-go-v2 v1.30.1
github.com/aws/aws-sdk-go-v2/config v1.27.23

4
go.sum
View File

@@ -73,6 +73,10 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/VictoriaMetrics/metrics v1.34.0 h1:0i8k/gdOJdSoZB4Z9pikVnVQXfhcIvnG7M7h2WaQW2w=
github.com/VictoriaMetrics/metrics v1.34.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e h1:IgNJoIYb2IhknxOLEAAG0ktj0f1609jpgmXjpPVrJ7s=
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080040-3f62e95de24e/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0 h1:qP+3SX4eslXLPmsJpGjnMv+9UbmyrSj/Yf5CqPm6bLE=
github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metricsql v0.76.0 h1:hl7vqJqyH2d8zKImzalkFrkFiD5q4ACF8gl3s86DqKA=
github.com/VictoriaMetrics/metricsql v0.76.0/go.mod h1:1g4hdCwlbJZ851PU9VN65xy9Rdlzupo6fx3SNZ8Z64U=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
@@ -12,18 +11,23 @@ type avgAggrState struct {
m sync.Map
}
type avgState struct {
sum float64
count float64
}
type avgStateValue struct {
mu sync.Mutex
sum float64
count int64
deleted bool
mu sync.Mutex
state [aggrStateSize]avgState
deleted bool
deleteDeadline int64
}
func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
}
func (as *avgAggrState) pushSamples(samples []pushSample) {
func (as *avgAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -32,25 +36,21 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{
sum: s.value,
count: 1,
}
v = &avgStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been successfully stored
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
sv.count++
sv.state[idx].sum += s.value
sv.state[idx].count++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -61,26 +61,28 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
}
func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *avgAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = avgState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "avg", flushTimestamp, state.sum/state.count)
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
@@ -13,16 +12,17 @@ type countSamplesAggrState struct {
}
type countSamplesStateValue struct {
mu sync.Mutex
n uint64
deleted bool
mu sync.Mutex
state [aggrStateSize]uint64
deleted bool
deleteDeadline int64
}
func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
}
func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
func (as *countSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -31,23 +31,20 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{
n: 1,
}
v = &countSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
sv.state[idx]++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -58,26 +55,28 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = 0
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_samples", flushTimestamp, float64(state))
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/cespare/xxhash/v2"
)
@@ -14,16 +13,17 @@ type countSeriesAggrState struct {
}
type countSeriesStateValue struct {
mu sync.Mutex
m map[uint64]struct{}
deleted bool
mu sync.Mutex
state [aggrStateSize]map[uint64]struct{}
deleted bool
deleteDeadline int64
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
}
func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
func (as *countSeriesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@@ -36,27 +36,26 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{
m: map[uint64]struct{}{
h: {},
},
csv := &countSeriesStateValue{}
for ic := range csv.state {
csv.state[ic] = make(map[uint64]struct{})
}
v = csv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been added to the map.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[h]; !ok {
sv.m[h] = struct{}{}
if _, ok := sv.state[idx][h]; !ok {
sv.state[idx][h] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -67,26 +66,28 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSeriesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[idx])
sv.state[idx] = make(map[uint64]struct{})
sv.mu.Unlock()
if state > 0 {
key := k.(string)
ctx.appendSeries(key, "count_series", flushTimestamp, float64(state))
}
return true
})
}

View File

@@ -14,7 +14,8 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
shards []dedupAggrShard
currentIdx atomic.Int32
}
type dedupAggrShard struct {
@@ -25,16 +26,18 @@ type dedupAggrShard struct {
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
}
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]*dedupAggrSample
type dedupAggrState struct {
m map[string]*dedupAggrSample
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrShardNopad struct {
mu sync.RWMutex
state [aggrStateSize]*dedupAggrState
}
type dedupAggrSample struct {
value float64
timestamp int64
@@ -49,21 +52,27 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
currentIdx := da.currentIdx.Load()
for i := range da.shards {
n += da.shards[i].sizeBytes.Load()
if da.shards[i].state[currentIdx] != nil {
n += da.shards[i].state[currentIdx].sizeBytes.Load()
}
}
return n
}
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
currentIdx := da.currentIdx.Load()
for i := range da.shards {
n += da.shards[i].itemsCount.Load()
if da.shards[i].state[currentIdx] != nil {
n += da.shards[i].state[currentIdx].itemsCount.Load()
}
}
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) {
func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, dedupIdx int) {
pss := getPerShardSamples()
shards := pss.shards
for _, sample := range samples {
@@ -75,7 +84,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample) {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples)
da.shards[i].pushSamples(shardSamples, dedupIdx)
}
putPerShardSamples(pss)
}
@@ -104,7 +113,7 @@ func (ctx *dedupFlushCtx) reset() {
ctx.samples = ctx.samples[:0]
}
func (da *dedupAggr) flush(f func(samples []pushSample)) {
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
@@ -116,10 +125,11 @@ func (da *dedupAggr) flush(f func(samples []pushSample)) {
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f)
shard.flush(ctx, f, deleteDeadline, dedupIdx, flushIdx)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
da.currentIdx.Store((da.currentIdx.Load() + 1) % aggrStateSize)
wg.Wait()
}
@@ -154,18 +164,20 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample) {
func (das *dedupAggrShard) pushSamples(samples []pushSample, dedupIdx int) {
das.mu.Lock()
defer das.mu.Unlock()
m := das.m
if m == nil {
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
state := das.state[dedupIdx]
if state == nil {
state = &dedupAggrState{
m: make(map[string]*dedupAggrSample, len(samples)),
}
das.state[dedupIdx] = state
}
samplesBuf := das.samplesBuf
samplesBuf := state.samplesBuf
for _, sample := range samples {
s, ok := m[sample.key]
s, ok := state.m[sample.key]
if !ok {
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
s = &samplesBuf[len(samplesBuf)-1]
@@ -173,10 +185,10 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s.timestamp = sample.timestamp
key := bytesutil.InternString(sample.key)
m[key] = s
state.m[key] = s
das.itemsCount.Add(1)
das.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
das.state[dedupIdx].itemsCount.Add(1)
das.state[dedupIdx].sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
// Update the existing value according to logic described at https://docs.victoriametrics.com/#deduplication
@@ -185,18 +197,20 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
s.timestamp = sample.timestamp
}
}
das.samplesBuf = samplesBuf
das.state[dedupIdx].samplesBuf = samplesBuf
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc, deleteDeadline int64, dedupIdx, flushIdx int) {
das.mu.Lock()
m := das.m
if len(m) > 0 {
das.m = make(map[string]*dedupAggrSample, len(m))
das.sizeBytes.Store(0)
das.itemsCount.Store(0)
das.samplesBuf = make([]dedupAggrSample, 0, len(das.samplesBuf))
var m map[string]*dedupAggrSample
state := das.state[dedupIdx]
if state != nil && len(state.m) > 0 {
m = state.m
das.state[dedupIdx].m = make(map[string]*dedupAggrSample, len(state.m))
das.state[dedupIdx].samplesBuf = make([]dedupAggrSample, 0, len(das.state[dedupIdx].samplesBuf))
das.state[dedupIdx].sizeBytes.Store(0)
das.state[dedupIdx].itemsCount.Store(0)
}
das.mu.Unlock()
@@ -215,11 +229,11 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 10_000 {
f(dstSamples)
f(dstSamples, deleteDeadline, flushIdx)
clear(dstSamples)
dstSamples = dstSamples[:0]
}
}
f(dstSamples)
f(dstSamples, deleteDeadline, flushIdx)
ctx.samples = dstSamples
}

View File

@@ -5,6 +5,7 @@ import (
"reflect"
"sync"
"testing"
"time"
)
func TestDedupAggrSerial(t *testing.T) {
@@ -20,7 +21,7 @@ func TestDedupAggrSerial(t *testing.T) {
sample.value = float64(i + j)
expectedSamplesMap[sample.key] = *sample
}
da.pushSamples(samples)
da.pushSamples(samples, 0, 0)
}
if n := da.sizeBytes(); n > 5_000_000 {
@@ -32,14 +33,16 @@ func TestDedupAggrSerial(t *testing.T) {
flushedSamplesMap := make(map[string]pushSample)
var mu sync.Mutex
flushSamples := func(samples []pushSample) {
flushSamples := func(samples []pushSample, _ int64, _ int) {
mu.Lock()
for _, sample := range samples {
flushedSamplesMap[sample.key] = sample
}
mu.Unlock()
}
da.flush(flushSamples)
flushTimestamp := time.Now().UnixMilli()
da.flush(flushSamples, flushTimestamp, 0, 0)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
@@ -70,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j)
}
da.pushSamples(samples)
da.pushSamples(samples, 0, 0)
}
}()
}

View File

@@ -27,7 +27,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < loops; i++ {
da.pushSamples(benchSamples)
da.pushSamples(benchSamples, 0, 0)
}
}
})

View File

@@ -17,7 +17,8 @@ import (
type Deduplicator struct {
da *dedupAggr
dropLabels []string
dropLabels []string
dedupInterval int64
wg sync.WaitGroup
stopCh chan struct{}
@@ -38,8 +39,9 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
da: newDedupAggr(),
dropLabels: dropLabels,
dedupInterval: dedupInterval.Milliseconds(),
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
@@ -71,6 +73,9 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
// MustStop stops d.
func (d *Deduplicator) MustStop() {
if d == nil {
return
}
metrics.UnregisterSet(d.ms)
d.ms = nil
@@ -86,6 +91,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
buf := ctx.buf
dropLabels := d.dropLabels
aggrIntervals := int64(aggrStateSize)
for _, ts := range tss {
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
@@ -101,7 +107,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
pss = append(pss, pushSample{
flushIntervals := s.Timestamp/d.dedupInterval + 1
idx := int(flushIntervals % aggrIntervals)
pss[idx] = append(pss[idx], pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
@@ -109,7 +117,9 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
}
}
d.da.pushSamples(pss)
for idx, ps := range pss {
d.da.pushSamples(ps, 0, idx)
}
ctx.pss = pss
ctx.buf = buf
@@ -132,17 +142,18 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration
select {
case <-d.stopCh:
return
case <-t.C:
d.flush(pushFunc, dedupInterval)
case t := <-t.C:
flushTime := t.Truncate(dedupInterval).Add(dedupInterval)
flushTimestamp := flushTime.UnixMilli()
flushIntervals := int(flushTimestamp / int64(dedupInterval/time.Millisecond))
flushIdx := flushIntervals % aggrStateSize
d.flush(pushFunc, dedupInterval, flushTime, flushIdx)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
startTime := time.Now()
timestamp := startTime.UnixMilli()
d.da.flush(func(pss []pushSample) {
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration, flushTime time.Time, flushIdx int) {
d.da.flush(func(pss []pushSample, _ int64, _ int) {
ctx := getDeduplicatorFlushCtx()
tss := ctx.tss
@@ -155,7 +166,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Value: ps.value,
Timestamp: timestamp,
Timestamp: ps.timestamp,
})
tss = append(tss, prompbmarshal.TimeSeries{
@@ -169,9 +180,9 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
})
}, flushTime.UnixMilli(), flushIdx, flushIdx)
duration := time.Since(startTime)
duration := time.Since(flushTime)
d.dedupFlushDuration.Update(duration.Seconds())
if duration > dedupInterval {
d.dedupFlushTimeouts.Inc()
@@ -182,14 +193,15 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
}
type deduplicatorPushCtx struct {
pss []pushSample
pss [aggrStateSize][]pushSample
labels promutils.Labels
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
clear(ctx.pss)
ctx.pss = ctx.pss[:0]
for i, sc := range ctx.pss {
ctx.pss[i] = sc[:0]
}
ctx.labels.Reset()

View File

@@ -21,20 +21,26 @@ func TestDeduplicator(t *testing.T) {
tss := prompbmarshal.MustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943 1000
x 8943 1
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
x 90984 900
x 433 1000
x 90984
x 433 1
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`, offsetMsecs)
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global")
dedupInterval := time.Hour
d := NewDeduplicator(pushFunc, dedupInterval, []string{"node", "instance"}, "global")
for i := 0; i < 10; i++ {
d.Push(tss)
}
d.flush(pushFunc, time.Hour)
flushTime := time.Now()
flushIntervals := flushTime.UnixMilli()/dedupInterval.Milliseconds() + 1
idx := int(flushIntervals % int64(aggrStateSize))
d.flush(pushFunc, time.Hour, time.Now(), idx)
d.MustStop()
result := timeSeriessToString(tssResult)

View File

@@ -1,39 +1,30 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics"
)
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
stalenessSecs uint64
}
type histogramBucketStateValue struct {
mu sync.Mutex
h metrics.Histogram
deleteDeadline uint64
state [aggrStateSize]metrics.Histogram
total metrics.Histogram
deleted bool
deleteDeadline int64
}
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &histogramBucketAggrState{
stalenessSecs: stalenessSecs,
}
func newHistogramBucketAggrState() *histogramBucketAggrState {
return &histogramBucketAggrState{}
}
func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
func (as *histogramBucketAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -54,7 +45,7 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@@ -66,54 +57,32 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
}
}
func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
var staleOutputSamples int
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
}
sv.mu.Unlock()
if deleted {
sv.mu.Unlock()
m.Delete(k)
return true
}
sv.total.Merge(&sv.state[idx])
total := &sv.total
sv.state[idx] = metrics.Histogram{}
sv.mu.Unlock()
key := k.(string)
total.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", flushTimestamp, float64(count), "vmrange", vmrange)
})
return true
})
ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples)
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
if !sv.deleted {
key := k.(string)
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
})
}
sv.mu.Unlock()
return true
})
}
func roundDurationToSecs(d time.Duration) uint64 {
if d < 0 {
return 0
}
secs := d.Seconds()
return uint64(math.Ceil(secs))
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
@@ -13,17 +12,22 @@ type lastAggrState struct {
}
type lastStateValue struct {
mu sync.Mutex
mu sync.Mutex
state [aggrStateSize]lastState
deleted bool
deleteDeadline int64
}
type lastState struct {
last float64
timestamp int64
deleted bool
}
func newLastAggrState() *lastAggrState {
return &lastAggrState{}
}
func (as *lastAggrState) pushSamples(samples []pushSample) {
func (as *lastAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -32,27 +36,23 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: s.value,
timestamp: s.timestamp,
}
v = &lastStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.timestamp >= sv.timestamp {
sv.last = s.value
sv.timestamp = s.timestamp
if s.timestamp >= sv.state[idx].timestamp {
sv.state[idx].last = s.value
sv.state[idx].timestamp = s.timestamp
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -63,26 +63,28 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
}
func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *lastAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = lastState{}
sv.mu.Unlock()
if state.timestamp > 0 {
key := k.(string)
ctx.appendSeries(key, "last", flushTimestamp, state.last)
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
@@ -13,16 +12,22 @@ type maxAggrState struct {
}
type maxStateValue struct {
mu sync.Mutex
max float64
deleted bool
mu sync.Mutex
state [aggrStateSize]maxState
deleted bool
deleteDeadline int64
}
type maxState struct {
max float64
exists bool
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
}
func (as *maxAggrState) pushSamples(samples []pushSample) {
func (as *maxAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -31,25 +36,26 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{
max: s.value,
}
v = &maxStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value > sv.max {
sv.max = s.value
state := &sv.state[idx]
if !state.exists {
state.max = s.value
state.exists = true
} else if s.value > state.max {
state.max = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -60,26 +66,28 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
}
func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *maxAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = maxState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "max", flushTimestamp, state.max)
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
@@ -13,16 +12,22 @@ type minAggrState struct {
}
type minStateValue struct {
mu sync.Mutex
min float64
deleted bool
mu sync.Mutex
state [aggrStateSize]minState
deleted bool
deleteDeadline int64
}
type minState struct {
min float64
exists bool
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
}
func (as *minAggrState) pushSamples(samples []pushSample) {
func (as *minAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -31,25 +36,26 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{
min: s.value,
}
v = &minStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value < sv.min {
sv.min = s.value
state := &sv.state[idx]
if !state.exists {
state.min = s.value
state.exists = true
} else if s.value < state.min {
state.min = s.value
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -60,25 +66,28 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
}
func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *minAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = minState{}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
if state.exists {
key := k.(string)
ctx.appendSeries(key, "min", flushTimestamp, state.min)
}
return true
})
}

View File

@@ -5,21 +5,20 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/valyala/histogram"
)
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
m sync.Map
phis []float64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
mu sync.Mutex
state [aggrStateSize]*histogram.Fast
deleted bool
deleteDeadline int64
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
@@ -28,7 +27,7 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
}
}
func (as *quantilesAggrState) pushSamples(samples []pushSample) {
func (as *quantilesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -37,15 +36,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
}
v = &quantilesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
}
}
@@ -53,7 +48,11 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
if sv.state[idx] == nil {
sv.state[idx] = histogram.GetFast()
}
sv.state[idx].Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -64,33 +63,39 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *quantilesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
quantiles = quantiles[:0]
if state != nil {
quantiles = state.Quantiles(quantiles[:0], phis)
histogram.PutFast(state)
state.Reset()
}
sv.mu.Unlock()
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
if len(quantiles) > 0 {
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", flushTimestamp, quantile, "quantile", phiStr)
}
}
return true
})

View File

@@ -2,53 +2,52 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// rateAggrState calculates output=rate, e.g. the counter per-second change.
type rateAggrState struct {
m sync.Map
m sync.Map
suffix string
// Time series state is dropped if no new samples are received during stalenessSecs.
stalenessSecs uint64
}
type rateStateValue struct {
mu sync.Mutex
lastValues map[string]rateLastValueState
deleteDeadline uint64
state map[string]rateState
deleted bool
deleteDeadline int64
}
type rateState struct {
lastValues [aggrStateSize]rateLastValueState
// prevTimestamp stores timestamp of the last registered value
// in the previous aggregation interval
prevTimestamp int64
// prevValue stores last registered value
// in the previous aggregation interval
prevValue float64
deleteDeadline int64
}
type rateLastValueState struct {
value float64
timestamp int64
deleteDeadline uint64
firstValue float64
value float64
timestamp int64
// total stores cumulative difference between registered values
// in the aggregation interval
total float64
// prevTimestamp stores timestamp of the last registered value
// in the previous aggregation interval
prevTimestamp int64
}
func newRateAggrState(stalenessInterval time.Duration, suffix string) *rateAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
func newRateAggrState(suffix string) *rateAggrState {
return &rateAggrState{
suffix: suffix,
stalenessSecs: stalenessSecs,
suffix: suffix,
}
}
func (as *rateAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
func (as *rateAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@@ -57,9 +56,10 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &rateStateValue{
lastValues: make(map[string]rateLastValueState),
rsv := &rateStateValue{
state: make(map[string]rateState),
}
v = rsv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
@@ -71,15 +71,17 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if ok {
state, ok := sv.state[inputKey]
lv := state.lastValues[idx]
if ok && lv.timestamp > 0 {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
continue
}
if lv.prevTimestamp == 0 {
lv.prevTimestamp = lv.timestamp
if state.prevTimestamp == 0 {
state.prevTimestamp = lv.timestamp
state.prevValue = lv.value
}
if s.value >= lv.value {
lv.total += s.value - lv.value
@@ -87,13 +89,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
// counter reset
lv.total += s.value
}
} else if state.prevTimestamp > 0 {
lv.firstValue = s.value
}
lv.value = s.value
lv.timestamp = s.timestamp
lv.deleteDeadline = deleteDeadline
state.lastValues[idx] = lv
state.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv
sv.state[inputKey] = state
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@@ -105,18 +109,15 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
}
}
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
func (as *rateAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
var staleOutputSamples, staleInputSamples int
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*rateStateValue)
sv.mu.Lock()
// check for stale entries
deleted := currentTime > sv.deleteDeadline
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
@@ -126,26 +127,33 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
return true
}
// Delete outdated entries in sv.lastValues
// Delete outdated entries in state
var rate float64
lvs := sv.lastValues
for k1, v1 := range lvs {
if currentTime > v1.deleteDeadline {
delete(lvs, k1)
var totalItems int
for k1, state := range sv.state {
if flushTimestamp > state.deleteDeadline {
delete(sv.state, k1)
staleInputSamples++
continue
}
rateInterval := v1.timestamp - v1.prevTimestamp
if v1.prevTimestamp > 0 && rateInterval > 0 {
v1 := state.lastValues[idx]
rateInterval := v1.timestamp - state.prevTimestamp
if rateInterval > 0 && state.prevTimestamp > 0 {
if v1.firstValue >= state.prevValue {
v1.total += v1.firstValue - state.prevValue
} else {
v1.total += v1.firstValue
}
// calculate rate only if value was seen at least twice with different timestamps
rate += v1.total * 1000 / float64(rateInterval)
v1.prevTimestamp = v1.timestamp
v1.total = 0
lvs[k1] = v1
rate += (v1.total) * 1000 / float64(rateInterval)
state.prevTimestamp = v1.timestamp
state.prevValue = v1.value
totalItems++
}
state.lastValues[idx] = rateLastValueState{}
sv.state[k1] = state
}
// capture m length after deleted items were removed
totalItems := len(lvs)
sv.mu.Unlock()
if as.suffix == "rate_avg" && totalItems > 0 {
@@ -153,7 +161,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
}
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, rate)
ctx.appendSeries(key, as.suffix, flushTimestamp, rate)
return true
})
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)

View File

@@ -5,7 +5,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
@@ -14,18 +13,23 @@ type stddevAggrState struct {
}
type stddevStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
state [aggrStateSize]stddevState
deleted bool
deleteDeadline int64
}
type stddevState struct {
count float64
avg float64
q float64
}
func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
}
func (as *stddevAggrState) pushSamples(samples []pushSample) {
func (as *stddevAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -47,10 +51,12 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -61,26 +67,28 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stddevAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = stddevState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stddev", flushTimestamp, math.Sqrt(state.q/state.count))
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
@@ -13,18 +12,23 @@ type stdvarAggrState struct {
}
type stdvarStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
state [aggrStateSize]stdvarState
deleted bool
deleteDeadline int64
}
type stdvarState struct {
count float64
avg float64
q float64
}
func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
}
func (as *stdvarAggrState) pushSamples(samples []pushSample) {
func (as *stdvarAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -46,10 +50,12 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
state := &sv.state[idx]
state.count++
avg := state.avg + (s.value-state.avg)/state.count
state.q += (s.value - state.avg) * (s.value - avg)
state.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -60,26 +66,28 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
}
func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stdvarAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = stdvarState{}
sv.mu.Unlock()
if state.count > 0 {
key := k.(string)
ctx.appendSeries(key, "stdvar", flushTimestamp, state.q/state.count)
}
return true
})
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@@ -23,9 +24,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
"gopkg.in/yaml.v2"
)
// count of aggregation intervals for states
const aggrStateSize = 2
var supportedOutputs = []string{
"rate_sum",
"rate_avg",
@@ -69,21 +74,28 @@ var (
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) {
data, err := fscore.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
a := &Aggregators{
path: path,
pushFunc: pushFunc,
opts: opts,
}
data, err = envtemplate.ReplaceBytes(data)
if err != nil {
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
if err := a.load(); err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
}
return a, nil
}
as, err := LoadFromData(data, pushFunc, opts)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
// Reload reads config file and updates aggregators if there're any changes
func (a *Aggregators) Reload() error {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc()
if err := a.load(); err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0)
return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err)
}
return as, nil
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp())
return nil
}
// Options contains optional settings for the Aggregators.
@@ -243,45 +255,83 @@ type Config struct {
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
type Aggregators struct {
as []*aggregator
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
ms *metrics.Set
as []*aggregator
ms *metrics.Set
path string
pushFunc PushFunc
opts Options
}
// LoadFromData loads aggregators from data.
func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) {
func (a *Aggregators) load() error {
data, err := fscore.ReadFileOrHTTP(a.path)
if err != nil {
return fmt.Errorf("cannot load aggregators: %w", err)
}
data, err = envtemplate.ReplaceBytes(data)
if err != nil {
return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err)
}
if err = a.loadAggregatorsFromData(data); err != nil {
return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
}
return nil
}
func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator {
idx := slices.IndexFunc(a.as, func(ac *aggregator) bool {
return ac.configData == aggr.configData
})
if idx >= 0 {
return a.as[idx]
}
return nil
}
func (a *Aggregators) loadAggregatorsFromData(data []byte) error {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
return fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
var unchanged, ac []*aggregator
var ignoreAggrConfigs []string
for i, cfg := range cfgs {
opts.aggrID = i + 1
a, err := newAggregator(cfg, pushFunc, ms, opts)
a.opts.aggrID = i + 1
aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
a.MustStop()
for _, c := range ac[:i] {
c.MustStop()
}
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
configData, err := json.Marshal(cfgs)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
if oldAggr := a.getAggregator(aggr); oldAggr != nil {
aggr.MustStop()
if !slices.Contains(ignoreAggrConfigs, oldAggr.configData) {
unchanged = append(unchanged, oldAggr)
ignoreAggrConfigs = append(ignoreAggrConfigs, oldAggr.configData)
}
continue
}
if slices.ContainsFunc(ac, func(x *aggregator) bool {
return x.configData == aggr.configData
}) {
aggr.MustStop()
continue
}
ac = append(ac, aggr)
}
metricLabels := fmt.Sprintf("url=%q", opts.Alias)
metricLabels := fmt.Sprintf("url=%q", a.opts.Alias)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrConfigs))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac))
a.MustStop(ignoreAggrConfigs)
a.as = slices.Concat(unchanged, ac)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := uint64(0)
for _, aggr := range as {
for _, aggr := range a.as {
if aggr.da != nil {
n += aggr.da.sizeBytes()
}
@@ -290,7 +340,7 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := uint64(0)
for _, aggr := range as {
for _, aggr := range a.as {
if aggr.da != nil {
n += aggr.da.itemsCount()
}
@@ -299,11 +349,8 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
})
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
configData: configData,
ms: ms,
}, nil
a.ms = ms
return nil
}
// IsEnabled returns true if Aggregators has at least one configured aggregator
@@ -318,7 +365,7 @@ func (a *Aggregators) IsEnabled() bool {
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
func (a *Aggregators) MustStop(ignoreAggrConfigs []string) {
if a == nil {
return
}
@@ -327,7 +374,9 @@ func (a *Aggregators) MustStop() {
a.ms = nil
for _, aggr := range a.as {
aggr.MustStop()
if ignoreAggrConfigs == nil || !slices.Contains(ignoreAggrConfigs, aggr.configData) {
aggr.MustStop()
}
}
a.as = nil
}
@@ -337,7 +386,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
return string(a.configData) == string(b.configData)
return slices.Compare(a.configData(), b.configData()) == 0
}
func (a *Aggregators) configData() []string {
result := make([]string, len(a.as))
for i := range result {
result[i] = a.as[i].configData
}
slices.Sort(result)
return result
}
// Push pushes tss to a.
@@ -370,15 +428,18 @@ type aggregator struct {
dropInputLabels []string
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
stalenessInterval time.Duration
keepMetricNames bool
ignoreOldSamples bool
configData string
by []string
without []string
aggregateOnlyByTime bool
tickInterval int64
// da is set to non-nil if input samples must be de-duplicated
da *dedupAggr
@@ -389,6 +450,10 @@ type aggregator struct {
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64
// time to wait after interval end before flush
flushAfter *histogram.Fast
muFlushAfter sync.Mutex
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, labels in (by, without), plus output name.
@@ -414,17 +479,15 @@ type aggregator struct {
}
type aggrState interface {
// pushSamples must push samples to the aggrState.
//
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample)
flushState(ctx *flushCtx, resetState bool)
pushSamples(samples []pushSample, deleteDeadline int64, idx int)
flushState(ctx *flushCtx, flushTimestamp int64, idx int)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
type aggrPushFunc func(samples []pushSample, deleteDeadline int64, idx int)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// opts can contain additional options. If opts is nil, then default options are used.
@@ -475,6 +538,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil {
dropInputLabels = *v
} else {
cfg.DropInputLabels = &dropInputLabels
}
// initialize input_relabel_configs and output_relabel_configs
@@ -502,6 +567,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
keepMetricNames := opts.KeepMetricNames
if v := cfg.KeepMetricNames; v != nil {
keepMetricNames = *v
} else {
cfg.KeepMetricNames = &keepMetricNames
}
if keepMetricNames {
if len(cfg.Outputs) != 1 {
@@ -516,12 +583,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
ignoreOldSamples := opts.IgnoreOldSamples
if v := cfg.IgnoreOldSamples; v != nil {
ignoreOldSamples = *v
} else {
cfg.IgnoreOldSamples = &ignoreOldSamples
}
// check cfg.IgnoreFirstIntervals
ignoreFirstIntervals := opts.IgnoreFirstIntervals
if v := cfg.IgnoreFirstIntervals; v != nil {
ignoreFirstIntervals = *v
} else {
cfg.IgnoreFirstIntervals = &ignoreFirstIntervals
}
// initialize outputs list
@@ -529,6 +600,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
}
slices.Sort(cfg.Outputs)
cfg.Outputs = slices.Compact(cfg.Outputs)
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
@@ -557,17 +630,17 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
aggrStates[i] = newTotalAggrState(false, true)
case "total_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
aggrStates[i] = newTotalAggrState(false, false)
case "increase":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
aggrStates[i] = newTotalAggrState(true, true)
case "increase_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
aggrStates[i] = newTotalAggrState(true, false)
case "rate_sum":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_sum")
aggrStates[i] = newRateAggrState("rate_sum")
case "rate_avg":
aggrStates[i] = newRateAggrState(stalenessInterval, "rate_avg")
aggrStates[i] = newRateAggrState("rate_avg")
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
@@ -589,7 +662,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
aggrStates[i] = newHistogramBucketAggrState()
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
@@ -626,18 +699,21 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples,
keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples,
stalenessInterval: stalenessInterval,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
tickInterval: interval.Milliseconds(),
aggrStates: aggrStates,
suffix: suffix,
stopCh: make(chan struct{}),
stopCh: make(chan struct{}),
flushAfter: histogram.NewFast(),
flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
@@ -663,21 +739,38 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
if dedupInterval > 0 {
a.da = newDedupAggr()
a.tickInterval = dedupInterval.Milliseconds()
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
noAlignFlushToInterval := opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v
noAlignFlushToInterval = *v
} else {
cfg.NoAlignFlushToInterval = &noAlignFlushToInterval
}
skipIncompleteFlush := !opts.FlushOnShutdown
flushOnShutdown := opts.FlushOnShutdown
if v := cfg.FlushOnShutdown; v != nil {
skipIncompleteFlush = !*v
flushOnShutdown = *v
} else {
cfg.FlushOnShutdown = &flushOnShutdown
}
configData, err := json.Marshal(&cfg)
if err != nil {
return nil, fmt.Errorf("Failed to marshal config: %w", err)
}
a.configData = string(configData)
minTime := time.Now()
if !flushOnShutdown && !noAlignFlushToInterval {
minTime = minTime.Truncate(interval).Add(interval)
}
a.minTimestamp.Store(minTime.UnixMilli())
a.wg.Add(1)
go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals)
a.wg.Done()
}()
@@ -709,75 +802,89 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
if dedupInterval <= 0 {
alignedSleep(interval)
t := time.NewTicker(interval)
defer t.Stop()
flushDeadline := time.Now().Truncate(interval).Add(interval)
tickInterval := time.Duration(a.tickInterval) * time.Millisecond
alignedSleep(tickInterval)
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval, true)
ignoreFirstIntervals--
var dedupIdx, flushIdx int
t := time.NewTicker(tickInterval)
defer t.Stop()
isSkippedFirstFlush := false
for tickerWait(t) {
ct := time.Now()
dedupTime := ct.Truncate(tickInterval)
if a.ignoreOldSamples {
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
}
pf := pushFunc
// Calculate delay
a.muFlushAfter.Lock()
flushAfterMsec := a.flushAfter.Quantile(0.95)
a.flushAfter.Reset()
a.muFlushAfter.Unlock()
flushAfter := time.Duration(flushAfterMsec) * time.Millisecond
if flushAfter > tickInterval {
logger.Warnf("metrics ingestion lag (%v) is more than tick interval (%v). "+
"gaps are expected in aggregations", flushAfter, tickInterval)
pf = nil
} else {
time.Sleep(flushAfter)
}
for tickerWait(t) {
if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
a.dedupFlush(dedupInterval, dedupTime.UnixMilli(), dedupIdx, flushIdx)
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, flushDeadline.UnixMilli(), flushIdx)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
a.flush(pf, interval, flushDeadline.UnixMilli(), flushIdx)
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
}
}
} else {
alignedSleep(dedupInterval)
t := time.NewTicker(dedupInterval)
defer t.Stop()
flushDeadline := time.Now().Add(interval)
isSkippedFirstFlush := false
for tickerWait(t) {
a.dedupFlush(dedupInterval)
ct := time.Now()
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, true)
ignoreFirstIntervals--
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
}
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
}
}
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval, true)
dedupTime := time.Now().Truncate(tickInterval).Add(tickInterval)
if a.ignoreOldSamples {
dedupIdx, flushIdx = getAggrIdxs(dedupInterval, interval, dedupTime, flushDeadline)
}
a.dedupFlush(dedupInterval, flushDeadline.UnixMilli(), dedupIdx, flushIdx)
a.flush(pushFunc, interval, flushDeadline.UnixMilli(), flushIdx)
}
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
func getAggrIdxs(dedupInterval, interval time.Duration, dedupTime, flushTime time.Time) (int, int) {
flushIdx := getStateIdx(interval.Milliseconds(), flushTime.Add(-interval).UnixMilli())
dedupIdx := flushIdx
if dedupInterval > 0 {
dedupIdx = getStateIdx(dedupInterval.Milliseconds(), dedupTime.Add(-dedupInterval).UnixMilli())
}
return dedupIdx, flushIdx
}
func getStateIdx(interval int64, ts int64) int {
return int(ts/interval) % aggrStateSize
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration, deleteDeadline int64, dedupIdx, flushIdx int) {
if dedupInterval <= 0 {
// The de-duplication is disabled.
return
@@ -785,7 +892,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
startTime := time.Now()
a.da.flush(a.pushSamples)
a.da.flush(a.pushSamples, deleteDeadline, dedupIdx, flushIdx)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
@@ -797,14 +904,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
}
}
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, flushTimestamp int64, idx int) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
// since the flush durtion can be quite long.
// This should prevent from dropping samples with old timestamps when the flush takes long time.
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup
for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{}
@@ -816,7 +918,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
}()
ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx, resetState)
as.flushState(ctx, flushTimestamp, idx)
ctx.flushSeries()
ctx.resetSeries()
putFlushCtx(ctx)
@@ -826,6 +928,7 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
a.minTimestamp.Store(flushTimestamp)
if d > interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+
@@ -855,11 +958,16 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
currentTime := time.Now()
currentTimestamp := currentTime.UnixMilli()
deleteDeadline := currentTime.Add(a.stalenessInterval)
deleteDeadlineMilli := deleteDeadline.UnixMilli()
dropLabels := a.dropInputLabels
ignoreOldSamples := a.ignoreOldSamples
minTimestamp := a.minTimestamp.Load()
var maxLag int64
var flushIdx int
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@@ -892,37 +1000,47 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, sample := range ts.Samples {
a.muFlushAfter.Lock()
a.flushAfter.Update(float64(currentTimestamp - sample.Timestamp))
a.muFlushAfter.Unlock()
if math.IsNaN(sample.Value) {
a.ignoredNanSamples.Inc()
// Skip NaN values
a.ignoredNanSamples.Inc()
continue
}
if ignoreOldSamples && sample.Timestamp < minTimestamp {
a.ignoredOldSamples.Inc()
// Skip old samples outside the current aggregation interval
a.ignoredOldSamples.Inc()
continue
}
if maxLag < now-sample.Timestamp {
maxLag = now - sample.Timestamp
}
samples = append(samples, pushSample{
if ignoreOldSamples {
flushIdx = getStateIdx(a.tickInterval, sample.Timestamp)
}
samples[flushIdx] = append(samples[flushIdx], pushSample{
key: key,
value: sample.Value,
timestamp: sample.Timestamp,
})
}
}
if len(samples) > 0 {
a.matchedSamples.Add(len(samples))
a.samplesLag.Update(float64(maxLag) / 1_000)
}
ctx.samples = samples
ctx.buf = buf
pushSamples := a.pushSamples
if a.da != nil {
a.da.pushSamples(samples)
} else {
a.pushSamples(samples)
pushSamples = a.da.pushSamples
}
for idx, s := range samples {
if len(s) > 0 {
a.samplesLag.Update(float64(maxLag) / 1_000)
a.matchedSamples.Add(len(s))
pushSamples(s, deleteDeadlineMilli, idx)
}
}
}
@@ -963,14 +1081,14 @@ func getInputOutputKey(key string) (string, string) {
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
func (a *aggregator) pushSamples(samples []pushSample) {
func (a *aggregator) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for _, as := range a.aggrStates {
as.pushSamples(samples)
as.pushSamples(samples, deleteDeadline, idx)
}
}
type pushCtx struct {
samples []pushSample
samples [aggrStateSize][]pushSample
labels promutils.Labels
inputLabels promutils.Labels
outputLabels promutils.Labels
@@ -978,8 +1096,9 @@ type pushCtx struct {
}
func (ctx *pushCtx) reset() {
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
for i := range ctx.samples {
ctx.samples[i] = ctx.samples[i][:0]
}
ctx.labels.Reset()
ctx.inputLabels.Reset()

View File

@@ -19,12 +19,11 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := LoadFromData([]byte(config), pushFunc, Options{})
if err == nil {
t.Fatalf("expecting non-nil error")
a := &Aggregators{
pushFunc: pushFunc,
}
if a != nil {
t.Fatalf("expecting nil a")
if err := a.loadAggregatorsFromData([]byte(config)); err == nil {
t.Fatalf("expecting non-nil error")
}
}
@@ -157,12 +156,17 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := LoadFromData([]byte(a), pushFunc, Options{})
if err != nil {
aa := &Aggregators{
pushFunc: pushFunc,
}
if err := aa.loadAggregatorsFromData([]byte(a)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := LoadFromData([]byte(b), pushFunc, Options{})
if err != nil {
ab := &Aggregators{
pushFunc: pushFunc,
}
if err := ab.loadAggregatorsFromData([]byte(b)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
result := aa.Equal(ab)
@@ -220,12 +224,14 @@ func TestAggregatorsSuccess(t *testing.T) {
tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock()
}
opts := Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
a := &Aggregators{
opts: Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
},
pushFunc: pushFunc,
}
a, err := LoadFromData([]byte(config), pushFunc, opts)
if err != nil {
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
@@ -233,7 +239,7 @@ func TestAggregatorsSuccess(t *testing.T) {
offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil)
a.MustStop()
a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
@@ -262,7 +268,7 @@ func TestAggregatorsSuccess(t *testing.T) {
outputs: [count_samples, sum_samples, count_series, last]
`, `
foo{abc="123"} 4
bar 5 100
bar 5 11
bar 34 10
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
@@ -507,8 +513,8 @@ foo:1m_by_abc_sum_samples{abc="456"} 8
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`, `bar:1m_total{baz="qwe"} 4.34
foo:1m_total 123
`, "11")
// total_prometheus output for non-repeated series
@@ -529,16 +535,16 @@ foo:1m_total_prometheus 0
`, `
foo 123
bar{baz="qwe"} 1.31
bar{baz="qwe"} 4.34 1000
bar{baz="qwe"} 4.34 1
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 3.03
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`, `bar:1m_total{baz="qwe"} 4.34
bar:1m_total{baz="qwer"} 344
foo:1m_total 123
foo:1m_total{baz="qwe"} 10
`, "11111111")
// total_prometheus output for repeated series
@@ -574,8 +580,8 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`, `bar:1m_total 350.34
foo:1m_total 133
`, "11111111")
// total_prometheus output for repeated series with group by __name__
@@ -603,8 +609,8 @@ foo:1m_total_prometheus 15
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`, `bar:1m_increase{baz="qwe"} 4.34
foo:1m_increase 123
`, "11")
// increase_prometheus output for non-repeated series
@@ -631,10 +637,10 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`, `bar:1m_increase{baz="qwe"} 6.34
bar:1m_increase{baz="qwer"} 344
foo:1m_increase 123
foo:1m_increase{baz="qwe"} 10
`, "11111111")
// increase_prometheus output for repeated series
@@ -917,12 +923,14 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
}
tssOutputLock.Unlock()
}
opts := Options{
DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
a := &Aggregators{
opts: Options{
DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
},
pushFunc: pushFunc,
}
a, err := LoadFromData([]byte(config), pushFunc, opts)
if err != nil {
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
@@ -930,7 +938,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil)
a.MustStop()
a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""

View File

@@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
}
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(outputs, pushFunc)
defer a.MustStop()
defer a.MustStop(nil)
_ = a.Push(benchSeries, nil)
b.ResetTimer()
@@ -55,7 +55,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
b.SetBytes(int64(len(benchSeries) * len(outputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flush(pushFunc, time.Hour, false)
aggr.flush(pushFunc, time.Hour, time.Now().UnixMilli(), 0)
}
}
}
@@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
defer a.MustStop(nil)
const loops = 100
@@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, Options{})
if err != nil {
a := &Aggregators{
pushFunc: pushFunc,
}
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
@@ -13,16 +12,22 @@ type sumSamplesAggrState struct {
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
deleted bool
mu sync.Mutex
state [aggrStateSize]sumState
deleted bool
deleteDeadline int64
}
type sumState struct {
sum float64
exists bool
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
}
func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
func (as *sumSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -31,23 +36,21 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{
sum: s.value,
}
v = &sumSamplesStateValue{}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
sv.state[idx].sum += s.value
sv.state[idx].exists = true
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -58,26 +61,28 @@ func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *sumSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := sv.state[idx]
sv.state[idx] = sumState{}
sv.mu.Unlock()
if state.exists {
key := k.(string)
ctx.appendSeries(key, "sum_samples", flushTimestamp, state.sum)
}
return true
})
}

View File

@@ -3,10 +3,8 @@ package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
@@ -20,36 +18,28 @@ type totalAggrState struct {
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
// Time series state is dropped if no new samples are received during stalenessSecs.
//
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
// see ignoreFirstSampleDeadline for more details.
stalenessSecs uint64
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]totalLastValueState
total float64
deleteDeadline uint64
shared totalState
state [aggrStateSize]float64
deleteDeadline int64
deleted bool
}
type totalState struct {
total float64
lastValues map[string]totalLastValueState
}
type totalLastValueState struct {
value float64
timestamp int64
deleteDeadline uint64
deleteDeadline int64
}
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
func newTotalAggrState(resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
suffix := "total"
if resetTotalOnFlush {
suffix = "increase"
@@ -58,18 +48,14 @@ func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepF
suffix += "_prometheus"
}
return &totalAggrState{
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
}
}
func (as *totalAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
func (as *totalAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
var deleted bool
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
@@ -79,7 +65,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]totalLastValueState),
shared: totalState{
lastValues: make(map[string]totalLastValueState),
},
}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
@@ -90,10 +78,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
deleted = sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if ok || keepFirstSample {
lv, ok := sv.shared.lastValues[inputKey]
if ok || as.keepFirstSample {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
@@ -101,10 +89,10 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
if s.value >= lv.value {
sv.total += s.value - lv.value
sv.state[idx] += s.value - lv.value
} else {
// counter reset
sv.total += s.value
sv.state[idx] += s.value
}
}
lv.value = s.value
@@ -112,7 +100,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv
sv.shared.lastValues[inputKey] = lv
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@@ -124,64 +112,44 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
}
}
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *totalAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
var total float64
m := &as.m
var staleInputSamples, staleOutputSamples int
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
staleInputSamples++
}
sv.mu.Unlock()
m.Delete(k)
return true
}
total = sv.shared.total + sv.state[idx]
for k1, v1 := range sv.shared.lastValues {
if flushTimestamp > v1.deleteDeadline {
delete(sv.shared.lastValues, k1)
}
}
sv.state[idx] = 0
if !as.resetTotalOnFlush {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.shared.total = 0
} else {
sv.shared.total = total
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
key := k.(string)
ctx.appendSeries(key, as.suffix, flushTimestamp, total)
return true
})
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
}
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if resetState {
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, total)
}
return true
})
}

View File

@@ -4,7 +4,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values.
@@ -13,16 +12,17 @@ type uniqueSamplesAggrState struct {
}
type uniqueSamplesStateValue struct {
mu sync.Mutex
m map[float64]struct{}
deleted bool
mu sync.Mutex
state [aggrStateSize]map[float64]struct{}
deleted bool
deleteDeadline int64
}
func newUniqueSamplesAggrState() *uniqueSamplesAggrState {
return &uniqueSamplesAggrState{}
}
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample, deleteDeadline int64, idx int) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
@@ -31,27 +31,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &uniqueSamplesStateValue{
m: map[float64]struct{}{
s.value: {},
},
usv := &uniqueSamplesStateValue{}
for iu := range usv.state {
usv.state[iu] = make(map[float64]struct{})
}
v = usv
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
if loaded {
// Update the entry created by a concurrent goroutine.
v = vNew
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[s.value]; !ok {
sv.m[s.value] = struct{}{}
if _, ok := sv.state[idx][s.value]; !ok {
sv.state[idx][s.value] = struct{}{}
}
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@@ -62,26 +61,26 @@ func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
}
}
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *uniqueSamplesAggrState) flushState(ctx *flushCtx, flushTimestamp int64, idx int) {
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
sv.mu.Unlock()
// check for stale entries
deleted := flushTimestamp > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
sv.mu.Unlock()
m.Delete(k)
return true
}
state := len(sv.state[idx])
sv.state[idx] = make(map[float64]struct{})
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
ctx.appendSeries(key, "unique_samples", flushTimestamp, float64(state))
return true
})
}

View File

@@ -130,7 +130,7 @@ func (h *Histogram) Merge(b *Histogram) {
h.decimalBuckets[i] = &b
}
for j := range db {
h.decimalBuckets[i][j] = db[j]
h.decimalBuckets[i][j] += db[j]
}
}
}

2
vendor/modules.txt vendored
View File

@@ -115,7 +115,7 @@ github.com/VictoriaMetrics/easyproto
# github.com/VictoriaMetrics/fastcache v1.12.2
## explicit; go 1.13
github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.34.0
# github.com/VictoriaMetrics/metrics v1.34.1-0.20240709080436-925f863c7bd0
## explicit; go 1.17
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.76.0