Compare commits

...

1 Commits

Author SHA1 Message Date
dmitryk-dk
35bd7f37b1 apptest/vmctl: migrate vmctl test for the prometheus migration process
apptest/vmctl: wait until vmctl finish it work

apptest/vmctl: remove unneeded function

apptest/vmctl: fix linter

apptest/vmctl: add vmctl to build for the integration tests

apptest/vmctl: rename file

apptest/vmctl: fix comments

apptest/vmctl: added check of the wait discussed in the comments

apptest/vmctl: compare migrated data with expected response

apptest/vmctl: format expected_response.json
2025-06-02 16:37:02 +02:00
11 changed files with 270894 additions and 219 deletions

View File

@@ -526,7 +526,7 @@ test-full:
test-full-386:
GOEXPERIMENT=synctest GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
integration-test: victoria-metrics vmagent vmalert vmauth
integration-test: victoria-metrics vmagent vmalert vmauth vmctl
go test ./apptest/... -skip="^TestCluster.*"
benchmark:

View File

@@ -1,215 +0,0 @@
package main
import (
"context"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
)
const (
testSnapshot = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547"
blockData = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547/01JHWQ445Y2P1TDYB05AEKD6MC"
)
// This test simulates close process if user abort it
func TestPrometheusProcessorRun(t *testing.T) {
f := func(startStr, endStr string, numOfSeries int, resultExpected []vm.TimeSeries) {
t.Helper()
dst := remote_read_integration.NewRemoteWriteServer(t)
defer func() {
dst.Close()
}()
dst.Series(resultExpected)
dst.ExpectedSeries(resultExpected)
if err := fillStorage(resultExpected); err != nil {
t.Fatalf("cannot fill storage: %s", err)
}
isSilent = true
defer func() { isSilent = false }()
bf, err := backoff.New(1, 1.8, time.Second*2)
if err != nil {
t.Fatalf("cannot create backoff: %s", err)
}
importerCfg := vm.Config{
Addr: dst.URL(),
Transport: nil,
Concurrency: 1,
Backoff: bf,
}
ctx := context.Background()
importer, err := vm.NewImporter(ctx, importerCfg)
if err != nil {
t.Fatalf("cannot create importer: %s", err)
}
defer importer.Close()
matchName := "__name__"
matchValue := ".*"
filter := prometheus.Filter{
TimeMin: startStr,
TimeMax: endStr,
Label: matchName,
LabelValue: matchValue,
}
runner, err := prometheus.NewClient(prometheus.Config{
Snapshot: testSnapshot,
Filter: filter,
})
if err != nil {
t.Fatalf("cannot create prometheus client: %s", err)
}
p := &prometheusProcessor{
cl: runner,
im: importer,
cc: 1,
}
if err := p.run(); err != nil {
t.Fatalf("run() error: %s", err)
}
collectedTs := dst.GetCollectedTimeSeries()
t.Logf("collected timeseries: %d; expected timeseries: %d", len(collectedTs), len(resultExpected))
if len(collectedTs) != len(resultExpected) {
t.Fatalf("unexpected number of collected time series; got %d; want %d", len(collectedTs), numOfSeries)
}
deleted, err := deleteSeries(matchName, matchValue)
if err != nil {
t.Fatalf("cannot delete series: %s", err)
}
if deleted != numOfSeries {
t.Fatalf("unexpected number of deleted series; got %d; want %d", deleted, numOfSeries)
}
}
processFlags()
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
defer func() {
vmstorage.Stop()
if err := os.RemoveAll(storagePath); err != nil {
log.Fatalf("cannot remove %q: %s", storagePath, err)
}
}()
barpool.Disable(true)
defer func() {
barpool.Disable(false)
}()
b, err := tsdb.OpenBlock(nil, blockData, nil, nil)
if err != nil {
t.Fatalf("cannot open block: %s", err)
}
// timestamp is equal to minTime and maxTime from meta.json
ss, err := readBlock(b, 1737204082361, 1737204302539)
if err != nil {
t.Fatalf("cannot read block: %s", err)
}
resultExpected, err := prepareExpectedData(ss)
if err != nil {
t.Fatalf("cannot prepare expected data: %s", err)
}
f("2025-01-18T12:40:00Z", "2025-01-18T12:46:00Z", 2792, resultExpected)
}
func readBlock(b tsdb.BlockReader, timeMin int64, timeMax int64) (storage.SeriesSet, error) {
minTime, maxTime := b.Meta().MinTime, b.Meta().MaxTime
if timeMin != 0 {
minTime = timeMin
}
if timeMax != 0 {
maxTime = timeMax
}
q, err := tsdb.NewBlockQuerier(b, minTime, maxTime)
if err != nil {
return nil, err
}
matchName := "__name__"
matchValue := ".*"
ctx := context.Background()
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, matchName, matchValue))
return ss, nil
}
func prepareExpectedData(ss storage.SeriesSet) ([]vm.TimeSeries, error) {
var expectedSeriesSet []vm.TimeSeries
var it chunkenc.Iterator
for ss.Next() {
var name string
var labelPairs []vm.LabelPair
series := ss.At()
for _, label := range series.Labels() {
if label.Name == "__name__" {
name = label.Value
continue
}
labelPairs = append(labelPairs, vm.LabelPair{
Name: label.Name,
Value: label.Value,
})
}
if name == "" {
return nil, fmt.Errorf("failed to find `__name__` label in labelset for block")
}
var timestamps []int64
var values []float64
it = series.Iterator(it)
for {
typ := it.Next()
if typ == chunkenc.ValNone {
break
}
if typ != chunkenc.ValFloat {
// Skip unsupported values
continue
}
t, v := it.At()
timestamps = append(timestamps, t)
values = append(values, v)
}
if err := it.Err(); err != nil {
return nil, err
}
ts := vm.TimeSeries{
Name: name,
LabelPairs: labelPairs,
Timestamps: timestamps,
Values: values,
}
expectedSeriesSet = append(expectedSeriesSet, ts)
}
return expectedSeriesSet, nil
}

View File

@@ -31,6 +31,7 @@ type app struct {
binary string
flags []string
process *os.Process
wait bool
}
// appOptions holds the optional configuration of an app, such as default flags
@@ -38,6 +39,7 @@ type app struct {
type appOptions struct {
defaultFlags map[string]string
extractREs []*regexp.Regexp
wait bool
}
// startApp starts an instance of an app using the app binary file path and
@@ -73,6 +75,7 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
binary: binary,
flags: flags,
process: cmd.Process,
wait: opts.wait,
}
go app.processOutput("stdout", stdout, app.writeToStderr)
@@ -92,7 +95,11 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
return nil, nil, err
}
return app, extracts, nil
if app.wait {
err = cmd.Wait()
}
return app, extracts, err
}
// setDefaultFlags adds flags with default values to `flags` if it does not
@@ -112,9 +119,12 @@ func setDefaultFlags(flags []string, defaultFlags map[string]string) []string {
return flags
}
// stop sends the app process a SIGINT signal and waits until it terminates
// Stop sends the app process a SIGINT signal and waits until it terminates
// gracefully.
func (app *app) Stop() {
if app.wait {
return
}
if err := app.process.Signal(os.Interrupt); err != nil {
log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err)
}

View File

@@ -7,8 +7,9 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/google/go-cmp/cmp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
// TestCase holds the state and defines clean-up procedure common for all test
@@ -251,6 +252,18 @@ func (tc *TestCase) MustStartCluster(opts *ClusterOptions) PrometheusWriteQuerie
return &Vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}}
}
// MustStartVmctl is a test helper function that starts an instance of vmctl
func (tc *TestCase) MustStartVmctl(instance string, flags []string) *Vmctl {
tc.t.Helper()
app, err := StartVmctl(instance, flags)
if err != nil {
tc.t.Fatalf("Could not start %s: %v", instance, err)
}
tc.addApp(instance, app)
return app
}
func (tc *TestCase) addApp(instance string, app Stopper) {
if _, alreadyStarted := tc.startedApps[instance]; alreadyStarted {
tc.t.Fatalf("%s has already been started", instance)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
package tests
import (
"fmt"
"io"
"os"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
func TestVmctlPrometheusProtocolToVMSingle(t *testing.T) {
os.RemoveAll(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
vmsingleDst := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + tc.Dir() + "/vmsingle",
"-retentionPeriod=100y",
})
// test for empty data request
got := vmsingleDst.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
Step: "5m",
Time: "2025-01-18T12:45:00Z",
})
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
vmAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
testSnapshot := "./testdata/prometheus/snapshots/20250118T124506Z-59d1b952d7eaf547"
_ = tc.MustStartVmctl("vmctl", []string{
`prometheus`,
`--prom-snapshot=` + testSnapshot,
`--vm-addr=` + vmAddr,
`--disable-progress-bar=true`,
})
vmsingleDst.ForceFlush(t)
// open the expected series response file
file, err := os.Open("./testdata/prometheus/expected_response.json")
if err != nil {
t.Fatalf("cannot open expected series response file: %s", err)
}
defer file.Close()
bytes, err := io.ReadAll(file)
if err != nil {
t.Fatalf("cannot read expected series response file: %s", err)
}
wantResponse := apptest.NewPrometheusAPIV1QueryResponse(t, string(bytes))
wantResponse.Sort()
tc.Assert(&apptest.AssertOptions{
Msg: `unexpected metrics stored on vmsingle via the prometheus protocol`,
Got: func() any {
exported := vmsingleDst.PrometheusAPIV1Export(t, `{__name__=~".*"}`, apptest.QueryOpts{
Start: "2025-01-18T00:45:00Z",
End: "2025-01-18T23:46:00Z",
})
exported.Sort()
return exported
},
Want: &apptest.PrometheusAPIV1QueryResponse{Data: wantResponse.Data},
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
},
})
}

18
apptest/vmctl.go Normal file
View File

@@ -0,0 +1,18 @@
package apptest
// Vmctl holds the state of a vmctl app and provides vmctl-specific functions
type Vmctl struct {
*app
}
// StartVmctl starts an instance of vmctl cli with the given flags
func StartVmctl(instance string, flags []string) (*Vmctl, error) {
app, _, err := startApp(instance, "../../bin/vmctl", flags, &appOptions{wait: true})
if err != nil {
return nil, err
}
return &Vmctl{
app: app,
}, nil
}