Compare commits

...

24 Commits

Author SHA1 Message Date
f41gh7
a892b588ef Merge remote-tracking branch 'origin/cluster' into series-update-api 2023-12-05 18:47:15 +03:00
f41gh7
07cb5be348 app/vmselect: fixes and issue with slice reuse 2023-12-05 18:46:02 +03:00
Aliaksandr Valialkin
f2eaf4d4aa deployment/docker: update Alpine from 3.18.3 to 3.18.4
See https://alpinelinux.org/posts/Alpine-3.18.4-released.html

(cherry picked from commit 5c28923c11)
2023-10-23 15:18:20 +02:00
f41gh7
f5e663c00c app/vmselect: adds traces for series update API 2023-10-03 14:57:03 +02:00
f41gh7
07394fb847 Merge remote-tracking branch 'origin/cluster' into series-update-api 2023-10-03 14:48:49 +02:00
Aliaksandr Valialkin
9e2d77ea62 app/vmselect/netstorage: prevent from deadlock at unpackUpdateAddrs() for time series with the updated data 2023-08-23 15:36:13 +02:00
f41gh7
23e53bdb80 app/vmselect: moves series update logic to vmselect
it should simplify migration and keep good performance for vmstorage component
2023-08-23 15:36:13 +02:00
f41gh7
1ab593f807 app/vminsert: fixes merge conflicts 2023-08-23 15:36:13 +02:00
f41gh7
fbfd7415da cluster: adds /api/v1/update/series API
It allows to modify exist series values.
User must write modified series into vminsert API
/insert/0/prometheus/api/v1/update/series

vminsert will generate id and add it to the series as __generation_id
label.

Modified series merged at vmselect side.
Only last series modify request at given time range will be applied.
Modification request could be exported with the following API request:
`curl localhost:8481/select/0/prometheus/api/v1/export -g -d
'reduce_mem_usage=true' -d 'match[]={__generation_id!=""}'`

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/844

adds guide

allow single datapoint modification

vmselectapi: prevent MetricBlockRef corruption

Modofying of MetricName byte slice may result into MetricBlockRef
corruption, since `ctx.mb.MetricName` is a pointer to
`MetricBlockRef.MetricName`.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

Revert "vmselectapi: prevent MetricBlockRef corruption"

This reverts commit cf36bfa1895885fcc7dc2673248ee56c78180ea0.

app/vmstorage/servers: properly copy MetricName into MetricBlock inside blockIterator.NextBlock

This should fix the issue at cf36bfa189

(cherry picked from commit 916f1ab86c)

app/vmselect: correctly update single datapoint at merge

app/vmselect: adds mutex for series update map
previously it was sync api, but function signature was changed for performance optimizations
2023-08-23 15:36:12 +02:00
Dmytro Kozlov
54a67d439c docs: cut 1.93.1-lts in changelog
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-08-23 14:14:15 +02:00
Nikolay
8bc42baf19 lib/storage: properly caclucate nextRotationTimestamp (#4874)
cause of typo unix millis was used instead of unix for current timestamp
calculation
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4873

(cherry picked from commit c5aac34b68)
2023-08-23 14:14:15 +02:00
Aliaksandr Valialkin
90f4581a0e docs/stream-aggregation.md: typo fix after 54f522ac25 2023-08-17 15:28:37 +02:00
Aliaksandr Valialkin
e69580fe97 docs/stream-aggregation.md: clarify the usage of -remoteWrite.label after the fix at a27c2f3773
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247
2023-08-17 15:19:52 +02:00
Aliaksandr Valialkin
be5673c39d app/vmagent/remotewrite: follow-up after a27c2f3773
- Fix Prometheus-compatible naming after applying the relabeling if -usePromCompatibleNaming command-line flag is set.
  This should prevent from possible Prometheus-incompatible metric names and label names generated by the relabeling.
- Do not return anything from relabelCtx.appendExtraLabels() function, since it cannot change the number of time series
  passed to it. Append labels for the passed time series in-place.
- Remove promrelabel.FinalizeLabels() call after adding extra labels to time series, since this call has been already
  made at relabelCtx.applyRelabeling(). It is user's responsibility if he passes labels with double underscore prefixes
  to -remoteWrite.label.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247
2023-08-17 14:47:28 +02:00
Alexander Marshalov
4d6875d81b vmagent: fixed premature release of the context (after #4247 / #4824) (#4849)
Follow-up after a27c2f3773

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247

Signed-off-by: Alexander Marshalov <_@marshalov.org>
2023-08-17 14:47:28 +02:00
Alexander Marshalov
e5e50504db fixed applying remoteWrite.label for pushed metrics (#4247) (#4824)
vmagent: properly add extra labels before sending data to remote storage

labels from `remoteWrite.label` are now added to sent metrics just before they
 are pushed to `remoteWrite.url` after all relabelings, including stream aggregation relabelings (#4247)

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247

Signed-off-by: Alexander Marshalov <_@marshalov.org>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
2023-08-17 14:47:28 +02:00
Aliaksandr Valialkin
cdf2eaf688 lib/envflag: do not allow unsupported form for boolean command-line flags in the form -boolFlag value
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4845
2023-08-17 14:15:48 +02:00
Aliaksandr Valialkin
eca318cc65 docs/CHANGELOG.md: mention that this is v1.93.x LTS release line 2023-08-17 13:57:41 +02:00
Aliaksandr Valialkin
5768ac0607 lib/promrelabel: stop emitting DEBUG log lines when parsing if expressions
These lines were accidentally left in the commit 62651570bb

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4635
2023-08-17 13:57:41 +02:00
Aliaksandr Valialkin
91b1700194 lib/promrelabel: properly replace : char with _ in metric names when -usePromCompatibleNaming command-line flag is set
This addresses https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071 comment from @johnseekins
2023-08-17 13:52:53 +02:00
Roman Khavronenko
f71382332b vmbackup: correctly check if specified -dst belongs to specified -storageDataPath (#4841)
See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2023-08-17 13:50:51 +02:00
Dmytro Kozlov
215e9dd724 app/vmctl: fix migration process if tenant have no data (#4799)
app/vmctl: don't interrupt migration process if tenant has no data

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: Alexander Marshalov <_@marshalov.org>
2023-08-17 13:49:07 +02:00
Aliaksandr Valialkin
14dada5da0 docs/CHANGELOG.md: document that v1.93.x is a new line of LTS releases 2023-08-12 15:30:23 -07:00
Aliaksandr Valialkin
7e9112da50 deployment/docker/Makefile: do not overwrite latest tag when pushing Docker images for LTS release
The `latest` tag is reserved for the latest release
2023-08-12 15:28:51 -07:00
13 changed files with 1025 additions and 15 deletions

View File

@@ -27,6 +27,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/seriesupdate"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@@ -250,6 +251,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/update/series":
// todo logging and errors.
if err := seriesupdate.InsertHandler(at, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(at, r); err != nil {

View File

@@ -0,0 +1,81 @@
package seriesupdate
import (
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="update_series"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="update_series"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="update_series"}`)
)
// Returns local unique generationID.
func generateUniqueGenerationID() []byte {
nextID := time.Now().UnixNano()
return []byte(strconv.FormatInt(nextID, 10))
}
// InsertHandler processes `/api/v1/update/series` request.
func InsertHandler(at *auth.Token, req *http.Request) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return stream.Parse(req.Body, isGzipped, func(rows []vmimport.Row) error {
return insertRows(at, rows)
})
}
func insertRows(at *auth.Token, rows []vmimport.Row) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
rowsTotal := 0
generationID := generateUniqueGenerationID()
for i := range rows {
r := &rows[i]
rowsTotal += len(r.Values)
ctx.Labels = ctx.Labels[:0]
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
// there is no need in relabeling and extra_label adding
// since modified series already passed this phase during ingestion,
// and it may lead to unexpected result for user.
ctx.AddLabelBytes([]byte(`__generation_id`), generationID)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
values := r.Values
timestamps := r.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
atLocal := ctx.GetLocalAuthToken(at)
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
for j, value := range values {
timestamp := timestamps[j]
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View File

@@ -6,10 +6,12 @@ import (
"flag"
"fmt"
"io"
"math/bits"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -64,12 +66,19 @@ type Result struct {
// Values are sorted by Timestamps.
Values []float64
Timestamps []int64
valuesBuf []float64
timestampsBuf []int64
// statistic for series updates
updateRows int
}
func (r *Result) reset() {
r.MetricName.Reset()
r.Values = r.Values[:0]
r.Timestamps = r.Timestamps[:0]
r.updateRows = 0
}
// Results holds results returned from ProcessSearchQuery.
@@ -110,7 +119,8 @@ type timeseriesWork struct {
f func(rs *Result, workerID uint) error
err error
rowsProcessed int
rowsProcessed int
updateRowsProcessed int
}
func (tsw *timeseriesWork) reset() {
@@ -120,6 +130,7 @@ func (tsw *timeseriesWork) reset() {
tsw.f = nil
tsw.err = nil
tsw.rowsProcessed = 0
tsw.updateRowsProcessed = 0
}
func getTimeseriesWork() *timeseriesWork {
@@ -150,6 +161,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
atomic.StoreUint32(tsw.mustStop, 1)
return fmt.Errorf("error during time series unpacking: %w", err)
}
tsw.updateRowsProcessed = r.updateRows
tsw.rowsProcessed = len(r.Timestamps)
if len(r.Timestamps) > 0 {
if err := tsw.f(r, workerID); err != nil {
@@ -166,17 +178,23 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
// Perform own work at first.
rowsProcessed := 0
seriesProcessed := 0
updateGenerationsProcessed := 0
updateRowsProcessed := 0
ch := workChs[workerID]
for tsw := range ch {
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
updateRowsProcessed += tsw.updateRowsProcessed
seriesProcessed++
}
qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
qt.Printf("own work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
// Then help others with the remaining work.
rowsProcessed = 0
seriesProcessed = 0
updateGenerationsProcessed = 0
updateRowsProcessed = 0
for i := uint(1); i < uint(len(workChs)); i++ {
idx := (i + workerID) % uint(len(workChs))
ch := workChs[idx]
@@ -194,10 +212,12 @@ func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, wo
}
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
updateGenerationsProcessed += len(tsw.pts.updateAddrs)
updateRowsProcessed += tsw.updateRowsProcessed
seriesProcessed++
}
}
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
qt.Printf("others work processed: series=%d, samples=%d, update_generations=%d, update_rows=%d", seriesProcessed, rowsProcessed, updateGenerationsProcessed, updateRowsProcessed)
putTmpResult(tmpResult)
}
@@ -383,8 +403,139 @@ var (
)
type packedTimeseries struct {
metricName string
addrs []tmpBlockAddr
samples int
metricName string
addrs []tmpBlockAddr
updateAddrs sortedTimeseriesUpdateAddrs
}
type sortedTimeseriesUpdateAddrs [][]tmpBlockAddr
// merges dst Result with update result
// may allocate memory
// dst and update must be sorted by Timestamps
// keeps order by Timestamps
func mergeResult(dst, update *Result) {
// ignore timestamps with not enough points
if len(dst.Timestamps) == 0 || len(update.Timestamps) == 0 {
return
}
firstDstTs := dst.Timestamps[0]
lastDstTs := dst.Timestamps[len(dst.Timestamps)-1]
firstUpdateTs := update.Timestamps[0]
lastUpdateTs := update.Timestamps[len(update.Timestamps)-1]
// check lower bound
// [5,6,7] [1,2,3]
if lastUpdateTs <= firstDstTs {
// fast path
if lastUpdateTs == firstDstTs {
dst.Timestamps = dst.Timestamps[1:]
dst.Values = dst.Values[1:]
}
newLen := len(dst.Timestamps) + len(update.Timestamps)
dst.timestampsBuf = reuseMayAllocateInt64(dst.timestampsBuf, newLen)
dst.timestampsBuf = append(dst.timestampsBuf, update.Timestamps...)
dst.timestampsBuf = append(dst.timestampsBuf, dst.Timestamps...)
dst.valuesBuf = reuseMayAllocateFloat64(dst.valuesBuf, newLen)
dst.valuesBuf = append(dst.valuesBuf, update.Values...)
dst.valuesBuf = append(dst.valuesBuf, dst.Values...)
// swap buffers
dst.timestampsBuf, dst.Timestamps = dst.Timestamps, dst.timestampsBuf
dst.valuesBuf, dst.Values = dst.Values, dst.valuesBuf
return
}
// check upper bound
// fast path, memory allocation possible
// [1,2,3] [5,6,7]
if firstUpdateTs >= lastDstTs {
if firstUpdateTs == lastDstTs {
dst.Timestamps = dst.Timestamps[:len(dst.Timestamps)-1]
dst.Values = dst.Values[:len(dst.Values)-1]
}
dst.Timestamps = append(dst.Timestamps, update.Timestamps...)
dst.Values = append(dst.Values, update.Values...)
return
}
// changes inside given range
// [1,5,7] [2,3,6]
firstPos := position(dst.Timestamps, firstUpdateTs)
lastPos := position(dst.Timestamps, lastUpdateTs)
// corner case last timestamp overlaps
if lastPos != len(dst.Timestamps) && lastUpdateTs == dst.Timestamps[lastPos] {
lastPos++
}
headTs := dst.Timestamps[:firstPos]
tailTs := dst.Timestamps[lastPos:]
headVs := dst.Values[:firstPos]
tailValues := dst.Values[lastPos:]
newLen := len(headTs) + len(update.Timestamps) + len(tailTs)
dst.timestampsBuf = reuseMayAllocateInt64(dst.timestampsBuf, newLen)
dst.timestampsBuf = append(dst.timestampsBuf, headTs...)
dst.timestampsBuf = append(dst.timestampsBuf, update.Timestamps...)
dst.timestampsBuf = append(dst.timestampsBuf, tailTs...)
dst.valuesBuf = reuseMayAllocateFloat64(dst.valuesBuf, newLen)
dst.valuesBuf = append(dst.valuesBuf, headVs...)
dst.valuesBuf = append(dst.valuesBuf, update.Values...)
dst.valuesBuf = append(dst.valuesBuf, tailValues...)
// swap buffers
dst.timestampsBuf, dst.Timestamps = dst.Timestamps, dst.timestampsBuf
dst.valuesBuf, dst.Values = dst.Values, dst.valuesBuf
}
func reuseMayAllocateInt64(src []int64, n int) []int64 {
if n <= cap(src) {
return src[:0]
}
nNew := roundToNearestPow2(n)
bNew := make([]int64, nNew)
return bNew[:0]
}
func reuseMayAllocateFloat64(src []float64, n int) []float64 {
if n <= cap(src) {
return src[:0]
}
nNew := roundToNearestPow2(n)
bNew := make([]float64, nNew)
return bNew[:0]
}
func roundToNearestPow2(n int) int {
pow2 := uint8(bits.Len(uint(n - 1)))
return 1 << pow2
}
// position searches element position at given src with binary search
// copied and modified from sort.SearchInts
func position(src []int64, value int64) int {
// fast path
if len(src) < 1 || src[0] > value {
return 0
}
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := int64(0), int64(len(src))
for i < j {
h := int64(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
// return a[i] >= x
if !(src[h] >= value) {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return int(i)
}
type unpackWork struct {
@@ -490,10 +641,165 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora
}
dedupInterval := storage.GetDedupInterval()
mergeSortBlocks(dst, sbh, dedupInterval)
seriesUpdateSbss, err := pts.unpackUpdateAddrs(tbfs, tr)
if err != nil {
return fmt.Errorf("cannot unpack series updates: %w", err)
}
// apply updates
if len(seriesUpdateSbss) > 0 {
var updateDst Result
updateRows := 0
for _, seriesUpdateSbs := range seriesUpdateSbss {
updateDst.reset()
mergeSortBlocks(&updateDst, seriesUpdateSbs, dedupInterval)
updateRows += len(updateDst.Timestamps)
mergeResult(dst, &updateDst)
putSortBlocksHeap(seriesUpdateSbs)
}
dst.updateRows = updateRows
}
putSortBlocksHeap(sbh)
return nil
}
func (pts *packedTimeseries) unpackUpdateAddrs(tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlocksHeap, error) {
var upwsLen int
for i := range pts.updateAddrs {
upwsLen += len(pts.updateAddrs[i])
}
if upwsLen == 0 {
// Nothing to do
return nil, nil
}
initUnpackWork := func(upw *unpackWork, addr tmpBlockAddr) {
upw.tbfs = tbfs
upw.addr = addr
upw.tr = tr
}
dsts := make([]*sortBlocksHeap, 0, upwsLen)
samples := pts.samples
if gomaxprocs == 1 || upwsLen < 1000 {
// It is faster to unpack all the data in the current goroutine.
upw := getUnpackWork()
tmpBlock := getTmpStorageBlock()
var err error
for _, addrBlock := range pts.updateAddrs {
dst := getSortBlocksHeap()
for _, addr := range addrBlock {
initUnpackWork(upw, addr)
upw.unpack(tmpBlock)
if upw.err != nil {
return dsts, upw.err
}
samples += len(upw.sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
putSortBlock(upw.sb)
err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
break
}
dst.sbs = append(dst.sbs, upw.sb)
upw.reset()
}
dsts = append(dsts, dst)
}
putTmpStorageBlock(tmpBlock)
putUnpackWork(upw)
return dsts, err
}
// Slow path - spin up multiple local workers for parallel data unpacking.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
// Prepare the work for workers.
upwss := make([][]*unpackWork, len(pts.updateAddrs))
var workItems int
for i, addrs := range pts.updateAddrs {
upws := make([]*unpackWork, len(addrs))
for j, addr := range addrs {
workItems++
upw := getUnpackWork()
initUnpackWork(upw, addr)
upws[j] = upw
}
upwss[i] = upws
}
// Prepare worker channels.
workers := upwsLen
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
itemsPerWorker := (workItems + workers - 1) / workers
workChs := make([]chan *unpackWork, workers)
for i := range workChs {
workChs[i] = make(chan *unpackWork, itemsPerWorker)
}
// Spread work among worker channels.
i := 0
for _, upws := range upwss {
for _, upw := range upws {
idx := i % len(workChs)
i++
workChs[idx] <- upw
}
}
// Mark worker channels as closed.
for _, workCh := range workChs {
close(workCh)
}
// Start workers and wait until they finish the work.
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID uint) {
unpackWorker(workChs, workerID)
wg.Done()
}(uint(i))
}
wg.Wait()
// Collect results.
var firstErr error
for _, upws := range upwss {
sbh := getSortBlocksHeap()
for _, upw := range upws {
if upw.err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = upw.err
}
if firstErr == nil {
sb := upw.sb
samples += len(sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
putSortBlock(sb)
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
} else {
sbh.sbs = append(sbh.sbs, sb)
}
} else {
putSortBlock(upw.sb)
}
putUnpackWork(upw)
}
dsts = append(dsts, sbh)
}
if firstErr != nil {
for _, sbh := range dsts {
putSortBlocksHeap(sbh)
}
}
return dsts, nil
}
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
upwsLen := len(pts.addrs)
if upwsLen == 0 {
@@ -605,6 +911,7 @@ func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, t
}
putUnpackWork(upw)
}
pts.samples = samples
return dst, firstErr
}
@@ -1349,6 +1656,12 @@ type tmpBlocksFileWrapper struct {
tbfs []*tmpBlocksFile
ms []map[string]*blockAddrs
orderedMetricNamess [][]string
// mu protects series updates
// it shouldn't cause cpu contention
// usually series updates are small
mu sync.Mutex
// updates grouped by metric name and generation ID
seriesUpdatesByMetricName map[string]map[int64][]tmpBlockAddr
}
type blockAddrs struct {
@@ -1373,9 +1686,10 @@ func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
ms[i] = make(map[string]*blockAddrs)
}
return &tmpBlocksFileWrapper{
tbfs: tbfs,
ms: ms,
orderedMetricNamess: make([][]string, n),
tbfs: tbfs,
ms: ms,
seriesUpdatesByMetricName: make(map[string]map[int64][]tmpBlockAddr),
orderedMetricNamess: make([][]string, n),
}
}
@@ -1390,6 +1704,38 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
// Do not intern mb.MetricName, since it leads to increased memory usage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692
metricName := mb.MetricName
var generationID int64
mn := storage.GetMetricName()
defer storage.PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %q %w", metricName, err)
}
generationIDTag := mn.RemoveTagWithResult(`__generation_id`)
if generationIDTag != nil {
generationID, err = strconv.ParseInt(string(generationIDTag.Value), 10, 64)
if err != nil {
return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err)
}
metricName = mn.Marshal(metricName[:0])
}
// process data blocks with metric updates
// TODO profile it, probably it's better to replace mutex with per worker lock-free struct
if generationID > 0 {
tbfw.mu.Lock()
defer tbfw.mu.Unlock()
ups := tbfw.seriesUpdatesByMetricName[string(metricName)]
if ups == nil {
// fast path
tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{generationID: {addr}}
return nil
}
// todo memory optimization for metricNames, use interning?
addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][generationID]
addrs = append(addrs, addr)
tbfw.seriesUpdatesByMetricName[string(metricName)][generationID] = addrs
return nil
}
m := tbfw.ms[workerID]
addrs := m[string(metricName)]
if addrs == nil {
@@ -1463,6 +1809,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err)
}
if err := f(mn, &mb.Block, tr, workerID); err != nil {
return err
}
@@ -1589,7 +1936,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
if err != nil {
return nil, false, fmt.Errorf("cannot finalize temporary blocks files: %w", err)
}
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal)
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d, unique_series_with_updates=%d", len(addrsByMetricName), blocksRead.GetTotal(), samples.GetTotal(), bytesTotal, len(tbfw.seriesUpdatesByMetricName))
var rss Results
rss.tr = tr
@@ -1597,9 +1944,25 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
rss.tbfs = tbfw.tbfs
pts := make([]packedTimeseries, len(orderedMetricNames))
for i, metricName := range orderedMetricNames {
seriesUpdateGenerations := tbfw.seriesUpdatesByMetricName[metricName]
var stua sortedTimeseriesUpdateAddrs
if len(seriesUpdateGenerations) > 0 {
stua = make(sortedTimeseriesUpdateAddrs, 0, len(seriesUpdateGenerations))
orderedGenerationIDs := make([]int64, 0, len(seriesUpdateGenerations))
for generationID := range seriesUpdateGenerations {
orderedGenerationIDs = append(orderedGenerationIDs, generationID)
}
sort.Slice(orderedGenerationIDs, func(i, j int) bool {
return orderedGenerationIDs[i] < orderedGenerationIDs[j]
})
for _, genID := range orderedGenerationIDs {
stua = append(stua, seriesUpdateGenerations[genID])
}
}
pts[i] = packedTimeseries{
metricName: metricName,
addrs: addrsByMetricName[metricName].addrs,
metricName: metricName,
addrs: addrsByMetricName[metricName].addrs,
updateAddrs: stua,
}
}
rss.packedTimeseries = pts

View File

@@ -5,6 +5,9 @@ import (
"reflect"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestInitStopNodes(t *testing.T) {
@@ -47,7 +50,6 @@ func TestMergeSortBlocks(t *testing.T) {
t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps)
}
}
// Zero blocks
f(nil, 1, &Result{})
@@ -221,3 +223,265 @@ func TestMergeSortBlocks(t *testing.T) {
Values: []float64{7, 24, 26},
})
}
func TestMergeResult(t *testing.T) {
f := func(name string, dst, update, expect *Result) {
t.Helper()
t.Run(name, func(t *testing.T) {
mergeResult(dst, update)
if !reflect.DeepEqual(dst.Values, expect.Values) || !reflect.DeepEqual(dst.Timestamps, expect.Timestamps) {
t.Fatalf(" unexpected result \ngot: \n%v\nwant: \n%v", dst, expect)
}
})
}
f("append and replace",
&Result{Timestamps: []int64{1, 2}, Values: []float64{5.0, 6.0}},
&Result{Timestamps: []int64{2, 3}, Values: []float64{10.0, 30.0}},
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 10.0, 30.0}})
f("extend and replace overlap",
&Result{Timestamps: []int64{2, 3}, Values: []float64{10.0, 30.0}},
&Result{Timestamps: []int64{1, 2}, Values: []float64{5.0, 6.0}},
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 30.0}})
f("extend and replace",
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
&Result{Timestamps: []int64{0, 1, 2}, Values: []float64{10.0, 15.0, 30.0}},
&Result{Timestamps: []int64{0, 1, 2, 3}, Values: []float64{10.0, 15.0, 30.0, 7.0}})
f("update single point",
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
&Result{Timestamps: []int64{15}, Values: []float64{35.0}},
&Result{Timestamps: []int64{1, 2, 3, 15}, Values: []float64{5.0, 6.0, 7.0, 35.0}})
f("append",
&Result{Timestamps: []int64{6, 7, 8}, Values: []float64{10.0, 15.0, 30.0}},
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
&Result{Timestamps: []int64{1, 2, 3, 6, 7, 8}, Values: []float64{5, 6, 7, 10, 15, 30}})
f("extend",
&Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}},
&Result{Timestamps: []int64{6, 7, 8}, Values: []float64{10.0, 15.0, 30.0}},
&Result{Timestamps: []int64{1, 2, 3, 6, 7, 8}, Values: []float64{5, 6, 7, 10, 15, 30}})
f("fast path",
&Result{},
&Result{Timestamps: []int64{1, 2, 3}},
&Result{})
f("merge at the middle",
&Result{Timestamps: []int64{1, 2, 5, 6, 10, 15}, Values: []float64{.1, .2, .3, .4, .5, .6}},
&Result{Timestamps: []int64{2, 6, 9, 10}, Values: []float64{1.1, 1.2, 1.3, 1.4}},
&Result{Timestamps: []int64{1, 2, 6, 9, 10, 15}, Values: []float64{.1, 1.1, 1.2, 1.3, 1.4, 0.6}})
f("merge and re-allocate",
&Result{
Timestamps: []int64{10, 20, 30, 50, 60, 90},
Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6},
},
&Result{
Timestamps: []int64{20, 30, 35, 45, 50, 55, 60},
Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6},
},
&Result{
Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90},
Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6},
})
f("update at the end",
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
},
&Result{
Timestamps: []int64{50, 70, 100},
Values: []float64{0.0, 5.7, 5.1},
},
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 70, 100},
Values: []float64{2.1, 2.2, 2.3, 2.4, 0.0, 5.7, 5.1},
})
}
func TestPackedTimeseries_Unpack(t *testing.T) {
createBlock := func(ts []int64, vs []int64) *storage.Block {
tsid := &storage.TSID{
MetricID: 234211,
}
scale := int16(0)
precisionBits := uint8(8)
var b storage.Block
b.Init(tsid, ts, vs, scale, precisionBits)
_, _, _ = b.MarshalData(0, 0)
return &b
}
tr := storage.TimeRange{
MinTimestamp: 0,
MaxTimestamp: 1<<63 - 1,
}
var mn storage.MetricName
mn.MetricGroup = []byte("foobar")
metricName := string(mn.Marshal(nil))
type blockData struct {
timestamps []int64
values []int64
}
isValuesEqual := func(got, want []float64) bool {
equal := true
if len(got) != len(want) {
return false
}
for i, v := range want {
gotV := got[i]
if v == gotV {
continue
}
if decimal.IsStaleNaN(v) && decimal.IsStaleNaN(gotV) {
continue
}
equal = false
}
return equal
}
f := func(name string, dataBlocks []blockData, updateBlocks []blockData, wantResult *Result) {
t.Run(name, func(t *testing.T) {
pts := packedTimeseries{
metricName: metricName,
}
var dst Result
tbf := tmpBlocksFile{
buf: make([]byte, 0, 20*1024*1024),
}
for _, dataBlock := range dataBlocks {
bb := createBlock(dataBlock.timestamps, dataBlock.values)
addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0)
if err != nil {
t.Fatalf("cannot write block: %s", err)
}
pts.addrs = append(pts.addrs, addr)
}
var updateAddrs []tmpBlockAddr
for _, updateBlock := range updateBlocks {
bb := createBlock(updateBlock.timestamps, updateBlock.values)
addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0)
if err != nil {
t.Fatalf("cannot write update block: %s", err)
}
updateAddrs = append(updateAddrs, addr)
}
if len(updateAddrs) > 0 {
pts.updateAddrs = append(pts.updateAddrs, updateAddrs)
}
if err := pts.Unpack(&dst, []*tmpBlocksFile{&tbf}, tr); err != nil {
t.Fatalf("unexpected error at series unpack: %s", err)
}
if !reflect.DeepEqual(wantResult, &dst) && !isValuesEqual(wantResult.Values, dst.Values) {
t.Fatalf("unexpected result for unpack \nwant: \n%v\ngot: \n%v\n", wantResult, &dst)
}
})
}
f("2 blocks without updates",
[]blockData{
{
timestamps: []int64{10, 15, 30},
values: []int64{1, 2, 3},
},
{
timestamps: []int64{35, 40, 45},
values: []int64{4, 5, 6},
},
},
nil,
&Result{
MetricName: mn,
Values: []float64{1, 2, 3, 4, 5, 6},
Timestamps: []int64{10, 15, 30, 35, 40, 45},
})
f("2 blocks at the border of time range",
[]blockData{
{
timestamps: []int64{10, 15, 30},
values: []int64{1, 2, 3},
},
{
timestamps: []int64{35, 40, 45},
values: []int64{4, 5, 6},
},
},
[]blockData{
{
timestamps: []int64{10},
values: []int64{16},
},
},
&Result{
MetricName: mn,
Values: []float64{16, 2, 3, 4, 5, 6},
Timestamps: []int64{10, 15, 30, 35, 40, 45},
})
f("2 blocks with update",
[]blockData{
{
timestamps: []int64{10, 15, 30},
values: []int64{1, 2, 3},
},
{
timestamps: []int64{35, 40, 45},
values: []int64{4, 5, 6},
},
},
[]blockData{
{
timestamps: []int64{15, 30},
values: []int64{11, 12},
},
},
&Result{
MetricName: mn,
Values: []float64{1, 11, 12, 4, 5, 6},
Timestamps: []int64{10, 15, 30, 35, 40, 45},
})
f("2 blocks with 2 update blocks",
[]blockData{
{
timestamps: []int64{10, 15, 30},
values: []int64{1, 2, 3},
},
{
timestamps: []int64{35, 40, 65},
values: []int64{4, 5, 6},
},
},
[]blockData{
{
timestamps: []int64{15, 30},
values: []int64{11, 12},
},
{
timestamps: []int64{45, 55},
values: []int64{21, 22},
},
},
&Result{
MetricName: mn,
Values: []float64{1, 11, 12, 21, 22, 6},
Timestamps: []int64{10, 15, 30, 45, 55, 65},
})
}
func TestPosition(t *testing.T) {
f := func(src []int64, value, wantPosition int64) {
t.Helper()
gotPos := position(src, value)
if wantPosition != int64(gotPos) {
t.Fatalf("incorrect position: \ngot:\n%d\nwant: \n%d", gotPos, wantPosition)
}
if gotPos == len(src) {
_ = src[int64(gotPos)-1]
} else {
_ = src[int64(gotPos)]
}
}
f([]int64{1, 2, 3, 4}, 5, 4)
f([]int64{1, 2, 3, 4}, 0, 0)
f([]int64{1, 2, 3, 4}, 1, 0)
f([]int64{1, 2, 3, 4}, 4, 3)
f([]int64{1, 2, 3, 4}, 3, 2)
f([]int64{10, 20, 30, 40, 50, 60, 90}, 100, 7)
}

View File

@@ -2,6 +2,7 @@ package netstorage
import (
"fmt"
"reflect"
"testing"
)
@@ -105,3 +106,83 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) {
}
})
}
func BenchmarkMergeResults(b *testing.B) {
b.ReportAllocs()
f := func(name string, dst, update, expect *Result) {
if len(dst.Timestamps) != len(dst.Values) {
b.Fatalf("bad input data, timestamps and values lens must match")
}
if len(update.Values) != len(update.Timestamps) {
b.Fatalf("bad input data, update timestamp and values must match")
}
toMerge := Result{
valuesBuf: make([]float64, 0, 256),
timestampsBuf: make([]int64, 0, 256),
}
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
toMerge.reset()
toMerge.Values = append(toMerge.Values, dst.Values...)
toMerge.Timestamps = append(toMerge.Timestamps, dst.Timestamps...)
mergeResult(&toMerge, update)
if !reflect.DeepEqual(toMerge.Timestamps, expect.Timestamps) || !reflect.DeepEqual(toMerge.Values, expect.Values) {
b.Fatalf("unexpected result, got \ntimestamps: \n%v\nvalues: \n%v\nwant: timestamps: \n%v\n values: \n%v", toMerge.Timestamps, toMerge.Values, expect.Timestamps, expect.Values)
}
}
})
}
f("update at the start",
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
},
&Result{
Timestamps: []int64{0, 20, 40},
Values: []float64{0.0, 5.2, 5.4},
},
&Result{
Timestamps: []int64{0, 20, 40, 50, 60, 90},
Values: []float64{0.0, 5.2, 5.4, 2.5, 2.6, 2.9},
})
f("update at the end",
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
},
&Result{
Timestamps: []int64{50, 70, 100},
Values: []float64{0.0, 5.7, 5.1},
},
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 70, 100},
Values: []float64{2.1, 2.2, 2.3, 2.4, 0.0, 5.7, 5.1},
})
f("update at the middle",
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9},
},
&Result{
Timestamps: []int64{30, 40, 50, 60},
Values: []float64{5.3, 5.4, 5.5, 5.6},
},
&Result{
Timestamps: []int64{10, 20, 30, 40, 50, 60, 90},
Values: []float64{2.1, 2.2, 5.3, 5.4, 5.5, 5.6, 2.9},
})
f("merge and re-allocate",
&Result{
Timestamps: []int64{10, 20, 30, 50, 60, 90},
Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6},
},
&Result{
Timestamps: []int64{20, 30, 35, 45, 50, 55, 60},
Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6},
},
&Result{
Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90},
Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6},
})
}

View File

@@ -0,0 +1 @@
{"metric":{"__name__":"promhttp_metric_handler_requests_total","job":"node_exporter-6","up":"true"},"values":[131],"timestamps":[1676050756785]}

View File

@@ -80,7 +80,6 @@ publish-via-docker:
--build-arg root_image=$(ROOT_IMAGE) \
--build-arg APP_NAME=$(APP_NAME) \
--tag $(DOCKER_NAMESPACE)/$(APP_NAME):$(PKG_TAG)$(RACE) \
--tag $(DOCKER_NAMESPACE)/$(APP_NAME):$(LATEST_TAG)$(RACE) \
-o type=image \
--provenance=false \
-f app/$(APP_NAME)/multiarch/Dockerfile \

View File

@@ -26,7 +26,17 @@ Metrics of the latest version of VictoriaMetrics cluster are available for viewi
The sandbox cluster installation is running under the constant load generated by
[prometheus-benchmark](https://github.com/VictoriaMetrics/prometheus-benchmark) and used for testing latest releases.
## tip
## v1.93.x long-time support release (LTS)
## v1.93.1 long-time support release (LTS)
* BUGFIX: [storage](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): properly set next retention time for indexDB. Previously it may enter into endless retention loop. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4873) for details.
* BUGFIX: do not allow starting VictoriaMetrics components with improperly set boolean command-line flags in the form `-boolFlagName value`, since this leads to silent incomplete flags' parsing. This form should be replaced with `-boolFlagName=value`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4845).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set labels from `-remoteWrite.label` command-line flag just before sending samples to the configured `-remoteWrite.url` according to [these docs](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics). Previously these labels were incorrectly set before [the relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) configured via `-remoteWrite.urlRelabelConfigs` and [the stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configured via `-remoteWrite.streamAggr.config`, so these labels could be lost or incorrectly transformed before sending the samples to remote storage. The fix allows using `-remoteWrite.label` for identifying `vmagent` instances in [cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247) and [these docs](https://docs.victoriametrics.com/stream-aggregation.html#cluster-mode) for more details.
* BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html).
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): correctly check if specified `-dst` belongs to specified `-storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4837).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): don't interrupt the migration process if no metrics were found for a specific tenant. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4796).
* SECURITY: upgrade base docker image (Alpine) from 3.18.4 to 3.18.5. See [alpine 3.18.5 release notes](https://www.alpinelinux.org/posts/Alpine-3.15.11-3.16.8-3.17.6-3.18.5-released.html).

View File

@@ -721,6 +721,48 @@ By default, `vminsert` tries to route all the samples for a single time series t
* when `vmstorage` nodes are temporarily unavailable (for instance, during their restart). Then new samples are re-routed to the remaining available `vmstorage` nodes;
* when `vmstorage` node has no enough capacity for processing incoming data stream. Then `vminsert` re-routes new samples to other `vmstorage` nodes.
## Alter/Update series
VictoriaMetrics supports data modification and update with following limitations:
- modified data cannot be changed with back-filling.
- modified data must be sent to `vminsert` component.
- only json-line format is supported.
How it works:
* Export series for modification with `/api/v1/export/prometheus` from `vmselect`.
* Modify values,timestamps with needed values. You can delete, add or modify timestamps and values.
* Send modified series to the `vminsert`'s API `/prometheus/api/v1/update/series` with POST request.
* `vminsert` adds unique monotonically increasing `__generation_id` label to each series during the update, so their samples could replace the original samples.
* at query requests `vmselect` merges original series with series updates sequentially according to their `__generation_id`.
How vmselect merges updates:
* example 1 - delete timestamps at time range
original series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
modified series timestamps: [20,50] values: [2,5]
query result series timestamps: [10,20,50] values: [1,2,5]
* example 2 - modify values
origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
modified series timestamps: [20,30,40] values: [22,33,44]
query result series timestamps: [10,20,30,40,50] values: [1,22,33,44,5]
* example 3 - modify timestamps
origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5]
modified series timestamps: [0,5,10,15,20,25,30] values: [0.1,0.5,1,1.5,2,2.5,30]
modified series timestamps: [0,5,10,15,20,25,30,40,50] values: [0.1,0.5,1,1.5,2,2.5,30,4,5]
How to check which series were modified:
* execute metrics export request with following params query params:
* `reduce_mem_usage=true`
* `extra_filter={__generation_id!=""}`
Example request
```bash
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type="datadog"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}'
```
example output:
```json
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429","__generation_id":"1658153678907548000"},"values":[0,0,0,0,256,253,135,15],"timestamps":[1658153559703,1658153569703,1658153579703,1658153589703,1658153599703,1658153609703,1658153619703,1658153621703]}
```
There was single series modify request with `__generation_id="1658153678907548000"`
## Backups

View File

@@ -0,0 +1,138 @@
# Series modification
## Scenario
VictoriaMetrics doesn't support direct data modification, since it uses immutable data structures and such operations may significantly reduce system performance.
The new series update API should provide a workaround for this issue. API allows overriding existing Timeseries data points at runtime during select requests.
Following operations supported:
- add data points.
- remove data points.
- modify data points.
Note this is a low-level feature, data modification could be done with scripts, vmctl, or `VMUI` in future releases.
## Examples
### Setup env
It's expected, that you have configured VictoriaMetrics cluster, vminsert, and vmselect components reachable from your computer.
I'll work with the following data set, which was exported with a call to the export API:
```text
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'start=1658164769' -d 'end=1658165291'
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"native","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"graphite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"csvimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"promremotewrite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
```
For better usability, it could be exported to a file on disk and modified via a preferred text editor.
### Modify data points
#### change values
Let's say, during ingestion some error happened and producer incorrectly ingest value `0` for timestamp 1658164969982 and `Prometheus` and `influx` types:
```text
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]}
```
we have to modify these values to correct `0` and send update request to the `vminsert` API:
```text
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]}
```
data points could be also updated at time range if actual timestamp is not known. For instance, [1658164969972,1658164969982,1658164969992] timestamp range overwrite values for given timestamps and drops any timestamps at a given time range.
Save 2 series above into the file `incorrect_value_modification.txt` and execute API request with the curl command:
```text
curl localhost:8480/insert/0/prometheus/api/v1/update/series -T incorrect_values_modification.txt
```
Check series modification output:
```text
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"prometheus|influx"}' -d 'start=1658164969' -d 'end=1658164989'
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]}
```
#### Add missing timestamps
Missing timestamps could be added in the same way, specify needed timestamps with needed values at correct array indexes.
### Delete data points at time range
For example data set we have following time range from `1658164969982` to `1658165261982`.
Data points inside time range can be removed by skipping timestamps and time range, which must be removed.
For example, if timestamps from `1658164999982` until `1658165099982` must be removed, skip all timestamps between it:
```text
# exclude variant
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,16581649999821658165099982,1658165109982]}
# include variant
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
```
saved on of variants into the file `delete_datapoints_range.txt` and execute following request to the API:
```text
curl localhost:8480/insert/0/prometheus/api/v1/update/series -T delete_datapoints_range.txt
```
Check output:
```text
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209'
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
```
As you see, series with `opentsdb` type has less data points than `influx`, since data was deleted at time range.
### Observing changes
Changes could de check by export api request with special query params `reduce_mem_usage=true` and `extra_filters={__generation_id!=""}`.
Let's observe changes from previous steps:
```text
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}'
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]}
```
### Rollback update operations
Changes could be undone with metrics DELETE API, you have to specify correct `__generation_id`.
For example, rollback timestamps delete:
```text
curl http://localhost:8481/delete/0/prometheus/api/v1/admin/tsdb/delete_series -g -d 'match={__generation_id="1658167040791371000"}'
```
Check that changes were rolled back:
```text
curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209'
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]}
```

View File

@@ -284,6 +284,28 @@ func (mn *MetricName) RemoveTag(tagKey string) {
}
}
// RemoveTagWithResult removes a tag with the given tagKey and returns removed Tag
func (mn *MetricName) RemoveTagWithResult(tagKey string) *Tag {
if tagKey == "__name__" {
mn.ResetMetricGroup()
return nil
}
tags := mn.Tags
mn.Tags = mn.Tags[:0]
var foundTag *Tag
for i := range tags {
tag := &tags[i]
if string(tag.Key) != tagKey {
mn.AddTagBytes(tag.Key, tag.Value)
continue
}
var t Tag
t.copyFrom(tag)
foundTag = &t
}
return foundTag
}
// RemoveTagsIgnoring removes all the tags included in ignoringTags.
func (mn *MetricName) RemoveTagsIgnoring(ignoringTags []string) {
if len(ignoringTags) == 0 {

View File

@@ -57,7 +57,6 @@ type MetricBlockRef struct {
type MetricBlock struct {
// MetricName is metric name for the given Block.
MetricName []byte
// Block is a block for the given MetricName
Block Block
}

View File

@@ -1051,6 +1051,7 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
s.metricRowsRead.Add(ctx.mb.Block.RowsCount())
ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
if err := ctx.writeDataBufBytes(); err != nil {
return fmt.Errorf("cannot send MetricBlock: %w", err)
}