mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/storage, lib/mergeset: properly account inmemoryPart refCount
Previously inmemoryPart refCount was not properly decremented. Previous behavior: * createInmemoryPart called newPartWrapperFromInmemoryPart and returns a partWrapper with refCount=1 * multiple parts are merged in mustMergeInmemoryPartsFinal, which creates a new merged part * the source partWrappers are never decRef'd * Since refCount never reaches 0, putInmemoryPart and (*part).MustClose are never called This commit properly decrements refCount at mustMergeInmemoryPartsFinal. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10086
This commit is contained in:
@@ -949,6 +949,13 @@ func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
|
||||
return pwsResult
|
||||
}
|
||||
|
||||
// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws)
|
||||
// into a single new in-memory part wrapper.
|
||||
//
|
||||
// It panics if the input slice pws is empty (though the caller should prevent
|
||||
// this). If the pws contains only one element, it is returned as is. Finally,
|
||||
// when len(pws) > 1, the source pws are merged, and their ref count is
|
||||
// decremented.
|
||||
func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper {
|
||||
if len(pws) == 0 {
|
||||
logger.Panicf("BUG: pws must contain at least a single item")
|
||||
@@ -969,7 +976,11 @@ func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper {
|
||||
}
|
||||
|
||||
flushToDiskDeadline := getFlushToDiskDeadline(pws, tb.flushInterval)
|
||||
return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
|
||||
pw := tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
|
||||
for _, srcPW := range pws {
|
||||
srcPW.decRef()
|
||||
}
|
||||
return pw
|
||||
}
|
||||
|
||||
func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
|
||||
|
||||
@@ -301,3 +301,56 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
|
||||
tb.MustClose()
|
||||
}
|
||||
}
|
||||
|
||||
func TestTableMustMergeInmemoryPartsFinal_pwsRefCount(t *testing.T) {
|
||||
path := t.Name()
|
||||
fs.MustRemoveDir(path)
|
||||
defer fs.MustRemoveDir(path)
|
||||
|
||||
var isReadOnly atomic.Bool
|
||||
tb := MustOpenTable(path, 0, nil, nil, &isReadOnly)
|
||||
defer tb.MustClose()
|
||||
|
||||
generatePartWrappers := func(n int) []*partWrapper {
|
||||
pws := make([]*partWrapper, n)
|
||||
for i := range n {
|
||||
var ib inmemoryBlock
|
||||
items := bytes.Repeat([]byte{byte(i)}, 1024)
|
||||
ib.Add(items)
|
||||
pw := tb.createInmemoryPart([]*inmemoryBlock{&ib})
|
||||
pws[i] = pw
|
||||
}
|
||||
return pws
|
||||
}
|
||||
|
||||
assertRefCount := func(pws []*partWrapper, want int32) {
|
||||
t.Helper()
|
||||
for _, pw := range pws {
|
||||
if got := pw.refCount.Load(); got != want {
|
||||
t.Fatalf("unexpected inmemory part wrapper ref count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
pwsSrc []*partWrapper
|
||||
pwFinal *partWrapper
|
||||
)
|
||||
|
||||
// single source part wrapper
|
||||
pwsSrc = generatePartWrappers(1)
|
||||
assertRefCount(pwsSrc, 1)
|
||||
pwFinal = tb.mustMergeInmemoryPartsFinal(pwsSrc)
|
||||
if pwFinal != pwsSrc[0] {
|
||||
t.Fatalf("mustMergeInmemoryPartsFinal must return the original wrapper for a single source part")
|
||||
}
|
||||
assertRefCount(pwsSrc, 1)
|
||||
assertRefCount([]*partWrapper{pwFinal}, 1)
|
||||
|
||||
// many source part wrappers
|
||||
pwsSrc = generatePartWrappers(100)
|
||||
assertRefCount(pwsSrc, 1)
|
||||
pwFinal = tb.mustMergeInmemoryPartsFinal(pwsSrc)
|
||||
assertRefCount(pwsSrc, 0)
|
||||
assertRefCount([]*partWrapper{pwFinal}, 1)
|
||||
}
|
||||
|
||||
@@ -803,10 +803,17 @@ func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
|
||||
return pwsResult
|
||||
}
|
||||
|
||||
// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws) into a single new in-memory part wrapper.
|
||||
// It panics if the input slice pws is empty (though the caller should prevent this).
|
||||
// Returns nil if the merge results in an empty part (e.g., due to retention filters removing all data).
|
||||
// Otherwise, returns the wrapper for the merged part.
|
||||
// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws)
|
||||
// into a single new in-memory part wrapper.
|
||||
//
|
||||
// It panics if the input slice pws is empty (though the caller should prevent
|
||||
// this). If the pws contains only one element, it is returned as is. Finally,
|
||||
// when len(pws) > 1, the source pws are merged, and their ref count is
|
||||
// decremented.
|
||||
//
|
||||
// Returns nil if the merge results in an empty part (e.g., due to retention
|
||||
// filters removing all data). Otherwise, returns the wrapper for the merged
|
||||
// part.
|
||||
func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper {
|
||||
if len(pws) == 0 {
|
||||
logger.Panicf("BUG: pws must contain at least a single item")
|
||||
@@ -853,6 +860,9 @@ func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrappe
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
|
||||
}
|
||||
for _, pw := range pws {
|
||||
pw.decRef()
|
||||
}
|
||||
|
||||
// The resulting part is empty, no need to create a part wrapper
|
||||
if ph.BlocksCount == 0 {
|
||||
|
||||
@@ -183,10 +183,9 @@ func TestMergeInMemoryPartsEmptyResult(t *testing.T) {
|
||||
rows[i].PrecisionBits = 64
|
||||
}
|
||||
|
||||
pws = append(pws, &partWrapper{
|
||||
mp: newTestInmemoryPart(rows),
|
||||
p: &part{},
|
||||
})
|
||||
mp := newTestInmemoryPart(rows)
|
||||
pw := newPartWrapperFromInmemoryPart(mp, time.Time{})
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
|
||||
pwsNew := pt.mustMergeInmemoryParts(pws)
|
||||
@@ -195,6 +194,68 @@ func TestMergeInMemoryPartsEmptyResult(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeInMemoryPartsFinal_pwsRefCount(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
generatePartWrappers := func(n int) []*partWrapper {
|
||||
var pws []*partWrapper
|
||||
for range n {
|
||||
var rows []rawRow
|
||||
for i := range 10 {
|
||||
row := rawRow{
|
||||
TSID: TSID{MetricID: uint64(i)},
|
||||
Value: float64(i),
|
||||
Timestamp: time.Now().UnixMilli() + int64(i),
|
||||
PrecisionBits: 64,
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
var mp inmemoryPart
|
||||
mp.InitFromRows(rows)
|
||||
pw := newPartWrapperFromInmemoryPart(&mp, time.Time{})
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
return pws
|
||||
}
|
||||
|
||||
assertRefCount := func(pws []*partWrapper, want int32) {
|
||||
t.Helper()
|
||||
for _, pw := range pws {
|
||||
if got := pw.refCount.Load(); got != want {
|
||||
t.Fatalf("unexpected inmemory part wrapper ref count: got %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s := MustOpenStorage(t.Name(), OpenOptions{})
|
||||
defer s.MustClose()
|
||||
ptw := s.tb.MustGetPartition(time.Now().UnixMilli())
|
||||
defer s.tb.PutPartition(ptw)
|
||||
pt := ptw.pt
|
||||
|
||||
var (
|
||||
pwsSrc []*partWrapper
|
||||
pwFinal *partWrapper
|
||||
)
|
||||
|
||||
// single source part wrapper
|
||||
pwsSrc = generatePartWrappers(1)
|
||||
assertRefCount(pwsSrc, 1)
|
||||
pwFinal = pt.mustMergeInmemoryPartsFinal(pwsSrc)
|
||||
if pwFinal != pwsSrc[0] {
|
||||
t.Fatalf("mustMergeInmemoryPartsFinal must return the original wrapper for a single source part")
|
||||
}
|
||||
assertRefCount(pwsSrc, 1)
|
||||
assertRefCount([]*partWrapper{pwFinal}, 1)
|
||||
|
||||
// many source part wrappers
|
||||
pwsSrc = generatePartWrappers(100)
|
||||
assertRefCount(pwsSrc, 1)
|
||||
pwFinal = pt.mustMergeInmemoryPartsFinal(pwsSrc)
|
||||
assertRefCount(pwsSrc, 0)
|
||||
assertRefCount([]*partWrapper{pwFinal}, 1)
|
||||
}
|
||||
|
||||
func testCreatePartition(t *testing.T, timestamp int64, s *Storage) *partition {
|
||||
t.Helper()
|
||||
small := filepath.Join(t.Name(), smallDirname)
|
||||
|
||||
Reference in New Issue
Block a user