lib/pushmetrics: allow enabling push metrics via config

This is needed in order to allow using lib/pushmetrics for vmctl as it does not use go native flags.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

app/vmctl: add metrics for the migrations

- add flags to allow setting up metrics push
- add metrics to track progress of the migration for all modes
- add metrics for generic backoff and limiter packages

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab
2026-01-12 19:42:00 +04:00
committed by Max Kotliar
parent f9895d7e5e
commit ad4562cd56
14 changed files with 305 additions and 7 deletions

View File

@@ -7,6 +7,8 @@ import (
"math"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@@ -55,6 +57,7 @@ func (b *Backoff) Retry(ctx context.Context, cb retryableFunc) (uint64, error) {
return attempt, err // fail fast if not recoverable
}
attempt++
retriesTotal.Inc()
backoff := float64(b.minDuration) * math.Pow(b.factor, float64(i))
dur := time.Duration(backoff)
logger.Errorf("got error: %s on attempt: %d; will retry in %v", err, attempt, dur)
@@ -74,3 +77,7 @@ func (b *Backoff) Retry(ctx context.Context, cb retryableFunc) (uint64, error) {
}
return attempt, fmt.Errorf("execution failed after %d retry attempts", b.retries)
}
var (
retriesTotal = metrics.NewCounter(`vmctl_backoff_retries_total`)
)

View File

@@ -14,6 +14,12 @@ const (
globalSilent = "s"
globalVerbose = "verbose"
globalDisableProgressBar = "disable-progress-bar"
globalPushMetricsURL = "pushmetrics.url"
globalPushMetricsInterval = "pushmetrics.interval"
globalPushExtraLabels = "pushmetrics.extraLabel"
globalPushHeaders = "pushmetrics.header"
globalPushDisableCompression = "pushmetrics.disableCompression"
)
var (
@@ -33,6 +39,29 @@ var (
Value: false,
Usage: "Whether to disable progress bar during the import.",
},
&cli.StringSliceFlag{
Name: globalPushMetricsURL,
Usage: "Optional URL to push metrics. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics",
},
&cli.DurationFlag{
Name: globalPushMetricsInterval,
Value: 10 * time.Second,
Usage: "Interval for pushing metrics to every -pushmetrics.url",
},
&cli.StringSliceFlag{
Name: globalPushExtraLabels,
Usage: "Extra labels to add to pushed metrics. In case of collision, label value defined by flag will have priority. " +
"Flag can be set multiple times, to add few additional labels. " +
"For example, -pushmetrics.extraLabel='instance=\"foo\"' adds instance=\"foo\" label to all the metrics pushed to every -pushmetrics.url",
},
&cli.StringSliceFlag{
Name: globalPushHeaders,
Usage: "Optional HTTP headers to add to pushed metrics. Flag can be set multiple times, to add few additional headers.",
},
&cli.BoolFlag{
Name: globalPushDisableCompression,
Usage: "Whether to disable compression when pushing metrics.",
},
}
)

View File

@@ -7,6 +7,8 @@ import (
"log"
"sync"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
@@ -52,6 +54,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
return nil
}
influxSeriesTotal.Add(len(series))
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing series"), len(series))
if err := barpool.Start(); err != nil {
return err
@@ -67,9 +70,11 @@ func (ip *influxProcessor) run(ctx context.Context) error {
wg.Go(func() {
for s := range seriesCh {
if err := ip.do(s); err != nil {
influxErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
return
}
influxSeriesProcessed.Inc()
bar.Increment()
}
})
@@ -81,6 +86,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
case infErr := <-errCh:
return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors():
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s:
}
@@ -93,6 +99,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
// drain import errors channel
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
}
}
@@ -167,3 +174,9 @@ func (ip *influxProcessor) do(s *influx.Series) error {
}
}
}
var (
influxSeriesTotal = metrics.NewCounter(`vmctl_influx_migration_series_total`)
influxSeriesProcessed = metrics.NewCounter(`vmctl_influx_migration_series_processed`)
influxErrorsTotal = metrics.NewCounter(`vmctl_influx_migration_errors_total`)
)

View File

@@ -4,6 +4,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
@@ -45,9 +47,16 @@ func (l *Limiter) Register(dataLen int) {
t := timerpool.Get(d)
<-t.C
timerpool.Put(t)
limiterThrottleEventsTotal.Inc()
}
l.budget += limit
l.deadline = time.Now().Add(time.Second)
}
l.budget -= int64(dataLen)
limiterBytesProcessed.Add(dataLen)
}
var (
limiterBytesProcessed = metrics.NewCounter(`vmctl_limiter_bytes_processed_total`)
limiterThrottleEventsTotal = metrics.NewCounter(`vmctl_limiter_throttle_events_total`)
)

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
@@ -19,7 +20,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
@@ -41,11 +44,20 @@ func main() {
ctx, cancelCtx := context.WithCancel(context.Background())
start := time.Now()
beforeFn := func(c *cli.Context) error {
flag.Parse()
logger.Init()
isSilent = c.Bool(globalSilent)
if c.Bool(globalDisableProgressBar) {
barpool.Disable(true)
}
netutil.EnableIPv6()
pushmetrics.InitWith(&pushmetrics.Config{
URLs: c.StringSlice(globalPushMetricsURL),
Interval: c.Duration(globalPushMetricsInterval),
ExtraLabels: c.StringSlice(globalPushExtraLabels),
DisableCompression: c.Bool(globalPushDisableCompression),
Headers: c.StringSlice(globalPushHeaders),
})
return nil
}
app := &cli.App{
@@ -451,6 +463,7 @@ func main() {
log.Fatalln(err)
}
log.Printf("Total time: %v", time.Since(start))
pushmetrics.StopAndPush()
}
func initConfigVM(c *cli.Context) (vm.Config, error) {

View File

@@ -8,6 +8,8 @@ import (
"net/http"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
)
@@ -36,12 +38,15 @@ type Response struct {
// Explore finds metric names by provided filter from api/v1/label/__name__/values
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start, end time.Time) ([]string, error) {
startTime := time.Now()
exploreRequestsTotal.Inc()
url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr)
if tenantID != "" {
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exploreRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %s", url, err)
}
@@ -53,37 +58,53 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
resp, err := c.do(req, http.StatusOK)
if err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("series request failed: %s", err)
}
var response Response
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("cannot decode series response: %s", err)
}
exploreDuration.UpdateDuration(startTime)
return response.MetricNames, resp.Body.Close()
}
// ImportPipe uses pipe reader in request to process data
func (c *Client) ImportPipe(ctx context.Context, dstURL string, pr *io.PipeReader) error {
startTime := time.Now()
importRequestsTotal.Inc()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr)
if err != nil {
importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create import request to %q: %s", c.Addr, err)
}
importResp, err := c.do(req, http.StatusNoContent)
if err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("import request failed: %s", err)
}
if err := importResp.Body.Close(); err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("cannot close import response body: %s", err)
}
importDuration.UpdateDuration(startTime)
return nil
}
// ExportPipe makes request by provided filter and return io.ReadCloser which can be used to get data
func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadCloser, error) {
startTime := time.Now()
exportRequestsTotal.Inc()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exportRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %s", c.Addr, err)
}
@@ -102,8 +123,11 @@ func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadC
resp, err := c.do(req, http.StatusOK)
if err != nil {
exportRequestsErrorsTotal.Inc()
exportDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("export request failed: %w", err)
}
exportDuration.UpdateDuration(startTime)
return resp.Body, nil
}
@@ -162,3 +186,16 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
}
return resp, err
}
var (
importRequestsTotal = metrics.NewCounter(`vmctl_vm_native_requests_total{type="import"}`)
exportRequestsTotal = metrics.NewCounter(`vmctl_vm_native_requests_total{type="export"}`)
exploreRequestsTotal = metrics.NewCounter(`vmctl_vm_native_requests_total{type="explore"}`)
importRequestsErrorsTotal = metrics.NewCounter(`vmctl_vm_native_request_errors_total{type="import"}`)
exportRequestsErrorsTotal = metrics.NewCounter(`vmctl_vm_native_request_errors_total{type="export"}`)
exploreRequestsErrorsTotal = metrics.NewCounter(`vmctl_vm_native_request_errors_total{type="explore"}`)
importDuration = metrics.NewHistogram(`vmctl_vm_native_import_duration_seconds`)
exportDuration = metrics.NewHistogram(`vmctl_vm_native_export_duration_seconds`)
exploreDuration = metrics.NewHistogram(`vmctl_vm_native_explore_duration_seconds`)
)

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
vmetrics "github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
@@ -57,6 +59,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
if !prompt(ctx, question) {
return nil
}
op.im.ResetStats()
var startTime int64
if op.oc.HardTS != 0 {
@@ -84,6 +87,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
seriesCh := make(chan queryObj, op.otsdbcc)
errCh := make(chan error)
// we're going to make serieslist * queryRanges queries, so we should represent that in the progress bar
otsdbSeriesTotal.Add(len(serieslist) * queryRanges)
bar := pb.StartNew(len(serieslist) * queryRanges)
defer func(bar *pb.ProgressBar) {
bar.Finish()
@@ -93,9 +97,11 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
wg.Go(func() {
for s := range seriesCh {
if err := op.do(s); err != nil {
otsdbErrorsTotal.Inc()
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
return
}
otsdbSeriesProcessed.Inc()
bar.Increment()
}
})
@@ -115,6 +121,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
case otsdbErr := <-errCh:
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
@@ -139,6 +146,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
op.im.Close()
for vmErr := range op.im.Errors() {
if vmErr.Err != nil {
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
}
}
@@ -169,3 +177,9 @@ func (op *otsdbProcessor) do(s queryObj) error {
}
return op.im.Input(&ts)
}
var (
otsdbSeriesTotal = vmetrics.NewCounter(`vmctl_opentsdb_migration_series_total`)
otsdbSeriesProcessed = vmetrics.NewCounter(`vmctl_opentsdb_migration_series_processed`)
otsdbErrorsTotal = vmetrics.NewCounter(`vmctl_opentsdb_migration_errors_total`)
)

View File

@@ -11,6 +11,8 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
@@ -113,6 +115,7 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
}
func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
promBlocksTotal.Add(len(blocks))
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
if err := barpool.Start(); err != nil {
return err
@@ -128,9 +131,11 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
wg.Go(func() {
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
promErrorsTotal.Inc()
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
return
}
promBlocksProcessed.Inc()
bar.Increment()
}
})
@@ -143,6 +148,7 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
case blockReadersCh <- br:
}
@@ -156,6 +162,7 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
// drain import errors channel
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
}
}
@@ -165,3 +172,9 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
return nil
}
var (
promBlocksTotal = metrics.NewCounter(`vmctl_prometheus_migration_blocks_total`)
promBlocksProcessed = metrics.NewCounter(`vmctl_prometheus_migration_blocks_processed`)
promErrorsTotal = metrics.NewCounter(`vmctl_prometheus_migration_errors_total`)
)

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
@@ -51,6 +53,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
return nil
}
remoteReadRangesTotal.Add(len(ranges))
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing ranges"), len(ranges))
if err := barpool.Start(); err != nil {
return err
@@ -70,9 +73,11 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
wg.Go(func() {
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
remoteReadErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for: %s", err)
return
}
remoteReadRangesProcessed.Inc()
bar.Increment()
}
})
@@ -83,6 +88,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
case infErr := <-errCh:
return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors():
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
@@ -98,6 +104,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
// drain import errors channel
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
}
}
@@ -118,3 +125,9 @@ func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filte
return nil
})
}
var (
remoteReadRangesTotal = metrics.NewCounter(`vmctl_remote_read_migration_ranges_total`)
remoteReadRangesProcessed = metrics.NewCounter(`vmctl_remote_read_migration_ranges_processed`)
remoteReadErrorsTotal = metrics.NewCounter(`vmctl_remote_read_migration_errors_total`)
)

View File

@@ -12,6 +12,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
@@ -80,6 +82,12 @@ type Importer struct {
s *stats
backoff *backoff.Backoff
importRequestsTotal *metrics.Counter
importRequestsErrorsTotal *metrics.Counter
importSamplesTotal *metrics.Counter
importBytesTotal *metrics.Counter
importDuration *metrics.Histogram
}
// ResetStats resets im stats.
@@ -147,6 +155,12 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
input: make(chan *TimeSeries, cfg.Concurrency*4),
errors: make(chan *ImportError, cfg.Concurrency),
backoff: cfg.Backoff,
importRequestsTotal: metrics.GetOrCreateCounter(`vmctl_importer_requests_total`),
importRequestsErrorsTotal: metrics.GetOrCreateCounter(`vmctl_importer_request_errors_total`),
importSamplesTotal: metrics.GetOrCreateCounter(`vmctl_importer_samples_total`),
importBytesTotal: metrics.GetOrCreateCounter(`vmctl_importer_bytes_total`),
importDuration: metrics.GetOrCreateHistogram(`vmctl_importer_request_duration_seconds`),
}
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
@@ -311,9 +325,13 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
return nil
}
startTime := time.Now()
im.importRequestsTotal.Inc()
pr, pw := io.Pipe()
req, err := http.NewRequest(http.MethodPost, im.importPath, pr)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
}
if im.user != "" {
@@ -333,6 +351,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
if im.compress {
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
}
w = zw
@@ -344,29 +363,39 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
for _, ts := range tsBatch {
n, err := ts.write(bw)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("write err: %w", err)
}
totalBytes += n
totalSamples += len(ts.Values)
}
if err := bw.Flush(); err != nil {
im.importRequestsErrorsTotal.Inc()
return err
}
if closer, ok := w.(io.Closer); ok {
err := closer.Close()
if err != nil {
im.importRequestsErrorsTotal.Inc()
return err
}
}
if err := pw.Close(); err != nil {
im.importRequestsErrorsTotal.Inc()
return err
}
requestErr := <-errCh
if requestErr != nil {
im.importRequestsErrorsTotal.Inc()
im.importDuration.UpdateDuration(startTime)
return fmt.Errorf("import request error for %q: %w", im.addr, requestErr)
}
im.importSamplesTotal.Add(totalSamples)
im.importBytesTotal.Add(totalBytes)
im.importDuration.UpdateDuration(startTime)
im.s.Lock()
im.s.bytes += uint64(totalBytes)
im.s.samples += uint64(totalSamples)

View File

@@ -9,6 +9,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
@@ -82,13 +84,19 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
if !prompt(ctx, question) {
return nil
}
migrationTenantsTotal.Set(uint64(len(tenants)))
}
for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges)
if err != nil {
migrationErrorsTotal.Inc()
return fmt.Errorf("migration failed: %s", err)
}
if p.interCluster {
migrationTenantsProcessed.Inc()
}
}
log.Println("Import finished!")
@@ -156,6 +164,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
p.s.bytes += uint64(written)
p.s.requests++
p.s.Unlock()
migrationBytesTransferredTotal.AddInt64(written)
if err := pw.Close(); err != nil {
return err
@@ -199,7 +208,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
var foundSeriesMsg string
var requestsToMake int
var metrics = map[string][][]time.Time{
var metricsMap = map[string][][]time.Time{
"": ranges,
}
@@ -211,11 +220,11 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
if !p.disablePerMetricRequests {
format = fmt.Sprintf(nativeWithBackoffTpl, barPrefix)
metrics, err = p.explore(ctx, p.src, tenantID, ranges)
metricsMap, err = p.explore(ctx, p.src, tenantID, ranges)
if err != nil {
return fmt.Errorf("failed to explore metric names: %s", err)
}
if len(metrics) == 0 {
if len(metricsMap) == 0 {
errMsg := "no metrics found"
if tenantID != "" {
errMsg = fmt.Sprintf("%s for tenant id: %s", errMsg, tenantID)
@@ -223,10 +232,14 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Println(errMsg)
return nil
}
for _, m := range metrics {
for _, m := range metricsMap {
requestsToMake += len(m)
}
foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake)
foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metricsMap), requestsToMake)
migrationMetricsTotal.Add(len(metricsMap))
} else {
requestsToMake = len(ranges)
}
if !p.interCluster {
@@ -240,6 +253,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Print(foundSeriesMsg)
}
migrationRequestsPlanned.Add(requestsToMake)
bar := barpool.NewSingleProgress(format, requestsToMake)
bar.Start()
defer bar.Finish()
@@ -263,12 +277,13 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
return
}
}
migrationRequestsCompleted.Inc()
}
})
}
// any error breaks the import
for mName, mRanges := range metrics {
for mName, mRanges := range metricsMap {
match, err := buildMatchWithFilter(p.filter.Match, mName)
if err != nil {
logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err)
@@ -288,6 +303,9 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
}:
}
}
if !p.disablePerMetricRequests {
migrationMetricsProcessed.Inc()
}
}
close(filterCh)
@@ -396,3 +414,18 @@ func buildMatchWithFilter(filter string, metricName string) (string, error) {
match := "{" + strings.Join(filters, " or ") + "}"
return match, nil
}
var (
migrationMetricsTotal = metrics.NewCounter(`vmctl_vm_native_migration_metrics_total`)
migrationMetricsProcessed = metrics.NewCounter(`vmctl_vm_native_migration_metrics_processed`)
migrationRequestsPlanned = metrics.NewCounter(`vmctl_vm_native_migration_requests_planned`)
migrationRequestsCompleted = metrics.NewCounter(`vmctl_vm_native_migration_requests_completed`)
migrationErrorsTotal = metrics.NewCounter(`vmctl_vm_native_migration_errors_total`)
migrationTenantsTotal = metrics.NewCounter(`vmctl_vm_native_migration_tenants_total`)
migrationTenantsProcessed = metrics.NewCounter(`vmctl_vm_native_migration_tenants_processed`)
migrationBytesTransferredTotal = metrics.NewCounter(`vmctl_vm_native_migration_bytes_transferred_total`)
)

View File

@@ -35,7 +35,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: All VictoriaMetrics components: add build version information to the home page for consistency with other projects. See [#10249](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10249).
* FEATURE: all VictoriaMetrics components: add flag `fs.disableMincore`, which allows to disable `mincore` syscall. See [#10327](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10327).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): expose topN average memory bytes consumption queries in `/api/v1/status/top_queries`. It can help users to find queries that consume a lot of memory and potentially cause OOM. See [#9330](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9330).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add metrics for tracking the migration progress. See [vmctl - monitoring the migration process](https://docs.victoriametrics.com/victoriametrics/vmctl/#monitoring-the-migration-process) and [#10276](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10276).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): stop backend health checks for URL prefixes defined in `url_map` during configuration reloads. Previously, stale backends kept being health-checked and produced repeated warning logs after reloads. See [#10334](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10334).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly return [/api/v1/status/tsdb](https://docs.victoriametrics.com/victoriametrics/#tsdb-stats) response for time range outside [partition index](https://docs.victoriametrics.com/victoriametrics/#indexdb). See [#10315](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10315).

View File

@@ -169,6 +169,71 @@ see `--vm-concurrency` flag.
Please note, you can also use [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/)
as a proxy between `vmctl` and destination with `-remoteWrite.rateLimit` flag enabled.
### Monitoring the migration process
`vmctl` can push internal metrics {{% available_from "#" %}} to a remote storage for monitoring migration progress and performance.
This is especially useful for long-running migrations where you want to track progress, detect issues,
or build dashboards to visualize the migration status.
Example usage with VictoriaMetrics as the metrics destination:
```sh
./vmctl influx \
--influx-addr=http://localhost:8086 \
--influx-database=mydb \
--vm-addr=http://localhost:8428 \
--pushmetrics.url=http://localhost:8428/api/v1/import/prometheus \
--pushmetrics.extraLabel='job="vmctl"' \
--pushmetrics.extraLabel='instance="migration-1"'
```
#### Available metrics
The following metrics are exposed by `vmctl`:
General metrics (available for all migration modes):
| Metric | Description |
|--------|-------------|
| `vmctl_backoff_retries_total` | Total number of retry attempts across all operations |
| `vmctl_limiter_bytes_processed_total` | Total bytes processed through rate limiter (when `--vm-rate-limit` is set) |
| `vmctl_limiter_throttle_events_total` | Number of times rate limiting caused a pause |
Mode-specific metrics:
Each migration mode exposes its own set of metrics with the mode name embedded in the metric name:
| Mode | Metrics |
|------|---------|
| `influx` | `vmctl_influx_migration_series_total`, `vmctl_influx_migration_series_processed`, `vmctl_influx_migration_errors_total` |
| `prometheus` | `vmctl_prometheus_migration_blocks_total`, `vmctl_prometheus_migration_blocks_processed`, `vmctl_prometheus_migration_errors_total` |
| `opentsdb` | `vmctl_opentsdb_migration_series_total`, `vmctl_opentsdb_migration_series_processed`, `vmctl_opentsdb_migration_errors_total` |
| `remote-read` | `vmctl_remote_read_migration_ranges_total`, `vmctl_remote_read_migration_ranges_processed`, `vmctl_remote_read_migration_errors_total` |
| `vm-native` | `vmctl_vm_native_migration_metrics_total`, `vmctl_vm_native_migration_metrics_processed`, `vmctl_vm_native_migration_requests_planned`, `vmctl_vm_native_migration_requests_completed`, `vmctl_vm_native_migration_tenants_total`, `vmctl_vm_native_migration_tenants_processed`, `vmctl_vm_native_migration_bytes_transferred_total`, `vmctl_vm_native_migration_errors_total` |
#### Example PromQL queries
Monitor migration progress:
```promql
# Migration completion percentage for influx mode
vmctl_influx_migration_series_processed / vmctl_influx_migration_series_total * 100
# Migration completion percentage for vm-native mode
vmctl_vm_native_migration_metrics_processed / vmctl_vm_native_migration_metrics_total * 100
# Retry rate
rate(vmctl_backoff_retries_total[5m])
# Rate limiter throttling events per second
rate(vmctl_limiter_throttle_events_total[5m])
# Data transfer speed in bytes per second (when rate limiting is enabled)
rate(vmctl_limiter_bytes_processed_total[5m])
# Data transfer speed in MB per second for vm-native mode
rate(vmctl_vm_native_migration_bytes_transferred_total[5m]) / 1Mb
```
## Verifying exported blocks from VictoriaMetrics
In this mode, `vmctl` allows verifying correctness and integrity of data exported via

View File

@@ -50,6 +50,29 @@ func Init() {
}
}
type Config struct {
URLs []string
Interval time.Duration
ExtraLabels []string
Headers []string
DisableCompression bool
}
// InitWith must be called after logger.Init
// Supersedes command-line flags related to pushmetrics and initializes pushmetrics with the given config.
// This is needed for vmctl integration, see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10375
func InitWith(c *Config) {
*pushURL = c.URLs
*pushExtraLabel = c.ExtraLabels
*pushHeader = c.Headers
*disableCompression = c.DisableCompression
if c.Interval > 0 {
*pushInterval = c.Interval
}
Init()
}
// Stop stops the periodic push of metrics.
// It is important to stop the push of metrics before disposing resources
// these metrics attached to. See related https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5548