mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 09:46:57 +03:00
Compare commits
1 Commits
debug-grou
...
gh-9452
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d862c82f5 |
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -434,7 +435,21 @@ func (pt *partition) AddRows(rows []rawRow) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if pt.s.enableDailyPartitioning {
|
||||
// sort rows in order to properly group it by date later
|
||||
slices.SortFunc(rows, func(a, b rawRow) int {
|
||||
if a.Timestamp < b.Timestamp {
|
||||
return -1
|
||||
}
|
||||
if a.Timestamp > b.Timestamp {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
pt.rawRows.addRowsGroupByDate(pt, rows)
|
||||
return
|
||||
}
|
||||
pt.rawRows.addRows(pt, rows)
|
||||
}
|
||||
|
||||
@@ -468,6 +483,18 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rrss *rawRowsShards) addRowsGroupByDate(pt *partition, rows []rawRow) {
|
||||
shards := rrss.shards
|
||||
shardsLen := uint32(len(shards))
|
||||
for len(rows) > 0 {
|
||||
n := rrss.shardIdx.Add(1)
|
||||
idx := n % shardsLen
|
||||
tailRows, rowsToFlush := shards[idx].addRowsGroupByDate(rows)
|
||||
rrss.addRowsToFlush(pt, rowsToFlush)
|
||||
rows = tailRows
|
||||
}
|
||||
}
|
||||
|
||||
func (rrss *rawRowsShards) addRowsToFlush(pt *partition, rowsToFlush []rawRow) {
|
||||
if len(rowsToFlush) == 0 {
|
||||
return
|
||||
@@ -555,6 +582,69 @@ func (rrs *rawRowsShard) addRows(rows []rawRow) ([]rawRow, []rawRow) {
|
||||
return rows, rowsToFlush
|
||||
}
|
||||
|
||||
func (rrs *rawRowsShard) addRowsGroupByDate(rows []rawRow) ([]rawRow, []rawRow) {
|
||||
var rowsToFlush []rawRow
|
||||
|
||||
rrs.mu.Lock()
|
||||
defer rrs.mu.Unlock()
|
||||
if cap(rrs.rows) == 0 {
|
||||
rrs.rows = newRawRows()
|
||||
}
|
||||
if len(rrs.rows) == 0 {
|
||||
rrs.updateFlushDeadline()
|
||||
}
|
||||
|
||||
firstRowsDate := rows[0].Timestamp / msecPerDay
|
||||
|
||||
// check if already buffered data belong to the same date
|
||||
if len(rrs.rows) > 0 && rrs.rows[len(rrs.rows)-1].Timestamp/msecPerDay != firstRowsDate {
|
||||
// flush buffered rows
|
||||
rowsToFlush = rrs.rows
|
||||
rrs.rows = newRawRows()
|
||||
rrs.updateFlushDeadline()
|
||||
return rows, rowsToFlush
|
||||
}
|
||||
|
||||
// check if all rows belong to the same date
|
||||
// rows must be sorted in ascending order
|
||||
if firstRowsDate < rows[len(rows)-1].Timestamp/msecPerDay {
|
||||
nextDateTimestamp := (firstRowsDate + 1) * msecPerDay
|
||||
i := sort.Search(len(rows), func(i int) bool {
|
||||
return rows[i].Timestamp >= nextDateTimestamp
|
||||
})
|
||||
|
||||
if isDebug {
|
||||
switch i {
|
||||
case 0:
|
||||
logger.Panicf("BUG: index: %d cannot be equal to the first row timestamp: %d", i, rows[0].Timestamp)
|
||||
case len(rows) - 1:
|
||||
logger.Panicf("BUG: index: %d cannot be equal to the last row timestamp: %d", i, rows[len(rows)-1].Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows[:i])
|
||||
rrs.rows = rrs.rows[:len(rrs.rows)+n]
|
||||
rows = rows[n:]
|
||||
rowsToFlush = rrs.rows
|
||||
rrs.rows = newRawRows()
|
||||
rrs.updateFlushDeadline()
|
||||
return rows, rowsToFlush
|
||||
}
|
||||
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows)
|
||||
rrs.rows = rrs.rows[:len(rrs.rows)+n]
|
||||
rows = rows[n:]
|
||||
if len(rows) > 0 {
|
||||
rowsToFlush = rrs.rows
|
||||
rrs.rows = newRawRows()
|
||||
rrs.updateFlushDeadline()
|
||||
n = copy(rrs.rows[:cap(rrs.rows)], rows)
|
||||
rrs.rows = rrs.rows[:n]
|
||||
rows = rows[n:]
|
||||
}
|
||||
|
||||
return rows, rowsToFlush
|
||||
}
|
||||
|
||||
func newRawRows() []rawRow {
|
||||
return make([]rawRow, 0, maxRawRowsPerShard)
|
||||
}
|
||||
@@ -576,7 +666,6 @@ func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
|
||||
<-inmemoryPartsConcurrencyCh
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
pw := pt.createInmemoryPart(rowsChunk)
|
||||
if pw != nil {
|
||||
pwsLock.Lock()
|
||||
@@ -588,6 +677,16 @@ func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
|
||||
wg.Wait()
|
||||
putWaitGroup(wg)
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
// daily partitioned parts may belong to the differents parts
|
||||
// so it cannot be merged into single part
|
||||
pt.partsLock.Lock()
|
||||
pt.inmemoryParts = append(pt.inmemoryParts, pws...)
|
||||
pt.startInmemoryPartsMergerLocked()
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
// Merge pws into a single in-memory part.
|
||||
maxPartSize := getMaxInmemoryPartSize()
|
||||
for len(pws) > 1 {
|
||||
@@ -627,7 +726,12 @@ func (pt *partition) inmemoryPartsMerger() {
|
||||
return
|
||||
}
|
||||
maxOutBytes := pt.getMaxBigPartSize()
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
if !pt.tryMergeDailyPartitionedParts(partInmemory, inmemoryPartsConcurrencyCh, maxOutBytes) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
pt.partsLock.Lock()
|
||||
pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes)
|
||||
pt.partsLock.Unlock()
|
||||
@@ -660,7 +764,12 @@ func (pt *partition) smallPartsMerger() {
|
||||
return
|
||||
}
|
||||
maxOutBytes := pt.getMaxBigPartSize()
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
if !pt.tryMergeDailyPartitionedParts(partSmall, smallPartsConcurrencyCh, maxOutBytes) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
pt.partsLock.Lock()
|
||||
pws := getPartsToMerge(pt.smallParts, maxOutBytes)
|
||||
pt.partsLock.Unlock()
|
||||
@@ -693,7 +802,12 @@ func (pt *partition) bigPartsMerger() {
|
||||
return
|
||||
}
|
||||
maxOutBytes := pt.getMaxBigPartSize()
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
if !pt.tryMergeDailyPartitionedParts(partBig, bigPartsConcurrencyCh, maxOutBytes) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
pt.partsLock.Lock()
|
||||
pws := getPartsToMerge(pt.bigParts, maxOutBytes)
|
||||
pt.partsLock.Unlock()
|
||||
@@ -735,6 +849,9 @@ func putWaitGroup(wg *sync.WaitGroup) {
|
||||
var wgPool sync.Pool
|
||||
|
||||
func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
|
||||
if pt.s.enableDailyPartitioning {
|
||||
logger.Fatalf("BUG: mustMergeInmemoryParts cannot be called for daily partitioning scheme")
|
||||
}
|
||||
var pwsResult []*partWrapper
|
||||
var pwsResultLock sync.Mutex
|
||||
wg := getWaitGroup()
|
||||
@@ -1124,6 +1241,16 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
pwss := groupPartsByDate(pws)
|
||||
for _, pws := range pwss {
|
||||
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh, false); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
}
|
||||
if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh, false); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
@@ -1142,13 +1269,13 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
|
||||
}
|
||||
|
||||
for i := range rrss.shards {
|
||||
dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal)
|
||||
dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal, pt.s.enableDailyPartitioning)
|
||||
}
|
||||
|
||||
pt.flushRowssToInmemoryParts(dst)
|
||||
}
|
||||
|
||||
func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow {
|
||||
func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool, useDailyPartitioning bool) [][]rawRow {
|
||||
flushDeadlineMs := rrs.flushDeadlineMs.Load()
|
||||
if !isFinal && currentTimeMs < flushDeadlineMs {
|
||||
// Fast path - nothing to flush
|
||||
@@ -1157,7 +1284,12 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int6
|
||||
|
||||
// Slow path - move rrs.rows to dst.
|
||||
rrs.mu.Lock()
|
||||
dst = appendRawRowss(dst, rrs.rows)
|
||||
if useDailyPartitioning {
|
||||
dst = appendRawRowssGroupByDate(dst, rrs.rows)
|
||||
} else {
|
||||
dst = appendRawRowss(dst, rrs.rows)
|
||||
}
|
||||
//dst = appendRawRowss(dst, rrs.rows)
|
||||
rrs.rows = rrs.rows[:0]
|
||||
rrs.mu.Unlock()
|
||||
|
||||
@@ -1189,6 +1321,41 @@ func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {
|
||||
return dst
|
||||
}
|
||||
|
||||
func appendRawRowssGroupByDate(dst [][]rawRow, src []rawRow) [][]rawRow {
|
||||
if len(src) == 0 {
|
||||
return dst
|
||||
}
|
||||
if len(dst) == 0 {
|
||||
dst = append(dst, newRawRows())
|
||||
}
|
||||
firstRowsDate := src[0].Timestamp / msecPerDay
|
||||
if firstRowsDate < src[len(src)-1].Timestamp/msecPerDay {
|
||||
logger.Panicf("BUG: src must belong to the same date, firstRowsDate: %d last row date: %d", firstRowsDate, src[len(src)-1].Timestamp/msecPerDay)
|
||||
}
|
||||
|
||||
prows := &dst[len(dst)-1]
|
||||
// check if buffered rows belong to the same date
|
||||
if len(*prows) > 0 {
|
||||
lastProwsDate := (*prows)[len(*prows)-1].Timestamp / msecPerDay
|
||||
if lastProwsDate != firstRowsDate {
|
||||
dst = append(dst, newRawRows())
|
||||
}
|
||||
prows = &dst[len(dst)-1]
|
||||
}
|
||||
|
||||
n := copy((*prows)[len(*prows):cap(*prows)], src)
|
||||
*prows = (*prows)[:len(*prows)+n]
|
||||
src = src[n:]
|
||||
for len(src) > 0 {
|
||||
rows := newRawRows()
|
||||
n := copy(rows[:cap(rows)], src)
|
||||
rows = rows[:len(rows)+n]
|
||||
src = src[n:]
|
||||
dst = append(dst, rows)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}, useSparseCache bool) error {
|
||||
pwsLen := len(pws)
|
||||
|
||||
@@ -1242,6 +1409,15 @@ func (pt *partition) ForceMergeAllParts(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if pt.s.enableDailyPartitioning {
|
||||
pwss := groupPartsByDate(pws)
|
||||
for _, pws := range pwss {
|
||||
if err := pt.mergePartsToFiles(pws, stopCh, bigPartsConcurrencyCh, true); err != nil {
|
||||
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// If len(pws) == 1, then the merge must run anyway.
|
||||
// This allows applying the configured retention, removing the deleted series
|
||||
// and performing de-duplication if needed.
|
||||
@@ -1496,11 +1672,17 @@ func getFlushToDiskDeadline(pws []*partWrapper) time.Time {
|
||||
type partType int
|
||||
|
||||
var (
|
||||
partInmemory = partType(0)
|
||||
partSmall = partType(1)
|
||||
partBig = partType(2)
|
||||
partInmemory = partType(0)
|
||||
partSmall = partType(1)
|
||||
partBig = partType(2)
|
||||
partTypesString = [3]string{0: "in-memory", 1: "small", 2: "big"}
|
||||
)
|
||||
|
||||
// String implements stringer interface
|
||||
func (p partType) String() string {
|
||||
return partTypesString[p]
|
||||
}
|
||||
|
||||
func (pt *partition) getDstPartType(pws []*partWrapper, isFinal bool) partType {
|
||||
dstPartSize := getPartsSize(pws)
|
||||
if dstPartSize > pt.getMaxSmallPartSize() {
|
||||
@@ -2108,3 +2290,98 @@ func mustReadPartNamesFromDir(srcDir string) []string {
|
||||
func isSpecialDir(name string) bool {
|
||||
return name == "tmp" || name == "txn" || name == snapshotsDirname
|
||||
}
|
||||
|
||||
func groupPartsByDate(src []*partWrapper) [][]*partWrapper {
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
// allocate new slice to cover 30 days + 1 day reserved for migration
|
||||
dst := make([][]*partWrapper, 0, 31)
|
||||
|
||||
// reserve 0 index as a destination for parts that covers more than 1 day
|
||||
// it's usual case for migration from previous versions
|
||||
dst = dst[:1]
|
||||
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.MinTimestamp < src[j].p.ph.MinTimestamp })
|
||||
var prevDate int64
|
||||
var prevIdx int
|
||||
for _, ptw := range src {
|
||||
currDate := ptw.p.ph.MinTimestamp / msecPerDay
|
||||
maxDate := ptw.p.ph.MaxTimestamp / msecPerDay
|
||||
if maxDate-currDate >= 1 {
|
||||
dst[0] = append(dst[0], ptw)
|
||||
continue
|
||||
}
|
||||
if currDate == prevDate {
|
||||
dst[prevIdx] = append(dst[prevIdx], ptw)
|
||||
continue
|
||||
}
|
||||
prevIdx++
|
||||
prevDate = currDate
|
||||
dst = append(dst, []*partWrapper{ptw})
|
||||
}
|
||||
if len(dst[0]) == 0 {
|
||||
dst = dst[1:]
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// tryMergeDailyPartitionedParts handles merging when daily partitioning is enabled.
|
||||
// Returns false if no further merging should continue.
|
||||
func (pt *partition) tryMergeDailyPartitionedParts(partsType partType, concurrencyCh chan struct{}, maxOutBytes uint64) bool {
|
||||
pt.partsLock.Lock()
|
||||
|
||||
var pws []*partWrapper
|
||||
var partPath string
|
||||
switch partsType {
|
||||
case partBig:
|
||||
partPath = pt.bigPartsPath
|
||||
pws = pt.bigParts
|
||||
case partSmall:
|
||||
partPath = pt.smallPartsPath
|
||||
pws = pt.smallParts
|
||||
case partInmemory:
|
||||
partPath = "in-memory"
|
||||
pws = pt.inmemoryParts
|
||||
default:
|
||||
logger.Fatalf("BUG: unexpected partsType: %d", partsType)
|
||||
}
|
||||
pwss := groupPartsByDate(pws)
|
||||
var cnt int
|
||||
for _, pws := range pwss {
|
||||
pws = getPartsToMerge(pws, maxOutBytes)
|
||||
if len(pws) == 0 {
|
||||
continue
|
||||
}
|
||||
pwss[cnt] = pws
|
||||
cnt++
|
||||
}
|
||||
pwss = pwss[:cnt]
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
if len(pwss) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for idx, pws := range pwss {
|
||||
concurrencyCh <- struct{}{}
|
||||
err := pt.mergeParts(pws, pt.stopCh, false, false)
|
||||
<-concurrencyCh
|
||||
if err != nil {
|
||||
if errors.Is(err, errForciblyStopped) {
|
||||
if idx+1 < len(pwss) {
|
||||
// properly release unmerged parts at current iteration
|
||||
// so it could be flushed at MustClose call
|
||||
for _, pwsToRelease := range pwss[idx+1:] {
|
||||
pt.releasePartsToMerge(pwsToRelease)
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing to do - finish the merger.
|
||||
return false
|
||||
}
|
||||
// Unexpected error.
|
||||
logger.Panicf("FATAL: unrecoverable error when merging %s parts at %q: %s", partsType, partPath, err)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -323,3 +323,102 @@ func TestMustOpenPartition_smallAndBigPartsPathsAreNotTheSame(t *testing.T) {
|
||||
_ = mustOpenPartition(smallPartsPath, bigPartsPath, s)
|
||||
|
||||
}
|
||||
|
||||
func TestGroupPartsByDate(t *testing.T) {
|
||||
f := func(pws []*partWrapper, expected [][]*partWrapper) {
|
||||
t.Helper()
|
||||
got := groupPartsByDate(pws)
|
||||
|
||||
if len(expected) != len(got) {
|
||||
t.Fatalf("groupPartsByDate: unexpected number of day groups: expected=%d, got=%d", len(expected), len(got))
|
||||
}
|
||||
|
||||
cmpPws := func(idx int, a, b []*partWrapper) {
|
||||
t.Helper()
|
||||
if len(a) != len(b) {
|
||||
t.Fatalf("group[%d]: unexpected number of parts: expected=%d, got=%d", idx, len(a), len(b))
|
||||
}
|
||||
for i := range a {
|
||||
if a[i].p.ph.MinTimestamp != b[i].p.ph.MinTimestamp {
|
||||
t.Fatalf("group[%d] part[%d]: MinTimestamp mismatch: expected=%d, got=%d",
|
||||
idx, i, a[i].p.ph.MinTimestamp, b[i].p.ph.MinTimestamp)
|
||||
}
|
||||
if a[i].p.ph.MaxTimestamp != b[i].p.ph.MaxTimestamp {
|
||||
t.Fatalf("group[%d] part[%d]: MaxTimestamp mismatch: expected=%d, got=%d",
|
||||
idx, i, a[i].p.ph.MaxTimestamp, b[i].p.ph.MaxTimestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := range expected {
|
||||
cmpPws(i, expected[i], got[i])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// empty
|
||||
f(nil, nil)
|
||||
src := []*partWrapper{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay}}},
|
||||
}
|
||||
expected := [][]*partWrapper{
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay}}},
|
||||
},
|
||||
}
|
||||
f(src, expected)
|
||||
|
||||
// group by a single day
|
||||
src = []*partWrapper{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 512}}},
|
||||
}
|
||||
expected = [][]*partWrapper{
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: msecPerDay - 512}}},
|
||||
},
|
||||
}
|
||||
f(src, expected)
|
||||
|
||||
// group into 2 days
|
||||
src = []*partWrapper{
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
|
||||
}
|
||||
expected = [][]*partWrapper{
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
|
||||
},
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
|
||||
},
|
||||
}
|
||||
f(src, expected)
|
||||
|
||||
// group int 2 days + extra day for migration
|
||||
src = []*partWrapper{
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: 2 * msecPerDay}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: 5 * msecPerDay}}},
|
||||
}
|
||||
expected = [][]*partWrapper{
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 0, MaxTimestamp: 2 * msecPerDay}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: 5 * msecPerDay}}},
|
||||
},
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 1024}}},
|
||||
{p: &part{ph: partHeader{MinTimestamp: msecPerDay, MaxTimestamp: msecPerDay + 512}}},
|
||||
},
|
||||
{
|
||||
{p: &part{ph: partHeader{MinTimestamp: 2 * msecPerDay, MaxTimestamp: 2*msecPerDay + 1024}}},
|
||||
},
|
||||
}
|
||||
f(src, expected)
|
||||
|
||||
}
|
||||
|
||||
@@ -91,6 +91,8 @@ type Storage struct {
|
||||
|
||||
disablePerDayIndex bool
|
||||
|
||||
enableDailyPartitioning bool
|
||||
|
||||
tb *table
|
||||
|
||||
// Series cardinality limiters.
|
||||
@@ -204,12 +206,17 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
|
||||
if idbPrefillStart <= 0 {
|
||||
idbPrefillStart = time.Hour
|
||||
}
|
||||
var enableDailyPartitioning bool
|
||||
if retention < 30*24*time.Hour {
|
||||
enableDailyPartitioning = true
|
||||
}
|
||||
s := &Storage{
|
||||
path: path,
|
||||
cachePath: filepath.Join(path, cacheDirname),
|
||||
retentionMsecs: retention.Milliseconds(),
|
||||
stopCh: make(chan struct{}),
|
||||
idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000,
|
||||
path: path,
|
||||
cachePath: filepath.Join(path, cacheDirname),
|
||||
retentionMsecs: retention.Milliseconds(),
|
||||
stopCh: make(chan struct{}),
|
||||
idbPrefillStartSeconds: idbPrefillStart.Milliseconds() / 1000,
|
||||
enableDailyPartitioning: enableDailyPartitioning,
|
||||
}
|
||||
s.logNewSeries.Store(opts.LogNewSeries)
|
||||
|
||||
|
||||
@@ -1694,7 +1694,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
||||
|
||||
// Try opening the storage from snapshot.
|
||||
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
|
||||
s1 := MustOpenStorage(snapshotPath, OpenOptions{})
|
||||
s1 := MustOpenStorage(snapshotPath, OpenOptions{Retention: time.Hour * 24 * 10})
|
||||
|
||||
// Verify the snapshot contains rows
|
||||
var m1 Metrics
|
||||
@@ -1709,17 +1709,20 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
||||
if err := s1.ForceMergePartitions(""); err != nil {
|
||||
return fmt.Errorf("error when force merging partitions: %w", err)
|
||||
}
|
||||
ptws := s1.tb.GetPartitions(nil)
|
||||
for _, ptw := range ptws {
|
||||
pws := ptw.pt.GetParts(nil, true)
|
||||
numParts := len(pws)
|
||||
ptw.pt.PutParts(pws)
|
||||
if numParts > 1 {
|
||||
s1.tb.PutPartitions(ptws)
|
||||
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
|
||||
if !s.enableDailyPartitioning {
|
||||
ptws := s1.tb.GetPartitions(nil)
|
||||
for _, ptw := range ptws {
|
||||
pws := ptw.pt.GetParts(nil, true)
|
||||
numParts := len(pws)
|
||||
ptw.pt.PutParts(pws)
|
||||
if numParts > 1 {
|
||||
s1.tb.PutPartitions(ptws)
|
||||
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
|
||||
}
|
||||
}
|
||||
s1.tb.PutPartitions(ptws)
|
||||
|
||||
}
|
||||
s1.tb.PutPartitions(ptws)
|
||||
|
||||
s1.MustClose()
|
||||
|
||||
@@ -4679,3 +4682,37 @@ func assertIndexDBIsNotNil(t *testing.T, idb *indexDB) {
|
||||
t.Fatalf("unexpected idb: got nil, want non-nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageAddRowsDailyPartitioning(t *testing.T) {
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
path := "TestStorageAddRowsDailyPartitioning"
|
||||
opts := OpenOptions{
|
||||
Retention: 10 * 24 * time.Hour,
|
||||
MaxHourlySeries: 1e5,
|
||||
MaxDailySeries: 1e5,
|
||||
}
|
||||
s := MustOpenStorage(path, opts)
|
||||
if err := testStorageAddRows(rng, s); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
assertPartRowsBelongToTheSameDate := func(pws []*partWrapper) {
|
||||
t.Helper()
|
||||
for _, pw := range pws {
|
||||
minDate := pw.p.ph.MinTimestamp / msecPerDay
|
||||
maxDate := pw.p.ph.MaxTimestamp / msecPerDay
|
||||
if maxDate-minDate > 0 {
|
||||
t.Fatalf("part path: %s rows must be belong to the same date, minDate: %d maxDate: %d", pw.p.path, minDate, maxDate)
|
||||
}
|
||||
}
|
||||
}
|
||||
partitions := s.tb.GetPartitions(nil)
|
||||
for _, p := range partitions {
|
||||
p.pt.partsLock.Lock()
|
||||
assertPartRowsBelongToTheSameDate(p.pt.smallParts)
|
||||
assertPartRowsBelongToTheSameDate(p.pt.bigParts)
|
||||
p.pt.partsLock.Unlock()
|
||||
}
|
||||
s.tb.PutPartitions(partitions)
|
||||
s.MustClose()
|
||||
fs.MustRemoveDir(path)
|
||||
}
|
||||
|
||||
@@ -612,3 +612,49 @@ func benchmarkDirSize(path string) int64 {
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func BenchmarkStorageAddRowsDailyPartitioning(b *testing.B) {
|
||||
defer fs.MustRemoveDir(b.Name())
|
||||
|
||||
f := func(b *testing.B, numRows int) {
|
||||
b.Helper()
|
||||
|
||||
s := MustOpenStorage(b.Name(), OpenOptions{Retention: 10 * 24 * time.Hour})
|
||||
defer s.MustClose()
|
||||
|
||||
var globalOffset atomic.Uint64
|
||||
|
||||
globalOffset.Store(uint64(time.Now().UnixMilli() - msecPerDay*2))
|
||||
|
||||
b.SetBytes(int64(numRows))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
mrs := make([]MetricRow, numRows)
|
||||
var mn MetricName
|
||||
mn.MetricGroup = []byte("rps")
|
||||
mn.Tags = []Tag{
|
||||
{[]byte("job"), []byte("webservice")},
|
||||
{[]byte("instance"), []byte("1.2.3.4")},
|
||||
}
|
||||
for pb.Next() {
|
||||
offset := int(globalOffset.Add(uint64(numRows)))
|
||||
for i := 0; i < numRows; i++ {
|
||||
mr := &mrs[i]
|
||||
mr.MetricNameRaw = mn.marshalRaw(mr.MetricNameRaw[:0])
|
||||
mr.Timestamp = int64(offset + i)
|
||||
mr.Value = float64(offset + i)
|
||||
}
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
}
|
||||
})
|
||||
b.StopTimer()
|
||||
s.DebugFlush()
|
||||
}
|
||||
|
||||
for _, numRows := range []int{1, 10, 100, 1000, 10000} {
|
||||
b.Run(fmt.Sprintf("%d", numRows), func(b *testing.B) {
|
||||
f(b, numRows)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user