Compare commits

...

1 Commits

Author SHA1 Message Date
f41gh7
9d862c82f5 lib/storage: implement daily partitioning for retention < 30 d 2025-08-22 13:51:22 +02:00
5 changed files with 491 additions and 25 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"sync"
@@ -434,7 +435,21 @@ func (pt *partition) AddRows(rows []rawRow) {
}
}
}
if pt.s.enableDailyPartitioning {
// sort rows in order to properly group it by date later
slices.SortFunc(rows, func(a, b rawRow) int {
if a.Timestamp < b.Timestamp {
return -1
}
if a.Timestamp > b.Timestamp {
return 1
}
return 0
})
pt.rawRows.addRowsGroupByDate(pt, rows)
return
}
pt.rawRows.addRows(pt, rows)
}
@@ -468,6 +483,18 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
}
}
func (rrss *rawRowsShards) addRowsGroupByDate(pt *partition, rows []rawRow) {
shards := rrss.shards
shardsLen := uint32(len(shards))
for len(rows) > 0 {
n := rrss.shardIdx.Add(1)
idx := n % shardsLen
tailRows, rowsToFlush := shards[idx].addRowsGroupByDate(rows)
rrss.addRowsToFlush(pt, rowsToFlush)
rows = tailRows
}
}
func (rrss *rawRowsShards) addRowsToFlush(pt *partition, rowsToFlush []rawRow) {
if len(rowsToFlush) == 0 {
return
@@ -555,6 +582,69 @@ func (rrs *rawRowsShard) addRows(rows []rawRow) ([]rawRow, []rawRow) {
return rows, rowsToFlush
}
func (rrs *rawRowsShard) addRowsGroupByDate(rows []rawRow) ([]rawRow, []rawRow) {
var rowsToFlush []rawRow
rrs.mu.Lock()
defer rrs.mu.Unlock()
if cap(rrs.rows) == 0 {
rrs.rows = newRawRows()
}
if len(rrs.rows) == 0 {
rrs.updateFlushDeadline()
}
firstRowsDate := rows[0].Timestamp / msecPerDay
// check if already buffered data belong to the same date
if len(rrs.rows) > 0 && rrs.rows[len(rrs.rows)-1].Timestamp/msecPerDay != firstRowsDate {
// flush buffered rows
rowsToFlush = rrs.rows
rrs.rows = newRawRows()
rrs.updateFlushDeadline()
return rows, rowsToFlush
}
// check if all rows belong to the same date
// rows must be sorted in ascending order
if firstRowsDate < rows[len(rows)-1].Timestamp/msecPerDay {
nextDateTimestamp := (firstRowsDate + 1) * msecPerDay
i := sort.Search(len(rows), func(i int) bool {
return rows[i].Timestamp >= nextDateTimestamp
})
if isDebug {
switch i {
case 0:
logger.Panicf("BUG: index: %d cannot be equal to the first row timestamp: %d", i, rows[0].Timestamp)
case len(rows) - 1:
logger.Panicf("BUG: index: %d cannot be equal to the last row timestamp: %d", i, rows[len(rows)-1].Timestamp)
}
}
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows[:i])
rrs.rows = rrs.rows[:len(rrs.rows)+n]
rows = rows[n:]
rowsToFlush = rrs.rows
rrs.rows = newRawRows()
rrs.updateFlushDeadline()
return rows, rowsToFlush
}
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:len(rrs.rows)+n]
rows = rows[n:]
if len(rows) > 0 {
rowsToFlush = rrs.rows
rrs.rows = newRawRows()
rrs.updateFlushDeadline()
n = copy(rrs.rows[:cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:n]
rows = rows[n:]
}
return rows, rowsToFlush
}
func newRawRows() []rawRow {
return make([]rawRow, 0, maxRawRowsPerShard)
}
@@ -576,7 +666,6 @@ func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
pw := pt.createInmemoryPart(rowsChunk)
if pw != nil {
pwsLock.Lock()
@@ -588,6 +677,16 @@ func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
wg.Wait()
putWaitGroup(wg)
if pt.s.enableDailyPartitioning {
// daily partitioned parts may belong to the differents parts
// so it cannot be merged into single part
pt.partsLock.Lock()
pt.inmemoryParts = append(pt.inmemoryParts, pws...)
pt.startInmemoryPartsMergerLocked()
pt.partsLock.Unlock()
return
}
// Merge pws into a single in-memory part.
maxPartSize := getMaxInmemoryPartSize()
for len(pws) > 1 {
@@ -627,7 +726,12 @@ func (pt *partition) inmemoryPartsMerger() {
return
}
maxOutBytes := pt.getMaxBigPartSize()
if pt.s.enableDailyPartitioning {
if !pt.tryMergeDailyPartitionedParts(partInmemory, inmemoryPartsConcurrencyCh, maxOutBytes) {
return
}
continue
}
pt.partsLock.Lock()
pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes)
pt.partsLock.Unlock()
@@ -660,7 +764,12 @@ func (pt *partition) smallPartsMerger() {
return
}
maxOutBytes := pt.getMaxBigPartSize()
if pt.s.enableDailyPartitioning {
if !pt.tryMergeDailyPartitionedParts(partSmall, smallPartsConcurrencyCh, maxOutBytes) {
return
}
continue
}
pt.partsLock.Lock()
pws := getPartsToMerge(pt.smallParts, maxOutBytes)
pt.partsLock.Unlock()
@@ -693,7 +802,12 @@ func (pt *partition) bigPartsMerger() {
return
}
maxOutBytes := pt.getMaxBigPartSize()
if pt.s.enableDailyPartitioning {
if !pt.tryMergeDailyPartitionedParts(partBig, bigPartsConcurrencyCh, maxOutBytes) {
return
}
continue
}
pt.partsLock.Lock()
pws := getPartsToMerge(pt.bigParts, maxOutBytes)
pt.partsLock.Unlock()
@@ -735,6 +849,9 @@ func putWaitGroup(wg *sync.WaitGroup) {
var wgPool sync.Pool
func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
if pt.s.enableDailyPartitioning {
logger.Fatalf("BUG: mustMergeInmemoryParts cannot be called for daily partitioning scheme")
}
var pwsResult []*partWrapper
var pwsResultLock sync.Mutex
wg := getWaitGroup()
@@ -1124,6 +1241,16 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
}
pt.partsLock.Unlock()
if pt.s.enableDailyPartitioning {
pwss := groupPartsByDate(pws)
for _, pws := range pwss {
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh, false); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
}
return
}
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh, false); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
@@ -1142,13 +1269,13 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
}
for i := range rrss.shards {
dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal)
dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal, pt.s.enableDailyPartitioning)
}
pt.flushRowssToInmemoryParts(dst)
}
func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow {
func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool, useDailyPartitioning bool) [][]rawRow {
flushDeadlineMs := rrs.flushDeadlineMs.Load()
if !isFinal && currentTimeMs < flushDeadlineMs {
// Fast path - nothing to flush
@@ -1157,7 +1284,12 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int6
// Slow path - move rrs.rows to dst.
rrs.mu.Lock()
dst = appendRawRowss(dst, rrs.rows)
if useDailyPartitioning {
dst = appendRawRowssGroupByDate(dst, rrs.rows)
} else {
dst = appendRawRowss(dst, rrs.rows)
}
//dst = appendRawRowss(dst, rrs.rows)
rrs.rows = rrs.rows[:0]
rrs.mu.Unlock()
@@ -1189,6 +1321,41 @@ func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {
return dst
}
func appendRawRowssGroupByDate(dst [][]rawRow, src []rawRow) [][]rawRow {
if len(src) == 0 {
return dst
}
if len(dst) == 0 {
dst = append(dst, newRawRows())
}
firstRowsDate := src[0].Timestamp / msecPerDay
if firstRowsDate < src[len(src)-1].Timestamp/msecPerDay {
logger.Panicf("BUG: src must belong to the same date, firstRowsDate: %d last row date: %d", firstRowsDate, src[len(src)-1].Timestamp/msecPerDay)
}
prows := &dst[len(dst)-1]
// check if buffered rows belong to the same date
if len(*prows) > 0 {
lastProwsDate := (*prows)[len(*prows)-1].Timestamp / msecPerDay
if lastProwsDate != firstRowsDate {
dst = append(dst, newRawRows())
}
prows = &dst[len(dst)-1]
}
n := copy((*prows)[len(*prows):cap(*prows)], src)
*prows = (*prows)[:len(*prows)+n]
src = src[n:]
for len(src) > 0 {
rows := newRawRows()
n := copy(rows[:cap(rows)], src)
rows = rows[:len(rows)+n]
src = src[n:]
dst = append(dst, rows)
}
return dst
}
func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}, useSparseCache bool) error {
pwsLen := len(pws)
@@ -1242,6 +1409,15 @@ func (pt *partition) ForceMergeAllParts(stopCh <-chan struct{}) error {
return nil
}
if pt.s.enableDailyPartitioning {
pwss := groupPartsByDate(pws)
for _, pws := range pwss {
if err := pt.mergePartsToFiles(pws, stopCh, bigPartsConcurrencyCh, true); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
}
}
return nil
}
// If len(pws) == 1, then the merge must run anyway.
// This allows applying the configured retention, removing the deleted series
// and performing de-duplication if needed.
@@ -1496,11 +1672,17 @@ func getFlushToDiskDeadline(pws []*partWrapper) time.Time {
type partType int
var (
partInmemory = partType(0)
partSmall = partType(1)
partBig = partType(2)
partInmemory = partType(0)
partSmall = partType(1)
partBig = partType(2)
partTypesString = [3]string{0: "in-memory", 1: "small", 2: "big"}
)
// String implements stringer interface
func (p partType) String() string {
return partTypesString[p]
}
func (pt *partition) getDstPartType(pws []*partWrapper, isFinal bool) partType {
dstPartSize := getPartsSize(pws)
if dstPartSize > pt.getMaxSmallPartSize() {
@@ -2108,3 +2290,98 @@ func mustReadPartNamesFromDir(srcDir string) []string {
func isSpecialDir(name string) bool {
return name == "tmp" || name == "txn" || name == snapshotsDirname
}
func groupPartsByDate(src []*partWrapper) [][]*partWrapper {
if len(src) == 0 {
return nil
}
// allocate new slice to cover 30 days + 1 day reserved for migration
dst := make([][]*partWrapper, 0, 31)
// reserve 0 index as a destination for parts that covers more than 1 day
// it's usual case for migration from previous versions
dst = dst[:1]
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.MinTimestamp < src[j].p.ph.MinTimestamp })
var prevDate int64
var prevIdx int
for _, ptw := range src {
currDate := ptw.p.ph.MinTimestamp / msecPerDay
maxDate := ptw.p.ph.MaxTimestamp / msecPerDay
if maxDate-currDate >= 1 {
dst[0] = append(dst[0], ptw)
continue
}
if currDate == prevDate {
dst[prevIdx] = append(dst[prevIdx], ptw)
continue
}
prevIdx++
prevDate = currDate
dst = append(dst, []*partWrapper{ptw})
}
if len(dst[0]) == 0 {
dst = dst[1:]
}
return dst
}
// tryMergeDailyPartitionedParts handles merging when daily partitioning is enabled.
// Returns false if no further merging should continue.
func (pt *partition) tryMergeDailyPartitionedParts(partsType partType, concurrencyCh chan struct{}, maxOutBytes uint64) bool {
pt.partsLock.Lock()
var pws []*partWrapper
var partPath string
switch partsType {
case partBig:
partPath = pt.bigPartsPath
pws = pt.bigParts
case partSmall:
partPath = pt.smallPartsPath
pws = pt.smallParts
case partInmemory:
partPath = "in-memory"
pws = pt.inmemoryParts
default:
logger.Fatalf("BUG: unexpected partsType: %d", partsType)
}
pwss := groupPartsByDate(pws)
var cnt int
for _, pws := range pwss {
pws = getPartsToMerge(pws, maxOutBytes)
if len(pws) == 0 {
continue
}
pwss[cnt] = pws
cnt++
}
pwss = pwss[:cnt]
pt.partsLock.Unlock()
if len(pwss) == 0 {
return false
}
for idx, pws := range pwss {
concurrencyCh <- struct{}{}
err := pt.mergeParts(pws, pt.stopCh, false, false)
<-concurrencyCh
if err != nil {
if errors.Is(err, errForciblyStopped) {
if idx+1 < len(pwss) {
// properly release unmerged parts at current iteration
// so it could be flushed at MustClose call
for _, pwsToRelease := range pwss[idx+1:] {
pt.releasePartsToMerge(pwsToRelease)
}
}
// Nothing to do - finish the merger.
return false
}
// Unexpected error.
logger.Panicf("FATAL: unrecoverable error when merging %s parts at %q: %s", partsType, partPath, err)
}
}
return true
}

View File

@@ -323,3 +323,102 @@ func TestMustOpenPartition_smallAndBigPartsPathsAreNotTheSame(t *testing.T) {
_ = mustOpenPartition(smallPartsPath, bigPartsPath, s)
}
func TestGroupPartsByDate(t *testing.T) {
f := func(pws []*partWrapper, expected [][]*partWrapper) {
t.Helper()
got := groupPartsByDate(pws)
if len(expected) != len(got) {
t.Fatalf("groupPartsByDate: unexpected number of day groups: expected=%d, got=%d", len(expected), len(got))
}
cmpPws := func(idx int, a, b []*partWrapper) {
t.Helper()
if len(a) != len(b) {
t.Fatalf("group[%d]: unexpected number of parts: expected=%d, got=%d", idx, len(a), len(b))
}
for i := range a {
if a[i].p.ph.MinTimestamp != b[i].p.ph.MinTimestamp {
t.Fatalf("group[%d] part[%d]: MinTimestamp mismatch: expected=%d, got=%d",
idx, i, a[i].p.ph.MinTimestamp, b[i].p.ph.MinTimestamp)
}
if a[i].p.ph.MaxTimestamp != b[i].p.ph.MaxTimestamp {
t.Fatalf("group[%d] part[%d]: MaxTimestamp mismatch: expected=%d, got=%d",
idx, i, a[i].p.ph.MaxTimestamp, b[i].p.ph.MaxTimestamp)
}
}
}
for i := range expected {
cmpPws(i, expected[i], got[i])
}
}
// empty
f(nil, nil)
src := []*partWrapper{
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay}}},
}
expected := [][]*partWrapper{
{
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay}}},
},
}
f(src, expected)
// group by a single day
src = []*partWrapper{
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 1024}}},
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 512}}},
}
expected = [][]*partWrapper{
{
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 1024}}},
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 512}}},
},
}
f(src, expected)
// group into 2 days
src = []*partWrapper{
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
}
expected = [][]*partWrapper{
{
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
},
{
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
},
}
f(src, expected)
// group int 2 days + extra day for migration
src = []*partWrapper{
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: 2 * msecPerDay}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: 5 * msecPerDay}}},
}
expected = [][]*partWrapper{
{
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: 2 * msecPerDay}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: 5 * msecPerDay}}},
},
{
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
},
{
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
},
}
f(src, expected)
}

View File

@@ -91,6 +91,8 @@ type Storage struct {
disablePerDayIndex bool
enableDailyPartitioning bool
tb *table
// Series cardinality limiters.
@@ -204,12 +206,17 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
if idbPrefillStart <= 0 {
idbPrefillStart = time.Hour
}
var enableDailyPartitioning bool
if retention < 30*24*time.Hour {
enableDailyPartitioning = true
}
s := &Storage{
path: path,
cachePath: filepath.Join(path, cacheDirname),
retentionMsecs: retention.Milliseconds(),
stopCh: make(chan struct{}),
idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000,
path: path,
cachePath: filepath.Join(path, cacheDirname),
retentionMsecs: retention.Milliseconds(),
stopCh: make(chan struct{}),
idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000,
enableDailyPartitioning: enableDailyPartitioning,
}
s.logNewSeries.Store(opts.LogNewSeries)

View File

@@ -1694,7 +1694,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
// Try opening the storage from snapshot.
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
s1 := MustOpenStorage(snapshotPath, OpenOptions{})
s1 := MustOpenStorage(snapshotPath, OpenOptions{Retention: time.Hour * 24 * 10})
// Verify the snapshot contains rows
var m1 Metrics
@@ -1709,17 +1709,20 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
if err := s1.ForceMergePartitions(""); err != nil {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil, true)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts > 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
if !s.enableDailyPartitioning {
ptws := s1.tb.GetPartitions(nil)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil, true)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts > 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
}
}
s1.tb.PutPartitions(ptws)
}
s1.tb.PutPartitions(ptws)
s1.MustClose()
@@ -4679,3 +4682,37 @@ func assertIndexDBIsNotNil(t *testing.T, idb *indexDB) {
t.Fatalf("unexpected idb: got nil, want non-nil")
}
}
func TestStorageAddRowsDailyPartitioning(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageAddRowsDailyPartitioning"
opts := OpenOptions{
Retention: 10 * 24 * time.Hour,
MaxHourlySeries: 1e5,
MaxDailySeries: 1e5,
}
s := MustOpenStorage(path, opts)
if err := testStorageAddRows(rng, s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
assertPartRowsBelongToTheSameDate := func(pws []*partWrapper) {
t.Helper()
for _, pw := range pws {
minDate := pw.p.ph.MinTimestamp / msecPerDay
maxDate := pw.p.ph.MaxTimestamp / msecPerDay
if maxDate-minDate > 0 {
t.Fatalf("part path: %s rows must be belong to the same date, minDate: %d maxDate: %d", pw.p.path, minDate, maxDate)
}
}
}
partitions := s.tb.GetPartitions(nil)
for _, p := range partitions {
p.pt.partsLock.Lock()
assertPartRowsBelongToTheSameDate(p.pt.smallParts)
assertPartRowsBelongToTheSameDate(p.pt.bigParts)
p.pt.partsLock.Unlock()
}
s.tb.PutPartitions(partitions)
s.MustClose()
fs.MustRemoveDir(path)
}

View File

@@ -612,3 +612,49 @@ func benchmarkDirSize(path string) int64 {
}
return size
}
func BenchmarkStorageAddRowsDailyPartitioning(b *testing.B) {
defer fs.MustRemoveDir(b.Name())
f := func(b *testing.B, numRows int) {
b.Helper()
s := MustOpenStorage(b.Name(), OpenOptions{Retention: 10 * 24 * time.Hour})
defer s.MustClose()
var globalOffset atomic.Uint64
globalOffset.Store(uint64(time.Now().UnixMilli() - msecPerDay*2))
b.SetBytes(int64(numRows))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
mrs := make([]MetricRow, numRows)
var mn MetricName
mn.MetricGroup = []byte("rps")
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for pb.Next() {
offset := int(globalOffset.Add(uint64(numRows)))
for i := 0; i < numRows; i++ {
mr := &mrs[i]
mr.MetricNameRaw = mn.marshalRaw(mr.MetricNameRaw[:0])
mr.Timestamp = int64(offset + i)
mr.Value = float64(offset + i)
}
s.AddRows(mrs, defaultPrecisionBits)
}
})
b.StopTimer()
s.DebugFlush()
}
for _, numRows := range []int{1, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("%d", numRows), func(b *testing.B) {
f(b, numRows)
})
}
}