mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 16:59:40 +03:00
Compare commits
1 Commits
timerpool-
...
detached
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35bd7f37b1 |
2
Makefile
2
Makefile
@@ -526,7 +526,7 @@ test-full:
|
||||
test-full-386:
|
||||
GOEXPERIMENT=synctest GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
||||
|
||||
integration-test: victoria-metrics vmagent vmalert vmauth
|
||||
integration-test: victoria-metrics vmagent vmalert vmauth vmctl
|
||||
go test ./apptest/... -skip="^TestCluster.*"
|
||||
|
||||
benchmark:
|
||||
|
||||
@@ -1,215 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
|
||||
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
)
|
||||
|
||||
const (
|
||||
testSnapshot = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547"
|
||||
blockData = "./testdata/snapshots/20250118T124506Z-59d1b952d7eaf547/01JHWQ445Y2P1TDYB05AEKD6MC"
|
||||
)
|
||||
|
||||
// This test simulates close process if user abort it
|
||||
func TestPrometheusProcessorRun(t *testing.T) {
|
||||
|
||||
f := func(startStr, endStr string, numOfSeries int, resultExpected []vm.TimeSeries) {
|
||||
t.Helper()
|
||||
|
||||
dst := remote_read_integration.NewRemoteWriteServer(t)
|
||||
|
||||
defer func() {
|
||||
dst.Close()
|
||||
}()
|
||||
|
||||
dst.Series(resultExpected)
|
||||
dst.ExpectedSeries(resultExpected)
|
||||
|
||||
if err := fillStorage(resultExpected); err != nil {
|
||||
t.Fatalf("cannot fill storage: %s", err)
|
||||
}
|
||||
|
||||
isSilent = true
|
||||
defer func() { isSilent = false }()
|
||||
|
||||
bf, err := backoff.New(1, 1.8, time.Second*2)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create backoff: %s", err)
|
||||
}
|
||||
|
||||
importerCfg := vm.Config{
|
||||
Addr: dst.URL(),
|
||||
Transport: nil,
|
||||
Concurrency: 1,
|
||||
Backoff: bf,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
importer, err := vm.NewImporter(ctx, importerCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create importer: %s", err)
|
||||
}
|
||||
defer importer.Close()
|
||||
|
||||
matchName := "__name__"
|
||||
matchValue := ".*"
|
||||
filter := prometheus.Filter{
|
||||
TimeMin: startStr,
|
||||
TimeMax: endStr,
|
||||
Label: matchName,
|
||||
LabelValue: matchValue,
|
||||
}
|
||||
|
||||
runner, err := prometheus.NewClient(prometheus.Config{
|
||||
Snapshot: testSnapshot,
|
||||
Filter: filter,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create prometheus client: %s", err)
|
||||
}
|
||||
p := &prometheusProcessor{
|
||||
cl: runner,
|
||||
im: importer,
|
||||
cc: 1,
|
||||
}
|
||||
|
||||
if err := p.run(); err != nil {
|
||||
t.Fatalf("run() error: %s", err)
|
||||
}
|
||||
|
||||
collectedTs := dst.GetCollectedTimeSeries()
|
||||
t.Logf("collected timeseries: %d; expected timeseries: %d", len(collectedTs), len(resultExpected))
|
||||
if len(collectedTs) != len(resultExpected) {
|
||||
t.Fatalf("unexpected number of collected time series; got %d; want %d", len(collectedTs), numOfSeries)
|
||||
}
|
||||
|
||||
deleted, err := deleteSeries(matchName, matchValue)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot delete series: %s", err)
|
||||
}
|
||||
if deleted != numOfSeries {
|
||||
t.Fatalf("unexpected number of deleted series; got %d; want %d", deleted, numOfSeries)
|
||||
}
|
||||
}
|
||||
|
||||
processFlags()
|
||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
||||
defer func() {
|
||||
vmstorage.Stop()
|
||||
if err := os.RemoveAll(storagePath); err != nil {
|
||||
log.Fatalf("cannot remove %q: %s", storagePath, err)
|
||||
}
|
||||
}()
|
||||
|
||||
barpool.Disable(true)
|
||||
defer func() {
|
||||
barpool.Disable(false)
|
||||
}()
|
||||
|
||||
b, err := tsdb.OpenBlock(nil, blockData, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open block: %s", err)
|
||||
}
|
||||
// timestamp is equal to minTime and maxTime from meta.json
|
||||
ss, err := readBlock(b, 1737204082361, 1737204302539)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot read block: %s", err)
|
||||
}
|
||||
|
||||
resultExpected, err := prepareExpectedData(ss)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot prepare expected data: %s", err)
|
||||
}
|
||||
|
||||
f("2025-01-18T12:40:00Z", "2025-01-18T12:46:00Z", 2792, resultExpected)
|
||||
}
|
||||
|
||||
func readBlock(b tsdb.BlockReader, timeMin int64, timeMax int64) (storage.SeriesSet, error) {
|
||||
minTime, maxTime := b.Meta().MinTime, b.Meta().MaxTime
|
||||
|
||||
if timeMin != 0 {
|
||||
minTime = timeMin
|
||||
}
|
||||
if timeMax != 0 {
|
||||
maxTime = timeMax
|
||||
}
|
||||
|
||||
q, err := tsdb.NewBlockQuerier(b, minTime, maxTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
matchName := "__name__"
|
||||
matchValue := ".*"
|
||||
ctx := context.Background()
|
||||
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, matchName, matchValue))
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
func prepareExpectedData(ss storage.SeriesSet) ([]vm.TimeSeries, error) {
|
||||
var expectedSeriesSet []vm.TimeSeries
|
||||
var it chunkenc.Iterator
|
||||
for ss.Next() {
|
||||
var name string
|
||||
var labelPairs []vm.LabelPair
|
||||
series := ss.At()
|
||||
|
||||
for _, label := range series.Labels() {
|
||||
if label.Name == "__name__" {
|
||||
name = label.Value
|
||||
continue
|
||||
}
|
||||
labelPairs = append(labelPairs, vm.LabelPair{
|
||||
Name: label.Name,
|
||||
Value: label.Value,
|
||||
})
|
||||
}
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("failed to find `__name__` label in labelset for block")
|
||||
}
|
||||
|
||||
var timestamps []int64
|
||||
var values []float64
|
||||
it = series.Iterator(it)
|
||||
for {
|
||||
typ := it.Next()
|
||||
if typ == chunkenc.ValNone {
|
||||
break
|
||||
}
|
||||
if typ != chunkenc.ValFloat {
|
||||
// Skip unsupported values
|
||||
continue
|
||||
}
|
||||
t, v := it.At()
|
||||
timestamps = append(timestamps, t)
|
||||
values = append(values, v)
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ts := vm.TimeSeries{
|
||||
Name: name,
|
||||
LabelPairs: labelPairs,
|
||||
Timestamps: timestamps,
|
||||
Values: values,
|
||||
}
|
||||
expectedSeriesSet = append(expectedSeriesSet, ts)
|
||||
}
|
||||
return expectedSeriesSet, nil
|
||||
}
|
||||
@@ -31,6 +31,7 @@ type app struct {
|
||||
binary string
|
||||
flags []string
|
||||
process *os.Process
|
||||
wait bool
|
||||
}
|
||||
|
||||
// appOptions holds the optional configuration of an app, such as default flags
|
||||
@@ -38,6 +39,7 @@ type app struct {
|
||||
type appOptions struct {
|
||||
defaultFlags map[string]string
|
||||
extractREs []*regexp.Regexp
|
||||
wait bool
|
||||
}
|
||||
|
||||
// startApp starts an instance of an app using the app binary file path and
|
||||
@@ -73,6 +75,7 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
|
||||
binary: binary,
|
||||
flags: flags,
|
||||
process: cmd.Process,
|
||||
wait: opts.wait,
|
||||
}
|
||||
|
||||
go app.processOutput("stdout", stdout, app.writeToStderr)
|
||||
@@ -92,7 +95,11 @@ func startApp(instance string, binary string, flags []string, opts *appOptions)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return app, extracts, nil
|
||||
if app.wait {
|
||||
err = cmd.Wait()
|
||||
}
|
||||
|
||||
return app, extracts, err
|
||||
}
|
||||
|
||||
// setDefaultFlags adds flags with default values to `flags` if it does not
|
||||
@@ -112,9 +119,12 @@ func setDefaultFlags(flags []string, defaultFlags map[string]string) []string {
|
||||
return flags
|
||||
}
|
||||
|
||||
// stop sends the app process a SIGINT signal and waits until it terminates
|
||||
// Stop sends the app process a SIGINT signal and waits until it terminates
|
||||
// gracefully.
|
||||
func (app *app) Stop() {
|
||||
if app.wait {
|
||||
return
|
||||
}
|
||||
if err := app.process.Signal(os.Interrupt); err != nil {
|
||||
log.Fatalf("Could not send SIGINT signal to %s process: %v", app.instance, err)
|
||||
}
|
||||
|
||||
@@ -7,8 +7,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
// TestCase holds the state and defines clean-up procedure common for all test
|
||||
@@ -251,6 +252,18 @@ func (tc *TestCase) MustStartCluster(opts *ClusterOptions) PrometheusWriteQuerie
|
||||
return &Vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}}
|
||||
}
|
||||
|
||||
// MustStartVmctl is a test helper function that starts an instance of vmctl
|
||||
func (tc *TestCase) MustStartVmctl(instance string, flags []string) *Vmctl {
|
||||
tc.t.Helper()
|
||||
|
||||
app, err := StartVmctl(instance, flags)
|
||||
if err != nil {
|
||||
tc.t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
tc.addApp(instance, app)
|
||||
return app
|
||||
}
|
||||
|
||||
func (tc *TestCase) addApp(instance string, app Stopper) {
|
||||
if _, alreadyStarted := tc.startedApps[instance]; alreadyStarted {
|
||||
tc.t.Fatalf("%s has already been started", instance)
|
||||
|
||||
270769
apptest/tests/testdata/prometheus/expected_response.json
vendored
Normal file
270769
apptest/tests/testdata/prometheus/expected_response.json
vendored
Normal file
File diff suppressed because it is too large
Load Diff
80
apptest/tests/vmctl_prometheus_migration_test.go
Normal file
80
apptest/tests/vmctl_prometheus_migration_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
func TestVmctlPrometheusProtocolToVMSingle(t *testing.T) {
|
||||
os.RemoveAll(t.Name())
|
||||
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
|
||||
|
||||
vmsingleDst := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + tc.Dir() + "/vmsingle",
|
||||
"-retentionPeriod=100y",
|
||||
})
|
||||
|
||||
// test for empty data request
|
||||
got := vmsingleDst.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||
Step: "5m",
|
||||
Time: "2025-01-18T12:45:00Z",
|
||||
})
|
||||
|
||||
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
|
||||
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
vmAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
|
||||
testSnapshot := "./testdata/prometheus/snapshots/20250118T124506Z-59d1b952d7eaf547"
|
||||
_ = tc.MustStartVmctl("vmctl", []string{
|
||||
`prometheus`,
|
||||
`--prom-snapshot=` + testSnapshot,
|
||||
`--vm-addr=` + vmAddr,
|
||||
`--disable-progress-bar=true`,
|
||||
})
|
||||
|
||||
vmsingleDst.ForceFlush(t)
|
||||
|
||||
// open the expected series response file
|
||||
file, err := os.Open("./testdata/prometheus/expected_response.json")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open expected series response file: %s", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
bytes, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot read expected series response file: %s", err)
|
||||
}
|
||||
|
||||
wantResponse := apptest.NewPrometheusAPIV1QueryResponse(t, string(bytes))
|
||||
wantResponse.Sort()
|
||||
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
Msg: `unexpected metrics stored on vmsingle via the prometheus protocol`,
|
||||
Got: func() any {
|
||||
exported := vmsingleDst.PrometheusAPIV1Export(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||
Start: "2025-01-18T00:45:00Z",
|
||||
End: "2025-01-18T23:46:00Z",
|
||||
})
|
||||
exported.Sort()
|
||||
return exported
|
||||
},
|
||||
Want: &apptest.PrometheusAPIV1QueryResponse{Data: wantResponse.Data},
|
||||
CmpOpts: []cmp.Option{
|
||||
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
|
||||
},
|
||||
})
|
||||
}
|
||||
18
apptest/vmctl.go
Normal file
18
apptest/vmctl.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package apptest
|
||||
|
||||
// Vmctl holds the state of a vmctl app and provides vmctl-specific functions
|
||||
type Vmctl struct {
|
||||
*app
|
||||
}
|
||||
|
||||
// StartVmctl starts an instance of vmctl cli with the given flags
|
||||
func StartVmctl(instance string, flags []string) (*Vmctl, error) {
|
||||
app, _, err := startApp(instance, "../../bin/vmctl", flags, &appOptions{wait: true})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Vmctl{
|
||||
app: app,
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user