Compare commits

...

2 Commits

Author SHA1 Message Date
dmitryk-dk
a07f4078de app/vmctl: remove unused channel 2025-03-17 17:40:25 +01:00
dmitryk-dk
de96a937dd app/vmctl: propagate context to avoid data races 2025-03-17 17:27:56 +01:00
7 changed files with 46 additions and 35 deletions

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"io"
"log"
@@ -37,7 +38,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
}
}
func (ip *influxProcessor) run() error {
func (ip *influxProcessor) run(ctx context.Context) error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %s", err)
@@ -67,7 +68,7 @@ func (ip *influxProcessor) run() error {
go func() {
defer wg.Done()
for s := range seriesCh {
if err := ip.do(s); err != nil {
if err := ip.do(ctx, s); err != nil {
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
return
}
@@ -110,7 +111,7 @@ const dbLabel = "db"
const nameLabel = "__name__"
const valueField = "value"
func (ip *influxProcessor) do(s *influx.Series) error {
func (ip *influxProcessor) do(ctx context.Context, s *influx.Series) error {
cr, err := ip.ic.FetchDataPoints(s)
if err != nil {
return fmt.Errorf("failed to fetch datapoints: %s", err)
@@ -163,7 +164,7 @@ func (ip *influxProcessor) do(s *influx.Series) error {
Timestamps: time,
Values: values,
}
if err := ip.im.Input(&ts); err != nil {
if err := ip.im.Input(ctx, &ts); err != nil {
return err
}
}

View File

@@ -97,7 +97,7 @@ func main() {
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
return otsdbProcessor.run()
return otsdbProcessor.run(ctx)
},
},
{
@@ -158,7 +158,7 @@ func main() {
c.Bool(influxSkipDatabaseLabel),
c.Bool(influxPrometheusMode),
c.Bool(globalVerbose))
return processor.run()
return processor.run(ctx)
},
},
{
@@ -261,7 +261,7 @@ func main() {
cc: c.Int(promConcurrency),
isVerbose: c.Bool(globalVerbose),
}
return pp.run()
return pp.run(ctx)
},
},
{

View File

@@ -1,14 +1,16 @@
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type otsdbProcessor struct {
@@ -37,7 +39,7 @@ func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int, verbos
}
}
func (op *otsdbProcessor) run() error {
func (op *otsdbProcessor) run(ctx context.Context) error {
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
var metrics []string
for _, filter := range op.oc.Filters {
@@ -93,7 +95,7 @@ func (op *otsdbProcessor) run() error {
go func() {
defer wg.Done()
for s := range seriesCh {
if err := op.do(s); err != nil {
if err := op.do(ctx, s); err != nil {
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
return
}
@@ -148,7 +150,7 @@ func (op *otsdbProcessor) run() error {
return nil
}
func (op *otsdbProcessor) do(s queryObj) error {
func (op *otsdbProcessor) do(ctx context.Context, s queryObj) error {
start := s.StartTime - s.Tr.Start
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
@@ -168,5 +170,5 @@ func (op *otsdbProcessor) do(s queryObj) error {
Timestamps: data.Timestamps,
Values: data.Values,
}
return op.im.Input(&ts)
return op.im.Input(ctx, &ts)
}

View File

@@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"log"
"sync"
@@ -30,7 +31,7 @@ type prometheusProcessor struct {
isVerbose bool
}
func (pp *prometheusProcessor) run() error {
func (pp *prometheusProcessor) run(ctx context.Context) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
@@ -59,7 +60,7 @@ func (pp *prometheusProcessor) run() error {
go func() {
defer wg.Done()
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
if err := pp.do(ctx, br); err != nil {
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
return
}
@@ -100,7 +101,7 @@ func (pp *prometheusProcessor) run() error {
return nil
}
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
ss, err := pp.cl.Read(b)
if err != nil {
return fmt.Errorf("failed to read block: %s", err)
@@ -150,7 +151,7 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
Timestamps: timestamps,
Values: values,
}
if err := pp.im.Input(&ts); err != nil {
if err := pp.im.Input(ctx, &ts); err != nil {
return err
}
}

View File

@@ -160,7 +160,7 @@ func TestPrometheusProcessorRun(t *testing.T) {
go tt.fields.closer(importer)
}
if err := pp.run(); (err != nil) != tt.wantErr {
if err := pp.run(context.Background()); (err != nil) != tt.wantErr {
t.Fatalf("run() error = %v, wantErr %v", err, tt.wantErr)
}
})

View File

@@ -112,7 +112,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filter) error {
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
if err := rrp.dst.Input(series); err != nil {
if err := rrp.dst.Input(ctx, series); err != nil {
return fmt.Errorf(
"failed to read data for time range start: %d, end: %d, %s",
filter.StartTimestampMs, filter.EndTimestampMs, err)

View File

@@ -69,7 +69,6 @@ type Importer struct {
user string
password string
close chan struct{}
input chan *TimeSeries
errors chan *ImportError
@@ -143,7 +142,6 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
errors: make(chan *ImportError, cfg.Concurrency),
backoff: cfg.Backoff,
@@ -189,10 +187,10 @@ func (im *Importer) Errors() chan *ImportError { return im.errors }
// Input returns a channel for sending timeseries
// that need to be imported
func (im *Importer) Input(ts *TimeSeries) error {
func (im *Importer) Input(ctx context.Context, ts *TimeSeries) error {
select {
case <-im.close:
return fmt.Errorf("importer is closed")
case <-ctx.Done():
return ctx.Err()
case im.input <- ts:
return nil
case err := <-im.errors:
@@ -207,7 +205,6 @@ func (im *Importer) Input(ts *TimeSeries) error {
// and waits until they are finished
func (im *Importer) Close() {
im.once.Do(func() {
close(im.close)
close(im.input)
im.wg.Wait()
close(im.errors)
@@ -220,24 +217,34 @@ func (im *Importer) startWorker(ctx context.Context, bar barpool.Bar, batchSize,
var waitForBatch time.Time
for {
select {
case <-im.close:
case <-ctx.Done():
for ts := range im.input {
ts = roundTimeseriesValue(ts, significantFigures, roundDigits)
batch = append(batch, ts)
exitErr := &ImportError{
Batch: batch,
}
retryableFunc := func() error { return im.Import(batch) }
_, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
exitErr.Err = err
}
im.errors <- exitErr
}
exitErr := &ImportError{
Batch: batch,
}
retryableFunc := func() error { return im.Import(batch) }
_, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
exitErr.Err = err
}
im.errors <- exitErr
return
case ts, ok := <-im.input:
if !ok {
continue
// drain all batches before exit
exitErr := &ImportError{
Batch: batch,
}
retryableFunc := func() error { return im.Import(batch) }
_, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
exitErr.Err = err
}
im.errors <- exitErr
return
}
// init waitForBatch when first
// value was received