Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
18dfeaf2cf lib/promutil: added labelcompressor cleanup 2026-05-23 00:40:15 +03:00
5 changed files with 484 additions and 165 deletions

View File

@@ -3,32 +3,162 @@ package promutil
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/puzpuzpuz/xsync/v4"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
// LabelsCompressor compresses []prompb.Label into short binary strings
// LabelsCompressor compresses []prompb.Label into short binary strings.
// Zero value is ready to use. Each Register call must be paired with Unregister.
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel labelsMap
labelToIdxOnce sync.Once
labelToIdx *xsync.Map[string, uint32]
nextIdx atomic.Uint64
// generation is incremented after each rotate() call that removes labels.
// Callers can use it to detect when cached compressed keys become stale.
generation atomic.Uint64
totalSizeBytes atomic.Uint64
idxToLabel atomic.Pointer[[]prompb.Label]
// usedBitset tracks which indices were used in the current rotation period.
// Bits are set atomically from compressFast without mu; grown under mu.
usedBitset atomic.Pointer[[]uint64]
totalSizeBytes uint64
mu sync.Mutex
// freeIdxs holds index slots available for reuse.
freeIdxs []uint32
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
pendingIdxs map[uint32]struct{}
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
// Requires absence from both prevBitset and usedBitset to evict a label,
// guarding against partial snapshots when rotate fires mid-compress loop.
// Only accessed in rotate, which runs in a single goroutine.
prevBitset []uint64
// registry holds staleness intervals of active callers; rotation period = max(registry).
registry []time.Duration
// tickerCh signals runRotate to re-evaluate the rotation period.
tickerCh chan struct{}
}
// idleRotationPeriod is the rotation period used when no callers are registered.
// It must be long enough that a sleeping goroutine does not consume measurable resources.
const idleRotationPeriod = time.Hour
func (lc *LabelsCompressor) labelToIdxMap() *xsync.Map[string, uint32] {
lc.labelToIdxOnce.Do(func() {
lc.labelToIdx = xsync.NewMap[string, uint32]()
})
return lc.labelToIdx
}
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
// Rotation period equals max(registry). Must be paired with Unregister.
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
lc.mu.Lock()
if lc.tickerCh == nil {
lc.tickerCh = make(chan struct{}, 1)
go lc.runRotate()
}
lc.registry = append(lc.registry, stalenessInterval)
tickerCh := lc.tickerCh
lc.mu.Unlock()
select {
case tickerCh <- struct{}{}:
default:
}
}
// Unregister removes stalenessInterval from the registry.
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
lc.mu.Lock()
for i, d := range lc.registry {
if d == stalenessInterval {
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
break
}
}
tickerCh := lc.tickerCh
lc.mu.Unlock()
if tickerCh != nil {
select {
case tickerCh <- struct{}{}:
default:
}
}
}
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
// must be called with lc.mu held
var max time.Duration
for _, d := range lc.registry {
if d > max {
max = d
}
}
if max == 0 {
return idleRotationPeriod
}
return max
}
func (lc *LabelsCompressor) runRotate() {
lc.mu.Lock()
period := lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer := time.NewTimer(period)
defer timer.Stop()
for {
select {
case <-timer.C:
lc.rotate()
lc.mu.Lock()
period = lc.maxRegisteredStaleness()
lc.mu.Unlock()
timer.Reset(period)
case <-lc.tickerCh:
lc.mu.Lock()
newPeriod := lc.maxRegisteredStaleness()
lc.mu.Unlock()
if newPeriod != period {
period = newPeriod
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
}
}
}
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
lc.mu.Lock()
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
lc.mu.Unlock()
return n
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load()
return uint64(lc.labelToIdxMap().Size())
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
@@ -42,46 +172,124 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
if lc.compressFast(a.A[1:], labels) {
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
lc.mu.Lock()
lc.compressSlow(a.A[1:], labels)
lc.mu.Unlock()
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
var keyBuf [256]byte
m := lc.labelToIdxMap()
for i, label := range labels {
// Build composite key name+'\x00'+value into the stack buffer.
totalLen := len(label.Name) + 1 + len(label.Value)
var key string
if totalLen <= len(keyBuf) {
n := copy(keyBuf[:], label.Name)
keyBuf[n] = '\x00'
copy(keyBuf[n+1:], label.Value)
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
} else {
key = makeLabelKey(label)
}
idx, ok := m.Load(key)
if !ok {
return false
}
dst[i] = uint64(idx)
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
}
}
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
if lc.usedBitset.Load() != p {
for _, idx64 := range dst {
lc.markUsed(uint32(idx64))
}
}
return true
}
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
m := lc.labelToIdxMap()
for i, label := range labels {
key := makeLabelKey(label)
idx, loaded := m.LoadOrCompute(key, func() (uint32, bool) {
return lc.newIndex(cloneLabel(label)), false
})
if loaded {
lc.markUsed(idx)
}
dst[i] = uint64(idx)
}
}
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
func (lc *LabelsCompressor) markUsed(idx uint32) {
p := lc.usedBitset.Load()
if p == nil {
return
}
_ = dst[len(labels)-1]
for i, label := range labels {
v, ok := lc.labelToIdx.Load(label)
if !ok {
idx := lc.nextIdx.Add(1)
v = idx
labelCopy := cloneLabel(label)
// Must store idxToLabel entry before labelToIdx,
// so it can be found by possible concurrent goroutines.
//
// We might store duplicated entries for single label with different indexes,
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
lc.idxToLabel.Store(idx, labelCopy)
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
if loaded {
// This label has been stored by a concurrent goroutine with different index,
// use it for key consistency in aggrState.
v = vNew
}
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
bits := *p
word := idx >> 6
if int(word) < len(bits) {
mask := uint64(1) << (idx & 63)
if atomic.LoadUint64(&bits[word])&mask == 0 {
atomic.OrUint64(&bits[word], mask)
}
dst[i] = v.(uint64)
}
}
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
func (lc *LabelsCompressor) growBitset(idx uint32) {
needed := int(idx>>6) + 1
p := lc.usedBitset.Load()
var bits []uint64
if p != nil {
bits = *p
}
if needed <= len(bits) {
return
}
newBits := make([]uint64, needed)
copy(newBits, bits)
lc.usedBitset.Store(&newBits)
}
func makeLabelKey(label prompb.Label) string {
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
buf = append(buf, label.Name...)
buf = append(buf, '\x00')
buf = append(buf, label.Value...)
return string(buf)
}
// Generation returns the current generation count.
// It increments whenever labels are evicted, invalidating any cached compressed keys.
func (lc *LabelsCompressor) Generation() uint64 {
return lc.generation.Load()
}
func cloneLabel(label prompb.Label) prompb.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
@@ -98,6 +306,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
}
}
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
// Must be called with lc.mu held.
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
var idx uint32
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
if len(lc.freeIdxs) > 0 {
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
next[idx] = cloned
lc.idxToLabel.Store(&next)
} else {
idx = uint32(len(newIdxToLabel))
next := append(newIdxToLabel, cloned)
lc.idxToLabel.Store(&next)
}
lc.growBitset(idx)
lc.markUsed(idx)
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
return idx
}
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
//
// It is safe calling Decompress from concurrent goroutines.
@@ -124,111 +361,135 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
p := lc.idxToLabel.Load()
if p == nil {
encoding.PutUint64s(a)
logger.Panicf("BUG: idxToLabel is nil in Decompress")
return dst
}
labels := *p
for _, idx := range a.A {
if int(idx) >= len(labels) {
encoding.PutUint64s(a)
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
}
dst = append(dst, labels[idx])
}
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label)
// rotate evicts unused labels and recycles their slots.
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
// giving in-flight keys a full rotation period to drain via Decompress.
// Must not be called concurrently with itself.
func (lc *LabelsCompressor) rotate() {
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
// Also snapshot idxToLabel for the phase-1 scan.
lc.mu.Lock()
var currentBits []uint64
if p := lc.usedBitset.Load(); p != nil {
currentBits = make([]uint64, len(*p))
copy(currentBits, *p)
newBits := make([]uint64, len(*p))
lc.usedBitset.Store(&newBits)
}
return dst
}
// labelsMap maps uint64 key to prompb.Label
//
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
type labelsMap struct {
readOnly atomic.Pointer[[]*prompb.Label]
mutableLock sync.Mutex
mutable map[uint64]*prompb.Label
misses uint64
}
// Store stores label under the given idx.
//
// It is safe calling Store from concurrent goroutines.
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
lm.mutableLock.Lock()
if lm.mutable == nil {
lm.mutable = make(map[uint64]*prompb.Label)
}
lm.mutable[idx] = &label
lm.mutableLock.Unlock()
}
// Load returns the label for the given idx.
//
// Load returns false if lm doesn't contain label for the given idx.
//
// It is safe calling Load from concurrent goroutines.
//
// The performance of Load() scales linearly with CPU cores.
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
// Fast path - the label for the given idx has been found in lm.readOnly.
return *pLabel, true
// Skip if nothing was used; all-zero snapshots from rapid rotation would
// advance pendingIdxs and reclaim slots before in-flight keys drain.
anyUsed := false
for _, w := range currentBits {
if w != 0 {
anyUsed = true
break
}
}
// Slow path - search in lm.mutable.
return lm.loadSlow(idx)
}
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
lm.mutableLock.Lock()
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
pReadOnly := lm.readOnly.Load()
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
lm.mutableLock.Unlock()
return *pLabel, true
}
}
// The label for the idx wasn't found in readOnly. Search it in mutable.
lm.misses++
pLabel := lm.mutable[idx]
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
lm.moveMutableToReadOnlyLocked(pReadOnly)
lm.misses = 0
}
lm.mutableLock.Unlock()
if pLabel == nil {
return prompb.Label{}, false
}
return *pLabel, true
}
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
if len(lm.mutable) == 0 {
// Nothing to move
if !anyUsed {
lc.mu.Unlock()
return
}
var labels []*prompb.Label
if pReadOnly != nil {
labels = append(labels, *pReadOnly...)
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
prevSnap := lc.prevBitset
lc.prevBitset = currentBits
pendingIdxs := lc.pendingIdxs
idxSnap := lc.idxToLabel.Load()
lc.mu.Unlock()
// A label is used if present in either the current or previous period snapshot.
isUsed := func(idx uint32) bool {
word := idx >> 6
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
return inCurrent || inPrev
}
for idx, pLabel := range lm.mutable {
if idx < uint64(len(labels)) {
labels[idx] = pLabel
} else {
for idx > uint64(len(labels)) {
labels = append(labels, nil)
// Phase 2: reclaim slots evicted last rotation if still unused.
var toReclaim []uint32
var nextRelease map[uint32]struct{}
for idx := range pendingIdxs {
if isUsed(idx) {
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
labels = append(labels, pLabel)
nextRelease[idx] = struct{}{}
} else {
toReclaim = append(toReclaim, idx)
}
}
clear(lm.mutable)
lm.readOnly.Store(&labels)
// Phase 1: scan idxToLabel to avoid contention with concurrent compressFast lookups.
phase1Evicted := false
m := lc.labelToIdxMap()
if idxSnap != nil {
for i, label := range *idxSnap {
if label == (prompb.Label{}) {
continue // freed slot
}
idx := uint32(i)
if _, inPending := pendingIdxs[idx]; inPending {
continue // already queued for release
}
if isUsed(idx) {
continue
}
key := makeLabelKey(label)
m.Delete(key)
phase1Evicted = true
if nextRelease == nil {
nextRelease = make(map[uint32]struct{})
}
nextRelease[idx] = struct{}{}
}
}
if phase1Evicted {
lc.generation.Add(1)
}
lc.mu.Lock()
if len(toReclaim) > 0 {
p := lc.idxToLabel.Load()
var newIdxToLabel []prompb.Label
if p != nil {
newIdxToLabel = *p
}
next := make([]prompb.Label, len(newIdxToLabel))
copy(next, newIdxToLabel)
for _, idx := range toReclaim {
label := next[idx]
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
if lc.totalSizeBytes >= entrySize {
lc.totalSizeBytes -= entrySize
}
next[idx] = prompb.Label{}
lc.freeIdxs = append(lc.freeIdxs, idx)
}
lc.idxToLabel.Store(&next)
}
lc.pendingIdxs = nextRelease
lc.mu.Unlock()
}

View File

@@ -8,47 +8,96 @@ import (
)
func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
for _, labels := range series {
lc.Compress(nil, labels)
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
datas := make([][]byte, len(series))
var dst []byte
for i, labels := range series {
dstLen := len(dst)
dst = lc.Compress(dst, labels)
datas[i] = dst[dstLen:]
}
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
run := func(b *testing.B, withRotate bool) {
var lc LabelsCompressor
datas := make([][]byte, len(series))
var buf []byte
for i, labels := range series {
bufLen := len(buf)
buf = lc.Compress(buf, labels)
datas[i] = buf[bufLen:]
}
})
var rotations atomic.Int64
if withRotate {
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
lc.rotate()
rotations.Add(1)
}
}
}()
defer close(done)
}
rotations.Store(0)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompb.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
if withRotate {
b.ReportMetric(float64(rotations.Load())/float64(b.N), "rotations/op")
}
}
b.Run("no_rotate", func(b *testing.B) { run(b, false) })
b.Run("with_rotate", func(b *testing.B) { run(b, true) })
}
var Sink atomic.Uint64

View File

@@ -6,6 +6,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
)
func BenchmarkDedupAggr(b *testing.B) {
@@ -63,14 +64,15 @@ func newBenchSamples(count int) []pushSample {
}
labelsLen := len(labels)
samples := make([]pushSample, count)
var lc promutil.LabelsCompressor
var keyBuf []byte
for i := range samples {
sample := &samples[i]
labels = append(labels[:labelsLen], prompb.Label{
labels := append(labels[:labelsLen:labelsLen], prompb.Label{
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
keyBuf = lc.Compress(keyBuf[:0], labels)
sample.key = string(keyBuf)
sample.value = float64(i)
}

View File

@@ -77,6 +77,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
metrics.RegisterSet(ms)
lc.Register(2 * interval)
d.wg.Go(func() {
d.runFlusher(pushFunc)
})
@@ -86,6 +88,8 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
// MustStop stops d.
func (d *Deduplicator) MustStop() {
lc.Unregister(2 * d.interval)
metrics.UnregisterSet(d.ms, true)
d.ms = nil
@@ -203,7 +207,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = decompressLabels(labels, ps.key)
labels = lc.Decompress(labels, bytesutil.ToUnsafeBytes(ps.key))
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompb.Sample{

View File

@@ -709,6 +709,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
a.cs.Store(cs)
lc.Register(a.stalenessInterval)
a.wg.Go(func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipFlushOnShutdown, ignoreFirstIntervals)
})
@@ -943,6 +945,7 @@ var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
lc.Unregister(a.stalenessInterval)
close(a.stopCh)
a.wg.Wait()
}