diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 8592adef0e..90dacc96d2 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -949,6 +949,13 @@ func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { return pwsResult } +// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws) +// into a single new in-memory part wrapper. +// +// It panics if the input slice pws is empty (though the caller should prevent +// this). If the pws contains only one element, it is returned as is. Finally, +// when len(pws) > 1, the source pws are merged, and their ref count is +// decremented. func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { if len(pws) == 0 { logger.Panicf("BUG: pws must contain at least a single item") @@ -969,7 +976,11 @@ func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { } flushToDiskDeadline := getFlushToDiskDeadline(pws, tb.flushInterval) - return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) + pw := tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) + for _, srcPW := range pws { + srcPW.decRef() + } + return pw } func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index 7231c11e2f..494933d69b 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -301,3 +301,56 @@ func testReopenTable(t *testing.T, path string, itemsCount int) { tb.MustClose() } } + +func TestTableMustMergeInmemoryPartsFinal_pwsRefCount(t *testing.T) { + path := t.Name() + fs.MustRemoveDir(path) + defer fs.MustRemoveDir(path) + + var isReadOnly atomic.Bool + tb := MustOpenTable(path, 0, nil, nil, &isReadOnly) + defer tb.MustClose() + + generatePartWrappers := func(n int) []*partWrapper { + pws := make([]*partWrapper, n) + for i := range n { + var ib inmemoryBlock + items := bytes.Repeat([]byte{byte(i)}, 1024) + ib.Add(items) + pw := tb.createInmemoryPart([]*inmemoryBlock{&ib}) + pws[i] = pw + } + return pws + } + + assertRefCount := func(pws []*partWrapper, want int32) { + t.Helper() + for _, pw := range pws { + if got := pw.refCount.Load(); got != want { + t.Fatalf("unexpected inmemory part wrapper ref count: got %d, want %d", got, want) + } + } + } + + var ( + pwsSrc []*partWrapper + pwFinal *partWrapper + ) + + // single source part wrapper + pwsSrc = generatePartWrappers(1) + assertRefCount(pwsSrc, 1) + pwFinal = tb.mustMergeInmemoryPartsFinal(pwsSrc) + if pwFinal != pwsSrc[0] { + t.Fatalf("mustMergeInmemoryPartsFinal must return the original wrapper for a single source part") + } + assertRefCount(pwsSrc, 1) + assertRefCount([]*partWrapper{pwFinal}, 1) + + // many source part wrappers + pwsSrc = generatePartWrappers(100) + assertRefCount(pwsSrc, 1) + pwFinal = tb.mustMergeInmemoryPartsFinal(pwsSrc) + assertRefCount(pwsSrc, 0) + assertRefCount([]*partWrapper{pwFinal}, 1) +} diff --git a/lib/storage/partition.go b/lib/storage/partition.go index de25cdcf66..7e7a3500e5 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -803,10 +803,17 @@ func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { return pwsResult } -// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws) into a single new in-memory part wrapper. -// It panics if the input slice pws is empty (though the caller should prevent this). -// Returns nil if the merge results in an empty part (e.g., due to retention filters removing all data). -// Otherwise, returns the wrapper for the merged part. +// mustMergeInmemoryPartsFinal merges the given in-memory part wrappers (pws) +// into a single new in-memory part wrapper. +// +// It panics if the input slice pws is empty (though the caller should prevent +// this). If the pws contains only one element, it is returned as is. Finally, +// when len(pws) > 1, the source pws are merged, and their ref count is +// decremented. +// +// Returns nil if the merge results in an empty part (e.g., due to retention +// filters removing all data). Otherwise, returns the wrapper for the merged +// part. func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { if len(pws) == 0 { logger.Panicf("BUG: pws must contain at least a single item") @@ -853,6 +860,9 @@ func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrappe if err != nil { logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) } + for _, pw := range pws { + pw.decRef() + } // The resulting part is empty, no need to create a part wrapper if ph.BlocksCount == 0 { diff --git a/lib/storage/partition_test.go b/lib/storage/partition_test.go index 5d730c4bf5..c9c317fe6d 100644 --- a/lib/storage/partition_test.go +++ b/lib/storage/partition_test.go @@ -183,10 +183,9 @@ func TestMergeInMemoryPartsEmptyResult(t *testing.T) { rows[i].PrecisionBits = 64 } - pws = append(pws, &partWrapper{ - mp: newTestInmemoryPart(rows), - p: &part{}, - }) + mp := newTestInmemoryPart(rows) + pw := newPartWrapperFromInmemoryPart(mp, time.Time{}) + pws = append(pws, pw) } pwsNew := pt.mustMergeInmemoryParts(pws) @@ -195,6 +194,68 @@ func TestMergeInMemoryPartsEmptyResult(t *testing.T) { } } +func TestMergeInMemoryPartsFinal_pwsRefCount(t *testing.T) { + defer testRemoveAll(t) + + generatePartWrappers := func(n int) []*partWrapper { + var pws []*partWrapper + for range n { + var rows []rawRow + for i := range 10 { + row := rawRow{ + TSID: TSID{MetricID: uint64(i)}, + Value: float64(i), + Timestamp: time.Now().UnixMilli() + int64(i), + PrecisionBits: 64, + } + rows = append(rows, row) + } + var mp inmemoryPart + mp.InitFromRows(rows) + pw := newPartWrapperFromInmemoryPart(&mp, time.Time{}) + pws = append(pws, pw) + } + return pws + } + + assertRefCount := func(pws []*partWrapper, want int32) { + t.Helper() + for _, pw := range pws { + if got := pw.refCount.Load(); got != want { + t.Fatalf("unexpected inmemory part wrapper ref count: got %d, want %d", got, want) + } + } + } + + s := MustOpenStorage(t.Name(), OpenOptions{}) + defer s.MustClose() + ptw := s.tb.MustGetPartition(time.Now().UnixMilli()) + defer s.tb.PutPartition(ptw) + pt := ptw.pt + + var ( + pwsSrc []*partWrapper + pwFinal *partWrapper + ) + + // single source part wrapper + pwsSrc = generatePartWrappers(1) + assertRefCount(pwsSrc, 1) + pwFinal = pt.mustMergeInmemoryPartsFinal(pwsSrc) + if pwFinal != pwsSrc[0] { + t.Fatalf("mustMergeInmemoryPartsFinal must return the original wrapper for a single source part") + } + assertRefCount(pwsSrc, 1) + assertRefCount([]*partWrapper{pwFinal}, 1) + + // many source part wrappers + pwsSrc = generatePartWrappers(100) + assertRefCount(pwsSrc, 1) + pwFinal = pt.mustMergeInmemoryPartsFinal(pwsSrc) + assertRefCount(pwsSrc, 0) + assertRefCount([]*partWrapper{pwFinal}, 1) +} + func testCreatePartition(t *testing.T, timestamp int64, s *Storage) *partition { t.Helper() small := filepath.Join(t.Name(), smallDirname)