mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
167 lines
4.9 KiB
Go
167 lines
4.9 KiB
Go
package tests
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"testing"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/google/go-cmp/cmp/cmpopts"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
)
|
|
|
|
func TestSingleToSingleVmctlNativeProtocol(t *testing.T) {
|
|
fs.MustRemoveDir(t.Name())
|
|
|
|
tc := apptest.NewTestCase(t)
|
|
defer tc.Stop()
|
|
|
|
vmsingleSrc := tc.MustStartVmsingle("vmsingle_src", []string{
|
|
"-storageDataPath=" + tc.Dir() + "/vmsingle_src",
|
|
"-retentionPeriod=100y",
|
|
})
|
|
// we need a separate vmsingle for the destination to avoid conflicts
|
|
vmsingleDst := tc.MustStartVmsingle("vmsingle_dst", []string{
|
|
"-storageDataPath=" + tc.Dir() + "/vmsingle_dst",
|
|
"-retentionPeriod=100y",
|
|
})
|
|
|
|
vmSrcAddr := fmt.Sprintf("http://%s/", vmsingleSrc.HTTPAddr())
|
|
vmDstAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
|
|
|
|
flags := []string{
|
|
`vm-native`,
|
|
`--vm-native-src-addr=` + vmSrcAddr,
|
|
`--vm-native-dst-addr=` + vmDstAddr,
|
|
`--vm-native-filter-match={__name__=~".*"}`,
|
|
`--vm-native-filter-time-start=2025-05-30T16:39:00Z`,
|
|
`--disable-progress-bar=true`,
|
|
}
|
|
|
|
testVmctlNativeProtocol(tc, vmsingleSrc, vmsingleDst, flags)
|
|
}
|
|
|
|
func TestClusterTenantsToTenantsVmctlNativeProtocol(t *testing.T) {
|
|
fs.MustRemoveDir(t.Name())
|
|
|
|
tc := apptest.NewTestCase(t)
|
|
defer tc.Stop()
|
|
|
|
clusterSrc := tc.MustStartCluster(&apptest.ClusterOptions{
|
|
Vmstorage1Instance: "vmstorageSrc1",
|
|
Vmstorage1Flags: []string{
|
|
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage1-src"),
|
|
"-retentionPeriod=100y",
|
|
},
|
|
Vmstorage2Instance: "vmstorageSrc2",
|
|
Vmstorage2Flags: []string{
|
|
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage2-src"),
|
|
"-retentionPeriod=100y",
|
|
},
|
|
VminsertInstance: "vminsertSrc",
|
|
VmselectInstance: "vmselectSrc",
|
|
})
|
|
clusterDst := tc.MustStartCluster(&apptest.ClusterOptions{
|
|
Vmstorage1Instance: "vmstorageDst1",
|
|
Vmstorage1Flags: []string{
|
|
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage1-dst"),
|
|
"-retentionPeriod=100y",
|
|
},
|
|
Vmstorage2Instance: "vmstorageDst2",
|
|
Vmstorage2Flags: []string{
|
|
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmstorage2-dst"),
|
|
"-retentionPeriod=100y",
|
|
},
|
|
VminsertInstance: "vminsertDst",
|
|
VmselectInstance: "vmselectDst",
|
|
})
|
|
|
|
vmSrcAddr := fmt.Sprintf("http://%s/", clusterSrc.Vmselect.HTTPAddr())
|
|
vmDstAddr := fmt.Sprintf("http://%s/", clusterDst.Vminsert.HTTPAddr())
|
|
|
|
flags := []string{
|
|
`vm-native`,
|
|
`--vm-native-src-addr=` + vmSrcAddr,
|
|
`--vm-native-dst-addr=` + vmDstAddr,
|
|
`--vm-native-filter-match={__name__=~".*"}`,
|
|
`--vm-native-filter-time-start=2025-05-30T16:39:00Z`,
|
|
`--disable-progress-bar=true`,
|
|
`--vm-intercluster`,
|
|
}
|
|
|
|
testVmctlNativeProtocol(tc, clusterSrc, clusterDst, flags)
|
|
}
|
|
|
|
func testVmctlNativeProtocol(tc *apptest.TestCase, srcSut apptest.PrometheusWriteQuerier, dstSut apptest.PrometheusWriteQuerier, vmctlFlags []string) {
|
|
t := tc.T()
|
|
t.Helper()
|
|
|
|
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
|
|
|
|
// test for empty data request in the source
|
|
got := srcSut.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
|
Step: "5m",
|
|
Time: "2025-05-30T12:45:00Z",
|
|
})
|
|
|
|
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
|
|
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
|
|
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
|
}
|
|
|
|
// Prepare the source vmsingle with some data
|
|
// Insert some data.
|
|
const numSamples = 1000
|
|
const ingestTimestamp = " 1748623176000" // 2025-05-30T16:39:36Z
|
|
|
|
expectedQueryData := apptest.QueryData{
|
|
ResultType: "matrix",
|
|
Result: make([]*apptest.QueryResult, 0, numSamples),
|
|
}
|
|
dataSet := make([]string, numSamples)
|
|
for i := range numSamples {
|
|
metricsName := fmt.Sprintf("metric_%03d", i)
|
|
metrics := map[string]string{"__name__": metricsName}
|
|
sample := &apptest.Sample{Value: float64(i), Timestamp: 1748623176000}
|
|
expectedQueryData.Result = append(expectedQueryData.Result, &apptest.QueryResult{
|
|
Metric: metrics,
|
|
Samples: []*apptest.Sample{sample},
|
|
})
|
|
|
|
dataSet[i] = fmt.Sprintf("%s %d %s", metricsName, i, ingestTimestamp)
|
|
}
|
|
|
|
wantResponse := apptest.PrometheusAPIV1QueryResponse{
|
|
Status: "success",
|
|
Data: &expectedQueryData,
|
|
}
|
|
|
|
wantResponse.Sort()
|
|
|
|
srcSut.PrometheusAPIV1ImportPrometheus(t, dataSet, apptest.QueryOpts{})
|
|
srcSut.ForceFlush(t)
|
|
|
|
tc.MustStartVmctl("vmctl", vmctlFlags)
|
|
|
|
dstSut.ForceFlush(t)
|
|
|
|
tc.Assert(&apptest.AssertOptions{
|
|
Retries: 300,
|
|
Msg: `unexpected metrics stored on vmsingle via the native protocol`,
|
|
Got: func() any {
|
|
exported := dstSut.PrometheusAPIV1Export(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
|
Start: "2025-05-30T16:39:36Z",
|
|
End: "2025-05-30T16:39:37Z",
|
|
})
|
|
exported.Sort()
|
|
return exported.Data.Result
|
|
},
|
|
Want: wantResponse.Data.Result,
|
|
CmpOpts: []cmp.Option{
|
|
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
|
|
},
|
|
})
|
|
}
|