Compare commits

...

4 Commits

Author SHA1 Message Date
hagen1778
cc0cc49234 rm debug info
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-10-02 14:31:15 +02:00
hagen1778
cde8ca7166 streamaggregation: update config reload logic
* make config update thread safe
* port update logic to vminsert

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-10-02 14:31:15 +02:00
hagen1778
e706cf950e lib/streamaggr: clarify comments
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-10-02 14:31:14 +02:00
hagen1778
1f7ab35f38 vmagent: restart only updated streamaggr configs
Before the change, if vmagent detected any change in streaming configs
it would stop/restart all current configs. This could negatively impact
aggregation state for all stream configs, no matter if they were updated or not.

The change should stop/restart only this stream configs, which were actually
updated on config reload.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4810
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-10-02 14:31:14 +02:00
6 changed files with 398 additions and 21 deletions

View File

@@ -759,16 +759,29 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
defer func() {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}()
if sasNew.Equal(sas) {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
return
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
if sas == nil {
sas = &streamaggr.Aggregators{}
}
sasOldLen := sas.Len()
updated := sas.UpdateWith(sasNew)
rwctx.sas.Store(sas)
// no need to stop sas or sasNew as their *aggregator should have been
// stopped in UpdateWith method.
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q. "+
"Total aggregation configs %d (was %d); updated %d.", sasFile, sas.Len(), sasOldLen, updated)
}
var tssPool = &sync.Pool{

View File

@@ -86,33 +86,50 @@ func InitStreamAggr() {
case <-saCfgReloaderStopCh:
return
}
reloadStreamAggrConfig()
reloadStreamAggrConfig(false)
}
}()
}
func reloadStreamAggrConfig() {
func reloadStreamAggrConfig(testMode bool) {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
pushFn := pushAggregateSeries
if testMode {
pushFn = func(tss []prompbmarshal.TimeSeries) {}
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushFn, *streamAggrDedupInterval)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return
}
defer func() {
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
}()
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
if sasNew.Equal(sas) {
sasNew.MustStop()
logger.Infof("the config at -streamAggr.config=%q wasn't changed", *streamAggrConfig)
return
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
if sas == nil {
sas = &streamaggr.Aggregators{}
}
sasOldLen := sas.Len()
updated := sas.UpdateWith(sasNew)
sasGlobal.Store(sas)
// no need to stop sasOld or sasNew as their *aggregator should have been
// stopped in UpdateWith method.
logger.Infof("successfully reloaded stream aggregation configs at -streamAggr.config=%q. "+
"Total aggregation configs %d (was %d); updated %d.", *streamAggrConfig, sas.Len(), sasOldLen, updated)
}
// MustStopStreamAggr stops stream aggregators.

View File

@@ -0,0 +1,126 @@
package common
import (
"bytes"
"fmt"
"math/rand"
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
// TestReloadStreamAggrConfigReload supposed to test concurrent
// execution of stream configuration update.
// Should be executed with -race flag
func TestReloadStreamAggrConfigReload(t *testing.T) {
tssInput := mustParsePromMetrics(`foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8`)
streamAggrConfigs := []string{`
- interval: 1m
outputs: [max]
`, `
- interval: 1m
outputs: [max]
- interval: 2m
outputs: [max]
- interval: 3m
outputs: [max]
`, `
- interval: 3m
outputs: [max]
- interval: 2m
outputs: [max]
- interval: 1m
outputs: [max]
`, `
- interval: 1m
outputs: [last]
- interval: 1m
outputs: [max]
`}
f, err := os.CreateTemp("", "")
if err != nil {
t.Fatal(err)
}
*streamAggrConfig = f.Name()
defer func() {
*streamAggrConfig = ""
_ = os.Remove(f.Name())
}()
logger.SetOutputForTests(&bytes.Buffer{})
writeToFile(t, f.Name(), streamAggrConfigs[0])
reloadStreamAggrConfig(true)
syncCh := make(chan struct{})
go func() {
r := rand.New(rand.NewSource(1))
for {
select {
case <-syncCh:
return
default:
n := r.Intn(len(streamAggrConfigs))
writeToFile(t, f.Name(), streamAggrConfigs[n])
reloadStreamAggrConfig(true)
time.Sleep(time.Millisecond * 50)
}
}
}()
for i := 0; i < 1e4; i++ {
sasGlobal.Load().Push(tssInput, nil)
}
close(syncCh)
}
func writeToFile(t *testing.T, file, b string) {
t.Helper()
err := os.WriteFile(file, []byte(b), 0644)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
}
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: row.Value,
Timestamp: row.Timestamp,
})
ts := prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}

View File

@@ -35,6 +35,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not log `unexpected EOF` when reading incoming metrics, since this error is expected and is handled during metrics' parsing. This reduces the amounts of noisy logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4817).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): retry failed write request on the closed connection immediately, without waiting for backoff. This should improve data delivery speed and reduce amount of error logs emitted by vmagent when using idle connections. See related [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduces load on Kubernetes control plane during initial service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart only updated stream aggregation configs on [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4810).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the maximum recovery time at `vmselect` and `vminsert` when some of `vmstorage` nodes become unavailable because of networking issues from 60 seconds to 3 seconds by default. The recovery time can be tuned at `vmselect` and `vminsert` nodes with `-vmstorageUserTimeout` command-line flag if needed. Thanks to @wjordan for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4423).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add Prometheus data support to the "Explore cardinality" page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4320) for details.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): make the warning message more noticeable for text fields. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4848).

View File

@@ -134,7 +134,8 @@ type Config struct {
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as []*aggregator
asMu sync.RWMutex
as []*aggregator
// configData contains marshaled configs passed to NewAggregators().
// It is used in Equal() for comparing Aggregators.
@@ -175,14 +176,90 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
}, nil
}
// Len returns number of aggregators
func (a *Aggregators) Len() int {
if a == nil {
return 0
}
a.asMu.RLock()
defer a.asMu.RUnlock()
return len(a.as)
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {
return
}
a.asMu.Lock()
for _, aggr := range a.as {
aggr.MustStop()
}
a.as = nil
a.asMu.Unlock()
}
// UpdateWith updates the list of `aggregator` from `a` with `aggregator`
// from `b`. UpdateWith keeps original objects from `a` if they have identical
// match by `configData` field with objects from `b`.
// UpdateWith returns number of new objects added to `a`.
// Objects from `a` which were absent in `b` will be stopped.
// Objects from `b` which had identical `configData` match with objects from `a` will be stopped.
// UpdateWith stops `aggregator`s from `a` and `b` that won't be used anymore,
// so no further calls to MustStop are needed.
func (a *Aggregators) UpdateWith(b *Aggregators) int {
if b == nil {
a.MustStop()
return 0
}
a.asMu.Lock()
defer a.asMu.Unlock()
var updatedAs []*aggregator
// keep all aggregators present in a and b
for i, oldAs := range a.as {
matched := false
for j, newAs := range b.as {
if newAs == nil {
continue
}
if string(oldAs.configData) == string(newAs.configData) {
matched = true
updatedAs = append(updatedAs, oldAs)
// a already has this aggregator, so we keep it as is unchanged
// and stop the aggregator from b instead
newAs.MustStop()
b.as[j] = nil
}
}
if !matched {
// aggregator from a isn't present in b, so we stop it and remove from the list
oldAs.MustStop()
a.as[i] = nil
continue
}
}
// by this point, b should contain only new aggregators,
// so we simply add them to the final list.
var newAsTotal int
for j, newAs := range b.as {
if newAs == nil {
continue
}
updatedAs = append(updatedAs, newAs)
b.as[j] = nil
newAsTotal++
}
// replace list of a aggregators with updated list.
// all the old aggregators in a should have been
// either stopped or added to updatedAs.
a.as = updatedAs
return newAsTotal
}
// Equal returns true if a and b are initialized from identical configs.
@@ -196,11 +273,11 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
// Push pushes tss to a.
//
// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations.
// Otherwise matchIdxs[idx] is set to 0.
// Otherwise, matchIdxs[idx] is set to 0.
//
// Push returns matchIdxs with len equal to len(tss).
// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items.
// Otherwise it allocates new matchIdxs.
// Otherwise, it allocates new matchIdxs.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := 0; i < len(matchIdxs); i++ {
@@ -208,9 +285,11 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b
}
if a != nil {
a.asMu.RLock()
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
}
a.asMu.RUnlock()
}
return matchIdxs
}
@@ -244,6 +323,8 @@ type aggregator struct {
wg sync.WaitGroup
stopCh chan struct{}
configData []byte
}
type aggrState interface {
@@ -261,6 +342,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
configData, err := json.Marshal(cfg)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided config: %s", err)
}
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
@@ -398,6 +484,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix,
stopCh: make(chan struct{}),
configData: configData,
}
if dedupAggr != nil {

View File

@@ -814,3 +814,135 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
}
return tss
}
func TestAggregators_UpdateWith(t *testing.T) {
f := func(oldConfig, newConfig string, expUpdates int) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
oldAg, err := NewAggregatorsFromData([]byte(oldConfig), pushFunc, 0)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
newAg, err := NewAggregatorsFromData([]byte(newConfig), pushFunc, 0)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
var oldPointers []*aggregator
if oldAg != nil {
oldPointers = append(oldPointers, oldAg.as...)
}
n := oldAg.UpdateWith(newAg)
if newAg == nil {
if oldAg != nil {
t.Fatalf("expected aggregations to be nil when updated with nil")
}
return
}
var unchangedPointers int
for _, uag := range oldAg.as {
for _, oag := range oldPointers {
if uag == oag {
unchangedPointers++
}
}
}
updates := len(oldAg.as) - unchangedPointers
if updates != expUpdates {
t.Fatalf("expected %d objects to change, only %d changed", expUpdates, updates)
}
if n != expUpdates {
t.Fatalf("expected %d objects to change, only %d changed", expUpdates, n)
}
}
f("", "", 0)
// identical configs
f(`
- interval: 1m
outputs: [last]
`, `
- interval: 1m
outputs: [last]
`, 0)
// the interval field change
f(`
- interval: 1m
outputs: [last]
`, `
- interval: 5m
outputs: [last]
`, 1)
// the output field change
f(`
- interval: 1m
outputs: [last]
`, `
- interval: 1m
outputs: [min]
`, 1)
// add one more config, old should remain unchanged
f(`
- interval: 1m
outputs: [last]
`, `
- interval: 1m
outputs: [last]
- interval: 1m
outputs: [max]
`, 1)
// remove one config, remained config should be unchanged
f(`
- interval: 1m
outputs: [last]
- interval: 1m
outputs: [max]
`, `
- interval: 1m
outputs: [last]
`, 0)
// update all three configs
f(`
- interval: 1m
outputs: [max]
- interval: 2m
outputs: [max]
- interval: 3m
outputs: [max]
`, `
- interval: 1m
outputs: [last]
- interval: 2m
outputs: [last]
- interval: 3m
outputs: [last]
`, 3)
// change order only
f(`
- interval: 1m
outputs: [max]
- interval: 2m
outputs: [max]
- interval: 3m
outputs: [max]
`, `
- interval: 3m
outputs: [max]
- interval: 2m
outputs: [max]
- interval: 1m
outputs: [max]
`, 0)
}