mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
2 Commits
sort-order
...
configwatc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0625bb77d | ||
|
|
4163f18250 |
@@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
|
||||
@@ -112,6 +113,7 @@ func main() {
|
||||
flag.Usage = usage
|
||||
envflag.Parse()
|
||||
remotewrite.InitSecretFlags()
|
||||
configwatcher.Init()
|
||||
buildinfo.Init()
|
||||
logger.Init()
|
||||
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
|
||||
@@ -199,6 +201,7 @@ func main() {
|
||||
}
|
||||
protoparserutil.StopUnmarshalWorkers()
|
||||
remotewrite.Stop()
|
||||
configwatcher.Stop()
|
||||
|
||||
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
|
||||
}
|
||||
|
||||
121
lib/configwatcher/watcher.go
Normal file
121
lib/configwatcher/watcher.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package configwatcher
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
)
|
||||
|
||||
// TBD: migrate /-/reload handler to the package
|
||||
// TBD: print registered configs in 10s after the start
|
||||
// TBD: print what reload methods enabled
|
||||
|
||||
type handler struct {
|
||||
flag string
|
||||
handler func()
|
||||
}
|
||||
|
||||
var configCheckInterval = flag.Duration("configCheckInterval", 0, "TBD")
|
||||
|
||||
var signalHandlers []handler
|
||||
|
||||
var checkIntervalHandlers []handler
|
||||
|
||||
var mux = sync.Mutex{}
|
||||
|
||||
func RegisterHandler(flag string, handlerFn func()) {
|
||||
RegisterSignalHandler(flag, handlerFn)
|
||||
RegisterCheckIntervalHandler(flag, handlerFn)
|
||||
}
|
||||
|
||||
func RegisterSignalHandler(flag string, handlerFn func()) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
|
||||
signalHandlers = append(signalHandlers, handler{
|
||||
flag: flag,
|
||||
handler: handlerFn,
|
||||
})
|
||||
}
|
||||
|
||||
func RegisterCheckIntervalHandler(flag string, handlerFn func()) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
|
||||
checkIntervalHandlers = append(checkIntervalHandlers, handler{
|
||||
flag: flag,
|
||||
handler: handlerFn,
|
||||
})
|
||||
}
|
||||
|
||||
func UnregisterHandler(flag string) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
|
||||
newCheckIntervalHandlers := make([]handler, 0, len(checkIntervalHandlers))
|
||||
for _, h := range checkIntervalHandlers {
|
||||
if h.flag != flag {
|
||||
newCheckIntervalHandlers = append(newCheckIntervalHandlers, h)
|
||||
}
|
||||
}
|
||||
newSignalHandlers := make([]handler, 0, len(signalHandlers))
|
||||
for _, h := range signalHandlers {
|
||||
if h.flag != flag {
|
||||
newSignalHandlers = append(newSignalHandlers, h)
|
||||
}
|
||||
}
|
||||
|
||||
checkIntervalHandlers = newCheckIntervalHandlers
|
||||
}
|
||||
|
||||
var stopChan chan struct{}
|
||||
|
||||
func Init() {
|
||||
stopChan = make(chan struct{})
|
||||
go func() {
|
||||
sighupCh := procutil.NewSighupChan()
|
||||
|
||||
var tickerCh <-chan time.Time
|
||||
if *configCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*configCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-sighupCh:
|
||||
mux.Lock()
|
||||
for _, h := range signalHandlers {
|
||||
h.handler()
|
||||
}
|
||||
mux.Unlock()
|
||||
case <-tickerCh:
|
||||
mux.Lock()
|
||||
for _, h := range checkIntervalHandlers {
|
||||
h.handler()
|
||||
}
|
||||
mux.Unlock()
|
||||
case <-stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Method for BC
|
||||
func EnableCheckInterval(dur time.Duration) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
|
||||
if dur > *configCheckInterval {
|
||||
*configCheckInterval = dur
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops Prometheus scraper.
|
||||
func Stop() {
|
||||
close(stopChan)
|
||||
}
|
||||
@@ -9,12 +9,12 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||
@@ -113,7 +113,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
|
||||
// Register SIGHUP handler for config reload before loadConfig.
|
||||
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||
sighupCh := procutil.NewSighupChan()
|
||||
//sighupCh := procutil.NewSighupChan()
|
||||
|
||||
logger.Infof("reading scrape configs from %q", configFile)
|
||||
cfg, err := loadConfig(configFile)
|
||||
@@ -152,61 +152,69 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
|
||||
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
|
||||
scs.add("static_configs", 0, func(cfg *Config, _ []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
|
||||
|
||||
var tickerCh <-chan time.Time
|
||||
scs.updateConfig(cfg)
|
||||
|
||||
if *configCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*configCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
for {
|
||||
scs.updateConfig(cfg)
|
||||
waitForChans:
|
||||
select {
|
||||
case <-sighupCh:
|
||||
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
||||
cfgNew, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
||||
goto waitForChans
|
||||
}
|
||||
configSuccess.Set(1)
|
||||
if !cfgNew.mustRestart(cfg) {
|
||||
logger.Infof("nothing changed in %q", configFile)
|
||||
goto waitForChans
|
||||
}
|
||||
cfg = cfgNew
|
||||
marshaledData = cfg.marshal()
|
||||
configData.Store(&marshaledData)
|
||||
configReloads.Inc()
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
case <-tickerCh:
|
||||
// TBD print notice that deprecated -promscrape.configCheckInterval is used
|
||||
configwatcher.EnableCheckInterval(*configCheckInterval)
|
||||
|
||||
configwatcher.RegisterCheckIntervalHandler("-promscrape.config", func() {
|
||||
cfgNew, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
|
||||
goto waitForChans
|
||||
return
|
||||
}
|
||||
configSuccess.Set(1)
|
||||
if !cfgNew.mustRestart(cfg) {
|
||||
goto waitForChans
|
||||
return
|
||||
}
|
||||
cfg = cfgNew
|
||||
marshaledData = cfg.marshal()
|
||||
configData.Store(&marshaledData)
|
||||
configReloads.Inc()
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
case <-globalStopCh:
|
||||
cfg.mustStop()
|
||||
logger.Infof("stopping Prometheus scrapers")
|
||||
startTime := time.Now()
|
||||
scs.stop()
|
||||
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
||||
|
||||
scs.updateConfig(cfg)
|
||||
})
|
||||
}
|
||||
|
||||
configwatcher.RegisterSignalHandler("-promscrape.config", func() {
|
||||
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
||||
cfgNew, err := loadConfig(configFile)
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
configSuccess.Set(1)
|
||||
if !cfgNew.mustRestart(cfg) {
|
||||
logger.Infof("nothing changed in %q", configFile)
|
||||
return
|
||||
}
|
||||
cfg = cfgNew
|
||||
marshaledData = cfg.marshal()
|
||||
configData.Store(&marshaledData)
|
||||
configReloads.Inc()
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
scs.updateConfig(cfg)
|
||||
})
|
||||
|
||||
go func() {
|
||||
<-globalStopCh
|
||||
|
||||
configwatcher.UnregisterHandler("-promscrape.config")
|
||||
|
||||
cfg.mustStop()
|
||||
logger.Infof("stopping Prometheus scrapers")
|
||||
startTime := time.Now()
|
||||
scs.stop()
|
||||
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
||||
return
|
||||
}()
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user