mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 16:59:40 +03:00
Compare commits
24 Commits
query-debu
...
series-upd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a892b588ef | ||
|
|
07cb5be348 | ||
|
|
f2eaf4d4aa | ||
|
|
f5e663c00c | ||
|
|
07394fb847 | ||
|
|
9e2d77ea62 | ||
|
|
23e53bdb80 | ||
|
|
1ab593f807 | ||
|
|
fbfd7415da | ||
|
|
54a67d439c | ||
|
|
8bc42baf19 | ||
|
|
90f4581a0e | ||
|
|
e69580fe97 | ||
|
|
be5673c39d | ||
|
|
4d6875d81b | ||
|
|
e5e50504db | ||
|
|
cdf2eaf688 | ||
|
|
eca318cc65 | ||
|
|
5768ac0607 | ||
|
|
91b1700194 | ||
|
|
f71382332b | ||
|
|
215e9dd724 | ||
|
|
14dada5da0 | ||
|
|
7e9112da50 |
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/seriesupdate"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
@@ -250,6 +251,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/update/series":
|
||||
// todo logging and errors.
|
||||
if err := seriesupdate.InsertHandler(at, r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import":
|
||||
vmimportRequests.Inc()
|
||||
if err := vmimport.InsertHandler(at, r); err != nil {
|
||||
|
||||
81
app/vminsert/seriesupdate/request_handler.go
Normal file
81
app/vminsert/seriesupdate/request_handler.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package seriesupdate
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="update_series"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="update_series"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="update_series"}`)
|
||||
)
|
||||
|
||||
// Returns local unique generationID.
|
||||
func generateUniqueGenerationID() []byte {
|
||||
nextID := time.Now().UnixNano()
|
||||
return []byte(strconv.FormatInt(nextID, 10))
|
||||
}
|
||||
|
||||
// InsertHandler processes `/api/v1/update/series` request.
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
return stream.Parse(req.Body, isGzipped, func(rows []vmimport.Row) error {
|
||||
return insertRows(at, rows)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, rows []vmimport.Row) error {
|
||||
ctx := netstorage.GetInsertCtx()
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
rowsTotal := 0
|
||||
generationID := generateUniqueGenerationID()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
rowsTotal += len(r.Values)
|
||||
ctx.Labels = ctx.Labels[:0]
|
||||
for j := range r.Tags {
|
||||
tag := &r.Tags[j]
|
||||
ctx.AddLabelBytes(tag.Key, tag.Value)
|
||||
}
|
||||
|
||||
if len(ctx.Labels) == 0 {
|
||||
// Skip metric without labels.
|
||||
continue
|
||||
}
|
||||
// there is no need in relabeling and extra_label adding
|
||||
// since modified series already passed this phase during ingestion,
|
||||
// and it may lead to unexpected result for user.
|
||||
ctx.AddLabelBytes([]byte(`__generation_id`), generationID)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
|
||||
values := r.Values
|
||||
timestamps := r.Timestamps
|
||||
if len(timestamps) != len(values) {
|
||||
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
|
||||
}
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
for j, value := range values {
|
||||
timestamp := timestamps[j]
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
@@ -6,10 +6,12 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/bits"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -64,12 +66,19 @@ type Result struct {
|
||||
// Values are sorted by Timestamps.
|
||||
Values []float64
|
||||
Timestamps []int64
|
||||
|
||||
valuesBuf []float64
|
||||
timestampsBuf []int64
|
||||
|
||||
// statistic for series updates
|
||||
updateRows int
|
||||
}
|
||||
|
||||
func (r *Result) reset() {
|
||||
r.MetricName.Reset()
|
||||
r.Values = r.Values[:0]
|
||||
r.Timestamps = r.Timestamps[:0]
|
||||
r.updateRows = 0
|
||||
}
|
||||
|
||||
// Results holds results returned from ProcessSearchQuery.
|
||||
@@ -110,7 +119,8 @@ type timeseriesWork struct {
|
||||
f func(rs *Result, workerID uint) error
|
||||
err error
|
||||
|
||||
rowsProcessed int
|
||||
rowsProcessed int
|
||||
updateRowsProcessed int
|
||||
}
|
||||
|
||||
func (tsw *timeseriesWork) reset() {
|
||||
@@ -120,6 +130,7 @@ func (tsw *timeseriesWork) reset() {
|
||||
tsw.f = nil
|
||||
tsw.err = nil
|
||||
tsw.rowsProcessed = 0
|
||||
tsw.updateRowsProcessed = 0
|
||||
}
|
||||
|
||||
func getTimeseriesWork() *timeseriesWork {
|
||||
@@ -150,6 +161,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
||||
atomic.StoreUint32(tsw.mustStop, 1)
|
||||
return fmt.Errorf("error during time series unpacking: %w", err)
|
||||
}
|
||||
tsw.updateRowsProcessed = r.updateRows
|
||||
tsw.rowsProcessed = len(r.Timestamps)
|
||||
if len(r.Timestamps) > 0 {
|
||||
if err := tsw.f(r, workerID); err != nil {
|
||||
@@ -166,17 +178,23 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
|
||||
// Perform own work at first.
|
||||
rowsProcessed := 0
|
||||
seriesProcessed := 0
|
||||
updateGenerationsProcessed := 0
|
||||
updateRowsProcessed := 0
|
||||
ch := workChs[workerID]
|
||||
for tsw := range ch {
|
||||
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
||||
rowsProcessed += tsw.rowsProcessed
|
||||
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
|
||||
updateRowsProcessed += tsw.updateRowsProcessed
|
||||
seriesProcessed++
|
||||
}
|
||||
qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
||||
qt.Printf("own work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
|
||||
|
||||
// Then help others with the remaining work.
|
||||
rowsProcessed = 0
|
||||
seriesProcessed = 0
|
||||
updateGenerationsProcessed = 0
|
||||
updateRowsProcessed = 0
|
||||
for i := uint(1); i < uint(len(workChs)); i++ {
|
||||
idx := (i + workerID) % uint(len(workChs))
|
||||
ch := workChs[idx]
|
||||
@@ -194,10 +212,12 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
|
||||
}
|
||||
tsw.err = tsw.do(&tmpResult.rs, workerID)
|
||||
rowsProcessed += tsw.rowsProcessed
|
||||
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
|
||||
updateRowsProcessed += tsw.updateRowsProcessed
|
||||
seriesProcessed++
|
||||
}
|
||||
}
|
||||
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
|
||||
qt.Printf("others work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
|
||||
|
||||
putTmpResult(tmpResult)
|
||||
}
|
||||
@@ -383,8 +403,139 @@ var (
|
||||
)
|
||||
|
||||
type packedTimeseries struct {
|
||||
metricName string
|
||||
addrs []tmpBlockAddr
|
||||
samples int
|
||||
metricName string
|
||||
addrs []tmpBlockAddr
|
||||
updateAddrs sortedTimeseriesUpdateAddrs
|
||||
}
|
||||
|
||||
type sortedTimeseriesUpdateAddrs [][]tmpBlockAddr
|
||||
|
||||
// merges dst Result with update result
|
||||
// may allocate memory
|
||||
// dst and update must be sorted by Timestamps
|
||||
// keeps order by Timestamps
|
||||
func mergeResult(dst, update *Result) {
|
||||
// ignore timestamps with not enough points
|
||||
if len(dst.Timestamps) == 0 || len(update.Timestamps) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
firstDstTs := dst.Timestamps[0]
|
||||
lastDstTs := dst.Timestamps[len(dst.Timestamps)-1]
|
||||
firstUpdateTs := update.Timestamps[0]
|
||||
lastUpdateTs := update.Timestamps[len(update.Timestamps)-1]
|
||||
|
||||
// check lower bound
|
||||
// [5,6,7] [1,2,3]
|
||||
if lastUpdateTs <= firstDstTs {
|
||||
// fast path
|
||||
if lastUpdateTs == firstDstTs {
|
||||
dst.Timestamps = dst.Timestamps[1:]
|
||||
dst.Values = dst.Values[1:]
|
||||
}
|
||||
newLen := len(dst.Timestamps) + len(update.Timestamps)
|
||||
dst.timestampsBuf = reuseMayAllocateInt64(dst.timestampsBuf, newLen)
|
||||
dst.timestampsBuf = append(dst.timestampsBuf, update.Timestamps...)
|
||||
dst.timestampsBuf = append(dst.timestampsBuf, dst.Timestamps...)
|
||||
|
||||
dst.valuesBuf = reuseMayAllocateFloat64(dst.valuesBuf, newLen)
|
||||
dst.valuesBuf = append(dst.valuesBuf, update.Values...)
|
||||
dst.valuesBuf = append(dst.valuesBuf, dst.Values...)
|
||||
// swap buffers
|
||||
dst.timestampsBuf, dst.Timestamps = dst.Timestamps, dst.timestampsBuf
|
||||
dst.valuesBuf, dst.Values = dst.Values, dst.valuesBuf
|
||||
return
|
||||
}
|
||||
|
||||
// check upper bound
|
||||
// fast path, memory allocation possible
|
||||
// [1,2,3] [5,6,7]
|
||||
if firstUpdateTs >= lastDstTs {
|
||||
if firstUpdateTs == lastDstTs {
|
||||
dst.Timestamps = dst.Timestamps[:len(dst.Timestamps)-1]
|
||||
dst.Values = dst.Values[:len(dst.Values)-1]
|
||||
}
|
||||
dst.Timestamps = append(dst.Timestamps, update.Timestamps...)
|
||||
dst.Values = append(dst.Values, update.Values...)
|
||||
return
|
||||
}
|
||||
|
||||
// changes inside given range
|
||||
// [1,5,7] [2,3,6]
|
||||
firstPos := position(dst.Timestamps, firstUpdateTs)
|
||||
lastPos := position(dst.Timestamps, lastUpdateTs)
|
||||
// corner case last timestamp overlaps
|
||||
if lastPos != len(dst.Timestamps) && lastUpdateTs == dst.Timestamps[lastPos] {
|
||||
lastPos++
|
||||
}
|
||||
|
||||
headTs := dst.Timestamps[:firstPos]
|
||||
tailTs := dst.Timestamps[lastPos:]
|
||||
headVs := dst.Values[:firstPos]
|
||||
tailValues := dst.Values[lastPos:]
|
||||
|
||||
newLen := len(headTs) + len(update.Timestamps) + len(tailTs)
|
||||
dst.timestampsBuf = reuseMayAllocateInt64(dst.timestampsBuf, newLen)
|
||||
dst.timestampsBuf = append(dst.timestampsBuf, headTs...)
|
||||
dst.timestampsBuf = append(dst.timestampsBuf, update.Timestamps...)
|
||||
dst.timestampsBuf = append(dst.timestampsBuf, tailTs...)
|
||||
|
||||
dst.valuesBuf = reuseMayAllocateFloat64(dst.valuesBuf, newLen)
|
||||
dst.valuesBuf = append(dst.valuesBuf, headVs...)
|
||||
dst.valuesBuf = append(dst.valuesBuf, update.Values...)
|
||||
dst.valuesBuf = append(dst.valuesBuf, tailValues...)
|
||||
|
||||
// swap buffers
|
||||
dst.timestampsBuf, dst.Timestamps = dst.Timestamps, dst.timestampsBuf
|
||||
dst.valuesBuf, dst.Values = dst.Values, dst.valuesBuf
|
||||
}
|
||||
|
||||
func reuseMayAllocateInt64(src []int64, n int) []int64 {
|
||||
if n <= cap(src) {
|
||||
return src[:0]
|
||||
}
|
||||
nNew := roundToNearestPow2(n)
|
||||
bNew := make([]int64, nNew)
|
||||
return bNew[:0]
|
||||
}
|
||||
|
||||
func reuseMayAllocateFloat64(src []float64, n int) []float64 {
|
||||
if n <= cap(src) {
|
||||
return src[:0]
|
||||
}
|
||||
nNew := roundToNearestPow2(n)
|
||||
bNew := make([]float64, nNew)
|
||||
return bNew[:0]
|
||||
}
|
||||
|
||||
func roundToNearestPow2(n int) int {
|
||||
pow2 := uint8(bits.Len(uint(n - 1)))
|
||||
return 1 << pow2
|
||||
}
|
||||
|
||||
// position searches element position at given src with binary search
|
||||
// copied and modified from sort.SearchInts
|
||||
func position(src []int64, value int64) int {
|
||||
// fast path
|
||||
if len(src) < 1 || src[0] > value {
|
||||
return 0
|
||||
}
|
||||
// Define f(-1) == false and f(n) == true.
|
||||
// Invariant: f(i-1) == false, f(j) == true.
|
||||
i, j := int64(0), int64(len(src))
|
||||
for i < j {
|
||||
h := int64(uint(i+j) >> 1) // avoid overflow when computing h
|
||||
// i ≤ h < j
|
||||
// return a[i] >= x
|
||||
if !(src[h] >= value) {
|
||||
i = h + 1 // preserves f(i-1) == false
|
||||
} else {
|
||||
j = h // preserves f(j) == true
|
||||
}
|
||||
}
|
||||
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
|
||||
return int(i)
|
||||
}
|
||||
|
||||
type unpackWork struct {
|
||||
@@ -490,10 +641,165 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
|
||||
}
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
mergeSortBlocks(dst, sbh, dedupInterval)
|
||||
seriesUpdateSbss, err := pts.unpackUpdateAddrs(tbfs, tr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot unpack series updates: %w", err)
|
||||
}
|
||||
// apply updates
|
||||
if len(seriesUpdateSbss) > 0 {
|
||||
var updateDst Result
|
||||
updateRows := 0
|
||||
for _, seriesUpdateSbs := range seriesUpdateSbss {
|
||||
updateDst.reset()
|
||||
mergeSortBlocks(&updateDst, seriesUpdateSbs, dedupInterval)
|
||||
updateRows += len(updateDst.Timestamps)
|
||||
mergeResult(dst, &updateDst)
|
||||
putSortBlocksHeap(seriesUpdateSbs)
|
||||
}
|
||||
dst.updateRows = updateRows
|
||||
}
|
||||
putSortBlocksHeap(sbh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pts *packedTimeseries) unpackUpdateAddrs(tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlocksHeap, error) {
|
||||
var upwsLen int
|
||||
for i := range pts.updateAddrs {
|
||||
upwsLen += len(pts.updateAddrs[i])
|
||||
}
|
||||
if upwsLen == 0 {
|
||||
// Nothing to do
|
||||
return nil, nil
|
||||
}
|
||||
initUnpackWork := func(upw *unpackWork, addr tmpBlockAddr) {
|
||||
upw.tbfs = tbfs
|
||||
upw.addr = addr
|
||||
upw.tr = tr
|
||||
}
|
||||
dsts := make([]*sortBlocksHeap, 0, upwsLen)
|
||||
samples := pts.samples
|
||||
if gomaxprocs == 1 || upwsLen < 1000 {
|
||||
// It is faster to unpack all the data in the current goroutine.
|
||||
upw := getUnpackWork()
|
||||
|
||||
tmpBlock := getTmpStorageBlock()
|
||||
var err error
|
||||
for _, addrBlock := range pts.updateAddrs {
|
||||
dst := getSortBlocksHeap()
|
||||
for _, addr := range addrBlock {
|
||||
initUnpackWork(upw, addr)
|
||||
upw.unpack(tmpBlock)
|
||||
if upw.err != nil {
|
||||
return dsts, upw.err
|
||||
}
|
||||
samples += len(upw.sb.Timestamps)
|
||||
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
||||
putSortBlock(upw.sb)
|
||||
err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
||||
"or reduce time range for the query", *maxSamplesPerSeries)
|
||||
break
|
||||
}
|
||||
dst.sbs = append(dst.sbs, upw.sb)
|
||||
upw.reset()
|
||||
}
|
||||
dsts = append(dsts, dst)
|
||||
}
|
||||
putTmpStorageBlock(tmpBlock)
|
||||
putUnpackWork(upw)
|
||||
return dsts, err
|
||||
}
|
||||
// Slow path - spin up multiple local workers for parallel data unpacking.
|
||||
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
|
||||
// which reduces the scalability on systems with many CPU cores.
|
||||
|
||||
// Prepare the work for workers.
|
||||
upwss := make([][]*unpackWork, len(pts.updateAddrs))
|
||||
var workItems int
|
||||
for i, addrs := range pts.updateAddrs {
|
||||
upws := make([]*unpackWork, len(addrs))
|
||||
for j, addr := range addrs {
|
||||
workItems++
|
||||
upw := getUnpackWork()
|
||||
initUnpackWork(upw, addr)
|
||||
upws[j] = upw
|
||||
}
|
||||
|
||||
upwss[i] = upws
|
||||
}
|
||||
|
||||
// Prepare worker channels.
|
||||
workers := upwsLen
|
||||
if workers > gomaxprocs {
|
||||
workers = gomaxprocs
|
||||
}
|
||||
if workers < 1 {
|
||||
workers = 1
|
||||
}
|
||||
itemsPerWorker := (workItems + workers - 1) / workers
|
||||
workChs := make([]chan *unpackWork, workers)
|
||||
for i := range workChs {
|
||||
workChs[i] = make(chan *unpackWork, itemsPerWorker)
|
||||
}
|
||||
|
||||
// Spread work among worker channels.
|
||||
i := 0
|
||||
for _, upws := range upwss {
|
||||
for _, upw := range upws {
|
||||
idx := i % len(workChs)
|
||||
i++
|
||||
workChs[idx] <- upw
|
||||
}
|
||||
}
|
||||
// Mark worker channels as closed.
|
||||
for _, workCh := range workChs {
|
||||
close(workCh)
|
||||
}
|
||||
|
||||
// Start workers and wait until they finish the work.
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < workers; i++ {
|
||||
wg.Add(1)
|
||||
go func(workerID uint) {
|
||||
unpackWorker(workChs, workerID)
|
||||
wg.Done()
|
||||
}(uint(i))
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Collect results.
|
||||
var firstErr error
|
||||
for _, upws := range upwss {
|
||||
sbh := getSortBlocksHeap()
|
||||
for _, upw := range upws {
|
||||
if upw.err != nil && firstErr == nil {
|
||||
// Return the first error only, since other errors are likely the same.
|
||||
firstErr = upw.err
|
||||
}
|
||||
if firstErr == nil {
|
||||
sb := upw.sb
|
||||
samples += len(sb.Timestamps)
|
||||
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
|
||||
putSortBlock(sb)
|
||||
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
|
||||
"or reduce time range for the query", *maxSamplesPerSeries)
|
||||
} else {
|
||||
sbh.sbs = append(sbh.sbs, sb)
|
||||
}
|
||||
} else {
|
||||
putSortBlock(upw.sb)
|
||||
}
|
||||
putUnpackWork(upw)
|
||||
}
|
||||
dsts = append(dsts, sbh)
|
||||
}
|
||||
if firstErr != nil {
|
||||
for _, sbh := range dsts {
|
||||
putSortBlocksHeap(sbh)
|
||||
}
|
||||
}
|
||||
return dsts, nil
|
||||
}
|
||||
|
||||
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
|
||||
upwsLen := len(pts.addrs)
|
||||
if upwsLen == 0 {
|
||||
@@ -605,6 +911,7 @@ func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, t
|
||||
}
|
||||
putUnpackWork(upw)
|
||||
}
|
||||
pts.samples = samples
|
||||
|
||||
return dst, firstErr
|
||||
}
|
||||
@@ -1349,6 +1656,12 @@ type tmpBlocksFileWrapper struct {
|
||||
tbfs []*tmpBlocksFile
|
||||
ms []map[string]*blockAddrs
|
||||
orderedMetricNamess [][]string
|
||||
// mu protects series updates
|
||||
// it shouldn't cause cpu contention
|
||||
// usually series updates are small
|
||||
mu sync.Mutex
|
||||
// updates grouped by metric name and generation ID
|
||||
seriesUpdatesByMetricName map[string]map[int64][]tmpBlockAddr
|
||||
}
|
||||
|
||||
type blockAddrs struct {
|
||||
@@ -1373,9 +1686,10 @@ func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
|
||||
ms[i] = make(map[string]*blockAddrs)
|
||||
}
|
||||
return &tmpBlocksFileWrapper{
|
||||
tbfs: tbfs,
|
||||
ms: ms,
|
||||
orderedMetricNamess: make([][]string, n),
|
||||
tbfs: tbfs,
|
||||
ms: ms,
|
||||
seriesUpdatesByMetricName: make(map[string]map[int64][]tmpBlockAddr),
|
||||
orderedMetricNamess: make([][]string, n),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1390,6 +1704,38 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
|
||||
// Do not intern mb.MetricName, since it leads to increased memory usage.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692
|
||||
metricName := mb.MetricName
|
||||
var generationID int64
|
||||
|
||||
mn := storage.GetMetricName()
|
||||
defer storage.PutMetricName(mn)
|
||||
if err := mn.Unmarshal(metricName); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName: %q %w", metricName, err)
|
||||
}
|
||||
generationIDTag := mn.RemoveTagWithResult(`__generation_id`)
|
||||
if generationIDTag != nil {
|
||||
generationID, err = strconv.ParseInt(string(generationIDTag.Value), 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err)
|
||||
}
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
}
|
||||
// process data blocks with metric updates
|
||||
// TODO profile it, probably it's better to replace mutex with per worker lock-free struct
|
||||
if generationID > 0 {
|
||||
tbfw.mu.Lock()
|
||||
defer tbfw.mu.Unlock()
|
||||
ups := tbfw.seriesUpdatesByMetricName[string(metricName)]
|
||||
if ups == nil {
|
||||
// fast path
|
||||
tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{generationID: {addr}}
|
||||
return nil
|
||||
}
|
||||
// todo memory optimization for metricNames, use interning?
|
||||
addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][generationID]
|
||||
addrs = append(addrs, addr)
|
||||
tbfw.seriesUpdatesByMetricName[string(metricName)][generationID] = addrs
|
||||
return nil
|
||||
}
|
||||
m := tbfw.ms[workerID]
|
||||
addrs := m[string(metricName)]
|
||||
if addrs == nil {
|
||||
@@ -1463,6 +1809,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||
if err := mn.Unmarshal(mb.MetricName); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName: %w", err)
|
||||
}
|
||||
|
||||
if err := f(mn, &mb.Block, tr, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1589,7 +1936,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
|
||||
}
|
||||
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
|
||||
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d, unique_series_with_updates=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal, len(tbfw.seriesUpdatesByMetricName))
|
||||
|
||||
var rss Results
|
||||
rss.tr = tr
|
||||
@@ -1597,9 +1944,25 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
||||
rss.tbfs = tbfw.tbfs
|
||||
pts := make([]packedTimeseries, len(orderedMetricNames))
|
||||
for i, metricName := range orderedMetricNames {
|
||||
seriesUpdateGenerations := tbfw.seriesUpdatesByMetricName[metricName]
|
||||
var stua sortedTimeseriesUpdateAddrs
|
||||
if len(seriesUpdateGenerations) > 0 {
|
||||
stua = make(sortedTimeseriesUpdateAddrs, 0, len(seriesUpdateGenerations))
|
||||
orderedGenerationIDs := make([]int64, 0, len(seriesUpdateGenerations))
|
||||
for generationID := range seriesUpdateGenerations {
|
||||
orderedGenerationIDs = append(orderedGenerationIDs, generationID)
|
||||
}
|
||||
sort.Slice(orderedGenerationIDs, func(i, j int) bool {
|
||||
return orderedGenerationIDs[i] < orderedGenerationIDs[j]
|
||||
})
|
||||
for _, genID := range orderedGenerationIDs {
|
||||
stua = append(stua, seriesUpdateGenerations[genID])
|
||||
}
|
||||
}
|
||||
pts[i] = packedTimeseries{
|
||||
metricName: metricName,
|
||||
addrs: addrsByMetricName[metricName].addrs,
|
||||
metricName: metricName,
|
||||
addrs: addrsByMetricName[metricName].addrs,
|
||||
updateAddrs: stua,
|
||||
}
|
||||
}
|
||||
rss.packedTimeseries = pts
|
||||
|
||||
@@ -5,6 +5,9 @@ import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
func TestInitStopNodes(t *testing.T) {
|
||||
@@ -47,7 +50,6 @@ func TestMergeSortBlocks(t *testing.T) {
|
||||
t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps)
|
||||
}
|
||||
}
|
||||
|
||||
// Zero blocks
|
||||
f(nil, 1, &Result{})
|
||||
|
||||
@@ -221,3 +223,265 @@ func TestMergeSortBlocks(t *testing.T) {
|
||||
Values: []float64{7, 24, 26},
|
||||
})
|
||||
}
|
||||
|
||||
func TestMergeResult(t *testing.T) {
|
||||
f := func(name string, dst, update, expect *Result) {
|
||||
t.Helper()
|
||||
t.Run(name, func(t *testing.T) {
|
||||
mergeResult(dst, update)
|
||||
if !reflect.DeepEqual(dst.Values, expect.Values) || !reflect.DeepEqual(dst.Timestamps, expect.Timestamps) {
|
||||
t.Fatalf(" unexpected result \ngot: \n%v\nwant: \n%v", dst, expect)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
f("append and replace",
|
||||
&Result{Timestamps: []int64{1, 2}, Values: []float64{5.0, 6.0}},
|
||||
&Result{Timestamps: []int64{2, 3}, Values: []float64{10.0, 30.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 10.0, 30.0}})
|
||||
f("extend and replace overlap",
|
||||
&Result{Timestamps: []int64{2, 3}, Values: []float64{10.0, 30.0}},
|
||||
&Result{Timestamps: []int64{1, 2}, Values: []float64{5.0, 6.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 30.0}})
|
||||
|
||||
f("extend and replace",
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
|
||||
&Result{Timestamps: []int64{0, 1, 2}, Values: []float64{10.0, 15.0, 30.0}},
|
||||
&Result{Timestamps: []int64{0, 1, 2, 3}, Values: []float64{10.0, 15.0, 30.0, 7.0}})
|
||||
f("update single point",
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
|
||||
&Result{Timestamps: []int64{15}, Values: []float64{35.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3, 15}, Values: []float64{5.0, 6.0, 7.0, 35.0}})
|
||||
f("append",
|
||||
&Result{Timestamps: []int64{6, 7, 8}, Values: []float64{10.0, 15.0, 30.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3, 6, 7, 8}, Values: []float64{5, 6, 7, 10, 15, 30}})
|
||||
|
||||
f("extend",
|
||||
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
|
||||
&Result{Timestamps: []int64{6, 7, 8}, Values: []float64{10.0, 15.0, 30.0}},
|
||||
&Result{Timestamps: []int64{1, 2, 3, 6, 7, 8}, Values: []float64{5, 6, 7, 10, 15, 30}})
|
||||
f("fast path",
|
||||
&Result{},
|
||||
&Result{Timestamps: []int64{1, 2, 3}},
|
||||
&Result{})
|
||||
f("merge at the middle",
|
||||
&Result{Timestamps: []int64{1, 2, 5, 6, 10, 15}, Values: []float64{.1, .2, .3, .4, .5, .6}},
|
||||
&Result{Timestamps: []int64{2, 6, 9, 10}, Values: []float64{1.1, 1.2, 1.3, 1.4}},
|
||||
&Result{Timestamps: []int64{1, 2, 6, 9, 10, 15}, Values: []float64{.1, 1.1, 1.2, 1.3, 1.4, 0.6}})
|
||||
|
||||
f("merge and re-allocate",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 50, 60, 90},
|
||||
Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{20, 30, 35, 45, 50, 55, 60},
|
||||
Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90},
|
||||
Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6},
|
||||
})
|
||||
f("update at the end",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{50, 70, 100},
|
||||
Values: []float64{0.0, 5.7, 5.1},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 70, 100},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 0.0, 5.7, 5.1},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPackedTimeseries_Unpack(t *testing.T) {
|
||||
createBlock := func(ts []int64, vs []int64) *storage.Block {
|
||||
tsid := &storage.TSID{
|
||||
MetricID: 234211,
|
||||
}
|
||||
scale := int16(0)
|
||||
precisionBits := uint8(8)
|
||||
var b storage.Block
|
||||
b.Init(tsid, ts, vs, scale, precisionBits)
|
||||
_, _, _ = b.MarshalData(0, 0)
|
||||
return &b
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: 0,
|
||||
MaxTimestamp: 1<<63 - 1,
|
||||
}
|
||||
var mn storage.MetricName
|
||||
mn.MetricGroup = []byte("foobar")
|
||||
metricName := string(mn.Marshal(nil))
|
||||
type blockData struct {
|
||||
timestamps []int64
|
||||
values []int64
|
||||
}
|
||||
isValuesEqual := func(got, want []float64) bool {
|
||||
equal := true
|
||||
if len(got) != len(want) {
|
||||
return false
|
||||
}
|
||||
for i, v := range want {
|
||||
gotV := got[i]
|
||||
if v == gotV {
|
||||
continue
|
||||
}
|
||||
if decimal.IsStaleNaN(v) && decimal.IsStaleNaN(gotV) {
|
||||
continue
|
||||
}
|
||||
equal = false
|
||||
}
|
||||
return equal
|
||||
}
|
||||
f := func(name string, dataBlocks []blockData, updateBlocks []blockData, wantResult *Result) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
pts := packedTimeseries{
|
||||
metricName: metricName,
|
||||
}
|
||||
var dst Result
|
||||
tbf := tmpBlocksFile{
|
||||
buf: make([]byte, 0, 20*1024*1024),
|
||||
}
|
||||
for _, dataBlock := range dataBlocks {
|
||||
bb := createBlock(dataBlock.timestamps, dataBlock.values)
|
||||
addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot write block: %s", err)
|
||||
}
|
||||
pts.addrs = append(pts.addrs, addr)
|
||||
}
|
||||
var updateAddrs []tmpBlockAddr
|
||||
for _, updateBlock := range updateBlocks {
|
||||
bb := createBlock(updateBlock.timestamps, updateBlock.values)
|
||||
addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot write update block: %s", err)
|
||||
}
|
||||
updateAddrs = append(updateAddrs, addr)
|
||||
}
|
||||
if len(updateAddrs) > 0 {
|
||||
pts.updateAddrs = append(pts.updateAddrs, updateAddrs)
|
||||
}
|
||||
|
||||
if err := pts.Unpack(&dst, []*tmpBlocksFile{&tbf}, tr); err != nil {
|
||||
t.Fatalf("unexpected error at series unpack: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(wantResult, &dst) && !isValuesEqual(wantResult.Values, dst.Values) {
|
||||
t.Fatalf("unexpected result for unpack \nwant: \n%v\ngot: \n%v\n", wantResult, &dst)
|
||||
}
|
||||
})
|
||||
}
|
||||
f("2 blocks without updates",
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{10, 15, 30},
|
||||
values: []int64{1, 2, 3},
|
||||
},
|
||||
{
|
||||
timestamps: []int64{35, 40, 45},
|
||||
values: []int64{4, 5, 6},
|
||||
},
|
||||
},
|
||||
nil,
|
||||
&Result{
|
||||
MetricName: mn,
|
||||
Values: []float64{1, 2, 3, 4, 5, 6},
|
||||
Timestamps: []int64{10, 15, 30, 35, 40, 45},
|
||||
})
|
||||
f("2 blocks at the border of time range",
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{10, 15, 30},
|
||||
values: []int64{1, 2, 3},
|
||||
},
|
||||
{
|
||||
timestamps: []int64{35, 40, 45},
|
||||
values: []int64{4, 5, 6},
|
||||
},
|
||||
},
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{10},
|
||||
values: []int64{16},
|
||||
},
|
||||
},
|
||||
&Result{
|
||||
MetricName: mn,
|
||||
Values: []float64{16, 2, 3, 4, 5, 6},
|
||||
Timestamps: []int64{10, 15, 30, 35, 40, 45},
|
||||
})
|
||||
f("2 blocks with update",
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{10, 15, 30},
|
||||
values: []int64{1, 2, 3},
|
||||
},
|
||||
{
|
||||
timestamps: []int64{35, 40, 45},
|
||||
values: []int64{4, 5, 6},
|
||||
},
|
||||
},
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{15, 30},
|
||||
values: []int64{11, 12},
|
||||
},
|
||||
},
|
||||
&Result{
|
||||
MetricName: mn,
|
||||
Values: []float64{1, 11, 12, 4, 5, 6},
|
||||
Timestamps: []int64{10, 15, 30, 35, 40, 45},
|
||||
})
|
||||
f("2 blocks with 2 update blocks",
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{10, 15, 30},
|
||||
values: []int64{1, 2, 3},
|
||||
},
|
||||
{
|
||||
timestamps: []int64{35, 40, 65},
|
||||
values: []int64{4, 5, 6},
|
||||
},
|
||||
},
|
||||
[]blockData{
|
||||
{
|
||||
timestamps: []int64{15, 30},
|
||||
values: []int64{11, 12},
|
||||
},
|
||||
{
|
||||
timestamps: []int64{45, 55},
|
||||
values: []int64{21, 22},
|
||||
},
|
||||
},
|
||||
&Result{
|
||||
MetricName: mn,
|
||||
Values: []float64{1, 11, 12, 21, 22, 6},
|
||||
Timestamps: []int64{10, 15, 30, 45, 55, 65},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPosition(t *testing.T) {
|
||||
f := func(src []int64, value, wantPosition int64) {
|
||||
t.Helper()
|
||||
gotPos := position(src, value)
|
||||
if wantPosition != int64(gotPos) {
|
||||
t.Fatalf("incorrect position: \ngot:\n%d\nwant: \n%d", gotPos, wantPosition)
|
||||
}
|
||||
if gotPos == len(src) {
|
||||
_ = src[int64(gotPos)-1]
|
||||
} else {
|
||||
_ = src[int64(gotPos)]
|
||||
}
|
||||
}
|
||||
f([]int64{1, 2, 3, 4}, 5, 4)
|
||||
f([]int64{1, 2, 3, 4}, 0, 0)
|
||||
f([]int64{1, 2, 3, 4}, 1, 0)
|
||||
f([]int64{1, 2, 3, 4}, 4, 3)
|
||||
f([]int64{1, 2, 3, 4}, 3, 2)
|
||||
f([]int64{10, 20, 30, 40, 50, 60, 90}, 100, 7)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package netstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -105,3 +106,83 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkMergeResults(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
f := func(name string, dst, update, expect *Result) {
|
||||
if len(dst.Timestamps) != len(dst.Values) {
|
||||
b.Fatalf("bad input data, timestamps and values lens must match")
|
||||
}
|
||||
if len(update.Values) != len(update.Timestamps) {
|
||||
b.Fatalf("bad input data, update timestamp and values must match")
|
||||
}
|
||||
toMerge := Result{
|
||||
valuesBuf: make([]float64, 0, 256),
|
||||
timestampsBuf: make([]int64, 0, 256),
|
||||
}
|
||||
b.Run(name, func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
toMerge.reset()
|
||||
toMerge.Values = append(toMerge.Values, dst.Values...)
|
||||
toMerge.Timestamps = append(toMerge.Timestamps, dst.Timestamps...)
|
||||
mergeResult(&toMerge, update)
|
||||
if !reflect.DeepEqual(toMerge.Timestamps, expect.Timestamps) || !reflect.DeepEqual(toMerge.Values, expect.Values) {
|
||||
b.Fatalf("unexpected result, got \ntimestamps: \n%v\nvalues: \n%v\nwant: timestamps: \n%v\n values: \n%v", toMerge.Timestamps, toMerge.Values, expect.Timestamps, expect.Values)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
f("update at the start",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{0, 20, 40},
|
||||
Values: []float64{0.0, 5.2, 5.4},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{0, 20, 40, 50, 60, 90},
|
||||
Values: []float64{0.0, 5.2, 5.4, 2.5, 2.6, 2.9},
|
||||
})
|
||||
f("update at the end",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{50, 70, 100},
|
||||
Values: []float64{0.0, 5.7, 5.1},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 70, 100},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 0.0, 5.7, 5.1},
|
||||
})
|
||||
f("update at the middle",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
|
||||
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{30, 40, 50, 60},
|
||||
Values: []float64{5.3, 5.4, 5.5, 5.6},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
|
||||
Values: []float64{2.1, 2.2, 5.3, 5.4, 5.5, 5.6, 2.9},
|
||||
})
|
||||
f("merge and re-allocate",
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 50, 60, 90},
|
||||
Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{20, 30, 35, 45, 50, 55, 60},
|
||||
Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6},
|
||||
},
|
||||
&Result{
|
||||
Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90},
|
||||
Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6},
|
||||
})
|
||||
}
|
||||
|
||||
1
app/vmselect/netstorage/upds.json
Normal file
1
app/vmselect/netstorage/upds.json
Normal file
@@ -0,0 +1 @@
|
||||
{"metric":{"__name__":"promhttp_metric_handler_requests_total","job":"node_exporter-6","up":"true"},"values":[131],"timestamps":[1676050756785]}
|
||||
@@ -80,7 +80,6 @@ publish-via-docker:
|
||||
--build-arg root_image=$(ROOT_IMAGE) \
|
||||
--build-arg APP_NAME=$(APP_NAME) \
|
||||
--tag $(DOCKER_NAMESPACE)/$(APP_NAME):$(PKG_TAG)$(RACE) \
|
||||
--tag $(DOCKER_NAMESPACE)/$(APP_NAME):$(LATEST_TAG)$(RACE) \
|
||||
-o type=image \
|
||||
--provenance=false \
|
||||
-f app/$(APP_NAME)/multiarch/Dockerfile \
|
||||
|
||||
@@ -26,7 +26,17 @@ Metrics of the latest version of VictoriaMetrics cluster are available for viewi
|
||||
The sandbox cluster installation is running under the constant load generated by
|
||||
[prometheus-benchmark](https://github.com/VictoriaMetrics/prometheus-benchmark) and used for testing latest releases.
|
||||
|
||||
## tip
|
||||
## v1.93.x long-time support release (LTS)
|
||||
|
||||
## v1.93.1 long-time support release (LTS)
|
||||
|
||||
* BUGFIX: [storage](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): properly set next retention time for indexDB. Previously it may enter into endless retention loop. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4873) for details.
|
||||
* BUGFIX: do not allow starting VictoriaMetrics components with improperly set boolean command-line flags in the form `-boolFlagName value`, since this leads to silent incomplete flags' parsing. This form should be replaced with `-boolFlagName=value`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4845).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set labels from `-remoteWrite.label` command-line flag just before sending samples to the configured `-remoteWrite.url` according to [these docs](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics). Previously these labels were incorrectly set before [the relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) configured via `-remoteWrite.urlRelabelConfigs` and [the stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configured via `-remoteWrite.streamAggr.config`, so these labels could be lost or incorrectly transformed before sending the samples to remote storage. The fix allows using `-remoteWrite.label` for identifying `vmagent` instances in [cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247) and [these docs](https://docs.victoriametrics.com/stream-aggregation.html#cluster-mode) for more details.
|
||||
* BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html).
|
||||
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837).
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796).
|
||||
|
||||
* SECURITY: upgrade base docker image (Alpine) from 3.18.4 to 3.18.5. See [alpine 3.18.5 release notes](https://www.alpinelinux.org/posts/Alpine-3.15.11-3.16.8-3.17.6-3.18.5-released.html).
|
||||
|
||||
|
||||
@@ -721,6 +721,48 @@ By default, `vminsert` tries to route all the samples for a single time series t
|
||||
* when `vmstorage` nodes are temporarily unavailable (for instance, during their restart). Then new samples are re-routed to the remaining available `vmstorage` nodes;
|
||||
* when `vmstorage` node has no enough capacity for processing incoming data stream. Then `vminsert` re-routes new samples to other `vmstorage` nodes.
|
||||
|
||||
## Alter/Update series
|
||||
|
||||
VictoriaMetrics supports data modification and update with following limitations:
|
||||
- modified data cannot be changed with back-filling.
|
||||
- modified data must be sent to `vminsert` component.
|
||||
- only json-line format is supported.
|
||||
|
||||
How it works:
|
||||
* Export series for modification with `/api/v1/export/prometheus` from `vmselect`.
|
||||
* Modify values,timestamps with needed values. You can delete, add or modify timestamps and values.
|
||||
* Send modified series to the `vminsert`'s API `/prometheus/api/v1/update/series` with POST request.
|
||||
* `vminsert` adds unique monotonically increasing `__generation_id` label to each series during the update, so their samples could replace the original samples.
|
||||
* at query requests `vmselect` merges original series with series updates sequentially according to their `__generation_id`.
|
||||
|
||||
How vmselect merges updates:
|
||||
* example 1 - delete timestamps at time range
|
||||
original series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
|
||||
modified series timestamps: [20,50] values: [2,5]
|
||||
query result series timestamps: [10,20,50] values: [1,2,5]
|
||||
* example 2 - modify values
|
||||
origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
|
||||
modified series timestamps: [20,30,40] values: [22,33,44]
|
||||
query result series timestamps: [10,20,30,40,50] values: [1,22,33,44,5]
|
||||
* example 3 - modify timestamps
|
||||
origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
|
||||
modified series timestamps: [0,5,10,15,20,25,30] values: [0.1,0.5,1,1.5,2,2.5,30]
|
||||
modified series timestamps: [0,5,10,15,20,25,30,40,50] values: [0.1,0.5,1,1.5,2,2.5,30,4,5]
|
||||
|
||||
How to check which series were modified:
|
||||
* execute metrics export request with following params query params:
|
||||
* `reduce_mem_usage=true`
|
||||
* `extra_filter={__generation_id!=""}`
|
||||
Example request
|
||||
```bash
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type="datadog"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}'
|
||||
```
|
||||
example output:
|
||||
```json
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429","__generation_id":"1658153678907548000"},"values":[0,0,0,0,256,253,135,15],"timestamps":[1658153559703,1658153569703,1658153579703,1658153589703,1658153599703,1658153609703,1658153619703,1658153621703]}
|
||||
```
|
||||
There was single series modify request with `__generation_id="1658153678907548000"`
|
||||
|
||||
|
||||
## Backups
|
||||
|
||||
|
||||
138
docs/guides/modify-series.md
Normal file
138
docs/guides/modify-series.md
Normal file
@@ -0,0 +1,138 @@
|
||||
# Series modification
|
||||
|
||||
## Scenario
|
||||
|
||||
VictoriaMetrics doesn't support direct data modification, since it uses immutable data structures and such operations may significantly reduce system performance.
|
||||
|
||||
The new series update API should provide a workaround for this issue. API allows overriding existing Timeseries data points at runtime during select requests.
|
||||
|
||||
Following operations supported:
|
||||
- add data points.
|
||||
- remove data points.
|
||||
- modify data points.
|
||||
|
||||
|
||||
Note this is a low-level feature, data modification could be done with scripts, vmctl, or `VMUI` in future releases.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
### Setup env
|
||||
|
||||
It's expected, that you have configured VictoriaMetrics cluster, vminsert, and vmselect components reachable from your computer.
|
||||
|
||||
I'll work with the following data set, which was exported with a call to the export API:
|
||||
```text
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'start=1658164769' -d 'end=1658165291'
|
||||
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"native","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"graphite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"csvimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"promremotewrite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
```
|
||||
|
||||
For better usability, it could be exported to a file on disk and modified via a preferred text editor.
|
||||
|
||||
|
||||
### Modify data points
|
||||
|
||||
#### change values
|
||||
|
||||
Let's say, during ingestion some error happened and producer incorrectly ingest value `0` for timestamp 1658164969982 and `Prometheus` and `influx` types:
|
||||
```text
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
|
||||
```
|
||||
|
||||
we have to modify these values to correct `0` and send update request to the `vminsert` API:
|
||||
```text
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]}
|
||||
```
|
||||
|
||||
data points could be also updated at time range if actual timestamp is not known. For instance, [1658164969972,1658164969982,1658164969992] timestamp range overwrite values for given timestamps and drops any timestamps at a given time range.
|
||||
|
||||
Save 2 series above into the file `incorrect_value_modification.txt` and execute API request with the curl command:
|
||||
```text
|
||||
curl localhost:8480/insert/0/prometheus/api/v1/update/series -T incorrect_values_modification.txt
|
||||
```
|
||||
|
||||
Check series modification output:
|
||||
```text
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"prometheus|influx"}' -d 'start=1658164969' -d 'end=1658164989'
|
||||
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]}
|
||||
```
|
||||
|
||||
#### Add missing timestamps
|
||||
|
||||
Missing timestamps could be added in the same way, specify needed timestamps with needed values at correct array indexes.
|
||||
|
||||
### Delete data points at time range
|
||||
|
||||
For example data set we have following time range from `1658164969982` to `1658165261982`.
|
||||
Data points inside time range can be removed by skipping timestamps and time range, which must be removed.
|
||||
For example, if timestamps from `1658164999982` until `1658165099982` must be removed, skip all timestamps between it:
|
||||
```text
|
||||
# exclude variant
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,16581649999821658165099982,1658165109982]}
|
||||
|
||||
# include variant
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
```
|
||||
|
||||
saved on of variants into the file `delete_datapoints_range.txt` and execute following request to the API:
|
||||
```text
|
||||
curl localhost:8480/insert/0/prometheus/api/v1/update/series -T delete_datapoints_range.txt
|
||||
```
|
||||
|
||||
Check output:
|
||||
```text
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209'
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
|
||||
```
|
||||
|
||||
As you see, series with `opentsdb` type has less data points than `influx`, since data was deleted at time range.
|
||||
|
||||
### Observing changes
|
||||
|
||||
Changes could de check by export api request with special query params `reduce_mem_usage=true` and `extra_filters={__generation_id!=""}`.
|
||||
|
||||
Let's observe changes from previous steps:
|
||||
```text
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}'
|
||||
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
|
||||
```
|
||||
|
||||
|
||||
### Rollback update operations
|
||||
|
||||
Changes could be undone with metrics DELETE API, you have to specify correct `__generation_id`.
|
||||
For example, rollback timestamps delete:
|
||||
```text
|
||||
curl http://localhost:8481/delete/0/prometheus/api/v1/admin/tsdb/delete_series -g -d 'match={__generation_id="1658167040791371000"}'
|
||||
```
|
||||
|
||||
Check that changes were rolled back:
|
||||
```text
|
||||
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209'
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
|
||||
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
|
||||
```
|
||||
@@ -284,6 +284,28 @@ func (mn *MetricName) RemoveTag(tagKey string) {
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveTagWithResult removes a tag with the given tagKey and returns removed Tag
|
||||
func (mn *MetricName) RemoveTagWithResult(tagKey string) *Tag {
|
||||
if tagKey == "__name__" {
|
||||
mn.ResetMetricGroup()
|
||||
return nil
|
||||
}
|
||||
tags := mn.Tags
|
||||
mn.Tags = mn.Tags[:0]
|
||||
var foundTag *Tag
|
||||
for i := range tags {
|
||||
tag := &tags[i]
|
||||
if string(tag.Key) != tagKey {
|
||||
mn.AddTagBytes(tag.Key, tag.Value)
|
||||
continue
|
||||
}
|
||||
var t Tag
|
||||
t.copyFrom(tag)
|
||||
foundTag = &t
|
||||
}
|
||||
return foundTag
|
||||
}
|
||||
|
||||
// RemoveTagsIgnoring removes all the tags included in ignoringTags.
|
||||
func (mn *MetricName) RemoveTagsIgnoring(ignoringTags []string) {
|
||||
if len(ignoringTags) == 0 {
|
||||
|
||||
@@ -57,7 +57,6 @@ type MetricBlockRef struct {
|
||||
type MetricBlock struct {
|
||||
// MetricName is metric name for the given Block.
|
||||
MetricName []byte
|
||||
|
||||
// Block is a block for the given MetricName
|
||||
Block Block
|
||||
}
|
||||
|
||||
@@ -1051,6 +1051,7 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
|
||||
s.metricRowsRead.Add(ctx.mb.Block.RowsCount())
|
||||
|
||||
ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
|
||||
|
||||
if err := ctx.writeDataBufBytes(); err != nil {
|
||||
return fmt.Errorf("cannot send MetricBlock: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user