|
|
|
|
@@ -3,32 +3,162 @@ package promutil
|
|
|
|
|
import (
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
|
|
"github.com/puzpuzpuz/xsync/v4"
|
|
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// LabelsCompressor compresses []prompb.Label into short binary strings
|
|
|
|
|
// LabelsCompressor compresses []prompb.Label into short binary strings.
|
|
|
|
|
// Zero value is ready to use. Each Register call must be paired with Unregister.
|
|
|
|
|
type LabelsCompressor struct {
|
|
|
|
|
labelToIdx sync.Map
|
|
|
|
|
idxToLabel labelsMap
|
|
|
|
|
labelToIdxOnce sync.Once
|
|
|
|
|
labelToIdx *xsync.Map[string, uint32]
|
|
|
|
|
|
|
|
|
|
nextIdx atomic.Uint64
|
|
|
|
|
// generation is incremented after each rotate() call that removes labels.
|
|
|
|
|
// Callers can use it to detect when cached compressed keys become stale.
|
|
|
|
|
generation atomic.Uint64
|
|
|
|
|
|
|
|
|
|
totalSizeBytes atomic.Uint64
|
|
|
|
|
idxToLabel atomic.Pointer[[]prompb.Label]
|
|
|
|
|
|
|
|
|
|
// usedBitset tracks which indices were used in the current rotation period.
|
|
|
|
|
// Bits are set atomically from compressFast without mu; grown under mu.
|
|
|
|
|
usedBitset atomic.Pointer[[]uint64]
|
|
|
|
|
|
|
|
|
|
totalSizeBytes uint64
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
|
|
|
|
|
// freeIdxs holds index slots available for reuse.
|
|
|
|
|
freeIdxs []uint32
|
|
|
|
|
|
|
|
|
|
// pendingIdxs holds indices evicted from labelToIdx last rotate, not yet zeroed in idxToLabel.
|
|
|
|
|
pendingIdxs map[uint32]struct{}
|
|
|
|
|
|
|
|
|
|
// prevBitset is the usedBitset snapshot from the previous non-zero rotation.
|
|
|
|
|
// Requires absence from both prevBitset and usedBitset to evict a label,
|
|
|
|
|
// guarding against partial snapshots when rotate fires mid-compress loop.
|
|
|
|
|
// Only accessed in rotate, which runs in a single goroutine.
|
|
|
|
|
prevBitset []uint64
|
|
|
|
|
|
|
|
|
|
// registry holds staleness intervals of active callers; rotation period = max(registry).
|
|
|
|
|
registry []time.Duration
|
|
|
|
|
|
|
|
|
|
// tickerCh signals runRotate to re-evaluate the rotation period.
|
|
|
|
|
tickerCh chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// idleRotationPeriod is the rotation period used when no callers are registered.
|
|
|
|
|
// It must be long enough that a sleeping goroutine does not consume measurable resources.
|
|
|
|
|
const idleRotationPeriod = time.Hour
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) labelToIdxMap() *xsync.Map[string, uint32] {
|
|
|
|
|
lc.labelToIdxOnce.Do(func() {
|
|
|
|
|
lc.labelToIdx = xsync.NewMap[string, uint32]()
|
|
|
|
|
})
|
|
|
|
|
return lc.labelToIdx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register adds stalenessInterval to the registry and starts the rotation goroutine on first call.
|
|
|
|
|
// Rotation period equals max(registry). Must be paired with Unregister.
|
|
|
|
|
func (lc *LabelsCompressor) Register(stalenessInterval time.Duration) {
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
if lc.tickerCh == nil {
|
|
|
|
|
lc.tickerCh = make(chan struct{}, 1)
|
|
|
|
|
go lc.runRotate()
|
|
|
|
|
}
|
|
|
|
|
lc.registry = append(lc.registry, stalenessInterval)
|
|
|
|
|
tickerCh := lc.tickerCh
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
select {
|
|
|
|
|
case tickerCh <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unregister removes stalenessInterval from the registry.
|
|
|
|
|
// The rotation goroutine keeps running at idleRotationPeriod when the registry is empty.
|
|
|
|
|
func (lc *LabelsCompressor) Unregister(stalenessInterval time.Duration) {
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
for i, d := range lc.registry {
|
|
|
|
|
if d == stalenessInterval {
|
|
|
|
|
lc.registry = append(lc.registry[:i], lc.registry[i+1:]...)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
tickerCh := lc.tickerCh
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
if tickerCh != nil {
|
|
|
|
|
select {
|
|
|
|
|
case tickerCh <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) maxRegisteredStaleness() time.Duration {
|
|
|
|
|
// must be called with lc.mu held
|
|
|
|
|
var max time.Duration
|
|
|
|
|
for _, d := range lc.registry {
|
|
|
|
|
if d > max {
|
|
|
|
|
max = d
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if max == 0 {
|
|
|
|
|
return idleRotationPeriod
|
|
|
|
|
}
|
|
|
|
|
return max
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) runRotate() {
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
period := lc.maxRegisteredStaleness()
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
timer := time.NewTimer(period)
|
|
|
|
|
defer timer.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
lc.rotate()
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
period = lc.maxRegisteredStaleness()
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
timer.Reset(period)
|
|
|
|
|
case <-lc.tickerCh:
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
newPeriod := lc.maxRegisteredStaleness()
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
if newPeriod != period {
|
|
|
|
|
period = newPeriod
|
|
|
|
|
if !timer.Stop() {
|
|
|
|
|
select {
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
timer.Reset(period)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SizeBytes returns the size of lc data in bytes
|
|
|
|
|
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
|
|
|
|
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
n := uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
return n
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ItemsCount returns the number of items in lc
|
|
|
|
|
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
|
|
|
|
return lc.nextIdx.Load()
|
|
|
|
|
return uint64(lc.labelToIdxMap().Size())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
|
|
|
|
@@ -42,46 +172,124 @@ func (lc *LabelsCompressor) Compress(dst []byte, labels []prompb.Label) []byte {
|
|
|
|
|
|
|
|
|
|
a := encoding.GetUint64s(len(labels) + 1)
|
|
|
|
|
a.A[0] = uint64(len(labels))
|
|
|
|
|
lc.compress(a.A[1:], labels)
|
|
|
|
|
|
|
|
|
|
if lc.compressFast(a.A[1:], labels) {
|
|
|
|
|
dst = encoding.MarshalVarUint64s(dst, a.A)
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
lc.compressSlow(a.A[1:], labels)
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
dst = encoding.MarshalVarUint64s(dst, a.A)
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompb.Label) {
|
|
|
|
|
if len(labels) == 0 {
|
|
|
|
|
func (lc *LabelsCompressor) compressFast(dst []uint64, labels []prompb.Label) bool {
|
|
|
|
|
p := lc.usedBitset.Load()
|
|
|
|
|
var bits []uint64
|
|
|
|
|
if p != nil {
|
|
|
|
|
bits = *p
|
|
|
|
|
}
|
|
|
|
|
var keyBuf [256]byte
|
|
|
|
|
m := lc.labelToIdxMap()
|
|
|
|
|
for i, label := range labels {
|
|
|
|
|
// Build composite key name+'\x00'+value into the stack buffer.
|
|
|
|
|
totalLen := len(label.Name) + 1 + len(label.Value)
|
|
|
|
|
var key string
|
|
|
|
|
if totalLen <= len(keyBuf) {
|
|
|
|
|
n := copy(keyBuf[:], label.Name)
|
|
|
|
|
keyBuf[n] = '\x00'
|
|
|
|
|
copy(keyBuf[n+1:], label.Value)
|
|
|
|
|
key = bytesutil.ToUnsafeString(keyBuf[:totalLen])
|
|
|
|
|
} else {
|
|
|
|
|
key = makeLabelKey(label)
|
|
|
|
|
}
|
|
|
|
|
idx, ok := m.Load(key)
|
|
|
|
|
if !ok {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
dst[i] = uint64(idx)
|
|
|
|
|
word := idx >> 6
|
|
|
|
|
if int(word) < len(bits) {
|
|
|
|
|
mask := uint64(1) << (idx & 63)
|
|
|
|
|
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
|
|
|
|
atomic.OrUint64(&bits[word], mask)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// usedBitset was swapped mid-loop; re-mark all indices in the new bitset.
|
|
|
|
|
if lc.usedBitset.Load() != p {
|
|
|
|
|
for _, idx64 := range dst {
|
|
|
|
|
lc.markUsed(uint32(idx64))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) compressSlow(dst []uint64, labels []prompb.Label) {
|
|
|
|
|
m := lc.labelToIdxMap()
|
|
|
|
|
for i, label := range labels {
|
|
|
|
|
key := makeLabelKey(label)
|
|
|
|
|
idx, loaded := m.LoadOrCompute(key, func() (uint32, bool) {
|
|
|
|
|
return lc.newIndex(cloneLabel(label)), false
|
|
|
|
|
})
|
|
|
|
|
if loaded {
|
|
|
|
|
lc.markUsed(idx)
|
|
|
|
|
}
|
|
|
|
|
dst[i] = uint64(idx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// markUsed sets the bit for idx in usedBitset. Safe to call without mu.
|
|
|
|
|
func (lc *LabelsCompressor) markUsed(idx uint32) {
|
|
|
|
|
p := lc.usedBitset.Load()
|
|
|
|
|
if p == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
_ = dst[len(labels)-1]
|
|
|
|
|
for i, label := range labels {
|
|
|
|
|
v, ok := lc.labelToIdx.Load(label)
|
|
|
|
|
if !ok {
|
|
|
|
|
idx := lc.nextIdx.Add(1)
|
|
|
|
|
v = idx
|
|
|
|
|
labelCopy := cloneLabel(label)
|
|
|
|
|
|
|
|
|
|
// Must store idxToLabel entry before labelToIdx,
|
|
|
|
|
// so it can be found by possible concurrent goroutines.
|
|
|
|
|
//
|
|
|
|
|
// We might store duplicated entries for single label with different indexes,
|
|
|
|
|
// and it's fine, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7118.
|
|
|
|
|
lc.idxToLabel.Store(idx, labelCopy)
|
|
|
|
|
vNew, loaded := lc.labelToIdx.LoadOrStore(labelCopy, v)
|
|
|
|
|
if loaded {
|
|
|
|
|
// This label has been stored by a concurrent goroutine with different index,
|
|
|
|
|
// use it for key consistency in aggrState.
|
|
|
|
|
v = vNew
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update lc.totalSizeBytes
|
|
|
|
|
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
|
|
|
|
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
|
|
|
|
lc.totalSizeBytes.Add(entrySizeBytes)
|
|
|
|
|
bits := *p
|
|
|
|
|
word := idx >> 6
|
|
|
|
|
if int(word) < len(bits) {
|
|
|
|
|
mask := uint64(1) << (idx & 63)
|
|
|
|
|
if atomic.LoadUint64(&bits[word])&mask == 0 {
|
|
|
|
|
atomic.OrUint64(&bits[word], mask)
|
|
|
|
|
}
|
|
|
|
|
dst[i] = v.(uint64)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// growBitset extends usedBitset to cover idx. Must be called with lc.mu held.
|
|
|
|
|
func (lc *LabelsCompressor) growBitset(idx uint32) {
|
|
|
|
|
needed := int(idx>>6) + 1
|
|
|
|
|
p := lc.usedBitset.Load()
|
|
|
|
|
var bits []uint64
|
|
|
|
|
if p != nil {
|
|
|
|
|
bits = *p
|
|
|
|
|
}
|
|
|
|
|
if needed <= len(bits) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
newBits := make([]uint64, needed)
|
|
|
|
|
copy(newBits, bits)
|
|
|
|
|
lc.usedBitset.Store(&newBits)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func makeLabelKey(label prompb.Label) string {
|
|
|
|
|
buf := make([]byte, 0, len(label.Name)+1+len(label.Value))
|
|
|
|
|
buf = append(buf, label.Name...)
|
|
|
|
|
buf = append(buf, '\x00')
|
|
|
|
|
buf = append(buf, label.Value...)
|
|
|
|
|
return string(buf)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generation returns the current generation count.
|
|
|
|
|
// It increments whenever labels are evicted, invalidating any cached compressed keys.
|
|
|
|
|
func (lc *LabelsCompressor) Generation() uint64 {
|
|
|
|
|
return lc.generation.Load()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func cloneLabel(label prompb.Label) prompb.Label {
|
|
|
|
|
// pre-allocate memory for label name and value
|
|
|
|
|
n := len(label.Name) + len(label.Value)
|
|
|
|
|
@@ -98,6 +306,35 @@ func cloneLabel(label prompb.Label) prompb.Label {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// newIndex assigns a new index for cloned and updates idxToLabel atomically.
|
|
|
|
|
// Must be called with lc.mu held.
|
|
|
|
|
func (lc *LabelsCompressor) newIndex(cloned prompb.Label) uint32 {
|
|
|
|
|
var idx uint32
|
|
|
|
|
p := lc.idxToLabel.Load()
|
|
|
|
|
var newIdxToLabel []prompb.Label
|
|
|
|
|
if p != nil {
|
|
|
|
|
newIdxToLabel = *p
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(lc.freeIdxs) > 0 {
|
|
|
|
|
idx = lc.freeIdxs[len(lc.freeIdxs)-1]
|
|
|
|
|
lc.freeIdxs = lc.freeIdxs[:len(lc.freeIdxs)-1]
|
|
|
|
|
next := make([]prompb.Label, len(newIdxToLabel))
|
|
|
|
|
copy(next, newIdxToLabel)
|
|
|
|
|
next[idx] = cloned
|
|
|
|
|
lc.idxToLabel.Store(&next)
|
|
|
|
|
} else {
|
|
|
|
|
idx = uint32(len(newIdxToLabel))
|
|
|
|
|
next := append(newIdxToLabel, cloned)
|
|
|
|
|
lc.idxToLabel.Store(&next)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lc.growBitset(idx)
|
|
|
|
|
lc.markUsed(idx)
|
|
|
|
|
lc.totalSizeBytes += uint64(len(cloned.Name)+len(cloned.Value)) + uint64(unsafe.Sizeof(cloned)) + 8
|
|
|
|
|
return idx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decompress decompresses src into []prompb.Label, appends it to dst and returns the result.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Decompress from concurrent goroutines.
|
|
|
|
|
@@ -124,111 +361,135 @@ func (lc *LabelsCompressor) Decompress(dst []prompb.Label, src []byte) []prompb.
|
|
|
|
|
if len(tail) > 0 {
|
|
|
|
|
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
|
|
|
|
}
|
|
|
|
|
dst = lc.decompress(dst, a.A)
|
|
|
|
|
|
|
|
|
|
p := lc.idxToLabel.Load()
|
|
|
|
|
if p == nil {
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
logger.Panicf("BUG: idxToLabel is nil in Decompress")
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
labels := *p
|
|
|
|
|
for _, idx := range a.A {
|
|
|
|
|
if int(idx) >= len(labels) {
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
logger.Panicf("BUG: missing label for idx=%d; idxToLabel len=%d", idx, len(labels))
|
|
|
|
|
}
|
|
|
|
|
dst = append(dst, labels[idx])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
|
|
|
|
|
for _, idx := range src {
|
|
|
|
|
label, ok := lc.idxToLabel.Load(idx)
|
|
|
|
|
if !ok {
|
|
|
|
|
logger.Panicf("BUG: missing label for idx=%d", idx)
|
|
|
|
|
}
|
|
|
|
|
dst = append(dst, label)
|
|
|
|
|
// rotate evicts unused labels and recycles their slots.
|
|
|
|
|
// Phase 1 removes the label from labelToIdx (preventing new keys from referencing it).
|
|
|
|
|
// Phase 2, one rotation later, zeroes idxToLabel and returns the slot to freeIdxs,
|
|
|
|
|
// giving in-flight keys a full rotation period to drain via Decompress.
|
|
|
|
|
// Must not be called concurrently with itself.
|
|
|
|
|
func (lc *LabelsCompressor) rotate() {
|
|
|
|
|
// Snapshot and reset usedBitset under mu to prevent races with growBitset/newIndex.
|
|
|
|
|
// Also snapshot idxToLabel for the phase-1 scan.
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
var currentBits []uint64
|
|
|
|
|
if p := lc.usedBitset.Load(); p != nil {
|
|
|
|
|
currentBits = make([]uint64, len(*p))
|
|
|
|
|
copy(currentBits, *p)
|
|
|
|
|
newBits := make([]uint64, len(*p))
|
|
|
|
|
lc.usedBitset.Store(&newBits)
|
|
|
|
|
}
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// labelsMap maps uint64 key to prompb.Label
|
|
|
|
|
//
|
|
|
|
|
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
|
|
|
|
type labelsMap struct {
|
|
|
|
|
readOnly atomic.Pointer[[]*prompb.Label]
|
|
|
|
|
|
|
|
|
|
mutableLock sync.Mutex
|
|
|
|
|
mutable map[uint64]*prompb.Label
|
|
|
|
|
misses uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Store stores label under the given idx.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Store from concurrent goroutines.
|
|
|
|
|
func (lm *labelsMap) Store(idx uint64, label prompb.Label) {
|
|
|
|
|
lm.mutableLock.Lock()
|
|
|
|
|
if lm.mutable == nil {
|
|
|
|
|
lm.mutable = make(map[uint64]*prompb.Label)
|
|
|
|
|
}
|
|
|
|
|
lm.mutable[idx] = &label
|
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Load returns the label for the given idx.
|
|
|
|
|
//
|
|
|
|
|
// Load returns false if lm doesn't contain label for the given idx.
|
|
|
|
|
//
|
|
|
|
|
// It is safe calling Load from concurrent goroutines.
|
|
|
|
|
//
|
|
|
|
|
// The performance of Load() scales linearly with CPU cores.
|
|
|
|
|
func (lm *labelsMap) Load(idx uint64) (prompb.Label, bool) {
|
|
|
|
|
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
|
|
|
|
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
|
|
|
|
// Fast path - the label for the given idx has been found in lm.readOnly.
|
|
|
|
|
return *pLabel, true
|
|
|
|
|
// Skip if nothing was used; all-zero snapshots from rapid rotation would
|
|
|
|
|
// advance pendingIdxs and reclaim slots before in-flight keys drain.
|
|
|
|
|
anyUsed := false
|
|
|
|
|
for _, w := range currentBits {
|
|
|
|
|
if w != 0 {
|
|
|
|
|
anyUsed = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Slow path - search in lm.mutable.
|
|
|
|
|
return lm.loadSlow(idx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lm *labelsMap) loadSlow(idx uint64) (prompb.Label, bool) {
|
|
|
|
|
lm.mutableLock.Lock()
|
|
|
|
|
|
|
|
|
|
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
|
|
|
|
pReadOnly := lm.readOnly.Load()
|
|
|
|
|
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
|
|
|
|
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
|
return *pLabel, true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
|
|
|
|
lm.misses++
|
|
|
|
|
pLabel := lm.mutable[idx]
|
|
|
|
|
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
|
|
|
|
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
|
|
|
|
lm.misses = 0
|
|
|
|
|
}
|
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
|
|
|
|
|
|
if pLabel == nil {
|
|
|
|
|
return prompb.Label{}, false
|
|
|
|
|
}
|
|
|
|
|
return *pLabel, true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompb.Label) {
|
|
|
|
|
if len(lm.mutable) == 0 {
|
|
|
|
|
// Nothing to move
|
|
|
|
|
if !anyUsed {
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var labels []*prompb.Label
|
|
|
|
|
if pReadOnly != nil {
|
|
|
|
|
labels = append(labels, *pReadOnly...)
|
|
|
|
|
// Advance prevBitset; labels missing from a partial snapshot are still covered by it.
|
|
|
|
|
prevSnap := lc.prevBitset
|
|
|
|
|
lc.prevBitset = currentBits
|
|
|
|
|
pendingIdxs := lc.pendingIdxs
|
|
|
|
|
idxSnap := lc.idxToLabel.Load()
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
// A label is used if present in either the current or previous period snapshot.
|
|
|
|
|
isUsed := func(idx uint32) bool {
|
|
|
|
|
word := idx >> 6
|
|
|
|
|
inCurrent := int(word) < len(currentBits) && currentBits[word]>>(idx&63)&1 == 1
|
|
|
|
|
inPrev := int(word) < len(prevSnap) && prevSnap[word]>>(idx&63)&1 == 1
|
|
|
|
|
return inCurrent || inPrev
|
|
|
|
|
}
|
|
|
|
|
for idx, pLabel := range lm.mutable {
|
|
|
|
|
if idx < uint64(len(labels)) {
|
|
|
|
|
labels[idx] = pLabel
|
|
|
|
|
} else {
|
|
|
|
|
for idx > uint64(len(labels)) {
|
|
|
|
|
labels = append(labels, nil)
|
|
|
|
|
|
|
|
|
|
// Phase 2: reclaim slots evicted last rotation if still unused.
|
|
|
|
|
var toReclaim []uint32
|
|
|
|
|
var nextRelease map[uint32]struct{}
|
|
|
|
|
for idx := range pendingIdxs {
|
|
|
|
|
if isUsed(idx) {
|
|
|
|
|
if nextRelease == nil {
|
|
|
|
|
nextRelease = make(map[uint32]struct{})
|
|
|
|
|
}
|
|
|
|
|
labels = append(labels, pLabel)
|
|
|
|
|
nextRelease[idx] = struct{}{}
|
|
|
|
|
} else {
|
|
|
|
|
toReclaim = append(toReclaim, idx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
clear(lm.mutable)
|
|
|
|
|
lm.readOnly.Store(&labels)
|
|
|
|
|
|
|
|
|
|
// Phase 1: scan idxToLabel to avoid contention with concurrent compressFast lookups.
|
|
|
|
|
phase1Evicted := false
|
|
|
|
|
m := lc.labelToIdxMap()
|
|
|
|
|
if idxSnap != nil {
|
|
|
|
|
for i, label := range *idxSnap {
|
|
|
|
|
if label == (prompb.Label{}) {
|
|
|
|
|
continue // freed slot
|
|
|
|
|
}
|
|
|
|
|
idx := uint32(i)
|
|
|
|
|
if _, inPending := pendingIdxs[idx]; inPending {
|
|
|
|
|
continue // already queued for release
|
|
|
|
|
}
|
|
|
|
|
if isUsed(idx) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
key := makeLabelKey(label)
|
|
|
|
|
m.Delete(key)
|
|
|
|
|
phase1Evicted = true
|
|
|
|
|
if nextRelease == nil {
|
|
|
|
|
nextRelease = make(map[uint32]struct{})
|
|
|
|
|
}
|
|
|
|
|
nextRelease[idx] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if phase1Evicted {
|
|
|
|
|
lc.generation.Add(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lc.mu.Lock()
|
|
|
|
|
if len(toReclaim) > 0 {
|
|
|
|
|
p := lc.idxToLabel.Load()
|
|
|
|
|
var newIdxToLabel []prompb.Label
|
|
|
|
|
if p != nil {
|
|
|
|
|
newIdxToLabel = *p
|
|
|
|
|
}
|
|
|
|
|
next := make([]prompb.Label, len(newIdxToLabel))
|
|
|
|
|
copy(next, newIdxToLabel)
|
|
|
|
|
for _, idx := range toReclaim {
|
|
|
|
|
label := next[idx]
|
|
|
|
|
entrySize := uint64(len(label.Name)+len(label.Value)) + uint64(unsafe.Sizeof(label)) + 8
|
|
|
|
|
if lc.totalSizeBytes >= entrySize {
|
|
|
|
|
lc.totalSizeBytes -= entrySize
|
|
|
|
|
}
|
|
|
|
|
next[idx] = prompb.Label{}
|
|
|
|
|
lc.freeIdxs = append(lc.freeIdxs, idx)
|
|
|
|
|
}
|
|
|
|
|
lc.idxToLabel.Store(&next)
|
|
|
|
|
}
|
|
|
|
|
lc.pendingIdxs = nextRelease
|
|
|
|
|
lc.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|