lib/{mergeset,storage,logstorage}: use chunked buffer instead of bytesutil.ByteBuffer as a storage for in-memory parts

This commit adds lib/chunkedbuffer.Buffer - an in-memory chunked buffer
optimized for random access via MustReadAt() function.
It is better than bytesutil.ByteBuffer for storing large volumes of data,
since it stores the data in chunks of a fixed size (4KiB at the moment)
instead of using a contiguous memory region. This has the following benefits over bytesutil.ByteBuffer:

- reduced memory fragmentation
- reduced memory re-allocations when new data is written to the buffer
- reduced memory usage, since the allocated chunks can be re-used
  by other Buffer instances after Buffer.Reset() call

Performance tests show up to 2x memory reduction for VictoriaLogs
when ingesting logs with big number of fields (aka wide events) under high speed.
This commit is contained in:
Aliaksandr Valialkin
2025-03-15 20:17:33 +01:00
parent 73aae546e0
commit 13ff9a8ebd
8 changed files with 488 additions and 169 deletions

View File

@@ -6,21 +6,9 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
var (
// Verify ByteBuffer implements the given interfaces.
_ io.Writer = &ByteBuffer{}
_ fs.MustReadAtCloser = &ByteBuffer{}
_ io.ReaderFrom = &ByteBuffer{}
// Verify reader implement filestream.ReadCloser interface.
_ filestream.ReadCloser = &reader{}
)
// ByteBuffer implements a simple byte buffer.
type ByteBuffer struct {
// B is the underlying byte slice.
@@ -43,19 +31,6 @@ func (bb *ByteBuffer) Write(p []byte) (int, error) {
return len(p), nil
}
// MustReadAt reads len(p) bytes starting from the given offset.
func (bb *ByteBuffer) MustReadAt(p []byte, offset int64) {
if offset < 0 {
logger.Panicf("BUG: cannot read at negative offset=%d", offset)
}
if offset > int64(len(bb.B)) {
logger.Panicf("BUG: too big offset=%d; cannot exceed len(bb.B)=%d", offset, len(bb.B))
}
if n := copy(p, bb.B[offset:]); n < len(p) {
logger.Panicf("BUG: EOF occurred after reading %d bytes out of %d bytes at offset %d", n, len(p), offset)
}
}
// ReadFrom reads all the data from r to bb until EOF.
func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
b := bb.B
@@ -83,11 +58,6 @@ func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
}
}
// MustClose closes bb for subsequent re-use.
func (bb *ByteBuffer) MustClose() {
// Do nothing, since certain code rely on bb reading after MustClose call.
}
// NewReader returns new reader for the given bb.
func (bb *ByteBuffer) NewReader() filestream.ReadCloser {
return &reader{

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"testing"
"time"
)
func TestByteBuffer(t *testing.T) {
@@ -217,88 +216,3 @@ func TestByteBufferRead(t *testing.T) {
t.Fatalf("unexpected rCopy.readOffset; got %d; want %d", rCopy.readOffset, n2)
}
}
func TestByteBufferMustReadAt(t *testing.T) {
testStr := "foobar baz"
var bb ByteBuffer
bb.B = append(bb.B, testStr...)
// Try reading at negative offset
p := make([]byte, 1)
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("expecting non-nil error when reading at negative offset")
}
}()
bb.MustReadAt(p, -1)
}()
// Try reading past the end of buffer
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("expecting non-nil error when reading past the end of buffer")
}
}()
bb.MustReadAt(p, int64(len(testStr))+1)
}()
// Try reading the first byte
n := len(p)
bb.MustReadAt(p, 0)
if string(p) != testStr[:n] {
t.Fatalf("unexpected value read: %q; want %q", p, testStr[:n])
}
// Try reading the last byte
bb.MustReadAt(p, int64(len(testStr))-1)
if string(p) != testStr[len(testStr)-1:] {
t.Fatalf("unexpected value read: %q; want %q", p, testStr[len(testStr)-1:])
}
// Try reading non-full p
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("expecting non-nil error when reading non-full p")
}
}()
p := make([]byte, 10)
bb.MustReadAt(p, int64(len(testStr))-3)
}()
// Try reading multiple bytes from the middle
p = make([]byte, 3)
bb.MustReadAt(p, 2)
if string(p) != testStr[2:2+len(p)] {
t.Fatalf("unexpected value read: %q; want %q", p, testStr[2:2+len(p)])
}
}
func TestByteBufferMustReadAtParallel(t *testing.T) {
ch := make(chan error, 10)
var bb ByteBuffer
bb.B = []byte("foo bar baz adsf adsf dsakjlkjlkj2l34324")
for i := 0; i < cap(ch); i++ {
go func() {
p := make([]byte, 3)
for i := 0; i < len(bb.B)-len(p); i++ {
bb.MustReadAt(p, int64(i))
}
ch <- nil
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}
}
}

View File

@@ -0,0 +1,188 @@
package chunkedbuffer
import (
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
)
const chunkSize = 4 * 1024
// Buffer provides in-memory buffer optimized for storing big bytes volumes.
//
// It stores the data in chunks of fixed size. This reduces memory fragmentation
// and memory waste comparing to the contiguous slices of bytes.
type Buffer struct {
chunks []*[chunkSize]byte
offset int
}
// Reset resets the cb, so it can be re-used for writing new data into it.
//
// Reset frees up memory chunks allocated for cb, so they could be re-used by other Buffer instances.
func (cb *Buffer) Reset() {
for _, chunk := range cb.chunks {
putChunk(chunk)
}
clear(cb.chunks)
cb.chunks = cb.chunks[:0]
cb.offset = 0
}
// SizeBytes returns the number of bytes occupied by the cb.
func (cb *Buffer) SizeBytes() int {
return len(cb.chunks) * chunkSize
}
// MustWrite writes p to cb.
func (cb *Buffer) MustWrite(p []byte) {
for len(p) > 0 {
if len(cb.chunks) == 0 || cb.offset == chunkSize {
chunk := getChunk()
cb.chunks = append(cb.chunks, chunk)
cb.offset = 0
}
dst := cb.chunks[len(cb.chunks)-1]
n := copy(dst[cb.offset:], p)
cb.offset += n
p = p[n:]
}
}
// Write implements io.Writer interface for cb.
func (cb *Buffer) Write(p []byte) (int, error) {
cb.MustWrite(p)
return len(p), nil
}
// MustReadAt reads len(p) bytes from cb at the offset off.
func (cb *Buffer) MustReadAt(p []byte, off int64) {
if len(p) == 0 {
return
}
chunkIdx := off / chunkSize
offset := off % chunkSize
chunk := cb.chunks[chunkIdx]
n := copy(p, chunk[offset:])
p = p[n:]
for len(p) > 0 {
chunkIdx++
chunk := cb.chunks[chunkIdx]
n := copy(p, chunk[:])
p = p[n:]
}
}
// WriteTo writes cb data to w.
func (cb *Buffer) WriteTo(w io.Writer) (int64, error) {
if len(cb.chunks) == 0 {
return 0, nil
}
nTotal := 0
// Write all the chunks except the last one, which may be incomplete.
for _, chunk := range cb.chunks[:len(cb.chunks)-1] {
n, err := w.Write(chunk[:])
nTotal += n
if err != nil {
return int64(nTotal), err
}
if n != chunkSize {
return int64(nTotal), fmt.Errorf("unexpected number of bytes written; got %d; want %d", n, chunkSize)
}
}
// Write the last chunk
chunk := cb.chunks[len(cb.chunks)-1]
n, err := w.Write(chunk[:cb.offset])
nTotal += n
if err != nil {
return int64(nTotal), err
}
if n != cb.offset {
return int64(nTotal), fmt.Errorf("unexpected number of bytes written; got %d; want %d", n, cb.offset)
}
return int64(nTotal), nil
}
// Path returns cb path.
func (cb *Buffer) Path() string {
return fmt.Sprintf("Buffer/%p/mem", cb)
}
// MustClose closes cb for subsequent re-use.
func (cb *Buffer) MustClose() {
// Do nothing, since certain code rely on cb reading after MustClose call.
}
// NewReader returns a reader for reading the data stored in cb.
func (cb *Buffer) NewReader() filestream.ReadCloser {
return &reader{
cb: cb,
}
}
type reader struct {
cb *Buffer
offset int
}
func (r *reader) Path() string {
return r.cb.Path()
}
func (r *reader) Read(p []byte) (int, error) {
chunkIdx := r.offset / chunkSize
offset := r.offset % chunkSize
if chunkIdx == len(r.cb.chunks) {
if offset != 0 {
panic(fmt.Errorf("BUG: offset must be 0; got %d", offset))
}
return 0, io.EOF
}
chunk := r.cb.chunks[chunkIdx]
if chunkIdx == len(r.cb.chunks)-1 {
// read the last chunk
n := copy(p, chunk[offset:r.cb.offset])
r.offset += n
if offset+n == r.cb.offset {
return n, io.EOF
}
return n, nil
}
n := copy(p, chunk[offset:])
r.offset += n
return n, nil
}
func (r *reader) MustClose() {
r.cb = nil
r.offset = 0
}
func getChunk() *[chunkSize]byte {
v := chunkPool.Get()
if v == nil {
var chunk [chunkSize]byte
return &chunk
}
return v.(*[chunkSize]byte)
}
func putChunk(chunk *[chunkSize]byte) {
chunkPool.Put(chunk)
}
var chunkPool sync.Pool

View File

@@ -0,0 +1,230 @@
package chunkedbuffer
import (
"bytes"
"fmt"
"io"
"testing"
)
func TestBuffer(t *testing.T) {
var cb Buffer
for i := 0; i < 10; i++ {
cb.Reset()
// Write data to chunked buffer
totalSize := 0
for j := 1; j < 1000; j++ {
b := make([]byte, j)
for k := range b {
b[k] = byte(k)
}
cb.MustWrite(b)
totalSize += len(b)
}
size := cb.SizeBytes()
if size < totalSize {
t.Fatalf("too small SizeBytes; got %d; want at least %d", size, totalSize)
}
// Read the data from chunked buffer via MustReadAt.
off := 0
for j := 1; j < 1000; j++ {
b := make([]byte, j)
cb.MustReadAt(b, int64(off))
off += j
// Verify the data is read correctly
for k := range b {
if b[k] != byte(k) {
t.Fatalf("unexpected byte read; got %d; want %d", b[k], k)
}
}
}
// Read the data from chunked buffer via NewReader.
r := cb.NewReader()
var bb bytes.Buffer
n, err := bb.ReadFrom(r)
if err != nil {
t.Fatalf("error when reading data from chunked buffer: %s", err)
}
if n != int64(off) {
t.Fatalf("unexpected amounts of data read from chunked buffer; got %d; want %d", n, off)
}
// Verify that reader path is equvalent to cb path
cbPath := cb.Path()
rPath := r.Path()
if cbPath != rPath {
t.Fatalf("unexpected path; got %q; want %q", rPath, cbPath)
}
r.MustClose()
// Verify the read data
off = 0
data := bb.Bytes()
for j := 1; j < 1000; j++ {
b := data[off : off+j]
off += j
// Verify the data is read correctly
for k := range b {
if b[k] != byte(k) {
t.Fatalf("unexpected byte read; got %d; want %d", b[k], k)
}
}
}
// Copy the data to another chunked buffer via WriteTo.
var cb2 Buffer
n, err = cb.WriteTo(&cb2)
if err != nil {
t.Fatalf("error when writing data to another chunked buffer: %s", err)
}
if n != int64(off) {
t.Fatalf("unexpected amounts of data written to chunked buffer; got %d; want %d", n, off)
}
// Verify that the data at cb is eqivalent to the data at cb2
var bb2 bytes.Buffer
r2 := cb2.NewReader()
n, err = bb2.ReadFrom(r2)
if err != nil {
t.Fatalf("cannot read data from chunked buffer: %s", err)
}
if n != int64(off) {
t.Fatalf("unexpected amounts of data read from the chunked buffer; got %d; want %d", n, off)
}
data2 := bb2.Bytes()
if !bytes.Equal(data, data2) {
t.Fatalf("unexpected data at the second chunked buffer\ngot\n%q\nwant\n%q", data2, data)
}
// Verify MustClose at chunked buffer
cb2.MustClose()
}
}
func TestBuffer_MustReadAtZeroData(_ *testing.T) {
var cb Buffer
cb.MustReadAt(nil, 0)
}
func TestBuffer_ReaderZeroData(t *testing.T) {
var cb Buffer
r := cb.NewReader()
data, err := io.ReadAll(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(data) != 0 {
t.Fatalf("unexpected data read with len=%d; data=%q", len(data), data)
}
}
func TestBuffer_ReaderSingleChunk(t *testing.T) {
var cb Buffer
fmt.Fprintf(&cb, "foo bar baz")
r := cb.NewReader()
b := make([]byte, 4)
if _, err := io.ReadFull(r, b); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(b) != "foo " {
t.Fatalf("unexpected data read; got %q; want %q", b, "foo ")
}
if _, err := io.ReadFull(r, b); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(b) != "bar " {
t.Fatalf("unexpected data read; got %q; want %q", b, "bar ")
}
data, err := io.ReadAll(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(data) != "baz" {
t.Fatalf("unexpected data read; got %q; want %q", b, "baz")
}
}
func TestBuffer_WriteToZeroData(t *testing.T) {
var cb Buffer
var bb bytes.Buffer
n, err := cb.WriteTo(&bb)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != 0 {
t.Fatalf("unexpected data written from cb with len=%d", n)
}
if bbLen := bb.Len(); bbLen != 0 {
t.Fatalf("unexpected data written to bb with len=%d; data=%q", bbLen, bb.Bytes())
}
}
func TestBuffer_WriteToBrokenWriter(t *testing.T) {
var cb Buffer
fmt.Fprintf(&cb, "foo bar baz")
w := &faultyWriter{}
n, err := cb.WriteTo(w)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if n != 0 {
t.Fatalf("expecting zero bytes written; got %d bytes", n)
}
w = &faultyWriter{
bytesToAccept: 5,
}
n, err = cb.WriteTo(w)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if n != int64(w.bytesToAccept) {
t.Fatalf("unexpected number of bytes written; got %d; want %d", n, w.bytesToAccept)
}
w = &faultyWriter{
returnInvalidBytesRead: true,
}
n, err = cb.WriteTo(w)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if n != 0 {
t.Fatalf("unexpected number of bytes written; got %d; want %d", n, 0)
}
}
type faultyWriter struct {
bytesToAccept int
returnInvalidBytesRead bool
bytesRead int
}
func (fw *faultyWriter) Write(p []byte) (int, error) {
if fw.returnInvalidBytesRead {
return 0, nil
}
if fw.bytesRead+len(p) > fw.bytesToAccept {
n := fw.bytesToAccept - fw.bytesRead
fw.bytesRead = fw.bytesToAccept
return n, fmt.Errorf("some error")
}
fw.bytesRead += len(p)
return len(p), nil
}

View File

@@ -23,6 +23,24 @@ func MustSyncPath(path string) {
mustSyncPath(path)
}
// MustWriteStreamSync writes src contents to the file at path and then calls fsync on the created file.
// The fsync guarantees that the written data survives hardware reset after successful call.
//
// This function may leave the file at the path in inconsistent state on app crash
// in the middle of the write.
// Use MustWriteAtomic if the file at the path must be either written in full
// or not written at all on app crash in the middle of the write.
func MustWriteStreamSync(path string, src io.WriterTo) {
f := filestream.MustCreate(path, false)
if _, err := src.WriteTo(f); err != nil {
f.MustClose()
// Do not call MustRemoveAll(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write data to %q: %s", path, err)
}
f.MustClose()
}
// MustWriteSync writes data to the file at path and then calls fsync on the created file.
//
// The fsync guarantees that the written data survives hardware reset after successful call.
@@ -39,7 +57,6 @@ func MustWriteSync(path string, data []byte) {
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(data), path, err)
}
// Sync and close the file.
f.MustClose()
}

View File

@@ -5,7 +5,7 @@ import (
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
@@ -14,21 +14,21 @@ type inmemoryPart struct {
// ph contains partHeader information for the given in-memory part.
ph partHeader
columnNames bytesutil.ByteBuffer
columnIdxs bytesutil.ByteBuffer
metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer
columnsHeaderIndex bytesutil.ByteBuffer
columnsHeader bytesutil.ByteBuffer
timestamps bytesutil.ByteBuffer
columnNames chunkedbuffer.Buffer
columnIdxs chunkedbuffer.Buffer
metaindex chunkedbuffer.Buffer
index chunkedbuffer.Buffer
columnsHeaderIndex chunkedbuffer.Buffer
columnsHeader chunkedbuffer.Buffer
timestamps chunkedbuffer.Buffer
messageBloomValues bloomValuesBuffer
fieldBloomValues bloomValuesBuffer
}
type bloomValuesBuffer struct {
bloom bytesutil.ByteBuffer
values bytesutil.ByteBuffer
bloom chunkedbuffer.Buffer
values chunkedbuffer.Buffer
}
func (b *bloomValuesBuffer) reset() {
@@ -119,22 +119,22 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(columnNamesPath, mp.columnNames.B)
fs.MustWriteSync(columnIdxsPath, mp.columnIdxs.B)
fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B)
fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B)
fs.MustWriteSync(timestampsPath, mp.timestamps.B)
fs.MustWriteStreamSync(columnNamesPath, &mp.columnNames)
fs.MustWriteStreamSync(columnIdxsPath, &mp.columnIdxs)
fs.MustWriteStreamSync(metaindexPath, &mp.metaindex)
fs.MustWriteStreamSync(indexPath, &mp.index)
fs.MustWriteStreamSync(columnsHeaderIndexPath, &mp.columnsHeaderIndex)
fs.MustWriteStreamSync(columnsHeaderPath, &mp.columnsHeader)
fs.MustWriteStreamSync(timestampsPath, &mp.timestamps)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B)
fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B)
fs.MustWriteStreamSync(messageBloomFilterPath, &mp.messageBloomValues.bloom)
fs.MustWriteStreamSync(messageValuesPath, &mp.messageBloomValues.values)
bloomPath := getBloomFilePath(path, 0)
fs.MustWriteSync(bloomPath, mp.fieldBloomValues.bloom.B)
fs.MustWriteStreamSync(bloomPath, &mp.fieldBloomValues.bloom)
valuesPath := getValuesFilePath(path, 0)
fs.MustWriteSync(valuesPath, mp.fieldBloomValues.values.B)
fs.MustWriteStreamSync(valuesPath, &mp.fieldBloomValues.values)
mp.ph.mustWriteMetadata(path)

View File

@@ -4,6 +4,7 @@ import (
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -14,10 +15,10 @@ type inmemoryPart struct {
bh blockHeader
mr metaindexRow
metaindexData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
itemsData bytesutil.ByteBuffer
lensData bytesutil.ByteBuffer
metaindexData chunkedbuffer.Buffer
indexData chunkedbuffer.Buffer
itemsData chunkedbuffer.Buffer
lensData chunkedbuffer.Buffer
}
func (mp *inmemoryPart) Reset() {
@@ -36,16 +37,16 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
metaindexPath := filepath.Join(path, metaindexFilename)
fs.MustWriteSync(metaindexPath, mp.metaindexData.B)
fs.MustWriteStreamSync(metaindexPath, &mp.metaindexData)
indexPath := filepath.Join(path, indexFilename)
fs.MustWriteSync(indexPath, mp.indexData.B)
fs.MustWriteStreamSync(indexPath, &mp.indexData)
itemsPath := filepath.Join(path, itemsFilename)
fs.MustWriteSync(itemsPath, mp.itemsData.B)
fs.MustWriteStreamSync(itemsPath, &mp.itemsData)
lensPath := filepath.Join(path, lensFilename)
fs.MustWriteSync(lensPath, mp.lensData.B)
fs.MustWriteStreamSync(lensPath, &mp.lensData)
mp.ph.MustWriteMetadata(path)
@@ -57,12 +58,7 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.Reset()
// Re-use mp.itemsData and mp.lensData in sb.
// This eliminates copying itemsData and lensData from sb to mp later.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2247
sb := &storageBlock{}
sb.itemsData = mp.itemsData.B[:0]
sb.lensData = mp.lensData.B[:0]
// Use the minimum possible compressLevel for compressing inmemoryPart,
// since it will be merged into file part soon.
@@ -75,13 +71,13 @@ func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
mp.itemsData.B = sb.itemsData
mp.itemsData.MustWrite(sb.itemsData)
mp.bh.itemsBlockOffset = 0
mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
mp.bh.itemsBlockSize = uint32(len(sb.itemsData))
mp.lensData.B = sb.lensData
mp.lensData.MustWrite(sb.lensData)
mp.bh.lensBlockOffset = 0
mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
mp.bh.lensBlockSize = uint32(len(sb.lensData))
bb := inmemoryPartBytePool.Get()
bb.B = mp.bh.Marshal(bb.B[:0])
@@ -89,14 +85,18 @@ func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
// marshaled blockHeader can exceed indexBlockSize when firstItem and commonPrefix sizes are close to indexBlockSize
logger.Panicf("BUG: too big index block: %d bytes; mustn't exceed %d bytes", len(bb.B), 3*maxIndexBlockSize)
}
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, compressLevel)
bbLen := len(bb.B)
bb.B = encoding.CompressZSTDLevel(bb.B, bb.B, compressLevel)
mp.indexData.MustWrite(bb.B[bbLen:])
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
mp.mr.blockHeadersCount = 1
mp.mr.indexBlockOffset = 0
mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
mp.mr.indexBlockSize = uint32(len(bb.B[bbLen:]))
bb.B = mp.mr.Marshal(bb.B[:0])
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, compressLevel)
bbLen = len(bb.B)
bb.B = encoding.CompressZSTDLevel(bb.B, bb.B, compressLevel)
mp.metaindexData.MustWrite(bb.B[bbLen:])
inmemoryPartBytePool.Put(bb)
}
@@ -111,5 +111,5 @@ func (mp *inmemoryPart) NewPart() *part {
}
func (mp *inmemoryPart) size() uint64 {
return uint64(cap(mp.metaindexData.B) + cap(mp.indexData.B) + cap(mp.itemsData.B) + cap(mp.lensData.B))
return uint64(mp.metaindexData.SizeBytes() + mp.indexData.SizeBytes() + mp.itemsData.SizeBytes() + mp.lensData.SizeBytes())
}

View File

@@ -3,8 +3,8 @@ package storage
import (
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -14,10 +14,10 @@ import (
type inmemoryPart struct {
ph partHeader
timestampsData bytesutil.ByteBuffer
valuesData bytesutil.ByteBuffer
indexData bytesutil.ByteBuffer
metaindexData bytesutil.ByteBuffer
timestampsData chunkedbuffer.Buffer
valuesData chunkedbuffer.Buffer
indexData chunkedbuffer.Buffer
metaindexData chunkedbuffer.Buffer
creationTime uint64
}
@@ -39,16 +39,16 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
timestampsPath := filepath.Join(path, timestampsFilename)
fs.MustWriteSync(timestampsPath, mp.timestampsData.B)
fs.MustWriteStreamSync(timestampsPath, &mp.timestampsData)
valuesPath := filepath.Join(path, valuesFilename)
fs.MustWriteSync(valuesPath, mp.valuesData.B)
fs.MustWriteStreamSync(valuesPath, &mp.valuesData)
indexPath := filepath.Join(path, indexFilename)
fs.MustWriteSync(indexPath, mp.indexData.B)
fs.MustWriteStreamSync(indexPath, &mp.indexData)
metaindexPath := filepath.Join(path, metaindexFilename)
fs.MustWriteSync(metaindexPath, mp.metaindexData.B)
fs.MustWriteStreamSync(metaindexPath, &mp.metaindexData)
mp.ph.MustWriteMetadata(path)
@@ -79,7 +79,7 @@ func (mp *inmemoryPart) NewPart() *part {
}
func (mp *inmemoryPart) size() uint64 {
return uint64(cap(mp.timestampsData.B) + cap(mp.valuesData.B) + cap(mp.indexData.B) + cap(mp.metaindexData.B))
return uint64(mp.timestampsData.SizeBytes() + mp.valuesData.SizeBytes() + mp.indexData.SizeBytes() + mp.metaindexData.SizeBytes())
}
func getInmemoryPart() *inmemoryPart {