Compare commits

...

2 Commits

Author SHA1 Message Date
Max Kotliar
d0625bb77d wip 2025-08-27 23:31:37 +03:00
Max Kotliar
4163f18250 lib/configwatcher: Introduce a library responsible for configs reloading 2025-08-27 23:28:02 +03:00
3 changed files with 172 additions and 40 deletions

View File

@@ -10,6 +10,7 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
@@ -112,6 +113,7 @@ func main() {
flag.Usage = usage
envflag.Parse()
remotewrite.InitSecretFlags()
configwatcher.Init()
buildinfo.Init()
logger.Init()
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
@@ -199,6 +201,7 @@ func main() {
}
protoparserutil.StopUnmarshalWorkers()
remotewrite.Stop()
configwatcher.Stop()
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
}

View File

@@ -0,0 +1,121 @@
package configwatcher
import (
"flag"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
)
// TBD: migrate /-/reload handler to the package
// TBD: print registered configs in 10s after the start
// TBD: print what reload methods enabled
type handler struct {
flag string
handler func()
}
var configCheckInterval = flag.Duration("configCheckInterval", 0, "TBD")
var signalHandlers []handler
var checkIntervalHandlers []handler
var mux = sync.Mutex{}
func RegisterHandler(flag string, handlerFn func()) {
RegisterSignalHandler(flag, handlerFn)
RegisterCheckIntervalHandler(flag, handlerFn)
}
func RegisterSignalHandler(flag string, handlerFn func()) {
mux.Lock()
defer mux.Unlock()
signalHandlers = append(signalHandlers, handler{
flag: flag,
handler: handlerFn,
})
}
func RegisterCheckIntervalHandler(flag string, handlerFn func()) {
mux.Lock()
defer mux.Unlock()
checkIntervalHandlers = append(checkIntervalHandlers, handler{
flag: flag,
handler: handlerFn,
})
}
func UnregisterHandler(flag string) {
mux.Lock()
defer mux.Unlock()
newCheckIntervalHandlers := make([]handler, 0, len(checkIntervalHandlers))
for _, h := range checkIntervalHandlers {
if h.flag != flag {
newCheckIntervalHandlers = append(newCheckIntervalHandlers, h)
}
}
newSignalHandlers := make([]handler, 0, len(signalHandlers))
for _, h := range signalHandlers {
if h.flag != flag {
newSignalHandlers = append(newSignalHandlers, h)
}
}
checkIntervalHandlers = newCheckIntervalHandlers
}
var stopChan chan struct{}
func Init() {
stopChan = make(chan struct{})
go func() {
sighupCh := procutil.NewSighupChan()
var tickerCh <-chan time.Time
if *configCheckInterval > 0 {
ticker := time.NewTicker(*configCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
for {
select {
case <-sighupCh:
mux.Lock()
for _, h := range signalHandlers {
h.handler()
}
mux.Unlock()
case <-tickerCh:
mux.Lock()
for _, h := range checkIntervalHandlers {
h.handler()
}
mux.Unlock()
case <-stopChan:
return
}
}
}()
}
// Method for BC
func EnableCheckInterval(dur time.Duration) {
mux.Lock()
defer mux.Unlock()
if dur > *configCheckInterval {
*configCheckInterval = dur
}
}
// Stop stops Prometheus scraper.
func Stop() {
close(stopChan)
}

View File

@@ -9,12 +9,12 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
@@ -113,7 +113,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
// Register SIGHUP handler for config reload before loadConfig.
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil.NewSighupChan()
//sighupCh := procutil.NewSighupChan()
logger.Infof("reading scrape configs from %q", configFile)
cfg, err := loadConfig(configFile)
@@ -152,61 +152,69 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
scs.add("static_configs", 0, func(cfg *Config, _ []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
var tickerCh <-chan time.Time
scs.updateConfig(cfg)
if *configCheckInterval > 0 {
ticker := time.NewTicker(*configCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
for {
scs.updateConfig(cfg)
waitForChans:
select {
case <-sighupCh:
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
cfgNew, err := loadConfig(configFile)
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
goto waitForChans
}
configSuccess.Set(1)
if !cfgNew.mustRestart(cfg) {
logger.Infof("nothing changed in %q", configFile)
goto waitForChans
}
cfg = cfgNew
marshaledData = cfg.marshal()
configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-tickerCh:
// TBD print notice that deprecated -promscrape.configCheckInterval is used
configwatcher.EnableCheckInterval(*configCheckInterval)
configwatcher.RegisterCheckIntervalHandler("-promscrape.config", func() {
cfgNew, err := loadConfig(configFile)
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
goto waitForChans
return
}
configSuccess.Set(1)
if !cfgNew.mustRestart(cfg) {
goto waitForChans
return
}
cfg = cfgNew
marshaledData = cfg.marshal()
configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
case <-globalStopCh:
cfg.mustStop()
logger.Infof("stopping Prometheus scrapers")
startTime := time.Now()
scs.stop()
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
scs.updateConfig(cfg)
})
}
configwatcher.RegisterSignalHandler("-promscrape.config", func() {
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
cfgNew, err := loadConfig(configFile)
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
return
}
}
configSuccess.Set(1)
if !cfgNew.mustRestart(cfg) {
logger.Infof("nothing changed in %q", configFile)
return
}
cfg = cfgNew
marshaledData = cfg.marshal()
configData.Store(&marshaledData)
configReloads.Inc()
configTimestamp.Set(fasttime.UnixTimestamp())
scs.updateConfig(cfg)
})
go func() {
<-globalStopCh
configwatcher.UnregisterHandler("-promscrape.config")
cfg.mustStop()
logger.Infof("stopping Prometheus scrapers")
startTime := time.Now()
scs.stop()
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
return
}()
}
var (