mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-26 03:57:43 +03:00
Compare commits
1 Commits
dependabot
...
detached
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35bd7f37b1 |
2
Makefile
2
Makefile
@@ -526,7 +526,7 @@ test-full:
|
|||||||
test-full-386:
|
test-full-386:
|
||||||
GOEXPERIMENT=synctest GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
GOEXPERIMENT=synctest GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
||||||
|
|
||||||
integration-test: victoria-metrics vmagent vmalert vmauth
|
integration-test: victoria-metrics vmagent vmalert vmauth vmctl
|
||||||
go test ./apptest/... -skip="^TestCluster.*"
|
go test ./apptest/... -skip="^TestCluster.*"
|
||||||
|
|
||||||
benchmark:
|
benchmark:
|
||||||
|
|||||||
@@ -1,215 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
|
|
||||||
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
testSnapshot = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547"
|
|
||||||
blockData = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547/01JHWQ445Y2P1TDYB05AEKD6MC"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This test simulates close process if user abort it
|
|
||||||
func TestPrometheusProcessorRun(t *testing.T) {
|
|
||||||
|
|
||||||
f := func(startStr, endStr string, numOfSeries int, resultExpected []vm.TimeSeries) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
dst := remote_read_integration.NewRemoteWriteServer(t)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
dst.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
dst.Series(resultExpected)
|
|
||||||
dst.ExpectedSeries(resultExpected)
|
|
||||||
|
|
||||||
if err := fillStorage(resultExpected); err != nil {
|
|
||||||
t.Fatalf("cannot fill storage: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
isSilent = true
|
|
||||||
defer func() { isSilent = false }()
|
|
||||||
|
|
||||||
bf, err := backoff.New(1, 1.8, time.Second*2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot create backoff: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
importerCfg := vm.Config{
|
|
||||||
Addr: dst.URL(),
|
|
||||||
Transport: nil,
|
|
||||||
Concurrency: 1,
|
|
||||||
Backoff: bf,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
importer, err := vm.NewImporter(ctx, importerCfg)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot create importer: %s", err)
|
|
||||||
}
|
|
||||||
defer importer.Close()
|
|
||||||
|
|
||||||
matchName := "__name__"
|
|
||||||
matchValue := ".*"
|
|
||||||
filter := prometheus.Filter{
|
|
||||||
TimeMin: startStr,
|
|
||||||
TimeMax: endStr,
|
|
||||||
Label: matchName,
|
|
||||||
LabelValue: matchValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
runner, err := prometheus.NewClient(prometheus.Config{
|
|
||||||
Snapshot: testSnapshot,
|
|
||||||
Filter: filter,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot create prometheus client: %s", err)
|
|
||||||
}
|
|
||||||
p := &prometheusProcessor{
|
|
||||||
cl: runner,
|
|
||||||
im: importer,
|
|
||||||
cc: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.run(); err != nil {
|
|
||||||
t.Fatalf("run() error: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
collectedTs := dst.GetCollectedTimeSeries()
|
|
||||||
t.Logf("collected timeseries: %d; expected timeseries: %d", len(collectedTs), len(resultExpected))
|
|
||||||
if len(collectedTs) != len(resultExpected) {
|
|
||||||
t.Fatalf("unexpected number of collected time series; got %d; want %d", len(collectedTs), numOfSeries)
|
|
||||||
}
|
|
||||||
|
|
||||||
deleted, err := deleteSeries(matchName, matchValue)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot delete series: %s", err)
|
|
||||||
}
|
|
||||||
if deleted != numOfSeries {
|
|
||||||
t.Fatalf("unexpected number of deleted series; got %d; want %d", deleted, numOfSeries)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processFlags()
|
|
||||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
|
||||||
defer func() {
|
|
||||||
vmstorage.Stop()
|
|
||||||
if err := os.RemoveAll(storagePath); err != nil {
|
|
||||||
log.Fatalf("cannot remove %q: %s", storagePath, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
barpool.Disable(true)
|
|
||||||
defer func() {
|
|
||||||
barpool.Disable(false)
|
|
||||||
}()
|
|
||||||
|
|
||||||
b, err := tsdb.OpenBlock(nil, blockData, nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot open block: %s", err)
|
|
||||||
}
|
|
||||||
// timestamp is equal to minTime and maxTime from meta.json
|
|
||||||
ss, err := readBlock(b, 1737204082361, 1737204302539)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot read block: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
resultExpected, err := prepareExpectedData(ss)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("cannot prepare expected data: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
f("2025-01-18T12:40:00Z", "2025-01-18T12:46:00Z", 2792, resultExpected)
|
|
||||||
}
|
|
||||||
|
|
||||||
func readBlock(b tsdb.BlockReader, timeMin int64, timeMax int64) (storage.SeriesSet, error) {
|
|
||||||
minTime, maxTime := b.Meta().MinTime, b.Meta().MaxTime
|
|
||||||
|
|
||||||
if timeMin != 0 {
|
|
||||||
minTime = timeMin
|
|
||||||
}
|
|
||||||
if timeMax != 0 {
|
|
||||||
maxTime = timeMax
|
|
||||||
}
|
|
||||||
|
|
||||||
q, err := tsdb.NewBlockQuerier(b, minTime, maxTime)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
matchName := "__name__"
|
|
||||||
matchValue := ".*"
|
|
||||||
ctx := context.Background()
|
|
||||||
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, matchName, matchValue))
|
|
||||||
return ss, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareExpectedData(ss storage.SeriesSet) ([]vm.TimeSeries, error) {
|
|
||||||
var expectedSeriesSet []vm.TimeSeries
|
|
||||||
var it chunkenc.Iterator
|
|
||||||
for ss.Next() {
|
|
||||||
var name string
|
|
||||||
var labelPairs []vm.LabelPair
|
|
||||||
series := ss.At()
|
|
||||||
|
|
||||||
for _, label := range series.Labels() {
|
|
||||||
if label.Name == "__name__" {
|
|
||||||
name = label.Value
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
labelPairs = append(labelPairs, vm.LabelPair{
|
|
||||||
Name: label.Name,
|
|
||||||
Value: label.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if name == "" {
|
|
||||||
return nil, fmt.Errorf("failed to find `__name__` label in labelset for block")
|
|
||||||
}
|
|
||||||
|
|
||||||
var timestamps []int64
|
|
||||||
var values []float64
|
|
||||||
it = series.Iterator(it)
|
|
||||||
for {
|
|
||||||
typ := it.Next()
|
|
||||||
if typ == chunkenc.ValNone {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if typ != chunkenc.ValFloat {
|
|
||||||
// Skip unsupported values
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
t, v := it.At()
|
|
||||||
timestamps = append(timestamps, t)
|
|
||||||
values = append(values, v)
|
|
||||||
}
|
|
||||||
if err := it.Err(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ts := vm.TimeSeries{
|
|
||||||
Name: name,
|
|
||||||
LabelPairs: labelPairs,
|
|
||||||
Timestamps: timestamps,
|
|
||||||
Values: values,
|
|
||||||
}
|
|
||||||
expectedSeriesSet = append(expectedSeriesSet, ts)
|
|
||||||
}
|
|
||||||
return expectedSeriesSet, nil
|
|
||||||
}
|
|
||||||
@@ -31,6 +31,7 @@ type app struct {
|
|||||||
binary string
|
binary string
|
||||||
flags []string
|
flags []string
|
||||||
process *os.Process
|
process *os.Process
|
||||||
|
wait bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// appOptions holds the optional configuration of an app, such as default flags
|
// appOptions holds the optional configuration of an app, such as default flags
|
||||||
@@ -38,6 +39,7 @@ type app struct {
|
|||||||
type appOptions struct {
|
type appOptions struct {
|
||||||
defaultFlags map[string]string
|
defaultFlags map[string]string
|
||||||
extractREs []*regexp.Regexp
|
extractREs []*regexp.Regexp
|
||||||
|
wait bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// startApp starts an instance of an app using the app binary file path and
|
// startApp starts an instance of an app using the app binary file path and
|
||||||
@@ -73,6 +75,7 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
|
|||||||
binary: binary,
|
binary: binary,
|
||||||
flags: flags,
|
flags: flags,
|
||||||
process: cmd.Process,
|
process: cmd.Process,
|
||||||
|
wait: opts.wait,
|
||||||
}
|
}
|
||||||
|
|
||||||
go app.processOutput("stdout", stdout, app.writeToStderr)
|
go app.processOutput("stdout", stdout, app.writeToStderr)
|
||||||
@@ -92,7 +95,11 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return app, extracts, nil
|
if app.wait {
|
||||||
|
err = cmd.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
return app, extracts, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// setDefaultFlags adds flags with default values to `flags` if it does not
|
// setDefaultFlags adds flags with default values to `flags` if it does not
|
||||||
@@ -112,9 +119,12 @@ func setDefaultFlags(flags []string, defaultFlags map[string]string) []string {
|
|||||||
return flags
|
return flags
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop sends the app process a SIGINT signal and waits until it terminates
|
// Stop sends the app process a SIGINT signal and waits until it terminates
|
||||||
// gracefully.
|
// gracefully.
|
||||||
func (app *app) Stop() {
|
func (app *app) Stop() {
|
||||||
|
if app.wait {
|
||||||
|
return
|
||||||
|
}
|
||||||
if err := app.process.Signal(os.Interrupt); err != nil {
|
if err := app.process.Signal(os.Interrupt); err != nil {
|
||||||
log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err)
|
log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,8 +7,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestCase holds the state and defines clean-up procedure common for all test
|
// TestCase holds the state and defines clean-up procedure common for all test
|
||||||
@@ -251,6 +252,18 @@ func (tc *TestCase) MustStartCluster(opts *ClusterOptions) PrometheusWriteQuerie
|
|||||||
return &Vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}}
|
return &Vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MustStartVmctl is a test helper function that starts an instance of vmctl
|
||||||
|
func (tc *TestCase) MustStartVmctl(instance string, flags []string) *Vmctl {
|
||||||
|
tc.t.Helper()
|
||||||
|
|
||||||
|
app, err := StartVmctl(instance, flags)
|
||||||
|
if err != nil {
|
||||||
|
tc.t.Fatalf("Could not start %s: %v", instance, err)
|
||||||
|
}
|
||||||
|
tc.addApp(instance, app)
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
func (tc *TestCase) addApp(instance string, app Stopper) {
|
func (tc *TestCase) addApp(instance string, app Stopper) {
|
||||||
if _, alreadyStarted := tc.startedApps[instance]; alreadyStarted {
|
if _, alreadyStarted := tc.startedApps[instance]; alreadyStarted {
|
||||||
tc.t.Fatalf("%s has already been started", instance)
|
tc.t.Fatalf("%s has already been started", instance)
|
||||||
|
|||||||
270769
apptest/tests/testdata/prometheus/expected_response.json
vendored
Normal file
270769
apptest/tests/testdata/prometheus/expected_response.json
vendored
Normal file
File diff suppressed because it is too large
Load Diff
80
apptest/tests/vmctl_prometheus_migration_test.go
Normal file
80
apptest/tests/vmctl_prometheus_migration_test.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package tests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestVmctlPrometheusProtocolToVMSingle(t *testing.T) {
|
||||||
|
os.RemoveAll(t.Name())
|
||||||
|
|
||||||
|
tc := apptest.NewTestCase(t)
|
||||||
|
defer tc.Stop()
|
||||||
|
|
||||||
|
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
|
||||||
|
|
||||||
|
vmsingleDst := tc.MustStartVmsingle("vmsingle", []string{
|
||||||
|
"-storageDataPath=" + tc.Dir() + "/vmsingle",
|
||||||
|
"-retentionPeriod=100y",
|
||||||
|
})
|
||||||
|
|
||||||
|
// test for empty data request
|
||||||
|
got := vmsingleDst.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||||
|
Step: "5m",
|
||||||
|
Time: "2025-01-18T12:45:00Z",
|
||||||
|
})
|
||||||
|
|
||||||
|
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
|
||||||
|
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
|
||||||
|
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
vmAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
|
||||||
|
testSnapshot := "./testdata/prometheus/snapshots/20250118T124506Z-59d1b952d7eaf547"
|
||||||
|
_ = tc.MustStartVmctl("vmctl", []string{
|
||||||
|
`prometheus`,
|
||||||
|
`--prom-snapshot=` + testSnapshot,
|
||||||
|
`--vm-addr=` + vmAddr,
|
||||||
|
`--disable-progress-bar=true`,
|
||||||
|
})
|
||||||
|
|
||||||
|
vmsingleDst.ForceFlush(t)
|
||||||
|
|
||||||
|
// open the expected series response file
|
||||||
|
file, err := os.Open("./testdata/prometheus/expected_response.json")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot open expected series response file: %s", err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
bytes, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot read expected series response file: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantResponse := apptest.NewPrometheusAPIV1QueryResponse(t, string(bytes))
|
||||||
|
wantResponse.Sort()
|
||||||
|
|
||||||
|
tc.Assert(&apptest.AssertOptions{
|
||||||
|
Msg: `unexpected metrics stored on vmsingle via the prometheus protocol`,
|
||||||
|
Got: func() any {
|
||||||
|
exported := vmsingleDst.PrometheusAPIV1Export(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||||
|
Start: "2025-01-18T00:45:00Z",
|
||||||
|
End: "2025-01-18T23:46:00Z",
|
||||||
|
})
|
||||||
|
exported.Sort()
|
||||||
|
return exported
|
||||||
|
},
|
||||||
|
Want: &apptest.PrometheusAPIV1QueryResponse{Data: wantResponse.Data},
|
||||||
|
CmpOpts: []cmp.Option{
|
||||||
|
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
18
apptest/vmctl.go
Normal file
18
apptest/vmctl.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package apptest
|
||||||
|
|
||||||
|
// Vmctl holds the state of a vmctl app and provides vmctl-specific functions
|
||||||
|
type Vmctl struct {
|
||||||
|
*app
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartVmctl starts an instance of vmctl cli with the given flags
|
||||||
|
func StartVmctl(instance string, flags []string) (*Vmctl, error) {
|
||||||
|
app, _, err := startApp(instance, "../../bin/vmctl", flags, &appOptions{wait: true})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Vmctl{
|
||||||
|
app: app,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user