all: consistently use sync.WaitGroup.Go() instead of sync.WaitGroup.Add(1) + sync.WaitGroup.Done()

This improves code readability a bit.
This commit is contained in:
Aliaksandr Valialkin
2026-01-27 00:29:44 +01:00
parent 6bbc03ecf8
commit e35a9a366c
66 changed files with 335 additions and 633 deletions

View File

@@ -29,11 +29,9 @@ var selfScraperWG sync.WaitGroup
func startSelfScraper() {
selfScraperStopCh = make(chan struct{})
selfScraperWG.Add(1)
go func() {
defer selfScraperWG.Done()
selfScraperWG.Go(func() {
selfScraper(*selfScrapeInterval)
}()
})
}
func stopSelfScraper() {

View File

@@ -204,12 +204,8 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
})
for i := 0; i < concurrency; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.runWorker()
}()
for range concurrency {
c.wg.Go(c.runWorker)
}
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
}

View File

@@ -48,11 +48,7 @@ func newPendingSeries(fq *persistentqueue.FastQueue, isVMRemoteWrite *atomic.Boo
ps.wr.significantFigures = significantFigures
ps.wr.roundDigits = roundDigits
ps.stopCh = make(chan struct{})
ps.periodicFlusherWG.Add(1)
go func() {
defer ps.periodicFlusherWG.Done()
ps.periodicFlusher()
}()
ps.periodicFlusherWG.Go(ps.periodicFlusher)
return &ps
}

View File

@@ -208,9 +208,7 @@ func Init() {
dropDanglingQueues()
// Start config reloader.
configReloaderWG.Add(1)
go func() {
defer configReloaderWG.Done()
configReloaderWG.Go(func() {
for {
select {
case <-configReloaderStopCh:
@@ -220,7 +218,7 @@ func Init() {
reloadRelabelConfigs()
reloadStreamAggrConfigs()
}
}()
})
}
func dropDanglingQueues() {
@@ -540,11 +538,9 @@ func tryPushMetadataToRemoteStorages(rwctxs []*remoteWriteCtx, mms []prompb.Metr
// Push metadata to remote storage systems in parallel to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
var anyPushFailed atomic.Bool
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
wg.Go(func() {
if !rwctx.tryPushMetadataInternal(mms) {
rwctx.pushFailures.Inc()
if forceDropSamplesOnFailure {
@@ -553,7 +549,7 @@ func tryPushMetadataToRemoteStorages(rwctxs []*remoteWriteCtx, mms []prompb.Metr
}
anyPushFailed.Store(true)
}
}(rwctx)
})
}
wg.Wait()
return !anyPushFailed.Load()
@@ -585,15 +581,13 @@ func tryPushTimeSeriesToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prom
// Push tssBlock to remote storage systems in parallel to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
var anyPushFailed atomic.Bool
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
wg.Go(func() {
if !rwctx.TryPushTimeSeries(tssBlock, forceDropSamplesOnFailure) {
anyPushFailed.Store(true)
}
}(rwctx)
})
}
wg.Wait()
return !anyPushFailed.Load()
@@ -615,13 +609,11 @@ func tryShardingTimeSeriesAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock
if len(shard) == 0 {
continue
}
wg.Add(1)
go func(rwctx *remoteWriteCtx, tss []prompb.TimeSeries) {
defer wg.Done()
if !rwctx.TryPushTimeSeries(tss, forceDropSamplesOnFailure) {
wg.Go(func() {
if !rwctx.TryPushTimeSeries(shard, forceDropSamplesOnFailure) {
anyPushFailed.Store(true)
}
}(rwctx, shard)
})
}
wg.Wait()
return !anyPushFailed.Load()

View File

@@ -65,11 +65,9 @@ func TestManagerUpdateConcurrent(t *testing.T) {
const workers = 500
const iterations = 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(n int) {
defer wg.Done()
var wg sync.WaitGroup
for n := range workers {
wg.Go(func() {
r := rand.New(rand.NewSource(int64(n)))
for i := 0; i < iterations; i++ {
rnd := r.Intn(len(paths))
@@ -79,7 +77,7 @@ func TestManagerUpdateConcurrent(t *testing.T) {
}
_ = m.update(context.Background(), cfg, false)
}
}(i)
})
}
wg.Wait()
}

View File

@@ -212,18 +212,16 @@ consul_sd_configs:
const workers = 500
const iterations = 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(n int) {
defer wg.Done()
var wg sync.WaitGroup
for n := range workers {
wg.Go(func() {
r := rand.New(rand.NewSource(int64(n)))
for i := 0; i < iterations; i++ {
rnd := r.Intn(len(paths))
_ = cw.reload(paths[rnd]) // update can fail and this is expected
_ = cw.notifiers()
}
}(i)
})
}
wg.Wait()
}

View File

@@ -65,17 +65,15 @@ func TestRule_stateConcurrent(_ *testing.T) {
r := &AlertingRule{state: &ruleState{entries: make([]StateEntry, 20)}}
const workers = 50
const iterations = 100
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
var wg sync.WaitGroup
for range workers {
wg.Go(func() {
for i := 0; i < iterations; i++ {
r.state.add(StateEntry{At: time.Now()})
r.state.getAll()
r.state.getLast()
}
}()
})
}
wg.Wait()
}

View File

@@ -373,12 +373,10 @@ func (bu *backendURL) isBroken() bool {
func (bu *backendURL) setBroken() {
if bu.broken.CompareAndSwap(false, true) {
bu.healthCheckWG.Add(1)
go func() {
defer bu.healthCheckWG.Done()
bu.healthCheckWG.Go(func() {
bu.runHealthCheck()
bu.broken.Store(false)
}()
})
}
}
@@ -743,11 +741,9 @@ func initAuthConfig() {
configTimestamp.Set(fasttime.UnixTimestamp())
stopCh = make(chan struct{})
authConfigWG.Add(1)
go func() {
defer authConfigWG.Done()
authConfigWG.Go(func() {
authConfigReloader(sighupCh)
}()
})
}
func stopAuthConfig() {

View File

@@ -63,10 +63,8 @@ func (ip *influxProcessor) run(ctx context.Context) error {
ip.im.ResetStats()
var wg sync.WaitGroup
wg.Add(ip.cc)
for i := 0; i < ip.cc; i++ {
go func() {
defer wg.Done()
for range ip.cc {
wg.Go(func() {
for s := range seriesCh {
if err := ip.do(s); err != nil {
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
@@ -74,7 +72,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
}
bar.Increment()
}
}()
})
}
// any error breaks the import

View File

@@ -89,10 +89,8 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
bar.Finish()
}(bar)
var wg sync.WaitGroup
wg.Add(op.otsdbcc)
for i := 0; i < op.otsdbcc; i++ {
go func() {
defer wg.Done()
for range op.otsdbcc {
wg.Go(func() {
for s := range seriesCh {
if err := op.do(s); err != nil {
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
@@ -100,7 +98,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
}
bar.Increment()
}
}()
})
}
/*
Loop through all series for this metric, processing all retentions and time ranges

View File

@@ -124,10 +124,8 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
pp.im.ResetStats()
var wg sync.WaitGroup
wg.Add(pp.cc)
for i := 0; i < pp.cc; i++ {
go func() {
defer wg.Done()
for range pp.cc {
wg.Go(func() {
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
@@ -135,7 +133,7 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
}
bar.Increment()
}
}()
})
}
// any error breaks the import
for _, br := range blocks {

View File

@@ -66,10 +66,8 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
errCh := make(chan error)
var wg sync.WaitGroup
wg.Add(rrp.cc)
for i := 0; i < rrp.cc; i++ {
go func() {
defer wg.Done()
for range rrp.cc {
wg.Go(func() {
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
errCh <- fmt.Errorf("request failed for: %s", err)
@@ -77,7 +75,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
}
bar.Increment()
}
}()
})
}
for _, r := range ranges {

View File

@@ -156,15 +156,13 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
cfg.BatchSize = 1e5
}
im.wg.Add(int(cfg.Concurrency))
for i := 0; i < int(cfg.Concurrency); i++ {
for i := range int(cfg.Concurrency) {
pbPrefix := fmt.Sprintf(`{{ green "VM worker %d:" }}`, i)
bar := barpool.AddWithTemplate(pbPrefix+pbTpl, 0)
go func(bar barpool.Bar) {
defer im.wg.Done()
im.wg.Go(func() {
im.startWorker(ctx, bar, cfg.BatchSize, cfg.SignificantFigures, cfg.RoundDigits)
}(bar)
})
}
im.ResetStats()
return im, nil

View File

@@ -249,9 +249,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
var wg sync.WaitGroup
for i := 0; i < p.cc; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for f := range filterCh {
if !p.disablePerMetricRequests {
if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil {
@@ -266,7 +264,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
}
}
}
}()
})
}
// any error breaks the import

View File

@@ -111,9 +111,7 @@ func InitStreamAggr() {
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader.
saCfgReloaderWG.Add(1)
go func() {
defer saCfgReloaderWG.Done()
saCfgReloaderWG.Go(func() {
for {
select {
case <-sighupCh:
@@ -122,7 +120,7 @@ func InitStreamAggr() {
}
reloadStreamAggrConfig()
}
}()
})
}
func reloadStreamAggrConfig() {

View File

@@ -3896,7 +3896,6 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (*
seriesCh := make(chan *series, goroutines)
errCh := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(goroutines)
go func() {
var err error
for {
@@ -3914,9 +3913,8 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (*
close(errCh)
}()
var skipProcessing atomic.Bool
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for range goroutines {
wg.Go(func() {
for s := range seriesCh {
if skipProcessing.Load() {
continue
@@ -3934,7 +3932,7 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (*
}
}
}
}()
})
}
wrapper := func() (*series, error) {
r := <-resultCh

View File

@@ -296,14 +296,12 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
// Start workers and wait until they finish the work.
var wg sync.WaitGroup
for i := range workChs {
wg.Add(1)
qtChild := qt.NewChild("worker #%d", i)
go func(workerID uint) {
timeseriesWorker(qtChild, workChs, workerID)
for workerID := range workChs {
qtChild := qt.NewChild("worker #%d", workerID)
wg.Go(func() {
timeseriesWorker(qtChild, workChs, uint(workerID))
qtChild.Done()
wg.Done()
}(uint(i))
})
}
wg.Wait()
@@ -514,12 +512,10 @@ func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr s
// Start workers and wait until they finish the work.
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID uint) {
unpackWorker(workChs, workerID)
wg.Done()
}(uint(i))
for workerID := range workers {
wg.Go(func() {
unpackWorker(workChs, uint(workerID))
})
}
wg.Wait()
@@ -1020,12 +1016,10 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
mustStop atomic.Bool
)
var wg sync.WaitGroup
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func(workerID uint) {
defer wg.Done()
for workerID := range gomaxprocs {
wg.Go(func() {
for xw := range workCh {
if err := f(&xw.mn, &xw.b, tr, workerID); err != nil {
if err := f(&xw.mn, &xw.b, tr, uint(workerID)); err != nil {
errGlobalLock.Lock()
if errGlobal == nil {
errGlobal = err
@@ -1036,7 +1030,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
xw.reset()
exportWorkPool.Put(xw)
}
}(uint(i))
})
}
// Feed workers with work

View File

@@ -103,15 +103,13 @@ func testIncrementalParallelAggr(iafc *incrementalAggrFuncContext, tssSrc, tssEx
workersCount := netstorage.MaxWorkers()
tsCh := make(chan *timeseries)
var wg sync.WaitGroup
wg.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
defer wg.Done()
for workerID := range workersCount {
wg.Go(func() {
for ts := range tsCh {
runtime.Gosched() // allow other goroutines performing the work
iafc.updateTimeseries(ts, workerID)
iafc.updateTimeseries(ts, uint(workerID))
}
}(uint(i))
})
}
for _, ts := range tssSrc {
tsCh <- ts

View File

@@ -477,22 +477,18 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
var tssFirst []*timeseries
var errFirst error
qtFirst := qt.NewChild("expr1")
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
qtFirst.Done()
}()
})
var tssSecond []*timeseries
var errSecond error
qtSecond := qt.NewChild("expr2")
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
qtSecond.Done()
}()
})
wg.Wait()
if errFirst != nil {
@@ -710,17 +706,13 @@ func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.
qt.Printf("eval function args in parallel")
var wg sync.WaitGroup
for i, e := range es {
wg.Add(1)
qtChild := qt.NewChild("eval arg %d", i)
go func(e metricsql.Expr, i int) {
defer func() {
qtChild.Done()
wg.Done()
}()
wg.Go(func() {
defer qtChild.Done()
rv, err := evalExpr(qtChild, ec, e)
rvs[i] = rv
errs[i] = err
}(e, i)
})
}
wg.Wait()
for _, err := range errs {
@@ -1019,16 +1011,14 @@ func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, time
}
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(workerID uint) {
defer wg.Done()
for workerID := range workers {
wg.Go(func() {
var tmpValues []float64
var tmpTimestamps []int64
for ts := range workChs[workerID] {
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, workerID)
tmpValues, tmpTimestamps = f(ts, tmpValues, tmpTimestamps, uint(workerID))
}
}(uint(i))
})
}
wg.Wait()
}

View File

@@ -470,9 +470,7 @@ func initStaleSnapshotsRemover(strg *storage.Storage) {
return
}
snapshotsMaxAgeDur := snapshotsMaxAge.Duration()
staleSnapshotsRemoverWG.Add(1)
go func() {
defer staleSnapshotsRemoverWG.Done()
staleSnapshotsRemoverWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second * 11)
t := time.NewTicker(d)
defer t.Stop()
@@ -484,7 +482,7 @@ func initStaleSnapshotsRemover(strg *storage.Storage) {
}
strg.MustDeleteStaleSnapshots(snapshotsMaxAgeDur)
}
}()
})
}
func stopStaleSnapshotsRemover() {

View File

@@ -18,20 +18,18 @@ func TestSlice_NoInit(t *testing.T) {
}
var wg sync.WaitGroup
for workerID := uint(0); workerID < workersCount; workerID++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for workerID := range workersCount {
wg.Go(func() {
for i := 0; i < loopsPerWorker; i++ {
bb := s.Get(workerID)
bb := s.Get(uint(workerID))
fmt.Fprintf(bb, "item %d at worker %d\n", i, workerID)
}
}(workerID)
})
}
wg.Wait()
bbs = s.All()
for workerID := uint(0); workerID < workersCount; workerID++ {
for workerID := range workersCount {
var bbExpected bytes.Buffer
for i := 0; i < loopsPerWorker; i++ {
fmt.Fprintf(&bbExpected, "item %d at worker %d\n", i, workerID)
@@ -61,20 +59,18 @@ func TestSlice_Init(t *testing.T) {
}
var wg sync.WaitGroup
for workerID := uint(0); workerID < workersCount; workerID++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for workerID := range workersCount {
wg.Go(func() {
for i := 0; i < loopsPerWorker; i++ {
bb := s.Get(workerID)
bb := s.Get(uint(workerID))
fmt.Fprintf(bb, "item %d at worker %d\n", i, workerID)
}
}(workerID)
})
}
wg.Wait()
bbs = s.All()
for workerID := uint(0); workerID < workersCount; workerID++ {
for workerID := range workersCount {
bbExpected := bytes.NewBufferString(prefix)
for i := 0; i < loopsPerWorker; i++ {
fmt.Fprintf(bbExpected, "item %d at worker %d\n", i, workerID)

View File

@@ -73,10 +73,8 @@ func runParallelPerPathInternal(ctx context.Context, concurrency int, perPath ma
// Start workers
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for range concurrency {
wg.Go(func() {
for parts := range workCh {
select {
case <-ctxLocal.Done():
@@ -85,7 +83,7 @@ func runParallelPerPathInternal(ctx context.Context, concurrency int, perPath ma
}
resultCh <- f(parts)
}
}()
})
}
// Feed workers with work.
@@ -126,10 +124,8 @@ func runParallelInternal(concurrency int, parts []common.Part, f func(p common.P
// Start workers
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for range concurrency {
wg.Go(func() {
for p := range workCh {
select {
case <-stopCh:
@@ -138,7 +134,7 @@ func runParallelInternal(concurrency int, parts []common.Part, f func(p common.P
}
resultCh <- f(p)
}
}()
})
}
// Feed workers with work.

View File

@@ -29,11 +29,7 @@ func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter {
var mu sync.Mutex
bl.c = sync.NewCond(&mu)
bl.stopCh = make(chan struct{})
bl.wg.Add(1)
go func() {
defer bl.wg.Done()
bl.perSecondUpdater()
}()
bl.wg.Go(bl.perSecondUpdater)
return &bl
}

View File

@@ -133,12 +133,10 @@ func TestCacheConcurrentAccess(_ *testing.T) {
workers := 5
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(worker int) {
defer wg.Done()
for worker := range workers {
wg.Go(func() {
testCacheSetGet(c, worker)
}(i)
})
}
wg.Wait()
}

View File

@@ -24,9 +24,7 @@ func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
stopCh: make(chan struct{}),
}
l.v.Store(newLimiter(maxItems))
l.wg.Add(1)
go func() {
defer l.wg.Done()
l.wg.Go(func() {
t := time.NewTicker(refreshInterval)
defer t.Stop()
for {
@@ -37,7 +35,7 @@ func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
return
}
}
}()
})
return l
}

View File

@@ -39,16 +39,10 @@ func (pfc *ParallelFileCreator) Run() {
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfc.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, wc *WriteCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*wc = MustCreate(dstPath, nocache)
}(task.dstPath, task.wc, task.nocache)
wg.Go(func() {
*task.wc = MustCreate(task.dstPath, task.nocache)
<-concurrencyCh
})
}
wg.Wait()
}
@@ -84,16 +78,10 @@ func (pfo *ParallelFileOpener) Run() {
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pfo.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *ReadCloser, nocache bool) {
defer func() {
wg.Done()
<-concurrencyCh
}()
*rc = MustOpen(path, nocache)
}(task.path, task.rc, task.nocache)
wg.Go(func() {
*task.rc = MustOpen(task.path, task.nocache)
<-concurrencyCh
})
}
wg.Wait()
}
@@ -127,23 +115,19 @@ func (psw *ParallelStreamWriter) Run() {
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range psw.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dstPath string, src io.WriterTo) {
defer func() {
wg.Done()
<-concurrencyCh
}()
f := MustCreate(dstPath, false)
if _, err := src.WriteTo(f); err != nil {
wg.Go(func() {
f := MustCreate(task.dstPath, false)
if _, err := task.src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemovePath(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", dstPath, err)
logger.Panicf("FATAL: cannot write data to %q: %s", task.dstPath, err)
}
f.MustClose()
}(task.dstPath, task.src)
<-concurrencyCh
})
}
wg.Wait()
}

View File

@@ -59,16 +59,12 @@ func MustRemoveDir(dirPath string) {
dirEntryPath := filepath.Join(dirPath, name)
concurrencyCh <- struct{}{}
wg.Add(1)
go func(dirEntryPath string) {
defer func() {
wg.Done()
<-concurrencyCh
}()
wg.Go(func() {
if err := os.RemoveAll(dirEntryPath); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
}(dirEntryPath)
<-concurrencyCh
})
}
wg.Wait()

View File

@@ -37,17 +37,13 @@ func (pro *ParallelReaderAtOpener) Run() {
concurrencyCh := fsutil.GetConcurrencyCh()
for _, task := range pro.tasks {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(path string, rc *MustReadAtCloser, fileSize *uint64) {
defer func() {
wg.Done()
<-concurrencyCh
}()
wg.Go(func() {
*task.rc = MustOpenReaderAt(task.path)
*task.fileSize = MustFileSize(task.path)
*rc = MustOpenReaderAt(path)
*fileSize = MustFileSize(path)
}(task.path, task.rc, task.fileSize)
<-concurrencyCh
})
}
wg.Wait()
}
@@ -66,14 +62,10 @@ func MustCloseParallel(cs []MustCloser) {
concurrencyCh := fsutil.GetConcurrencyCh()
for _, c := range cs {
concurrencyCh <- struct{}{}
wg.Add(1)
go func(c MustCloser) {
defer func() {
wg.Done()
<-concurrencyCh
}()
wg.Go(func() {
c.MustClose()
}(c)
<-concurrencyCh
})
}
wg.Wait()
}

View File

@@ -226,15 +226,13 @@ func Stop(addrs []string) error {
if addr == "" {
continue
}
wg.Add(1)
go func(addr string) {
wg.Go(func() {
if err := stop(addr); err != nil {
errGlobalLock.Lock()
errGlobal = err
errGlobalLock.Unlock()
}
wg.Done()
}(addr)
})
}
wg.Wait()

View File

@@ -62,18 +62,17 @@ func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reade
lnUDP: lnUDP,
}
s.cm.Init("graphite")
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
s.serveTCP(insertHandler)
logger.Infof("stopped TCP Graphite server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
})
s.wg.Go(func() {
s.serveUDP(insertHandler)
logger.Infof("stopped UDP Graphite server at %q", addr)
}()
})
return s
}
@@ -115,19 +114,17 @@ func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
wg.Go(func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP Graphite conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
})
}
wg.Wait()
}
@@ -136,9 +133,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
@@ -169,7 +164,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
continue
}
}
}()
})
}
wg.Wait()
}

View File

@@ -60,18 +60,17 @@ func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reade
lnUDP: lnUDP,
}
s.cm.Init("influx")
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
s.serveTCP(insertHandler)
logger.Infof("stopped TCP InfluxDB server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
})
s.wg.Go(func() {
s.serveUDP(insertHandler)
logger.Infof("stopped UDP InfluxDB server at %q", addr)
}()
})
return s
}
@@ -113,19 +112,17 @@ func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
wg.Go(func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP InfluxDB conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
})
}
wg.Wait()
}
@@ -134,9 +131,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
@@ -167,7 +162,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
continue
}
}
}()
})
}
wg.Wait()
}

View File

@@ -33,13 +33,11 @@ func newListenerSwitch(ln net.Listener) *listenerSwitch {
}
ls.telnetConnsCh = make(chan net.Conn)
ls.httpConnsCh = make(chan net.Conn)
ls.wg.Add(1)
go func() {
ls.wg.Go(func() {
ls.worker()
close(ls.telnetConnsCh)
close(ls.httpConnsCh)
ls.wg.Done()
}()
})
return ls
}
@@ -78,34 +76,31 @@ func (ls *listenerSwitch) worker() {
return
}
wg.Add(1)
go func(conn net.Conn) {
defer wg.Done()
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
wg.Go(func() {
if err := c.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
logger.Errorf("listenerSwitch: cannot set read deadline for the underlying connection %q: %s", ls.ln.Addr(), err)
_ = conn.Close()
_ = c.Close()
return
}
// Block on reading the first byte. This determines the connection type and decides whether it goes to ls.telnetConnsCh or ls.httpConnsCh.
var buf [1]byte
if _, err := io.ReadFull(conn, buf[:]); err != nil {
if _, err := io.ReadFull(c, buf[:]); err != nil {
logger.Errorf("listenerSwitch: cannot read one byte from the underlying connection for %q: %s", ls.ln.Addr(), err)
_ = conn.Close()
_ = c.Close()
return
}
if err := conn.SetReadDeadline(time.Time{}); err != nil {
if err := c.SetReadDeadline(time.Time{}); err != nil {
logger.Errorf("listenerSwitch: cannot reset read deadline for the underlying connection %q: %s", ls.ln.Addr(), err)
_ = conn.Close()
_ = c.Close()
return
}
// It is expected that both listeners - http and telnet consume incoming connections as soon as possible,
// so the below code shouldn't block for extended periods of time.
pc := &peekedConn{
Conn: conn,
Conn: c,
firstChar: buf[0],
}
if buf[0] == 'p' {
@@ -115,7 +110,7 @@ func (ls *listenerSwitch) worker() {
// Assume the request starts with `POST`.
ls.httpConnsCh <- pc
}
}(c)
})
}
}

View File

@@ -70,24 +70,20 @@ func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io
lnUDP: lnUDP,
}
s.cm.Init("opentsdb")
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
s.serveTelnet(lnTelnet, telnetInsertHandler)
logger.Infof("stopped TCP telnet OpenTSDB server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
})
s.wg.Go(func() {
httpServer.Wait()
// Do not log when httpServer is stopped, since this is logged by the server itself.
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
})
s.wg.Go(func() {
s.serveUDP(telnetInsertHandler)
logger.Infof("stopped UDP OpenTSDB server at %q", addr)
}()
})
return s
}
@@ -133,19 +129,17 @@ func (s *Server) serveTelnet(ln net.Listener, insertHandler func(r io.Reader) er
_ = c.Close()
break
}
wg.Add(1)
go func() {
wg.Go(func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP OpenTSDB conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
})
}
wg.Wait()
}
@@ -154,9 +148,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
@@ -187,7 +179,7 @@ func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
continue
}
}
}()
})
}
wg.Wait()
}

View File

@@ -56,9 +56,7 @@ func MustServe(ln net.Listener, insertHandler func(r *http.Request) error) *Serv
s: hs,
ln: ln,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.wg.Go(func() {
err := s.s.Serve(s.ln)
if err == http.ErrServerClosed {
return
@@ -66,7 +64,7 @@ func MustServe(ln net.Listener, insertHandler func(r *http.Request) error) *Serv
if err != nil {
logger.Fatalf("error serving HTTP OpenTSDB at %q: %s", s.ln.Addr(), err)
}
}()
})
return s
}

View File

@@ -128,12 +128,10 @@ func TestCacheConcurrentAccess(_ *testing.T) {
workers := 5
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(worker int) {
defer wg.Done()
for worker := range workers {
wg.Go(func() {
testCacheSetGet(c, worker)
}(i)
})
}
wg.Wait()
}

View File

@@ -401,11 +401,7 @@ func (tb *Table) startInmemoryPartsMergerLocked() {
return
default:
}
tb.wg.Add(1)
go func() {
tb.inmemoryPartsMerger()
tb.wg.Done()
}()
tb.wg.Go(tb.inmemoryPartsMerger)
}
func (tb *Table) startFilePartsMergers() {
@@ -422,27 +418,15 @@ func (tb *Table) startFilePartsMergerLocked() {
return
default:
}
tb.wg.Add(1)
go func() {
tb.filePartsMerger()
tb.wg.Done()
}()
tb.wg.Go(tb.filePartsMerger)
}
func (tb *Table) startPendingItemsFlusher() {
tb.wg.Add(1)
go func() {
tb.pendingItemsFlusher()
tb.wg.Done()
}()
tb.wg.Go(tb.pendingItemsFlusher)
}
func (tb *Table) startInmemoryPartsFlusher() {
tb.wg.Add(1)
go func() {
tb.inmemoryPartsFlusher()
tb.wg.Done()
}()
tb.wg.Go(tb.inmemoryPartsFlusher)
}
func (tb *Table) startFlushCallbackWorker() {
@@ -450,8 +434,7 @@ func (tb *Table) startFlushCallbackWorker() {
return
}
tb.wg.Add(1)
go func() {
tb.wg.Go(func() {
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
d := timeutil.AddJitterToDuration(time.Second * 10)
@@ -461,7 +444,6 @@ func (tb *Table) startFlushCallbackWorker() {
case <-tb.stopCh:
tc.Stop()
tb.flushCallback()
tb.wg.Done()
return
case <-tc.C:
if tb.needFlushCallbackCall.CompareAndSwap(true, false) {
@@ -469,7 +451,7 @@ func (tb *Table) startFlushCallbackWorker() {
}
}
}
}()
})
}
var (
@@ -712,15 +694,10 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
if err := tb.mergeParts(pwsChunk, nil, true); err != nil {
wg.Go(func() {
if err := tb.mergeParts(pwsToMerge, nil, true); err != nil {
// There is no need for errors.Is(err, errForciblyStopped) check here, since stopCh=nil is passed to mergeParts.
errGlobalLock.Lock()
if errGlobal == nil {
@@ -728,7 +705,9 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error {
}
errGlobalLock.Unlock()
}
}(pwsToMerge)
<-inmemoryPartsConcurrencyCh
})
pws = pwsRemaining
}
wg.Wait()
@@ -866,14 +845,12 @@ func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool)
if n > len(ibs) {
n = len(ibs)
}
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(ibsChunk []*inmemoryBlock) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
ibsChunk := ibs[:n]
ibs = ibs[n:]
wg.Go(func() {
if pw := tb.createInmemoryPart(ibsChunk); pw != nil {
pwsLock.Lock()
pws = append(pws, pw)
@@ -883,8 +860,9 @@ func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool)
for i := range ibsChunk {
ibsChunk[i] = nil
}
}(ibs[:n])
ibs = ibs[n:]
<-inmemoryPartsConcurrencyCh
})
}
wg.Wait()
putWaitGroup(wg)
@@ -961,20 +939,17 @@ func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
pw := tb.mustMergeInmemoryPartsFinal(pwsChunk)
wg.Go(func() {
pw := tb.mustMergeInmemoryPartsFinal(pwsToMerge)
pwsResultLock.Lock()
pwsResult = append(pwsResult, pw)
pwsResultLock.Unlock()
}(pwsToMerge)
<-inmemoryPartsConcurrencyCh
})
pws = pwsRemaining
}
wg.Wait()

View File

@@ -268,10 +268,8 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) {
const goroutinesCount = 6
workCh := make(chan int, itemsCount)
var wg sync.WaitGroup
for i := 0; i < goroutinesCount; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for n := range goroutinesCount {
wg.Go(func() {
r := rand.New(rand.NewSource(int64(n)))
for range workCh {
item := getRandomBytes(r)
@@ -280,9 +278,9 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) {
}
tb.AddItems([][]byte{item})
}
}(i)
})
}
for i := 0; i < itemsCount; i++ {
for i := range itemsCount {
workCh <- i
}
close(workCh)

View File

@@ -210,10 +210,8 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
// Start readers
var readersWG sync.WaitGroup
for i := 0; i < 10; i++ {
readersWG.Add(1)
go func() {
defer readersWG.Done()
for range 10 {
readersWG.Go(func() {
for {
data, ok := fq.MustReadBlock(nil)
if !ok {
@@ -226,22 +224,20 @@ func TestFastQueueReadWriteConcurrent(t *testing.T) {
delete(blocksMap, string(data))
blocksMapLock.Unlock()
}
}()
})
}
// Start writers
blocksCh := make(chan string)
var writersWG sync.WaitGroup
for i := 0; i < 10; i++ {
writersWG.Add(1)
go func() {
defer writersWG.Done()
for range 10 {
writersWG.Go(func() {
for block := range blocksCh {
if !fq.TryWriteBlock([]byte(block)) {
panic(fmt.Errorf("TryWriteBlock must return true in this context"))
}
}
}()
})
}
// feed writers

View File

@@ -133,24 +133,20 @@ func enrichVirtualMachinesNetworkInterfaces(ac *apiConfig, vms []virtualMachine)
workCh := make(chan *virtualMachine, concurrency)
resultCh := make(chan error, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for range concurrency {
wg.Go(func() {
for vm := range workCh {
err := enrichVMNetworkInterfaces(ac, vm)
resultCh <- err
}
}()
})
}
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for i := range vms {
workCh <- &vms[i]
}
close(workCh)
}()
})
var firstErr error
for range vms {
err := <-resultCh

View File

@@ -415,9 +415,9 @@ func (gw *groupWatcher) getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey map[
limiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for key, o := range objectsByKey {
labelss := o.getTargetLabels(gw)
wg.Add(1)
limiterCh <- struct{}{}
go func(key string, labelss []*promutil.Labels) {
wg.Go(func() {
for aw, e := range swosByAPIWatcher {
swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labelss)
e.mu.Lock()
@@ -425,9 +425,9 @@ func (gw *groupWatcher) getScrapeWorkObjectsByAPIWatcherLocked(objectsByKey map[
e.mu.Unlock()
}
putLabelssToPool(labelss)
wg.Done()
<-limiterCh
}(key, labelss)
})
}
wg.Wait()
return swosByAPIWatcher

View File

@@ -94,11 +94,9 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
client.Stop()
return nil, fmt.Errorf("cannot discover Kuma targets: %w", err)
}
cfg.wg.Add(1)
go func() {
defer cfg.wg.Done()
cfg.wg.Go(func() {
cfg.runTargetsWatcher(ctx)
}()
})
return cfg, nil
}

View File

@@ -68,11 +68,9 @@ func CheckConfig() error {
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
mustInitClusterMemberID()
globalStopChan = make(chan struct{})
scraperWG.Add(1)
go func() {
defer scraperWG.Done()
scraperWG.Go(func() {
runScraper(*promscrapeConfigFile, pushData, globalStopChan)
}()
})
}
// Stop stops Prometheus scraper.
@@ -245,11 +243,9 @@ func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrap
discoveryDuration: metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_service_discovery_duration_seconds{type=%q}", name)),
}
scs.wg.Add(1)
go func() {
defer scs.wg.Done()
scs.wg.Go(func() {
scfg.run(scs.globalStopCh)
}()
})
scs.scfgs = append(scs.scfgs, scfg)
}
@@ -417,18 +413,16 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
}
sg.activeScrapers.Inc()
sg.scrapersStarted.Inc()
sg.wg.Add(1)
tsmGlobal.Register(&sc.sw)
go func() {
sg.wg.Go(func() {
defer func() {
sg.wg.Done()
close(sc.stoppedCh)
}()
sc.sw.run(sc.ctx.Done(), sg.globalStopCh)
tsmGlobal.Unregister(&sc.sw)
sg.activeScrapers.Dec()
sg.scrapersStopped.Inc()
}()
})
key := sw.key()
sg.m[key] = sc
additionsCount++

View File

@@ -72,9 +72,7 @@ func TestLabelsCompressorConcurrent(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
series := newTestSeries(100, 20)
for n, labels := range series {
sExpected := labelsToString(labels)
@@ -90,7 +88,7 @@ func TestLabelsCompressorConcurrent(t *testing.T) {
panic(fmt.Errorf("unexpected result on iteration %d; got %s; want %s", i, sResult, sExpected))
}
}
}()
})
}
wg.Wait()

View File

@@ -27,14 +27,12 @@ func StartUnmarshalWorkers() {
}
gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer unmarshalWorkersWG.Done()
for range gomaxprocs {
unmarshalWorkersWG.Go(func() {
for uw := range unmarshalWorkCh {
uw.Unmarshal()
}
}()
})
}
}

View File

@@ -80,20 +80,18 @@ func StopAndPush() {
extraLabels := strings.Join(*pushExtraLabel, ",")
for _, pu := range *pushURL {
// push to all destinations in parallel to speed up shutdown
wg.Add(1)
go func(pushURL string) {
wg.Go(func() {
ctxLocal, cancel := context.WithTimeout(context.Background(), *pushInterval)
opts := &metrics.PushOptions{
ExtraLabels: extraLabels,
Headers: *pushHeader,
DisableCompression: *disableCompression,
}
if err := metrics.PushMetricsExt(ctxLocal, pushURL, appmetrics.WritePrometheusMetrics, opts); err != nil {
logger.Errorf("failed to push metrics to %q: %s", pushURL, err)
if err := metrics.PushMetricsExt(ctxLocal, pu, appmetrics.WritePrometheusMetrics, opts); err != nil {
logger.Errorf("failed to push metrics to %q: %s", pu, err)
}
cancel()
wg.Done()
}(pu)
})
}
wg.Wait()
logger.Infof("pushing metrics on shutdown finished in %.3f seconds", time.Since(startTime).Seconds())

View File

@@ -153,13 +153,11 @@ func TestTraceConcurrent(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
child := qt.NewChild("child %d", i)
wg.Add(1)
go func() {
wg.Go(func() {
for j := 0; j < 100; j++ {
child.Printf("message %d", j)
}
wg.Done()
}()
})
}
qt.Done()
// Verify that it is safe to call qt.String() when child traces aren't done yet

View File

@@ -114,16 +114,14 @@ func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
defer dmc.MustStop()
var wg sync.WaitGroup
for i := range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for id := uint64(i * numMetrics); id < uint64((i+1)*numMetrics); id++ {
dmc.Set(date, id)
if !dmc.Has(date, id) {
panic(fmt.Errorf("dmc.Has(metricID=%d): unexpected cache miss after adding the entry to cache", id))
}
}
}()
})
}
wg.Wait()
}

View File

@@ -540,13 +540,10 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tr
lns := make(map[string]struct{})
qt = qt.NewChild("parallel search for label names: filters=%s, timeRange=%s", tfss, &tr)
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
qtChild := qt.NewChild("search for label names: filters=%s, date=%s", tfss, dateToString(date))
go func(date uint64) {
defer func() {
qtChild.Done()
wg.Done()
}()
wg.Go(func() {
defer qtChild.Done()
isLocal := is.db.getIndexSearch(is.deadline)
lnsLocal, err := isLocal.searchLabelNamesWithFiltersOnDate(qtChild, tfss, date, maxLabelNames, maxMetrics)
is.db.putIndexSearch(isLocal)
@@ -565,7 +562,7 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tr
for k := range lnsLocal {
lns[k] = struct{}{}
}
}(date)
})
}
wg.Wait()
putWaitGroup(wg)
@@ -801,13 +798,10 @@ func (is *indexSearch) searchLabelValuesOnTimeRange(qt *querytracer.Tracer, labe
lvs := make(map[string]struct{})
qt = qt.NewChild("parallel search for label values: labelName=%q, filters=%s, timeRange=%s", labelName, tfss, &tr)
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
qtChild := qt.NewChild("search for label values: filters=%s, date=%s", tfss, dateToString(date))
go func(date uint64) {
defer func() {
qtChild.Done()
wg.Done()
}()
wg.Go(func() {
defer qtChild.Done()
isLocal := is.db.getIndexSearch(is.deadline)
lvsLocal, err := isLocal.searchLabelValuesOnDate(qtChild, labelName, tfss, date, maxLabelValues, maxMetrics)
is.db.putIndexSearch(isLocal)
@@ -826,7 +820,7 @@ func (is *indexSearch) searchLabelValuesOnTimeRange(qt *querytracer.Tracer, labe
for v := range lvsLocal {
lvs[v] = struct{}{}
}
}(date)
})
}
wg.Wait()
putWaitGroup(wg)
@@ -997,9 +991,8 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tr TimeRange, tagKey,
var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
tvss := make(map[string]struct{})
for minDate <= maxDate {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
date := minDate
wg.Go(func() {
isLocal := is.db.getIndexSearch(is.deadline)
tvssLocal, err := isLocal.searchTagValueSuffixesForDate(date, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
is.db.putIndexSearch(isLocal)
@@ -1018,7 +1011,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tr TimeRange, tagKey,
for k := range tvssLocal {
tvss[k] = struct{}{}
}
}(minDate)
})
minDate++
}
wg.Wait()
@@ -2478,13 +2471,11 @@ func (is *indexSearch) updateMetricIDsForDateRange(qt *querytracer.Tracer, metri
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
for minDate <= maxDate {
qtChild := qt.NewChild("parallel thread for date=%s", dateToString(minDate))
wg.Add(1)
go func(date uint64) {
defer func() {
qtChild.Done()
wg.Done()
}()
date := minDate
qtChild := qt.NewChild("parallel thread for date=%s", dateToString(date))
wg.Go(func() {
defer qtChild.Done()
isLocal := is.db.getIndexSearch(is.deadline)
m, err := isLocal.getMetricIDsForDateAndFilters(qtChild, date, tfs, maxMetrics)
is.db.putIndexSearch(isLocal)
@@ -2501,7 +2492,7 @@ func (is *indexSearch) updateMetricIDsForDateRange(qt *querytracer.Tracer, metri
if metricIDs.Len() < maxMetrics {
metricIDs.UnionMayOwn(m)
}
}(minDate)
})
minDate++
}
wg.Wait()

View File

@@ -1996,13 +1996,12 @@ func TestIndexSearchLegacyContainsTimeRange_Concurrent(t *testing.T) {
concurrency := int64(100)
var wg sync.WaitGroup
for i := range concurrency {
wg.Add(1)
go func(ts int64) {
ts := minTimestamp + msecPerDay*i
wg.Go(func() {
is := idb.getIndexSearch(noDeadline)
_ = is.legacyContainsTimeRange(TimeRange{ts, ts})
idb.putIndexSearch(is)
wg.Done()
}(minTimestamp + msecPerDay*i)
})
}
wg.Wait()

View File

@@ -38,24 +38,20 @@ func TestMetricIDCache_SetHas_Concurrent(t *testing.T) {
writeCh := make(chan uint64, concurrency)
readCh := make(chan uint64, concurrency)
for range concurrency {
writeWG.Add(1)
go func() {
writeWG.Go(func() {
for metricID := range writeCh {
c.Set(metricID)
readCh <- metricID
}
writeWG.Done()
}()
})
readWG.Add(1)
go func() {
readWG.Go(func() {
for metricID := range readCh {
if !c.Has(metricID) {
panic(fmt.Sprintf("metricID not found: %d", metricID))
}
}
readWG.Done()
}()
})
}
metricIDMin := uint64(time.Now().UnixNano())

View File

@@ -191,9 +191,7 @@ func TestMetricsTrackerConcurrent(t *testing.T) {
var wg sync.WaitGroup
for range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for _, op := range ops {
switch op.o {
case 'i':
@@ -202,7 +200,7 @@ func TestMetricsTrackerConcurrent(t *testing.T) {
umt.RegisterQueryRequest(0, 0, []byte(op.mg))
}
}
}()
})
}
wg.Wait()

View File

@@ -54,8 +54,7 @@ func NewStorage(maxSizeBytes int) *Storage {
maxSizeBytes: int64(maxShardBytes),
}
}
s.wg.Add(1)
go s.cleaner()
s.wg.Go(s.cleaner)
return s
}
@@ -184,7 +183,6 @@ func (s *Storage) UpdateMetrics(dst *MetadataStorageMetrics) {
}
func (s *Storage) cleaner() {
defer s.wg.Done()
d := timeutil.AddJitterToDuration(time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()

View File

@@ -610,21 +610,18 @@ func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
pws := make([]*partWrapper, 0, len(rowss))
wg := getWaitGroup()
for _, rows := range rowss {
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(rowsChunk []rawRow) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
pw := pt.createInmemoryPart(rowsChunk)
wg.Go(func() {
pw := pt.createInmemoryPart(rows)
if pw != nil {
pwsLock.Lock()
pws = append(pws, pw)
pwsLock.Unlock()
}
}(rows)
<-inmemoryPartsConcurrencyCh
})
}
wg.Wait()
putWaitGroup(wg)
@@ -782,15 +779,14 @@ func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
wg.Go(func() {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
pw := pt.mustMergeInmemoryPartsFinal(pwsChunk)
pw := pt.mustMergeInmemoryPartsFinal(pwsToMerge)
if pw == nil {
return
}
@@ -798,7 +794,7 @@ func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
pwsResultLock.Lock()
pwsResult = append(pwsResult, pw)
pwsResultLock.Unlock()
}(pwsToMerge)
})
pws = pwsRemaining
}
wg.Wait()
@@ -1018,11 +1014,7 @@ func (pt *partition) startInmemoryPartsMergerLocked() {
return
default:
}
pt.wg.Add(1)
go func() {
pt.inmemoryPartsMerger()
pt.wg.Done()
}()
pt.wg.Go(pt.inmemoryPartsMerger)
}
func (pt *partition) startSmallPartsMergers() {
@@ -1039,11 +1031,7 @@ func (pt *partition) startSmallPartsMergerLocked() {
return
default:
}
pt.wg.Add(1)
go func() {
pt.smallPartsMerger()
pt.wg.Done()
}()
pt.wg.Go(pt.smallPartsMerger)
}
func (pt *partition) startBigPartsMergers() {
@@ -1060,35 +1048,19 @@ func (pt *partition) startBigPartsMergerLocked() {
return
default:
}
pt.wg.Add(1)
go func() {
pt.bigPartsMerger()
pt.wg.Done()
}()
pt.wg.Go(pt.bigPartsMerger)
}
func (pt *partition) startPendingRowsFlusher() {
pt.wg.Add(1)
go func() {
pt.pendingRowsFlusher()
pt.wg.Done()
}()
pt.wg.Go(pt.pendingRowsFlusher)
}
func (pt *partition) startInmemoryPartsFlusher() {
pt.wg.Add(1)
go func() {
pt.inmemoryPartsFlusher()
pt.wg.Done()
}()
pt.wg.Go(pt.inmemoryPartsFlusher)
}
func (pt *partition) startStalePartsRemover() {
pt.wg.Add(1)
go func() {
pt.stalePartsRemover()
pt.wg.Done()
}()
pt.wg.Go(pt.stalePartsRemover)
}
var (
@@ -1250,22 +1222,19 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
concurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-concurrencyCh
wg.Done()
}()
if err := pt.mergeParts(pwsChunk, stopCh, true, useSparseCache); err != nil && !errors.Is(err, errForciblyStopped) {
wg.Go(func() {
if err := pt.mergeParts(pwsToMerge, stopCh, true, useSparseCache); err != nil && !errors.Is(err, errForciblyStopped) {
errGlobalLock.Lock()
if errGlobal == nil {
errGlobal = err
}
errGlobalLock.Unlock()
}
}(pwsToMerge)
<-concurrencyCh
})
pws = pwsRemaining
}
wg.Wait()

View File

@@ -746,9 +746,7 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
}
}
f()
s.freeDiskSpaceWatcherWG.Add(1)
go func() {
defer s.freeDiskSpaceWatcherWG.Done()
s.freeDiskSpaceWatcherWG.Go(func() {
d := timeutil.AddJitterToDuration(time.Second)
ticker := time.NewTicker(d)
defer ticker.Stop()
@@ -760,7 +758,7 @@ func (s *Storage) startFreeDiskSpaceWatcher() {
f()
}
}
}()
})
}
func (s *Storage) notifyReadWriteMode() {
@@ -769,19 +767,11 @@ func (s *Storage) notifyReadWriteMode() {
}
func (s *Storage) startCurrHourMetricIDsUpdater() {
s.currHourMetricIDsUpdaterWG.Add(1)
go func() {
s.currHourMetricIDsUpdater()
s.currHourMetricIDsUpdaterWG.Done()
}()
s.currHourMetricIDsUpdaterWG.Go(s.currHourMetricIDsUpdater)
}
func (s *Storage) startNextDayMetricIDsUpdater() {
s.nextDayMetricIDsUpdaterWG.Add(1)
go func() {
s.nextDayMetricIDsUpdater()
s.nextDayMetricIDsUpdaterWG.Done()
}()
s.nextDayMetricIDsUpdaterWG.Go(s.nextDayMetricIDsUpdater)
}
func (s *Storage) currHourMetricIDsUpdater() {
@@ -1119,12 +1109,10 @@ func searchAndMerge[T any](qt *querytracer.Tracer, s *Storage, tr TimeRange, sea
for i, idb := range idbs {
searchTR := s.adjustTimeRange(tr, idb.tr)
qtChild := qtSearch.NewChild("search indexDB %s: timeRange=%v", idb.name, &searchTR)
wg.Add(1)
go func(qt *querytracer.Tracer, i int, idb *indexDB, tr TimeRange) {
defer wg.Done()
defer qt.Done()
data[i], errs[i] = search(qt, idb, tr)
}(qtChild, i, idb, searchTR)
wg.Go(func() {
data[i], errs[i] = search(qtChild, idb, searchTR)
qtChild.Done()
})
}
wg.Wait()
qtSearch.Done()

View File

@@ -173,11 +173,7 @@ func (s *Storage) startLegacyRetentionWatcher() {
if !s.hasLegacyIndexDBs() {
return
}
s.legacyRetentionWatcherWG.Add(1)
go func() {
s.legacyRetentionWatcher()
s.legacyRetentionWatcherWG.Done()
}()
s.legacyRetentionWatcherWG.Go(s.legacyRetentionWatcher)
}
func (s *Storage) legacyRetentionWatcher() {

View File

@@ -1140,18 +1140,16 @@ func testLegacyRotateIndexDB(t *testing.T, mrs []MetricRow, op func(s *Storage))
var wg sync.WaitGroup
stop := make(chan struct{})
for range 100 {
wg.Add(1)
go func() {
wg.Go(func() {
for {
select {
case <-stop:
wg.Done()
return
default:
}
op(s)
}
}()
})
}
for range 10 {

View File

@@ -3706,13 +3706,11 @@ func testDoConcurrently(s *Storage, op func(s *Storage, mrs []MetricRow), concur
var wg sync.WaitGroup
mrsCh := make(chan []MetricRow)
for range concurrency {
wg.Add(1)
go func() {
wg.Go(func() {
for mrs := range mrsCh {
op(s, mrs)
}
wg.Done()
}()
})
}
n := 1

View File

@@ -421,11 +421,7 @@ func (tb *table) getMinMaxTimestamps() (int64, int64) {
}
func (tb *table) startRetentionWatcher() {
tb.retentionWatcherWG.Add(1)
go func() {
tb.retentionWatcher()
tb.retentionWatcherWG.Done()
}()
tb.retentionWatcherWG.Go(tb.retentionWatcher)
}
func (tb *table) retentionWatcher() {
@@ -469,11 +465,7 @@ func (tb *table) retentionWatcher() {
}
func (tb *table) startHistoricalMergeWatcher() {
tb.historicalMergeWatcherWG.Add(1)
go func() {
tb.historicalMergeWatcher()
tb.historicalMergeWatcherWG.Done()
}()
tb.historicalMergeWatcherWG.Go(tb.historicalMergeWatcher)
}
func (tb *table) historicalMergeWatcher() {
@@ -655,14 +647,9 @@ func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, indexDBPath stri
var wg sync.WaitGroup
concurrencyLimiterCh := make(chan struct{}, cgroup.AvailableCPUs())
for ptName := range ptNames {
wg.Add(1)
concurrencyLimiterCh <- struct{}{}
go func(ptName string) {
defer func() {
<-concurrencyLimiterCh
wg.Done()
}()
wg.Go(func() {
smallPartsPath := filepath.Join(smallPartitionsPath, ptName)
bigPartsPath := filepath.Join(bigPartitionsPath, ptName)
indexDBPartsPath := filepath.Join(indexDBPath, ptName)
@@ -671,7 +658,9 @@ func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, indexDBPath stri
ptsLock.Lock()
pts = append(pts, pt)
ptsLock.Unlock()
}(ptName)
<-concurrencyLimiterCh
})
}
wg.Wait()

View File

@@ -75,9 +75,8 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
timestamp.Store(uint64(startTimestamp))
var wg sync.WaitGroup
for k := 0; k < cgroup.AvailableCPUs(); k++ {
wg.Add(1)
go func(n int) {
for n := range cgroup.AvailableCPUs() {
wg.Go(func() {
rng := rand.New(rand.NewSource(int64(n)))
rows := make([]rawRow, rowsPerInsert)
value := float64(100)
@@ -94,8 +93,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
}
tb.MustAddRows(rows)
}
wg.Done()
}(k)
})
}
wg.Wait()

View File

@@ -72,15 +72,13 @@ func TestGetPartition_concurrent(t *testing.T) {
for ts := begin; ts < limit; ts += msecPerDay {
var wg sync.WaitGroup
for range 100 {
wg.Add(1)
go func() {
wg.Go(func() {
ptw := s.tb.MustGetPartition(ts)
s.tb.PutPartition(ptw)
ptw = s.tb.GetPartition(ts)
s.tb.PutPartition(ptw)
wg.Done()
}()
})
}
wg.Wait()
}

View File

@@ -124,18 +124,14 @@ func (ctx *dedupFlushCtx) reset() {
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, isGreen bool) {
var wg sync.WaitGroup
for i := range da.shards {
for shardIdx := range da.shards {
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(shard *dedupAggrShard) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
wg.Go(func() {
ctx := getDedupFlushCtx(deleteDeadline, isGreen)
shard.flush(ctx, f)
da.shards[shardIdx].flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
<-flushConcurrencyCh
})
}
wg.Wait()
}

View File

@@ -63,9 +63,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
for i := 0; i < 10; i++ {
samples := make([]pushSample, seriesCount)
for j := range samples {
@@ -75,7 +73,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
}
da.pushSamples(samples, 0, false)
}
}()
})
}
wg.Wait()
}

View File

@@ -80,11 +80,9 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.wg.Go(func() {
d.runFlusher(pushFunc)
}()
})
return d
}

View File

@@ -374,14 +374,12 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
wg.Add(len(a.as))
for _, aggr := range a.as {
concurrencyChan <- struct{}{}
go func(aggr *aggregator) {
wg.Go(func() {
aggr.Push(tss, matchIdxs)
wg.Done()
<-concurrencyChan
}(aggr)
})
}
wg.Wait()
@@ -718,11 +716,9 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
a.wg.Add(1)
go func() {
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
a.wg.Done()
}()
})
return a, nil
}

View File

@@ -16,13 +16,11 @@ func BenchmarkCounterMapGrowth(b *testing.B) {
cm := NewCounterMap("foobar")
var wg sync.WaitGroup
for range nProcs {
wg.Add(1)
go func() {
wg.Go(func() {
for i := range numTenants {
cm.Get(&auth.Token{AccountID: i, ProjectID: i}).Inc()
}
wg.Done()
}()
})
}
wg.Wait()
}

View File

@@ -152,23 +152,13 @@ func newCacheInternal(curr, prev *fastcache.Cache, mode, maxBytes int, expireDur
}
func (c *Cache) runWatchers(expireDuration time.Duration) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.wg.Go(func() {
c.expirationWatcher(expireDuration)
}()
})
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.prevCacheWatcher()
}()
c.wg.Go(c.prevCacheWatcher)
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.cacheSizeWatcher()
}()
c.wg.Go(c.cacheSizeWatcher)
}
func (c *Cache) expirationWatcher(expireDuration time.Duration) {