mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
2 Commits
detached
...
vmctl-prop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a07f4078de | ||
|
|
de96a937dd |
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@@ -37,7 +38,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
|
||||
}
|
||||
}
|
||||
|
||||
func (ip *influxProcessor) run() error {
|
||||
func (ip *influxProcessor) run(ctx context.Context) error {
|
||||
series, err := ip.ic.Explore()
|
||||
if err != nil {
|
||||
return fmt.Errorf("explore query failed: %s", err)
|
||||
@@ -67,7 +68,7 @@ func (ip *influxProcessor) run() error {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for s := range seriesCh {
|
||||
if err := ip.do(s); err != nil {
|
||||
if err := ip.do(ctx, s); err != nil {
|
||||
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
|
||||
return
|
||||
}
|
||||
@@ -110,7 +111,7 @@ const dbLabel = "db"
|
||||
const nameLabel = "__name__"
|
||||
const valueField = "value"
|
||||
|
||||
func (ip *influxProcessor) do(s *influx.Series) error {
|
||||
func (ip *influxProcessor) do(ctx context.Context, s *influx.Series) error {
|
||||
cr, err := ip.ic.FetchDataPoints(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch datapoints: %s", err)
|
||||
@@ -163,7 +164,7 @@ func (ip *influxProcessor) do(s *influx.Series) error {
|
||||
Timestamps: time,
|
||||
Values: values,
|
||||
}
|
||||
if err := ip.im.Input(&ts); err != nil {
|
||||
if err := ip.im.Input(ctx, &ts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ func main() {
|
||||
}
|
||||
|
||||
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
|
||||
return otsdbProcessor.run()
|
||||
return otsdbProcessor.run(ctx)
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -158,7 +158,7 @@ func main() {
|
||||
c.Bool(influxSkipDatabaseLabel),
|
||||
c.Bool(influxPrometheusMode),
|
||||
c.Bool(globalVerbose))
|
||||
return processor.run()
|
||||
return processor.run(ctx)
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -261,7 +261,7 @@ func main() {
|
||||
cc: c.Int(promConcurrency),
|
||||
isVerbose: c.Bool(globalVerbose),
|
||||
}
|
||||
return pp.run()
|
||||
return pp.run(ctx)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
)
|
||||
|
||||
type otsdbProcessor struct {
|
||||
@@ -37,7 +39,7 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, verbos
|
||||
}
|
||||
}
|
||||
|
||||
func (op *otsdbProcessor) run() error {
|
||||
func (op *otsdbProcessor) run(ctx context.Context) error {
|
||||
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
|
||||
var metrics []string
|
||||
for _, filter := range op.oc.Filters {
|
||||
@@ -93,7 +95,7 @@ func (op *otsdbProcessor) run() error {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for s := range seriesCh {
|
||||
if err := op.do(s); err != nil {
|
||||
if err := op.do(ctx, s); err != nil {
|
||||
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
|
||||
return
|
||||
}
|
||||
@@ -148,7 +150,7 @@ func (op *otsdbProcessor) run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (op *otsdbProcessor) do(s queryObj) error {
|
||||
func (op *otsdbProcessor) do(ctx context.Context, s queryObj) error {
|
||||
start := s.StartTime - s.Tr.Start
|
||||
end := s.StartTime - s.Tr.End
|
||||
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
|
||||
@@ -168,5 +170,5 @@ func (op *otsdbProcessor) do(s queryObj) error {
|
||||
Timestamps: data.Timestamps,
|
||||
Values: data.Values,
|
||||
}
|
||||
return op.im.Input(&ts)
|
||||
return op.im.Input(ctx, &ts)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
@@ -30,7 +31,7 @@ type prometheusProcessor struct {
|
||||
isVerbose bool
|
||||
}
|
||||
|
||||
func (pp *prometheusProcessor) run() error {
|
||||
func (pp *prometheusProcessor) run(ctx context.Context) error {
|
||||
blocks, err := pp.cl.Explore()
|
||||
if err != nil {
|
||||
return fmt.Errorf("explore failed: %s", err)
|
||||
@@ -59,7 +60,7 @@ func (pp *prometheusProcessor) run() error {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for br := range blockReadersCh {
|
||||
if err := pp.do(br); err != nil {
|
||||
if err := pp.do(ctx, br); err != nil {
|
||||
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
|
||||
return
|
||||
}
|
||||
@@ -100,7 +101,7 @@ func (pp *prometheusProcessor) run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
|
||||
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
|
||||
ss, err := pp.cl.Read(b)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read block: %s", err)
|
||||
@@ -150,7 +151,7 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
|
||||
Timestamps: timestamps,
|
||||
Values: values,
|
||||
}
|
||||
if err := pp.im.Input(&ts); err != nil {
|
||||
if err := pp.im.Input(ctx, &ts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ func TestPrometheusProcessorRun(t *testing.T) {
|
||||
go tt.fields.closer(importer)
|
||||
}
|
||||
|
||||
if err := pp.run(); (err != nil) != tt.wantErr {
|
||||
if err := pp.run(context.Background()); (err != nil) != tt.wantErr {
|
||||
t.Fatalf("run() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -112,7 +112,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
|
||||
|
||||
func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filter) error {
|
||||
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
|
||||
if err := rrp.dst.Input(series); err != nil {
|
||||
if err := rrp.dst.Input(ctx, series); err != nil {
|
||||
return fmt.Errorf(
|
||||
"failed to read data for time range start: %d, end: %d, %s",
|
||||
filter.StartTimestampMs, filter.EndTimestampMs, err)
|
||||
|
||||
@@ -69,7 +69,6 @@ type Importer struct {
|
||||
user string
|
||||
password string
|
||||
|
||||
close chan struct{}
|
||||
input chan *TimeSeries
|
||||
errors chan *ImportError
|
||||
|
||||
@@ -143,7 +142,6 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
|
||||
user: cfg.User,
|
||||
password: cfg.Password,
|
||||
rl: limiter.NewLimiter(cfg.RateLimit),
|
||||
close: make(chan struct{}),
|
||||
input: make(chan *TimeSeries, cfg.Concurrency*4),
|
||||
errors: make(chan *ImportError, cfg.Concurrency),
|
||||
backoff: cfg.Backoff,
|
||||
@@ -189,10 +187,10 @@ func (im *Importer) Errors() chan *ImportError { return im.errors }
|
||||
|
||||
// Input returns a channel for sending timeseries
|
||||
// that need to be imported
|
||||
func (im *Importer) Input(ts *TimeSeries) error {
|
||||
func (im *Importer) Input(ctx context.Context, ts *TimeSeries) error {
|
||||
select {
|
||||
case <-im.close:
|
||||
return fmt.Errorf("importer is closed")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case im.input <- ts:
|
||||
return nil
|
||||
case err := <-im.errors:
|
||||
@@ -207,7 +205,6 @@ func (im *Importer) Input(ts *TimeSeries) error {
|
||||
// and waits until they are finished
|
||||
func (im *Importer) Close() {
|
||||
im.once.Do(func() {
|
||||
close(im.close)
|
||||
close(im.input)
|
||||
im.wg.Wait()
|
||||
close(im.errors)
|
||||
@@ -220,24 +217,34 @@ func (im *Importer) startWorker(ctx context.Context, bar barpool.Bar, batchSize,
|
||||
var waitForBatch time.Time
|
||||
for {
|
||||
select {
|
||||
case <-im.close:
|
||||
case <-ctx.Done():
|
||||
for ts := range im.input {
|
||||
ts = roundTimeseriesValue(ts, significantFigures, roundDigits)
|
||||
batch = append(batch, ts)
|
||||
exitErr := &ImportError{
|
||||
Batch: batch,
|
||||
}
|
||||
retryableFunc := func() error { return im.Import(batch) }
|
||||
_, err := im.backoff.Retry(ctx, retryableFunc)
|
||||
if err != nil {
|
||||
exitErr.Err = err
|
||||
}
|
||||
im.errors <- exitErr
|
||||
}
|
||||
exitErr := &ImportError{
|
||||
Batch: batch,
|
||||
}
|
||||
retryableFunc := func() error { return im.Import(batch) }
|
||||
_, err := im.backoff.Retry(ctx, retryableFunc)
|
||||
if err != nil {
|
||||
exitErr.Err = err
|
||||
}
|
||||
im.errors <- exitErr
|
||||
return
|
||||
case ts, ok := <-im.input:
|
||||
if !ok {
|
||||
continue
|
||||
// drain all batches before exit
|
||||
exitErr := &ImportError{
|
||||
Batch: batch,
|
||||
}
|
||||
retryableFunc := func() error { return im.Import(batch) }
|
||||
_, err := im.backoff.Retry(ctx, retryableFunc)
|
||||
if err != nil {
|
||||
exitErr.Err = err
|
||||
}
|
||||
im.errors <- exitErr
|
||||
return
|
||||
}
|
||||
// init waitForBatch when first
|
||||
// value was received
|
||||
|
||||
Reference in New Issue
Block a user