mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 01:36:27 +03:00
Compare commits
4 Commits
logsql-ski
...
vmagent-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc0cc49234 | ||
|
|
cde8ca7166 | ||
|
|
e706cf950e | ||
|
|
1f7ab35f38 |
@@ -759,16 +759,29 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
|
||||
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
|
||||
return
|
||||
}
|
||||
if !sasNew.Equal(sas) {
|
||||
sasOld := rwctx.sas.Swap(sasNew)
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
|
||||
} else {
|
||||
|
||||
defer func() {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
||||
}()
|
||||
|
||||
if sasNew.Equal(sas) {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
|
||||
return
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
||||
|
||||
if sas == nil {
|
||||
sas = &streamaggr.Aggregators{}
|
||||
}
|
||||
sasOldLen := sas.Len()
|
||||
updated := sas.UpdateWith(sasNew)
|
||||
rwctx.sas.Store(sas)
|
||||
|
||||
// no need to stop sas or sasNew as their *aggregator should have been
|
||||
// stopped in UpdateWith method.
|
||||
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q. "+
|
||||
"Total aggregation configs %d (was %d); updated %d.", sasFile, sas.Len(), sasOldLen, updated)
|
||||
}
|
||||
|
||||
var tssPool = &sync.Pool{
|
||||
|
||||
@@ -86,33 +86,50 @@ func InitStreamAggr() {
|
||||
case <-saCfgReloaderStopCh:
|
||||
return
|
||||
}
|
||||
reloadStreamAggrConfig()
|
||||
reloadStreamAggrConfig(false)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfig() {
|
||||
func reloadStreamAggrConfig(testMode bool) {
|
||||
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
|
||||
saCfgReloads.Inc()
|
||||
|
||||
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
|
||||
pushFn := pushAggregateSeries
|
||||
if testMode {
|
||||
pushFn = func(tss []prompbmarshal.TimeSeries) {}
|
||||
}
|
||||
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushFn, *streamAggrDedupInterval)
|
||||
if err != nil {
|
||||
saCfgSuccess.Set(0)
|
||||
saCfgReloadErr.Inc()
|
||||
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
}()
|
||||
|
||||
sas := sasGlobal.Load()
|
||||
if !sasNew.Equal(sas) {
|
||||
sasOld := sasGlobal.Swap(sasNew)
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
|
||||
} else {
|
||||
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
|
||||
if sasNew.Equal(sas) {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("the config at -streamAggr.config=%q wasn't changed", *streamAggrConfig)
|
||||
return
|
||||
}
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
if sas == nil {
|
||||
sas = &streamaggr.Aggregators{}
|
||||
}
|
||||
sasOldLen := sas.Len()
|
||||
updated := sas.UpdateWith(sasNew)
|
||||
sasGlobal.Store(sas)
|
||||
|
||||
// no need to stop sasOld or sasNew as their *aggregator should have been
|
||||
// stopped in UpdateWith method.
|
||||
logger.Infof("successfully reloaded stream aggregation configs at -streamAggr.config=%q. "+
|
||||
"Total aggregation configs %d (was %d); updated %d.", *streamAggrConfig, sas.Len(), sasOldLen, updated)
|
||||
}
|
||||
|
||||
// MustStopStreamAggr stops stream aggregators.
|
||||
|
||||
126
app/vminsert/common/streamaggr_test.go
Normal file
126
app/vminsert/common/streamaggr_test.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
)
|
||||
|
||||
// TestReloadStreamAggrConfigReload supposed to test concurrent
|
||||
// execution of stream configuration update.
|
||||
// Should be executed with -race flag
|
||||
func TestReloadStreamAggrConfigReload(t *testing.T) {
|
||||
tssInput := mustParsePromMetrics(`foo{abc="123"} 4
|
||||
bar 5
|
||||
foo{abc="123"} 8.5
|
||||
foo{abc="456",de="fg"} 8`)
|
||||
|
||||
streamAggrConfigs := []string{`
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
- interval: 2m
|
||||
outputs: [max]
|
||||
- interval: 3m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 3m
|
||||
outputs: [max]
|
||||
- interval: 2m
|
||||
outputs: [max]
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`}
|
||||
|
||||
f, err := os.CreateTemp("", "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
*streamAggrConfig = f.Name()
|
||||
defer func() {
|
||||
*streamAggrConfig = ""
|
||||
_ = os.Remove(f.Name())
|
||||
}()
|
||||
|
||||
logger.SetOutputForTests(&bytes.Buffer{})
|
||||
|
||||
writeToFile(t, f.Name(), streamAggrConfigs[0])
|
||||
reloadStreamAggrConfig(true)
|
||||
|
||||
syncCh := make(chan struct{})
|
||||
go func() {
|
||||
r := rand.New(rand.NewSource(1))
|
||||
for {
|
||||
select {
|
||||
case <-syncCh:
|
||||
return
|
||||
default:
|
||||
n := r.Intn(len(streamAggrConfigs))
|
||||
writeToFile(t, f.Name(), streamAggrConfigs[n])
|
||||
reloadStreamAggrConfig(true)
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 1e4; i++ {
|
||||
sasGlobal.Load().Push(tssInput, nil)
|
||||
}
|
||||
close(syncCh)
|
||||
}
|
||||
|
||||
func writeToFile(t *testing.T, file, b string) {
|
||||
t.Helper()
|
||||
err := os.WriteFile(file, []byte(b), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
||||
var rows prometheus.Rows
|
||||
errLogger := func(s string) {
|
||||
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
|
||||
}
|
||||
rows.UnmarshalWithErrLogger(s, errLogger)
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
||||
for _, row := range rows.Rows {
|
||||
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: "__name__",
|
||||
Value: row.Metric,
|
||||
})
|
||||
for _, tag := range row.Tags {
|
||||
labels = append(labels, prompbmarshal.Label{
|
||||
Name: tag.Key,
|
||||
Value: tag.Value,
|
||||
})
|
||||
}
|
||||
samples = append(samples, prompbmarshal.Sample{
|
||||
Value: row.Value,
|
||||
Timestamp: row.Timestamp,
|
||||
})
|
||||
ts := prompbmarshal.TimeSeries{
|
||||
Labels: labels,
|
||||
Samples: samples[len(samples)-1:],
|
||||
}
|
||||
tss = append(tss, ts)
|
||||
}
|
||||
return tss
|
||||
}
|
||||
@@ -35,6 +35,7 @@ The sandbox cluster installation is running under the constant load generated by
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not log `unexpected EOF` when reading incoming metrics, since this error is expected and is handled during metrics' parsing. This reduces the amounts of noisy logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4817).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): retry failed write request on the closed connection immediately, without waiting for backoff. This should improve data delivery speed and reduce amount of error logs emitted by vmagent when using idle connections. See related [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduces load on Kubernetes control plane during initial service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855) for details.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart only updated stream aggregation configs on [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4810).
|
||||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce the maximum recovery time at `vmselect` and `vminsert` when some of `vmstorage` nodes become unavailable because of networking issues from 60 seconds to 3 seconds by default. The recovery time can be tuned at `vmselect` and `vminsert` nodes with `-vmstorageUserTimeout` command-line flag if needed. Thanks to @wjordan for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4423).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add Prometheus data support to the "Explore cardinality" page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4320) for details.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): make the warning message more noticeable for text fields. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4848).
|
||||
|
||||
@@ -134,7 +134,8 @@ type Config struct {
|
||||
|
||||
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
asMu sync.RWMutex
|
||||
as []*aggregator
|
||||
|
||||
// configData contains marshaled configs passed to NewAggregators().
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
@@ -175,14 +176,90 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Len returns number of aggregators
|
||||
func (a *Aggregators) Len() int {
|
||||
if a == nil {
|
||||
return 0
|
||||
}
|
||||
a.asMu.RLock()
|
||||
defer a.asMu.RUnlock()
|
||||
return len(a.as)
|
||||
}
|
||||
|
||||
// MustStop stops a.
|
||||
func (a *Aggregators) MustStop() {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
|
||||
a.asMu.Lock()
|
||||
for _, aggr := range a.as {
|
||||
aggr.MustStop()
|
||||
}
|
||||
a.as = nil
|
||||
a.asMu.Unlock()
|
||||
}
|
||||
|
||||
// UpdateWith updates the list of `aggregator` from `a` with `aggregator`
|
||||
// from `b`. UpdateWith keeps original objects from `a` if they have identical
|
||||
// match by `configData` field with objects from `b`.
|
||||
// UpdateWith returns number of new objects added to `a`.
|
||||
// Objects from `a` which were absent in `b` will be stopped.
|
||||
// Objects from `b` which had identical `configData` match with objects from `a` will be stopped.
|
||||
// UpdateWith stops `aggregator`s from `a` and `b` that won't be used anymore,
|
||||
// so no further calls to MustStop are needed.
|
||||
func (a *Aggregators) UpdateWith(b *Aggregators) int {
|
||||
if b == nil {
|
||||
a.MustStop()
|
||||
return 0
|
||||
}
|
||||
|
||||
a.asMu.Lock()
|
||||
defer a.asMu.Unlock()
|
||||
|
||||
var updatedAs []*aggregator
|
||||
// keep all aggregators present in a and b
|
||||
for i, oldAs := range a.as {
|
||||
matched := false
|
||||
for j, newAs := range b.as {
|
||||
if newAs == nil {
|
||||
continue
|
||||
}
|
||||
if string(oldAs.configData) == string(newAs.configData) {
|
||||
matched = true
|
||||
updatedAs = append(updatedAs, oldAs)
|
||||
// a already has this aggregator, so we keep it as is unchanged
|
||||
// and stop the aggregator from b instead
|
||||
newAs.MustStop()
|
||||
b.as[j] = nil
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
// aggregator from a isn't present in b, so we stop it and remove from the list
|
||||
oldAs.MustStop()
|
||||
a.as[i] = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// by this point, b should contain only new aggregators,
|
||||
// so we simply add them to the final list.
|
||||
var newAsTotal int
|
||||
for j, newAs := range b.as {
|
||||
if newAs == nil {
|
||||
continue
|
||||
}
|
||||
updatedAs = append(updatedAs, newAs)
|
||||
b.as[j] = nil
|
||||
newAsTotal++
|
||||
}
|
||||
|
||||
// replace list of a aggregators with updated list.
|
||||
// all the old aggregators in a should have been
|
||||
// either stopped or added to updatedAs.
|
||||
a.as = updatedAs
|
||||
|
||||
return newAsTotal
|
||||
}
|
||||
|
||||
// Equal returns true if a and b are initialized from identical configs.
|
||||
@@ -196,11 +273,11 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
|
||||
// Push pushes tss to a.
|
||||
//
|
||||
// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations.
|
||||
// Otherwise matchIdxs[idx] is set to 0.
|
||||
// Otherwise, matchIdxs[idx] is set to 0.
|
||||
//
|
||||
// Push returns matchIdxs with len equal to len(tss).
|
||||
// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items.
|
||||
// Otherwise it allocates new matchIdxs.
|
||||
// Otherwise, it allocates new matchIdxs.
|
||||
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte {
|
||||
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
|
||||
for i := 0; i < len(matchIdxs); i++ {
|
||||
@@ -208,9 +285,11 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b
|
||||
}
|
||||
|
||||
if a != nil {
|
||||
a.asMu.RLock()
|
||||
for _, aggr := range a.as {
|
||||
aggr.Push(tss, matchIdxs)
|
||||
}
|
||||
a.asMu.RUnlock()
|
||||
}
|
||||
return matchIdxs
|
||||
}
|
||||
@@ -244,6 +323,8 @@ type aggregator struct {
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
|
||||
configData []byte
|
||||
}
|
||||
|
||||
type aggrState interface {
|
||||
@@ -261,6 +342,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
|
||||
//
|
||||
// The returned aggregator must be stopped when no longer needed by calling MustStop().
|
||||
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
|
||||
configData, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal the provided config: %s", err)
|
||||
}
|
||||
|
||||
// check cfg.Interval
|
||||
interval, err := time.ParseDuration(cfg.Interval)
|
||||
if err != nil {
|
||||
@@ -398,6 +484,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
suffix: suffix,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
|
||||
configData: configData,
|
||||
}
|
||||
|
||||
if dedupAggr != nil {
|
||||
|
||||
@@ -814,3 +814,135 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
||||
}
|
||||
return tss
|
||||
}
|
||||
|
||||
func TestAggregators_UpdateWith(t *testing.T) {
|
||||
f := func(oldConfig, newConfig string, expUpdates int) {
|
||||
t.Helper()
|
||||
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
|
||||
oldAg, err := NewAggregatorsFromData([]byte(oldConfig), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
newAg, err := NewAggregatorsFromData([]byte(newConfig), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
|
||||
var oldPointers []*aggregator
|
||||
if oldAg != nil {
|
||||
oldPointers = append(oldPointers, oldAg.as...)
|
||||
}
|
||||
|
||||
n := oldAg.UpdateWith(newAg)
|
||||
if newAg == nil {
|
||||
if oldAg != nil {
|
||||
t.Fatalf("expected aggregations to be nil when updated with nil")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var unchangedPointers int
|
||||
for _, uag := range oldAg.as {
|
||||
for _, oag := range oldPointers {
|
||||
if uag == oag {
|
||||
unchangedPointers++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updates := len(oldAg.as) - unchangedPointers
|
||||
if updates != expUpdates {
|
||||
t.Fatalf("expected %d objects to change, only %d changed", expUpdates, updates)
|
||||
}
|
||||
if n != expUpdates {
|
||||
t.Fatalf("expected %d objects to change, only %d changed", expUpdates, n)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
f("", "", 0)
|
||||
|
||||
// identical configs
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, 0)
|
||||
|
||||
// the interval field change
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, `
|
||||
- interval: 5m
|
||||
outputs: [last]
|
||||
`, 1)
|
||||
|
||||
// the output field change
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [min]
|
||||
`, 1)
|
||||
|
||||
// add one more config, old should remain unchanged
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`, 1)
|
||||
|
||||
// remove one config, remained config should be unchanged
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
`, 0)
|
||||
|
||||
// update all three configs
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
- interval: 2m
|
||||
outputs: [max]
|
||||
- interval: 3m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [last]
|
||||
- interval: 2m
|
||||
outputs: [last]
|
||||
- interval: 3m
|
||||
outputs: [last]
|
||||
`, 3)
|
||||
|
||||
// change order only
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
- interval: 2m
|
||||
outputs: [max]
|
||||
- interval: 3m
|
||||
outputs: [max]
|
||||
`, `
|
||||
- interval: 3m
|
||||
outputs: [max]
|
||||
- interval: 2m
|
||||
outputs: [max]
|
||||
- interval: 1m
|
||||
outputs: [max]
|
||||
`, 0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user