mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
Basic apptest and changes to make it work
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
18
Makefile
18
Makefile
@@ -471,7 +471,23 @@ test-full-386:
|
||||
|
||||
apptest:
|
||||
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
|
||||
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
|
||||
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
|
||||
|
||||
apptest-mixed: victoria-metrics-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
VERSION=v1.142.0; \
|
||||
VMSINGLE=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}.tar.gz; \
|
||||
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
|
||||
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
|
||||
DIR=/tmp/$${VERSION}; \
|
||||
test -d $${DIR} || (mkdir $${DIR} && \
|
||||
curl --output-dir /tmp -LO $${URL}/$${VMSINGLE} && tar xzf /tmp/$${VMSINGLE} -C $${DIR} && \
|
||||
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
|
||||
); \
|
||||
VM_VMSINGLE_PATH=$${DIR}/victoria-metrics-prod \
|
||||
VM_VMSELECT_PATH=$${DIR}/vmselect-prod \
|
||||
go test ./apptest/tests -run="^TestMixed.*"
|
||||
|
||||
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
|
||||
@@ -4,10 +4,12 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@@ -89,6 +91,11 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQu
|
||||
bi.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
// Initialize block iterator with the tenantID which will be added to the
|
||||
// metric name of every block. See bi.NextBlock().
|
||||
bi.tenantID = make([]byte, 0, 8)
|
||||
bi.tenantID = encoding.MarshalUint32(bi.tenantID, sq.AccountID)
|
||||
bi.tenantID = encoding.MarshalUint32(bi.tenantID, sq.ProjectID)
|
||||
return bi, nil
|
||||
}
|
||||
|
||||
@@ -110,7 +117,24 @@ func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.S
|
||||
if len(tfss) == 0 {
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
||||
|
||||
metricNames, err := api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// vmselect expects metric names to have the tenantID but vmsingle does not
|
||||
// have it. Therefore the tenantID needs to be appended to every metric
|
||||
// name.
|
||||
dst := make([]byte, 0, 8)
|
||||
dst = encoding.MarshalUint32(dst, sq.AccountID)
|
||||
dst = encoding.MarshalUint32(dst, sq.ProjectID)
|
||||
tenantID := string(dst)
|
||||
|
||||
for i, metricName := range metricNames {
|
||||
metricNames[i] = tenantID + metricName
|
||||
}
|
||||
return metricNames, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
@@ -269,8 +293,9 @@ func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
tenantID []byte
|
||||
}
|
||||
|
||||
var blockIteratorsPool sync.Pool
|
||||
@@ -279,6 +304,7 @@ func (bi *blockIterator) MustClose() {
|
||||
bi.sr.MustClose()
|
||||
bi.mb.MetricName = nil
|
||||
bi.mb.Block.Reset()
|
||||
bi.tenantID = nil
|
||||
blockIteratorsPool.Put(bi)
|
||||
}
|
||||
|
||||
@@ -295,9 +321,16 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
mb := bi.mb
|
||||
mb.MetricName = bi.sr.MetricBlockRef.MetricName
|
||||
|
||||
// vmselect expects metric names to have the tenantID but vmsingle does not
|
||||
// have it. Therefore the tenantID needs to be included to every metric
|
||||
// name and block.
|
||||
mb.MetricName = slices.Concat(bi.tenantID, bi.sr.MetricBlockRef.MetricName)
|
||||
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
|
||||
dst = mb.Marshal(dst[:0])
|
||||
dst = encoding.MarshalBytes(dst, mb.MetricName)
|
||||
dst = append(dst, bi.tenantID...)
|
||||
dst = storage.MarshalBlock(dst, &mb.Block)
|
||||
|
||||
return dst, true
|
||||
}
|
||||
|
||||
|
||||
@@ -92,9 +92,6 @@ func (tc *TestCase) MustStartDefaultVmsingle() *Vmsingle {
|
||||
// fails to start.
|
||||
func (tc *TestCase) MustStartVmsingle(instance string, flags []string) *Vmsingle {
|
||||
tc.t.Helper()
|
||||
// TODO(rtm0): Move to defaultFlags in vmsingle.go. Currently does not work
|
||||
// because legacy vmsingle does not have this flag.
|
||||
flags = append(flags, "-vmselectAddr=127.0.0.1:0")
|
||||
return tc.MustStartVmsingleAt(instance, "../../bin/victoria-metrics-race", flags)
|
||||
}
|
||||
|
||||
@@ -111,9 +108,23 @@ func (tc *TestCase) MustStartVmsingleAt(instance, binary string, flags []string)
|
||||
return app
|
||||
}
|
||||
|
||||
// MustStartLegacyVmsingleAt is a test helper function that starts an instance
|
||||
// of vmsingle v1.132.0 (last version before pt-index) and fails the test if the
|
||||
// app fails to start.
|
||||
func (tc *TestCase) MustStartLegacyVmsingleAt(instance, binary string, flags []string) *Vmsingle {
|
||||
tc.t.Helper()
|
||||
|
||||
app, err := StartLegacyVmsingleAt(instance, binary, flags, tc.cli, tc.output)
|
||||
if err != nil {
|
||||
tc.t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
tc.addApp(instance, app)
|
||||
return app
|
||||
}
|
||||
|
||||
// MustStartVmstorage is a test helper function that starts an instance of
|
||||
// vmstorage located at ../../bin/vmstorage-race and fails the test if the app fails
|
||||
// to start.
|
||||
// vmstorage located at ../../bin/vmstorage-race and fails the test if the app
|
||||
// fails to start.
|
||||
func (tc *TestCase) MustStartVmstorage(instance string, flags []string) *Vmstorage {
|
||||
tc.t.Helper()
|
||||
return tc.MustStartVmstorageAt(instance, "../../bin/vmstorage-race", flags)
|
||||
@@ -121,7 +132,7 @@ func (tc *TestCase) MustStartVmstorage(instance string, flags []string) *Vmstora
|
||||
|
||||
// MustStartVmstorageAt is a test helper function that starts an instance of
|
||||
// vmstorage and fails the test if the app fails to start.
|
||||
func (tc *TestCase) MustStartVmstorageAt(instance string, binary string, flags []string) *Vmstorage {
|
||||
func (tc *TestCase) MustStartVmstorageAt(instance, binary string, flags []string) *Vmstorage {
|
||||
tc.t.Helper()
|
||||
|
||||
app, err := StartVmstorageAt(instance, binary, flags, tc.cli, tc.output)
|
||||
@@ -133,11 +144,19 @@ func (tc *TestCase) MustStartVmstorageAt(instance string, binary string, flags [
|
||||
}
|
||||
|
||||
// MustStartVmselect is a test helper function that starts an instance of
|
||||
// vmselect and fails the test if the app fails to start.
|
||||
// vmselect located at ../../bin/vmselect-race and fails the test if the app
|
||||
// fails to start.
|
||||
func (tc *TestCase) MustStartVmselect(instance string, flags []string) *Vmselect {
|
||||
tc.t.Helper()
|
||||
return tc.MustStartVmselectAt(instance, "../../bin/vmselect-race", flags)
|
||||
}
|
||||
|
||||
app, err := StartVmselect(instance, flags, tc.cli, tc.output)
|
||||
// MustStartVmselectAt is a test helper function that starts an instance of
|
||||
// vmselect and fails the test if the app fails to start.
|
||||
func (tc *TestCase) MustStartVmselectAt(instance, binary string, flags []string) *Vmselect {
|
||||
tc.t.Helper()
|
||||
|
||||
app, err := StartVmselectAt(instance, binary, flags, tc.cli, tc.output)
|
||||
if err != nil {
|
||||
tc.t.Fatalf("Could not start %s: %v", instance, err)
|
||||
}
|
||||
|
||||
89
apptest/testdata.go
Normal file
89
apptest/testdata.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package apptest
|
||||
|
||||
import "fmt"
|
||||
|
||||
type TestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantSeries []map[string]string
|
||||
WantQueryResults []*QueryResult
|
||||
}
|
||||
|
||||
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
|
||||
samples := make([]string, numMetrics)
|
||||
step := (end - start) / numMetrics
|
||||
wantSeries := make([]map[string]string, numMetrics)
|
||||
wantQueryResults := make([]*QueryResult, numMetrics)
|
||||
for i := range numMetrics {
|
||||
metricName := fmt.Sprintf("%s_%04d", prefix, i)
|
||||
labelName := fmt.Sprintf("label_%04d", i)
|
||||
labelValue := fmt.Sprintf("value_%04d", i)
|
||||
value := i
|
||||
timestamp := start + i*step
|
||||
samples[i] = fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
|
||||
wantSeries[i] = map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
}
|
||||
wantQueryResults[i] = &QueryResult{
|
||||
Metric: map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
},
|
||||
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
|
||||
}
|
||||
}
|
||||
return TestData{samples, step, wantSeries, wantQueryResults}
|
||||
}
|
||||
|
||||
// AssertSeries retrieves metric names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE string, start, end int64, want []map[string]string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /api/v1/series response",
|
||||
Got: func() any {
|
||||
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
}).Sort()
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertQueryResults sends a data query to storage and compares the query
|
||||
// result with the expected one.
|
||||
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE string, start, end, step int64, want []*QueryResult) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /api/v1/query_range response",
|
||||
Got: func() any {
|
||||
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: fmt.Sprintf("%dms", step),
|
||||
MaxLookback: fmt.Sprintf("%dms", step-1),
|
||||
NoCache: "1",
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1QueryResponse{
|
||||
Status: "success",
|
||||
Data: &QueryData{
|
||||
ResultType: "matrix",
|
||||
Result: want,
|
||||
},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
@@ -31,7 +31,7 @@ func TestLegacySingleDeleteSeries(t *testing.T) {
|
||||
|
||||
opts := testLegacyDeleteSeriesOpts{
|
||||
startLegacySUT: func() at.PrometheusWriteQuerier {
|
||||
return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
"-storageDataPath=" + storageDataPath,
|
||||
"-retentionPeriod=100y",
|
||||
"-search.maxStalenessInterval=1m",
|
||||
@@ -255,7 +255,7 @@ func TestLegacySingleBackupRestore(t *testing.T) {
|
||||
|
||||
opts := testLegacyBackupRestoreOpts{
|
||||
startLegacySUT: func() at.PrometheusWriteQuerier {
|
||||
return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
"-storageDataPath=" + storageDataPath,
|
||||
"-retentionPeriod=100y",
|
||||
"-search.disableCache=true",
|
||||
@@ -583,7 +583,7 @@ func TestLegacySingleDowngrade(t *testing.T) {
|
||||
|
||||
opts := testLegacyDowngradeOpts{
|
||||
startLegacySUT: func() at.PrometheusWriteQuerier {
|
||||
return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{
|
||||
"-storageDataPath=" + storageDataPath,
|
||||
"-retentionPeriod=100y",
|
||||
"-search.disableCache=true",
|
||||
|
||||
40
apptest/tests/mixed_test.go
Normal file
40
apptest/tests/mixed_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
var (
|
||||
vmselectPath = os.Getenv("VM_VMSELECT_PATH")
|
||||
)
|
||||
|
||||
func TestMixedDataRetrieval(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const numMetrics = 1000
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateTestData("metric", numMetrics, start, end)
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
})
|
||||
vmselect := tc.MustStartVmselectAt("vmselect", vmselectPath, []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", start, end, data.WantSeries)
|
||||
apptest.AssertQueryResults(tc, vmsingle, "metric.*", start, end, data.Step, data.WantQueryResults)
|
||||
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", start, end, data.WantSeries)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", start, end, data.Step, data.WantQueryResults)
|
||||
}
|
||||
@@ -21,8 +21,8 @@ type Vmselect struct {
|
||||
// StartVmselect starts an instance of vmselect with the given flags. It also
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr)
|
||||
func StartVmselect(instance string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) {
|
||||
app, stderrExtracts, err := startApp(instance, "../../bin/vmselect-race", flags, &appOptions{
|
||||
func StartVmselectAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) {
|
||||
app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-clusternativeListenAddr": "127.0.0.1:0",
|
||||
|
||||
@@ -20,12 +20,74 @@ type Vmsingle struct {
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
// StartVmsingleAt starts an instance of vmsingle with the given flags. It also
|
||||
// sets the default flags and populates the app instance state with runtime
|
||||
// values extracted from the application log (such as httpListenAddr).
|
||||
func StartVmsingleAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmsingle, error) {
|
||||
app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()),
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-graphiteListenAddr": ":0",
|
||||
"-opentsdbListenAddr": "127.0.0.1:0",
|
||||
"-vmselectAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
storageDataPathRE,
|
||||
httpListenAddrRE,
|
||||
graphiteListenAddrRE,
|
||||
openTSDBListenAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
output: output,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Vmsingle{
|
||||
app: app,
|
||||
metricsClient: newMetricsClient(cli, stderrExtracts[1]),
|
||||
vmstorageClient: &vmstorageClient{
|
||||
vmstorageCli: cli,
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
},
|
||||
vmselectClient: &vmselectClient{
|
||||
vmselectCli: cli,
|
||||
url: func(op, path string, opts QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path)
|
||||
},
|
||||
metricNamesStatsResetURL: fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[1]),
|
||||
tenantsURL: "vmsingle-does-not-serve-tenants",
|
||||
},
|
||||
vminsertClient: &vminsertClient{
|
||||
vminsertCli: cli,
|
||||
url: func(_, path string, _ QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path)
|
||||
},
|
||||
openTSDBURL: func(_, path string, _ QueryOpts) string {
|
||||
return fmt.Sprintf("http://%s/%s", stderrExtracts[3], path)
|
||||
},
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
sendBlocking: func(t *testing.T, _ int, send func()) {
|
||||
t.Helper()
|
||||
send()
|
||||
},
|
||||
},
|
||||
storageDataPath: stderrExtracts[0],
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
vmselectAddr: stderrExtracts[4],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// StartLegacyVmsingleAt starts an instance of vmsingle v1.132.0 (last version
|
||||
// before pt-index) with the given flags. It also sets the default flags and
|
||||
// populates the app instance state with runtime values extracted from the
|
||||
// application log (such as httpListenAddr).
|
||||
func StartLegacyVmsingleAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmsingle, error) {
|
||||
app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{
|
||||
defaultFlags: map[string]string{
|
||||
"-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()),
|
||||
@@ -85,6 +147,12 @@ func (app *Vmsingle) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// VmselectAddr returns the address at which the vmsingle process is listening
|
||||
// for vmselect connections.
|
||||
func (app *Vmsingle) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmsingle app state.
|
||||
func (app *Vmsingle) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{
|
||||
|
||||
Reference in New Issue
Block a user