mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-08 19:33:35 +03:00
Compare commits
2 Commits
dependabot
...
configwatc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0625bb77d | ||
|
|
4163f18250 |
@@ -10,6 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
|
||||||
@@ -112,6 +113,7 @@ func main() {
|
|||||||
flag.Usage = usage
|
flag.Usage = usage
|
||||||
envflag.Parse()
|
envflag.Parse()
|
||||||
remotewrite.InitSecretFlags()
|
remotewrite.InitSecretFlags()
|
||||||
|
configwatcher.Init()
|
||||||
buildinfo.Init()
|
buildinfo.Init()
|
||||||
logger.Init()
|
logger.Init()
|
||||||
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
|
timeserieslimits.Init(*maxLabelsPerTimeseries, *maxLabelNameLen, *maxLabelValueLen)
|
||||||
@@ -199,6 +201,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
protoparserutil.StopUnmarshalWorkers()
|
protoparserutil.StopUnmarshalWorkers()
|
||||||
remotewrite.Stop()
|
remotewrite.Stop()
|
||||||
|
configwatcher.Stop()
|
||||||
|
|
||||||
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
|
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|||||||
121
lib/configwatcher/watcher.go
Normal file
121
lib/configwatcher/watcher.go
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
package configwatcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TBD: migrate /-/reload handler to the package
|
||||||
|
// TBD: print registered configs in 10s after the start
|
||||||
|
// TBD: print what reload methods enabled
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
flag string
|
||||||
|
handler func()
|
||||||
|
}
|
||||||
|
|
||||||
|
var configCheckInterval = flag.Duration("configCheckInterval", 0, "TBD")
|
||||||
|
|
||||||
|
var signalHandlers []handler
|
||||||
|
|
||||||
|
var checkIntervalHandlers []handler
|
||||||
|
|
||||||
|
var mux = sync.Mutex{}
|
||||||
|
|
||||||
|
func RegisterHandler(flag string, handlerFn func()) {
|
||||||
|
RegisterSignalHandler(flag, handlerFn)
|
||||||
|
RegisterCheckIntervalHandler(flag, handlerFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterSignalHandler(flag string, handlerFn func()) {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
|
||||||
|
signalHandlers = append(signalHandlers, handler{
|
||||||
|
flag: flag,
|
||||||
|
handler: handlerFn,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterCheckIntervalHandler(flag string, handlerFn func()) {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
|
||||||
|
checkIntervalHandlers = append(checkIntervalHandlers, handler{
|
||||||
|
flag: flag,
|
||||||
|
handler: handlerFn,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnregisterHandler(flag string) {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
|
||||||
|
newCheckIntervalHandlers := make([]handler, 0, len(checkIntervalHandlers))
|
||||||
|
for _, h := range checkIntervalHandlers {
|
||||||
|
if h.flag != flag {
|
||||||
|
newCheckIntervalHandlers = append(newCheckIntervalHandlers, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newSignalHandlers := make([]handler, 0, len(signalHandlers))
|
||||||
|
for _, h := range signalHandlers {
|
||||||
|
if h.flag != flag {
|
||||||
|
newSignalHandlers = append(newSignalHandlers, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkIntervalHandlers = newCheckIntervalHandlers
|
||||||
|
}
|
||||||
|
|
||||||
|
var stopChan chan struct{}
|
||||||
|
|
||||||
|
func Init() {
|
||||||
|
stopChan = make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
sighupCh := procutil.NewSighupChan()
|
||||||
|
|
||||||
|
var tickerCh <-chan time.Time
|
||||||
|
if *configCheckInterval > 0 {
|
||||||
|
ticker := time.NewTicker(*configCheckInterval)
|
||||||
|
tickerCh = ticker.C
|
||||||
|
defer ticker.Stop()
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sighupCh:
|
||||||
|
mux.Lock()
|
||||||
|
for _, h := range signalHandlers {
|
||||||
|
h.handler()
|
||||||
|
}
|
||||||
|
mux.Unlock()
|
||||||
|
case <-tickerCh:
|
||||||
|
mux.Lock()
|
||||||
|
for _, h := range checkIntervalHandlers {
|
||||||
|
h.handler()
|
||||||
|
}
|
||||||
|
mux.Unlock()
|
||||||
|
case <-stopChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method for BC
|
||||||
|
func EnableCheckInterval(dur time.Duration) {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
|
||||||
|
if dur > *configCheckInterval {
|
||||||
|
*configCheckInterval = dur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops Prometheus scraper.
|
||||||
|
func Stop() {
|
||||||
|
close(stopChan)
|
||||||
|
}
|
||||||
@@ -9,12 +9,12 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/configwatcher"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
|
||||||
@@ -113,7 +113,7 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
|
|||||||
// Register SIGHUP handler for config reload before loadConfig.
|
// Register SIGHUP handler for config reload before loadConfig.
|
||||||
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
|
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
||||||
sighupCh := procutil.NewSighupChan()
|
//sighupCh := procutil.NewSighupChan()
|
||||||
|
|
||||||
logger.Infof("reading scrape configs from %q", configFile)
|
logger.Infof("reading scrape configs from %q", configFile)
|
||||||
cfg, err := loadConfig(configFile)
|
cfg, err := loadConfig(configFile)
|
||||||
@@ -152,61 +152,69 @@ func runScraper(configFile string, pushData func(at *auth.Token, wr *prompb.Writ
|
|||||||
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
|
scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
|
||||||
scs.add("static_configs", 0, func(cfg *Config, _ []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
|
scs.add("static_configs", 0, func(cfg *Config, _ []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })
|
||||||
|
|
||||||
var tickerCh <-chan time.Time
|
scs.updateConfig(cfg)
|
||||||
|
|
||||||
if *configCheckInterval > 0 {
|
if *configCheckInterval > 0 {
|
||||||
ticker := time.NewTicker(*configCheckInterval)
|
// TBD print notice that deprecated -promscrape.configCheckInterval is used
|
||||||
tickerCh = ticker.C
|
configwatcher.EnableCheckInterval(*configCheckInterval)
|
||||||
defer ticker.Stop()
|
|
||||||
}
|
configwatcher.RegisterCheckIntervalHandler("-promscrape.config", func() {
|
||||||
for {
|
|
||||||
scs.updateConfig(cfg)
|
|
||||||
waitForChans:
|
|
||||||
select {
|
|
||||||
case <-sighupCh:
|
|
||||||
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
|
||||||
cfgNew, err := loadConfig(configFile)
|
|
||||||
if err != nil {
|
|
||||||
configReloadErrors.Inc()
|
|
||||||
configSuccess.Set(0)
|
|
||||||
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
|
||||||
goto waitForChans
|
|
||||||
}
|
|
||||||
configSuccess.Set(1)
|
|
||||||
if !cfgNew.mustRestart(cfg) {
|
|
||||||
logger.Infof("nothing changed in %q", configFile)
|
|
||||||
goto waitForChans
|
|
||||||
}
|
|
||||||
cfg = cfgNew
|
|
||||||
marshaledData = cfg.marshal()
|
|
||||||
configData.Store(&marshaledData)
|
|
||||||
configReloads.Inc()
|
|
||||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
|
||||||
case <-tickerCh:
|
|
||||||
cfgNew, err := loadConfig(configFile)
|
cfgNew, err := loadConfig(configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
configReloadErrors.Inc()
|
configReloadErrors.Inc()
|
||||||
configSuccess.Set(0)
|
configSuccess.Set(0)
|
||||||
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
|
logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
|
||||||
goto waitForChans
|
return
|
||||||
}
|
}
|
||||||
configSuccess.Set(1)
|
configSuccess.Set(1)
|
||||||
if !cfgNew.mustRestart(cfg) {
|
if !cfgNew.mustRestart(cfg) {
|
||||||
goto waitForChans
|
return
|
||||||
}
|
}
|
||||||
cfg = cfgNew
|
cfg = cfgNew
|
||||||
marshaledData = cfg.marshal()
|
marshaledData = cfg.marshal()
|
||||||
configData.Store(&marshaledData)
|
configData.Store(&marshaledData)
|
||||||
configReloads.Inc()
|
configReloads.Inc()
|
||||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||||
case <-globalStopCh:
|
|
||||||
cfg.mustStop()
|
scs.updateConfig(cfg)
|
||||||
logger.Infof("stopping Prometheus scrapers")
|
})
|
||||||
startTime := time.Now()
|
}
|
||||||
scs.stop()
|
|
||||||
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
configwatcher.RegisterSignalHandler("-promscrape.config", func() {
|
||||||
|
logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
|
||||||
|
cfgNew, err := loadConfig(configFile)
|
||||||
|
if err != nil {
|
||||||
|
configReloadErrors.Inc()
|
||||||
|
configSuccess.Set(0)
|
||||||
|
logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
configSuccess.Set(1)
|
||||||
|
if !cfgNew.mustRestart(cfg) {
|
||||||
|
logger.Infof("nothing changed in %q", configFile)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cfg = cfgNew
|
||||||
|
marshaledData = cfg.marshal()
|
||||||
|
configData.Store(&marshaledData)
|
||||||
|
configReloads.Inc()
|
||||||
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||||
|
|
||||||
|
scs.updateConfig(cfg)
|
||||||
|
})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-globalStopCh
|
||||||
|
|
||||||
|
configwatcher.UnregisterHandler("-promscrape.config")
|
||||||
|
|
||||||
|
cfg.mustStop()
|
||||||
|
logger.Infof("stopping Prometheus scrapers")
|
||||||
|
startTime := time.Now()
|
||||||
|
scs.stop()
|
||||||
|
logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
|
||||||
|
return
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
Reference in New Issue
Block a user