mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
lib/mergeset: do no cache inmemoryBlock with single item
indexDB mergeset has an edge for single item inmemoryBlock. It stores such items blocks in-memory at blockheader firstItem. So there is no need to perform on-disk read operations and storing copy of it at cache. It also may result in incorrect search results, inmemoryBlock with a single item has always zero index block offset. Which causes collisions if it's cached with the next index block at part. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10239 Probably fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10063
This commit is contained in:
@@ -336,6 +336,19 @@ func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefi
|
||||
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD
|
||||
}
|
||||
|
||||
func (ib *inmemoryBlock) unmarshalSingleItem(commonPrefix, firstItem []byte, mt marshalType) {
|
||||
if mt != marshalTypePlain {
|
||||
logger.Panicf("BUG: single item block must be always encoded with TypePlain")
|
||||
}
|
||||
ib.commonPrefix = append(ib.commonPrefix[:0], commonPrefix...)
|
||||
ib.items = slicesutil.SetLength(ib.items, 1)
|
||||
ib.data = bytesutil.ResizeNoCopyNoOverallocate(ib.data, len(firstItem))
|
||||
ib.data = append(ib.data[:0], firstItem...)
|
||||
item := &ib.items[0]
|
||||
item.Start = 0
|
||||
item.End = uint32(len(ib.data))
|
||||
}
|
||||
|
||||
// UnmarshalData decodes itemsCount items from sb and firstItem and stores them to ib.
|
||||
func (ib *inmemoryBlock) UnmarshalData(sb *storageBlock, firstItem, commonPrefix []byte, itemsCount uint32, mt marshalType) error {
|
||||
ib.Reset()
|
||||
|
||||
@@ -331,6 +331,14 @@ func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error)
|
||||
if ps.sparse {
|
||||
cache = ibSparseCache
|
||||
}
|
||||
if bh.itemsCount == 1 {
|
||||
// special case for single item
|
||||
// there is no need to cache it, since firstItem is always stored in-memory
|
||||
ib := ps.tmpIB
|
||||
ib.Reset()
|
||||
ib.unmarshalSingleItem(bh.commonPrefix, bh.firstItem, bh.marshalType)
|
||||
return ib, nil
|
||||
}
|
||||
ibKey := blockcache.Key{
|
||||
Part: ps.p,
|
||||
Offset: bh.itemsBlockOffset,
|
||||
|
||||
@@ -163,3 +163,80 @@ func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []stri
|
||||
p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
return p, items, nil
|
||||
}
|
||||
|
||||
func TestGetInmemoryBlockWithZeroSizeBlock(t *testing.T) {
|
||||
var ph partHeader
|
||||
var ip inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.MustInitFromInmemoryPart(&ip, -3)
|
||||
|
||||
buildBlock := func(items ...string) inmemoryBlock {
|
||||
var ib inmemoryBlock
|
||||
for _, item := range items {
|
||||
if !ib.Add([]byte(item)) {
|
||||
t.Fatalf("cannot add item %q", item)
|
||||
}
|
||||
}
|
||||
ib.SortItems()
|
||||
return ib
|
||||
}
|
||||
|
||||
writeBlock := func(ib inmemoryBlock) {
|
||||
if len(ib.items) == 0 {
|
||||
t.Fatalf("block must contain items")
|
||||
}
|
||||
data := ib.data
|
||||
ph.itemsCount += uint64(len(ib.items))
|
||||
if ph.blocksCount == 0 {
|
||||
ph.firstItem = append(ph.firstItem[:0], ib.items[0].Bytes(data)...)
|
||||
}
|
||||
ph.lastItem = append(ph.lastItem[:0], ib.items[len(ib.items)-1].Bytes(data)...)
|
||||
ph.blocksCount++
|
||||
bsw.WriteBlock(&ib)
|
||||
}
|
||||
|
||||
writeBlock(buildBlock("a"))
|
||||
writeBlock(buildBlock("b0", "b1"))
|
||||
bsw.MustClose()
|
||||
|
||||
p := newPart(&ph, "test", ip.size(), ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
defer p.MustClose()
|
||||
|
||||
var ps partSearch
|
||||
ps.Init(p, false)
|
||||
ps.mrs = p.mrs
|
||||
if err := ps.nextBHS(); err != nil {
|
||||
t.Fatalf("cannot read block headers: %s", err)
|
||||
}
|
||||
if len(ps.bhs) != 2 {
|
||||
t.Fatalf("unexpected block headers count: %d", len(ps.bhs))
|
||||
}
|
||||
if ps.bhs[0].itemsBlockOffset != ps.bhs[1].itemsBlockOffset {
|
||||
t.Fatalf("blocks must share itemsBlockOffset for the test: %d vs %d", ps.bhs[0].itemsBlockOffset, ps.bhs[1].itemsBlockOffset)
|
||||
}
|
||||
if ps.bhs[0].itemsBlockSize != 0 {
|
||||
t.Fatalf("the first block must have zero itemsBlockSize; got %d", ps.bhs[0].itemsBlockSize)
|
||||
}
|
||||
|
||||
// iterate 4 times in order to place block into the cache
|
||||
// storage caches it after 2 missed requests according to the flag blockcache.missesBeforeCaching=2
|
||||
for i := range 4 {
|
||||
if _, err := ps.getInmemoryBlock(&ps.bhs[1]); err != nil {
|
||||
t.Fatalf("cannot load non-empty block at iteration %d: %s", i, err)
|
||||
}
|
||||
}
|
||||
assertBlockAt := func(bhIdx int, wantFirstItemValue string) {
|
||||
block, err := ps.getInmemoryBlock(&ps.bhs[bhIdx])
|
||||
if err != nil {
|
||||
t.Fatalf("cannot block=%d : %s", bhIdx, err)
|
||||
}
|
||||
if len(block.items) != int(ps.bhs[bhIdx].itemsCount) {
|
||||
t.Fatalf("unexpected items count in block=%d; got %d; want %d", bhIdx, len(block.items), ps.bhs[bhIdx].itemsCount)
|
||||
}
|
||||
if got := string(block.items[0].Bytes(block.data)); got != wantFirstItemValue {
|
||||
t.Fatalf("unexpected item in block=%d; got %q; want %q", bhIdx, got, wantFirstItemValue)
|
||||
}
|
||||
}
|
||||
assertBlockAt(0, "a")
|
||||
assertBlockAt(1, "b0")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user