diff --git a/Makefile b/Makefile index f0a993228b..04af05ca99 100644 --- a/Makefile +++ b/Makefile @@ -471,7 +471,23 @@ test-full-386: apptest: $(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race - go test ./apptest/... -skip="^Test(Cluster|Legacy).*" + go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*" + +apptest-mixed: victoria-metrics-race + OS=$$(uname | tr '[:upper:]' '[:lower:]'); \ + ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \ + VERSION=v1.142.0; \ + VMSINGLE=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}.tar.gz; \ + VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \ + URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \ + DIR=/tmp/$${VERSION}; \ + test -d $${DIR} || (mkdir $${DIR} && \ + curl --output-dir /tmp -LO $${URL}/$${VMSINGLE} && tar xzf /tmp/$${VMSINGLE} -C $${DIR} && \ + curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \ + ); \ + VM_VMSINGLE_PATH=$${DIR}/victoria-metrics-prod \ + VM_VMSELECT_PATH=$${DIR}/vmselect-prod \ + go test ./apptest/tests -run="^TestMixed.*" apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race OS=$$(uname | tr '[:upper:]' '[:lower:]'); \ diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index 8401c8845a..038bbcc921 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -4,10 +4,12 @@ import ( "flag" "fmt" "net/http" + "slices" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -89,6 +91,11 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQu bi.MustClose() return nil, err } + // Initialize block iterator with the tenantID which will be added to the + // metric name of every block. See bi.NextBlock(). + bi.tenantID = make([]byte, 0, 8) + bi.tenantID = encoding.MarshalUint32(bi.tenantID, sq.AccountID) + bi.tenantID = encoding.MarshalUint32(bi.tenantID, sq.ProjectID) return bi, nil } @@ -110,7 +117,24 @@ func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.S if len(tfss) == 0 { return nil, fmt.Errorf("missing tag filters") } - return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) + + metricNames, err := api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + + // vmselect expects metric names to have the tenantID but vmsingle does not + // have it. Therefore the tenantID needs to be appended to every metric + // name. + dst := make([]byte, 0, 8) + dst = encoding.MarshalUint32(dst, sq.AccountID) + dst = encoding.MarshalUint32(dst, sq.ProjectID) + tenantID := string(dst) + + for i, metricName := range metricNames { + metricNames[i] = tenantID + metricName + } + return metricNames, nil } func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) { @@ -269,8 +293,9 @@ func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage. // blockIterator implements vmselectapi.BlockIterator type blockIterator struct { - sr storage.Search - mb storage.MetricBlock + sr storage.Search + mb storage.MetricBlock + tenantID []byte } var blockIteratorsPool sync.Pool @@ -279,6 +304,7 @@ func (bi *blockIterator) MustClose() { bi.sr.MustClose() bi.mb.MetricName = nil bi.mb.Block.Reset() + bi.tenantID = nil blockIteratorsPool.Put(bi) } @@ -295,9 +321,16 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) { return dst, false } mb := bi.mb - mb.MetricName = bi.sr.MetricBlockRef.MetricName + + // vmselect expects metric names to have the tenantID but vmsingle does not + // have it. Therefore the tenantID needs to be included to every metric + // name and block. + mb.MetricName = slices.Concat(bi.tenantID, bi.sr.MetricBlockRef.MetricName) bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block) - dst = mb.Marshal(dst[:0]) + dst = encoding.MarshalBytes(dst, mb.MetricName) + dst = append(dst, bi.tenantID...) + dst = storage.MarshalBlock(dst, &mb.Block) + return dst, true } diff --git a/apptest/testcase.go b/apptest/testcase.go index 463e07748d..1b4738a73e 100644 --- a/apptest/testcase.go +++ b/apptest/testcase.go @@ -92,9 +92,6 @@ func (tc *TestCase) MustStartDefaultVmsingle() *Vmsingle { // fails to start. func (tc *TestCase) MustStartVmsingle(instance string, flags []string) *Vmsingle { tc.t.Helper() - // TODO(rtm0): Move to defaultFlags in vmsingle.go. Currently does not work - // because legacy vmsingle does not have this flag. - flags = append(flags, "-vmselectAddr=127.0.0.1:0") return tc.MustStartVmsingleAt(instance, "../../bin/victoria-metrics-race", flags) } @@ -111,9 +108,23 @@ func (tc *TestCase) MustStartVmsingleAt(instance, binary string, flags []string) return app } +// MustStartLegacyVmsingleAt is a test helper function that starts an instance +// of vmsingle v1.132.0 (last version before pt-index) and fails the test if the +// app fails to start. +func (tc *TestCase) MustStartLegacyVmsingleAt(instance, binary string, flags []string) *Vmsingle { + tc.t.Helper() + + app, err := StartLegacyVmsingleAt(instance, binary, flags, tc.cli, tc.output) + if err != nil { + tc.t.Fatalf("Could not start %s: %v", instance, err) + } + tc.addApp(instance, app) + return app +} + // MustStartVmstorage is a test helper function that starts an instance of -// vmstorage located at ../../bin/vmstorage-race and fails the test if the app fails -// to start. +// vmstorage located at ../../bin/vmstorage-race and fails the test if the app +// fails to start. func (tc *TestCase) MustStartVmstorage(instance string, flags []string) *Vmstorage { tc.t.Helper() return tc.MustStartVmstorageAt(instance, "../../bin/vmstorage-race", flags) @@ -121,7 +132,7 @@ func (tc *TestCase) MustStartVmstorage(instance string, flags []string) *Vmstora // MustStartVmstorageAt is a test helper function that starts an instance of // vmstorage and fails the test if the app fails to start. -func (tc *TestCase) MustStartVmstorageAt(instance string, binary string, flags []string) *Vmstorage { +func (tc *TestCase) MustStartVmstorageAt(instance, binary string, flags []string) *Vmstorage { tc.t.Helper() app, err := StartVmstorageAt(instance, binary, flags, tc.cli, tc.output) @@ -133,11 +144,19 @@ func (tc *TestCase) MustStartVmstorageAt(instance string, binary string, flags [ } // MustStartVmselect is a test helper function that starts an instance of -// vmselect and fails the test if the app fails to start. +// vmselect located at ../../bin/vmselect-race and fails the test if the app +// fails to start. func (tc *TestCase) MustStartVmselect(instance string, flags []string) *Vmselect { tc.t.Helper() + return tc.MustStartVmselectAt(instance, "../../bin/vmselect-race", flags) +} - app, err := StartVmselect(instance, flags, tc.cli, tc.output) +// MustStartVmselectAt is a test helper function that starts an instance of +// vmselect and fails the test if the app fails to start. +func (tc *TestCase) MustStartVmselectAt(instance, binary string, flags []string) *Vmselect { + tc.t.Helper() + + app, err := StartVmselectAt(instance, binary, flags, tc.cli, tc.output) if err != nil { tc.t.Fatalf("Could not start %s: %v", instance, err) } diff --git a/apptest/testdata.go b/apptest/testdata.go new file mode 100644 index 0000000000..2dfab64cc8 --- /dev/null +++ b/apptest/testdata.go @@ -0,0 +1,89 @@ +package apptest + +import "fmt" + +type TestData struct { + Samples []string + Step int64 + WantSeries []map[string]string + WantQueryResults []*QueryResult +} + +func GenerateTestData(prefix string, numMetrics, start, end int64) TestData { + samples := make([]string, numMetrics) + step := (end - start) / numMetrics + wantSeries := make([]map[string]string, numMetrics) + wantQueryResults := make([]*QueryResult, numMetrics) + for i := range numMetrics { + metricName := fmt.Sprintf("%s_%04d", prefix, i) + labelName := fmt.Sprintf("label_%04d", i) + labelValue := fmt.Sprintf("value_%04d", i) + value := i + timestamp := start + i*step + samples[i] = fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp) + wantSeries[i] = map[string]string{ + "__name__": metricName, + labelName: "value", + "label": labelValue, + } + wantQueryResults[i] = &QueryResult{ + Metric: map[string]string{ + "__name__": metricName, + labelName: "value", + "label": labelValue, + }, + Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}}, + } + } + return TestData{samples, step, wantSeries, wantQueryResults} +} + +// AssertSeries retrieves metric names from the storage and compares the result +// with the expected one. +func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE string, start, end int64, want []map[string]string) { + tc.T().Helper() + + query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE) + tc.Assert(&AssertOptions{ + Msg: "unexpected /api/v1/series response", + Got: func() any { + return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{ + Start: fmt.Sprintf("%d", start), + End: fmt.Sprintf("%d", end), + }).Sort() + }, + Want: &PrometheusAPIV1SeriesResponse{ + Status: "success", + Data: want, + }, + FailNow: true, + }) +} + +// AssertQueryResults sends a data query to storage and compares the query +// result with the expected one. +func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE string, start, end, step int64, want []*QueryResult) { + tc.T().Helper() + + query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE) + tc.Assert(&AssertOptions{ + Msg: "unexpected /api/v1/query_range response", + Got: func() any { + return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{ + Start: fmt.Sprintf("%d", start), + End: fmt.Sprintf("%d", end), + Step: fmt.Sprintf("%dms", step), + MaxLookback: fmt.Sprintf("%dms", step-1), + NoCache: "1", + }) + }, + Want: &PrometheusAPIV1QueryResponse{ + Status: "success", + Data: &QueryData{ + ResultType: "matrix", + Result: want, + }, + }, + FailNow: true, + }) +} diff --git a/apptest/tests/legacy_indexdb_test.go b/apptest/tests/legacy_indexdb_test.go index 01a74c38b0..68eca5ca4b 100644 --- a/apptest/tests/legacy_indexdb_test.go +++ b/apptest/tests/legacy_indexdb_test.go @@ -31,7 +31,7 @@ func TestLegacySingleDeleteSeries(t *testing.T) { opts := testLegacyDeleteSeriesOpts{ startLegacySUT: func() at.PrometheusWriteQuerier { - return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ + return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ "-storageDataPath=" + storageDataPath, "-retentionPeriod=100y", "-search.maxStalenessInterval=1m", @@ -255,7 +255,7 @@ func TestLegacySingleBackupRestore(t *testing.T) { opts := testLegacyBackupRestoreOpts{ startLegacySUT: func() at.PrometheusWriteQuerier { - return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ + return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ "-storageDataPath=" + storageDataPath, "-retentionPeriod=100y", "-search.disableCache=true", @@ -583,7 +583,7 @@ func TestLegacySingleDowngrade(t *testing.T) { opts := testLegacyDowngradeOpts{ startLegacySUT: func() at.PrometheusWriteQuerier { - return tc.MustStartVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ + return tc.MustStartLegacyVmsingleAt("vmsingle-legacy", legacyVmsinglePath, []string{ "-storageDataPath=" + storageDataPath, "-retentionPeriod=100y", "-search.disableCache=true", diff --git a/apptest/tests/mixed_test.go b/apptest/tests/mixed_test.go new file mode 100644 index 0000000000..214ec633c8 --- /dev/null +++ b/apptest/tests/mixed_test.go @@ -0,0 +1,40 @@ +package tests + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" +) + +var ( + vmselectPath = os.Getenv("VM_VMSELECT_PATH") +) + +func TestMixedDataRetrieval(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + const numMetrics = 1000 + start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() + end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli() + data := apptest.GenerateTestData("metric", numMetrics, start, end) + + vmsingle := tc.MustStartVmsingle("vmsingle", []string{ + "-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"), + "-retentionPeriod=100y", + }) + vmselect := tc.MustStartVmselectAt("vmselect", vmselectPath, []string{ + "-storageNode=" + vmsingle.VmselectAddr(), + }) + + vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{}) + vmsingle.ForceFlush(t) + apptest.AssertSeries(tc, vmsingle, "metric.*", start, end, data.WantSeries) + apptest.AssertQueryResults(tc, vmsingle, "metric.*", start, end, data.Step, data.WantQueryResults) + + apptest.AssertSeries(tc, vmselect, "metric.*", start, end, data.WantSeries) + apptest.AssertQueryResults(tc, vmselect, "metric.*", start, end, data.Step, data.WantQueryResults) +} diff --git a/apptest/vmselect.go b/apptest/vmselect.go index 9054345fce..c181b991f8 100644 --- a/apptest/vmselect.go +++ b/apptest/vmselect.go @@ -21,8 +21,8 @@ type Vmselect struct { // StartVmselect starts an instance of vmselect with the given flags. It also // sets the default flags and populates the app instance state with runtime // values extracted from the application log (such as httpListenAddr) -func StartVmselect(instance string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) { - app, stderrExtracts, err := startApp(instance, "../../bin/vmselect-race", flags, &appOptions{ +func StartVmselectAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmselect, error) { + app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{ defaultFlags: map[string]string{ "-httpListenAddr": "127.0.0.1:0", "-clusternativeListenAddr": "127.0.0.1:0", diff --git a/apptest/vmsingle.go b/apptest/vmsingle.go index 53c41aad18..cd2bdb3cf4 100644 --- a/apptest/vmsingle.go +++ b/apptest/vmsingle.go @@ -20,12 +20,74 @@ type Vmsingle struct { storageDataPath string httpListenAddr string + vmselectAddr string } // StartVmsingleAt starts an instance of vmsingle with the given flags. It also // sets the default flags and populates the app instance state with runtime // values extracted from the application log (such as httpListenAddr). func StartVmsingleAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmsingle, error) { + app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{ + defaultFlags: map[string]string{ + "-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()), + "-httpListenAddr": "127.0.0.1:0", + "-graphiteListenAddr": ":0", + "-opentsdbListenAddr": "127.0.0.1:0", + "-vmselectAddr": "127.0.0.1:0", + }, + extractREs: []*regexp.Regexp{ + storageDataPathRE, + httpListenAddrRE, + graphiteListenAddrRE, + openTSDBListenAddrRE, + vmselectAddrRE, + }, + output: output, + }) + if err != nil { + return nil, err + } + + return &Vmsingle{ + app: app, + metricsClient: newMetricsClient(cli, stderrExtracts[1]), + vmstorageClient: &vmstorageClient{ + vmstorageCli: cli, + httpListenAddr: stderrExtracts[1], + }, + vmselectClient: &vmselectClient{ + vmselectCli: cli, + url: func(op, path string, opts QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path) + }, + metricNamesStatsResetURL: fmt.Sprintf("http://%s/api/v1/admin/status/metric_names_stats/reset", stderrExtracts[1]), + tenantsURL: "vmsingle-does-not-serve-tenants", + }, + vminsertClient: &vminsertClient{ + vminsertCli: cli, + url: func(_, path string, _ QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[1], path) + }, + openTSDBURL: func(_, path string, _ QueryOpts) string { + return fmt.Sprintf("http://%s/%s", stderrExtracts[3], path) + }, + graphiteListenAddr: stderrExtracts[2], + sendBlocking: func(t *testing.T, _ int, send func()) { + t.Helper() + send() + }, + }, + storageDataPath: stderrExtracts[0], + httpListenAddr: stderrExtracts[1], + vmselectAddr: stderrExtracts[4], + }, nil +} + +// StartLegacyVmsingleAt starts an instance of vmsingle v1.132.0 (last version +// before pt-index) with the given flags. It also sets the default flags and +// populates the app instance state with runtime values extracted from the +// application log (such as httpListenAddr). +func StartLegacyVmsingleAt(instance, binary string, flags []string, cli *Client, output io.Writer) (*Vmsingle, error) { app, stderrExtracts, err := startApp(instance, binary, flags, &appOptions{ defaultFlags: map[string]string{ "-storageDataPath": fmt.Sprintf("%s/%s-%d", os.TempDir(), instance, time.Now().UnixNano()), @@ -85,6 +147,12 @@ func (app *Vmsingle) HTTPAddr() string { return app.httpListenAddr } +// VmselectAddr returns the address at which the vmsingle process is listening +// for vmselect connections. +func (app *Vmsingle) VmselectAddr() string { + return app.vmselectAddr +} + // String returns the string representation of the vmsingle app state. func (app *Vmsingle) String() string { return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{