Compare commits

...

7 Commits

Author SHA1 Message Date
Artem Fetishev
df8403252a fix ai code review remarks
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 19:22:11 +02:00
Artem Fetishev
cc99bf1d14 document methods
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 18:56:00 +02:00
Artem Fetishev
20dd40a108 document marshalMetricBlock and make it a method of the type to remove dependency on flags
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 17:57:11 +02:00
Artem Fetishev
0c5ad51bff remove variables and use constants
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 17:09:22 +02:00
Artem Fetishev
470b383ad3 remove unused RetentionMsecs() method from Storage
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 16:41:19 +02:00
Artem Fetishev
ef69c8c58c rebase build targets and test files
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 16:41:19 +02:00
Artem Fetishev
cf52a45553 vmsingle: add the support of vmselect RPC
Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2026-06-11 16:41:13 +02:00
15 changed files with 2728 additions and 12 deletions

View File

@@ -471,8 +471,9 @@ test-full-386:
apptest:
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
# App tests for legacy indexDB
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
@@ -489,6 +490,20 @@ apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
VMSTORAGE_V1_132_0_PATH=$${DIR}/vmstorage-prod \
go test ./apptest/tests -run="^TestLegacySingle.*"
# App tests for mixed setups where vmsingle and vmcluster coexist.
apptest-mixed: victoria-metrics-race
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
VERSION=v1.145.0; \
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
DIR=/tmp/$${VERSION}; \
test -d $${DIR} || (mkdir $${DIR} && \
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
); \
VMSELECT_PATH=$${DIR}/vmselect-prod \
go test ./apptest/tests -run="^TestMixed.*"
benchmark:
go test -run=NO_TESTS -bench=. ./lib/...
go test -run=NO_TESTS -bench=. ./app/...

View File

@@ -89,7 +89,7 @@ func main() {
}
logger.Infof("starting VictoriaMetrics at %q...", listenAddrs)
startTime := time.Now()
vmstorage.Init(*vmselectMaxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
vmstorage.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
vmselect.Init(*vmselectMaxConcurrentRequests, *vmselectMaxQueueDuration)
vminsertcommon.StartIngestionRateLimiter(*maxIngestionRate)
vminsert.Init()

View File

@@ -282,7 +282,8 @@ func processFlags() {
func setUp() {
const maxConcurrentRequests = 4
vmstorage.Init(maxConcurrentRequests, promql.ResetRollupResultCacheIfNeeded)
maxQueueDuration := 5 * time.Second
vmstorage.Init(maxConcurrentRequests, maxQueueDuration, promql.ResetRollupResultCacheIfNeeded)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
readyCheckFunc := func() bool {

View File

@@ -30,6 +30,9 @@ var (
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
vmselectAddr = flag.String("vmselectAddr", "", "TCP address to accept connections from vmselect services")
vmselectDisableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
@@ -108,7 +111,7 @@ func DataPath() string {
}
// Init initializes vmstorage.
func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
func Init(vmselectMaxConcurrentRequests int, vmselectMaxQueueDuration time.Duration, resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
@@ -169,6 +172,21 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
storageMetrics.RegisterMetricsWriter(vmStorage.writeStorageMetrics)
metrics.RegisterSet(storageMetrics)
if *vmselectAddr != "" {
var err error
limits := vmselectapi.Limits{
MaxConcurrentRequests: vmselectMaxConcurrentRequests,
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
MaxQueueDuration: vmselectMaxQueueDuration,
MaxQueueDurationFlagName: "search.maxQueueDuration",
}
api := newVMStorageWithTenantID(vmStorage)
vmselectSrv, err = vmselectapi.NewServer(*vmselectAddr, api, limits, *vmselectDisableRPCCompression)
if err != nil {
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
}
}
VMInsertAPI = vmStorage
VMSelectAPI = vmStorage
GetSearch = vmStorage.GetSearch
@@ -191,6 +209,8 @@ var (
// TODO(@rtm0): Remove this dependency from vmalert-tool unit tests.
DebugFlush func()
vmselectSrv *vmselectapi.Server
)
// Stop stops the vmstorage
@@ -201,6 +221,10 @@ func Stop() {
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime := time.Now()
if vmselectSrv != nil {
vmselectSrv.MustStop()
}
vmStorage.Stop()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())

View File

@@ -164,6 +164,10 @@ func (vms *VMStorage) IsReadOnly() bool {
}
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
return vms.initSearch(qt, sq, nil, deadline)
}
func (vms *VMStorage) initSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, marshal marshalFunc, deadline uint64) (vmselectapi.BlockIterator, error) {
vms.wg.Add(1)
tr := sq.GetTimeRange()
@@ -178,6 +182,7 @@ func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.marshal = marshal
bi.wgDone = vms.wg.Done
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
@@ -198,11 +203,14 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
return searchQueryLimit
}
type marshalFunc func(dst []byte, src *storage.MetricBlock) []byte
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
wgDone func()
sr storage.Search
mb storage.MetricBlock
marshal marshalFunc
wgDone func()
}
var blockIteratorsPool sync.Pool
@@ -231,7 +239,11 @@ func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
mb := bi.mb
mb.MetricName = bi.sr.MetricBlockRef.MetricName
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
dst = mb.Marshal(dst[:0])
if bi.marshal != nil {
dst = bi.marshal(dst[:0], &mb)
} else {
dst = mb.Marshal(dst[:0])
}
return dst, true
}

View File

@@ -0,0 +1,264 @@
package vmstorage
import (
"flag"
"fmt"
"math"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
accountID = flag.Uint64("accountID", 0, "The accountID of the stored data")
projectID = flag.Uint64("projectID", 0, "The projectID of the stored data")
)
func newVMStorageWithTenantID(vms *VMStorage) *VMStorageWithTenantID {
if *accountID > math.MaxUint32 {
logger.Fatalf("-clusternative.accountID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *accountID)
}
if *projectID > math.MaxUint32 {
logger.Fatalf("-clusternative.projectID must to be in the range [0, %d], got %d", uint32(math.MaxUint32), *projectID)
}
return &VMStorageWithTenantID{
vms: vms,
accountID: uint32(*accountID),
projectID: uint32(*projectID),
}
}
// VMStorageWithTenantID is a thin wrapper around VMStorage type that overrides
// its methods to properly serve requests coming from a vmselect (require
// tenantID).
//
// A new instance of this type should be created using
// newVMStorageWithTenantID(). The created instance does not require closing.
// The instance also does not take ownership of vms and it is the responsibility
// of the caller to close vms.
type VMStorageWithTenantID struct {
vms *VMStorage
accountID uint32
projectID uint32
}
// InitSearch initializes a storage search for a request initiated by a
// vmselect.
//
// The search is initialized iff the search query is either multitenant or its
// accountID and projectID match -accountID and -projectID flag values.
// Otherwise, the method returns an interator that will return no data.
//
// The method also overrides the data format of the data returned by the
// iterator by prepending accountID and projectID bytes to the metric name and
// the data block (a format used in vmcluster).
func (vmst *VMStorageWithTenantID) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return emptyBI, nil
}
return vmst.vms.initSearch(qt, sq, vmst.marshalMetricBlock, deadline)
}
var emptyBI = &emptyBlockIterator{}
// emptyBlockIterator is an implementation of vmselectapi.BlockIterator that
// always returns no data.
type emptyBlockIterator struct{}
func (*emptyBlockIterator) MustClose() {}
func (*emptyBlockIterator) NextBlock(dst []byte) ([]byte, bool) {
return dst, false
}
func (*emptyBlockIterator) Error() error {
return nil
}
// marshalMetricBlock serializes a metric block in the format expected by
// vmselect.
//
// vmselect expects metric names and data blocks to have the tenantID but
// vmsingle does not have it. Therefore the tenantID needs to be included to
// every metric name and block.
func (vmst *VMStorageWithTenantID) marshalMetricBlock(dst []byte, src *storage.MetricBlock) []byte {
// Marshal metric name:
// 1. Marshal metric name length + accountID length + projectID length (in
// bytes).
// 2. append accountID and projectID bytes
// 3. Finally append metric name bytes
dst = encoding.MarshalVarUint64(dst, uint64(len(src.MetricName))+8)
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
dst = append(dst, src.MetricName...)
// Marshal data block.
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
dst = storage.MarshalBlock(dst, &src.Block)
return dst
}
// SearchMetricNames searches the storage for metric names that match the query.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
//
// Found metric names are prepended with accountID and projectID bytes (a format
// used in vmcluster).
func (vmst *VMStorageWithTenantID) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
metricNames, err := vmst.vms.SearchMetricNames(qt, sq, deadline)
if err != nil {
return nil, err
}
// vmselect expects metric names to have the tenantID but vmsingle does not
// have it. Therefore the tenantID needs to be appended to every metric
// name.
dst := make([]byte, 0, 8)
dst = encoding.MarshalUint32(dst, vmst.accountID)
dst = encoding.MarshalUint32(dst, vmst.projectID)
tenantID := string(dst)
for i, metricName := range metricNames {
metricNames[i] = tenantID + metricName
}
return metricNames, nil
}
// LabelValues searches the storage for values that match the query and
// correspond to a label whose name is `labelName`. The returned result
// will contain not more than `maxLabelValues`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
}
// TagValueSuffixes searches the storage for Graphite tag value suffixes. The
// returned result will contain not more than `maxSuffixes`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
if accountID != vmst.accountID || projectID != vmst.projectID {
return nil, nil
}
return vmst.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
}
// LabelNames searches the storage for label names that match the query.
// The returned result will contain not more than `maxLabelNames`.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return an
// empty result.
func (vmst *VMStorageWithTenantID) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.LabelNames(qt, sq, maxLabelNames, deadline)
}
// SeriesCount returns the total number of metrics stored in the database.
//
// The method may return inflated numbers. How inflated the count depends
// on the churn rate and the retention period. For example, if a metric lasts
// for 2 months, it will be counted twice.
//
// The method also counts the deleted metrics.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return 0.
func (vmst *VMStorageWithTenantID) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
if accountID != vmst.accountID || projectID != vmst.projectID {
return 0, nil
}
return vmst.vms.SeriesCount(qt, accountID, projectID, deadline)
}
// Tenants returns just one tenant consisting of the -accountID and -projectID
// flag values.
func (vmst *VMStorageWithTenantID) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
tenantID := fmt.Sprintf("%d:%d", vmst.accountID, vmst.projectID)
return []string{tenantID}, nil
}
// TSDBStatus retrieves the status for metrics that match to the search query.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, the method will return empty
// status.
func (vmst *VMStorageWithTenantID) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return &storage.TSDBStatus{}, nil
}
return vmst.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
}
// DeleteSeries marks as deleted metrics that match the search query.
// The method returns the number of deleted metrics.
//
// If the query is not multitenant or the query accountID and projectID do not
// match the -accoutID and -projectID flag values, no metrics will be deleted
// and the method will return 0.
func (vmst *VMStorageWithTenantID) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
if !sq.IsMultiTenant && (sq.AccountID != vmst.accountID || sq.ProjectID != vmst.projectID) {
return 0, nil
}
return vmst.vms.DeleteSeries(qt, sq, deadline)
}
// RegisterMetricNames registers metric names in the index, the sample values
// and timestamps are ignored.
func (vmst *VMStorageWithTenantID) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
return vmst.vms.RegisterMetricNames(qt, mrs, deadline)
}
// GetMetricNamesUsageStats retrieves the usage stats for metrics whose name
// matches the pattern.
//
// If the request is not multitenant or the request accountID and projectID do
// not match the -accoutID and -projectID flag values, no metrics will be
// deleted and the method will return 0.
func (vmst *VMStorageWithTenantID) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
return metricnamestats.StatsResult{}, nil
}
return vmst.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
}
// ResetMetricNamesUsageStats resets the metric name usage stats.
func (vmst *VMStorageWithTenantID) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
return vmst.vms.ResetMetricNamesUsageStats(qt, deadline)
}
// GetMetadataRecords retrieves the metadata for the metricName.
//
// If the request is not multitenant or the request accountID and projectID do
// not match the -accoutID and -projectID flag values, no metrics will be
// deleted and the method will return 0.
func (vmst *VMStorageWithTenantID) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
if tt != nil && (tt.AccountID != vmst.accountID || tt.ProjectID != vmst.projectID) {
return nil, nil
}
return vmst.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
}

358
apptest/testdata.go Normal file
View File

@@ -0,0 +1,358 @@
package apptest
import (
"fmt"
"slices"
)
type TestData struct {
Samples []string
Step int64
WantSeries []map[string]string
WantLabels []string
WantLabelValues []string
WantQueryResults []*QueryResult
WantMetadata map[string][]MetadataEntry
WantMetricNamesStats []MetricNamesStatsRecord
}
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
d := TestData{
Samples: []string{},
Step: (end - start) / numMetrics,
WantSeries: make([]map[string]string, numMetrics),
WantLabels: make([]string, numMetrics),
WantLabelValues: make([]string, numMetrics),
WantQueryResults: make([]*QueryResult, numMetrics),
WantMetadata: make(map[string][]MetadataEntry),
WantMetricNamesStats: make([]MetricNamesStatsRecord, numMetrics),
}
for i := range numMetrics {
metricName := fmt.Sprintf("%s_%04d", prefix, i)
metricHelp := fmt.Sprintf("# HELP %s some help message", metricName)
metricType := fmt.Sprintf("# TYPE %s gauge", metricName)
labelName := fmt.Sprintf("label_%04d", i)
labelValue := fmt.Sprintf("value_%04d", i)
value := i
timestamp := start + i*d.Step
sample := fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
d.Samples = append(d.Samples, metricHelp, metricType, sample)
d.WantSeries[i] = map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
}
d.WantLabels[i] = labelName
d.WantLabelValues[i] = labelValue
d.WantQueryResults[i] = &QueryResult{
Metric: map[string]string{
"__name__": metricName,
labelName: "value",
"label": labelValue,
},
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
}
d.WantMetadata[metricName] = []MetadataEntry{{Help: "some help message", Type: "gauge"}}
d.WantMetricNamesStats[i].MetricName = metricName
}
d.WantLabels = append(d.WantLabels, "__name__", "label")
slices.Sort(d.WantLabels)
return d
}
// AssertSeries retrieves metric names from the storage and compares the result
// with the expected one.
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []map[string]string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/series response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
}).Sort()
},
Want: &PrometheusAPIV1SeriesResponse{
Status: "success",
Data: want,
},
Retries: 1000,
FailNow: true,
})
}
// AssertSeriesCount retrieves series count and compares it with expected one.
func AssertSeriesCount(tc *TestCase, app PrometheusQuerier, tenantID string, start, end int64, want uint64) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/series/count response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1SeriesCount(tc.T(), QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
},
Want: &PrometheusAPIV1SeriesCountResponse{
Status: "success",
Data: []uint64{want},
},
FailNow: true,
})
}
// AssertLabels retrieves label names from the storage and compares the result
// with the expected one.
func AssertLabels(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/labels response",
Got: func() any {
tc.T().Helper()
res := app.PrometheusAPIV1Labels(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
slices.Sort(res.Data)
return res
},
Want: &PrometheusAPIV1LabelsResponse{
Status: "success",
Data: want,
},
FailNow: true,
})
}
// AssertLabelValues retrieves values for the label whose name is labelName for
// the series whose name mathes metricNameRE, compares the result with the
// expected one.
func AssertLabelValues(tc *TestCase, app PrometheusQuerier, metricNameRE, labelName, tenantID string, start, end int64, want []string) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/labels/.../values response",
Got: func() any {
tc.T().Helper()
res := app.PrometheusAPIV1LabelValues(tc.T(), labelName, query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
})
slices.Sort(res.Data)
return res
},
Want: &PrometheusAPIV1LabelValuesResponse{
Status: "success",
Data: want,
},
FailNow: true,
})
}
// AssertQueryResults sends a data query to storage and compares the query
// result with the expected one.
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end, step int64, want []*QueryResult) {
tc.T().Helper()
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/query_range response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
Tenant: tenantID,
Start: fmt.Sprintf("%d", start),
End: fmt.Sprintf("%d", end),
Step: fmt.Sprintf("%dms", step),
MaxLookback: fmt.Sprintf("%dms", step-1),
NoCache: "1",
})
},
Want: &PrometheusAPIV1QueryResponse{
Status: "success",
Data: &QueryData{
ResultType: "matrix",
Result: want,
},
},
FailNow: true,
})
}
func AssertMetadata(tc *TestCase, app PrometheusQuerier, metricName, tenantID string, want map[string][]MetadataEntry) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/metadata response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1Metadata(tc.T(), metricName, 0, QueryOpts{
Tenant: tenantID,
})
},
Want: &PrometheusAPIV1Metadata{
Status: "success",
Data: want,
},
FailNow: true,
})
}
func AssertMetricNamesStats(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, want []MetricNamesStatsRecord) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /prometheus/api/v1/status/metric_names_stats response",
Got: func() any {
tc.T().Helper()
return app.PrometheusAPIV1StatusMetricNamesStats(tc.T(), "", "", metricNameRE, QueryOpts{
Tenant: tenantID,
})
},
Want: MetricNamesStatsResponse{
Records: want,
},
FailNow: true,
})
}
// GraphiteTestData holds the data samples in Graphite Pickle format, distance
// between samples in milliseconds and expected responses for various Graphite
// API endpoints.
type GraphiteTestData struct {
Samples []string
Step int64
WantMetricsIndex []string
WantMetricsFind []GraphiteMetric
WantMetricsExpand []string
WantRenderedTargets []GraphiteRenderedTarget
}
// GenerateGraphiteTestData generates Graphite test data.
func GenerateGraphiteTestData(prefix string, numMetrics, start, end int64) GraphiteTestData {
d := GraphiteTestData{
Samples: make([]string, numMetrics),
Step: (end - start) / numMetrics,
WantMetricsIndex: make([]string, numMetrics),
WantMetricsFind: make([]GraphiteMetric, numMetrics),
WantMetricsExpand: make([]string, numMetrics),
WantRenderedTargets: make([]GraphiteRenderedTarget, numMetrics),
}
datapoints := make([][2]float64, numMetrics)
for i := range numMetrics {
timestamp := (start + i*d.Step) / 1000
datapoints[i][1] = float64(timestamp)
}
for i := range numMetrics {
suffix := fmt.Sprintf("%04d", i)
metricName := fmt.Sprintf("%s.%s", prefix, suffix)
value := i
timestamp := (start + i*d.Step) / 1000
sample := fmt.Sprintf(`%s %d %d`, metricName, value, timestamp)
d.Samples[i] = sample
d.WantMetricsIndex[i] = metricName
d.WantMetricsFind[i].Id = metricName
d.WantMetricsFind[i].Text = suffix
d.WantMetricsFind[i].Leaf = 1
d.WantMetricsExpand[i] = metricName
d.WantRenderedTargets[i].Target = metricName
d.WantRenderedTargets[i].Datapoints = slices.Clone(datapoints)
d.WantRenderedTargets[i].Datapoints[i][0] = float64(value)
}
return d
}
// AssertGraphiteMetricsIndex retrieves all metrics by sending a request to
// /graphite/metrics/index.json and compares the result with the expected one.
func AssertGraphiteMetricsIndex(tc *TestCase, app PrometheusQuerier, tenantID string, want []string) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/index.json response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsIndex(tc.T(), QueryOpts{
Tenant: tenantID,
})
},
Want: want,
Retries: 30,
FailNow: true,
})
}
// AssertGraphiteMetricsFind finds metric names by sending a request to
// /graphite/metrics/find and compares the result with the expected one.
func AssertGraphiteMetricsFind(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []GraphiteMetric) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/find response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsFind(tc.T(), query, QueryOpts{
Tenant: tenantID,
})
},
Want: want,
FailNow: true,
})
}
// AssertGraphiteMetricsFind expands metric names by sending a request to
// /graphite/metrics/expand and compares the result with the expected one.
func AssertGraphiteMetricsExpand(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []string) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/metrics/expand response",
Got: func() any {
tc.T().Helper()
return app.GraphiteMetricsExpand(tc.T(), query, QueryOpts{
Tenant: tenantID,
})
},
Want: want,
FailNow: true,
})
}
// AssertGraphiteRender retieves metric raw data by sending a request to
// /graphite/render and compares the result with the expected one.
func AssertGraphiteRender(tc *TestCase, app PrometheusQuerier, target, tenantID string, from, until, step int64, want []GraphiteRenderedTarget) {
tc.T().Helper()
tc.Assert(&AssertOptions{
Msg: "unexpected /graphite/render response",
Got: func() any {
tc.T().Helper()
return app.GraphiteRender(tc.T(), target, QueryOpts{
Tenant: tenantID,
From: fmt.Sprintf("%d", from/1000),
Until: fmt.Sprintf("%d", until/1000),
StorageStep: fmt.Sprintf("%dms", step),
})
},
Want: want,
FailNow: true,
})
}

View File

@@ -0,0 +1,216 @@
package tests
import (
"fmt"
"path/filepath"
"slices"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/google/go-cmp/cmp"
)
func TestMixedPrometheusQueries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data := apptest.GenerateTestData("metric", numMetrics, start, end)
emptySeries := []map[string]string{}
emptyLabels := []string{}
emptyLabelValues := []string{}
emptyQueryResults := []*apptest.QueryResult{}
emptyMetadata := map[string][]apptest.MetadataEntry{}
emptyMetricNamesStats := []apptest.MetricNamesStatsRecord{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
// Ensure vmsingle returns data.
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data.WantSeries)
apptest.AssertSeriesCount(tc, vmsingle, "", start, end, numMetrics)
apptest.AssertLabels(tc, vmsingle, "metric.*", "", start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmsingle, "metric.*", "label", "", start, end, data.WantLabelValues)
apptest.AssertQueryResults(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmsingle, "", "", data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 1
}
apptest.AssertMetricNamesStats(tc, vmsingle, "", "", data.WantMetricNamesStats)
// Check that current vmsingle tenant (configured via flags) is tenant1.
gotAdminTenantsResponse := vmselect.APIV1AdminTenants(t, apptest.QueryOpts{})
wantAdminTenantsResponse := &apptest.AdminTenantsResponse{
Status: "success",
Data: []string{tenantID1},
}
if diff := cmp.Diff(wantAdminTenantsResponse, gotAdminTenantsResponse); diff != "" {
t.Fatalf("unexpected tenants (-want, +got):\n%s", diff)
}
// Ensure vmselect returns data for tenant1.
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID1, start, end, data.WantSeries)
apptest.AssertSeriesCount(tc, vmselect, tenantID1, start, end, numMetrics)
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID1, start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID1, start, end, data.WantLabelValues)
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmselect, "", tenantID1, data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 2
}
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID1, data.WantMetricNamesStats)
// Ensure vmselect does not return any data for tenant2.
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID2, start, end, emptySeries)
apptest.AssertSeriesCount(tc, vmselect, tenantID2, start, end, 0)
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID2, start, end, emptyLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID2, start, end, emptyLabelValues)
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyQueryResults)
apptest.AssertMetadata(tc, vmselect, "", tenantID2, emptyMetadata)
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID2, emptyMetricNamesStats)
// Ensure vmselect returns data for multitenant.
for _, v := range data.WantSeries {
v["vm_account_id"] = strconv.Itoa(accountID1)
v["vm_project_id"] = strconv.Itoa(projectID1)
}
apptest.AssertSeries(tc, vmselect, "metric.*", "multitenant", start, end, data.WantSeries)
data.WantLabels = append(data.WantLabels, "vm_account_id", "vm_project_id")
apptest.AssertLabels(tc, vmselect, "metric.*", "multitenant", start, end, data.WantLabels)
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", "multitenant", start, end, data.WantLabelValues)
for _, v := range data.WantQueryResults {
v.Metric["vm_account_id"] = strconv.Itoa(accountID1)
v.Metric["vm_project_id"] = strconv.Itoa(projectID1)
}
apptest.AssertQueryResults(tc, vmselect, "metric.*", "multitenant", start, end, data.Step, data.WantQueryResults)
apptest.AssertMetadata(tc, vmselect, "", "multitenant", data.WantMetadata)
for i := range data.WantMetricNamesStats {
data.WantMetricNamesStats[i].QueryRequestsCount = 3
}
apptest.AssertMetricNamesStats(tc, vmselect, "", "multitenant", data.WantMetricNamesStats)
}
func TestMixedDeleteSeries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data1 := apptest.GenerateTestData("metric1", numMetrics, start, end)
data2 := apptest.GenerateTestData("metric2", numMetrics, start, end)
emptySeries := []map[string]string{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data1.Samples, apptest.QueryOpts{})
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data2.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
wantSeries12 := slices.Concat(data1.WantSeries, data2.WantSeries)
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, wantSeries12)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric1.*"}`, apptest.QueryOpts{
Tenant: tenantID1,
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
Tenant: tenantID2,
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
Tenant: "multitenant",
})
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, emptySeries)
}
func TestMixedGraphiteQueries(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
const (
accountID1 = 12
projectID1 = 34
accountID2 = 56
projectID2 = 78
numMetrics = 10
)
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
data := apptest.GenerateGraphiteTestData("metric", numMetrics, start, end)
emptyMetricsIndex := []string{}
emptyMetricsFind := []apptest.GraphiteMetric{}
emptyMetricsExpand := []string{}
emptyRenderedTargets := []apptest.GraphiteRenderedTarget{}
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
"-retentionPeriod=100y",
fmt.Sprintf("-accountID=%d", accountID1),
fmt.Sprintf("-projectID=%d", projectID1),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmsingle.VmselectAddr(),
})
vmsingle.GraphiteWrite(tc.T(), data.Samples, apptest.QueryOpts{})
vmsingle.ForceFlush(t)
// Ensure vmsingle returns data.
apptest.AssertGraphiteMetricsIndex(tc, vmsingle, "", data.WantMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmsingle, "metric.*", "", data.WantMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmsingle, "metric.*", "", data.WantMetricsExpand)
apptest.AssertGraphiteRender(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantRenderedTargets)
// Ensure vmselect returns data for tenant1.
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID1, data.WantMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID1, data.WantMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID1, data.WantMetricsExpand)
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantRenderedTargets)
// Ensure vmselect does not return any data for tenant2.
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID2, emptyMetricsIndex)
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID2, emptyMetricsFind)
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID2, emptyMetricsExpand)
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyRenderedTargets)
}

View File

@@ -25,12 +25,14 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
"-httpListenAddr": "127.0.0.1:0",
"-graphiteListenAddr": "127.0.0.1:0",
"-opentsdbListenAddr": "127.0.0.1:0",
"-vmselectAddr": "127.0.0.1:0",
},
extractREs: []*regexp.Regexp{
storageDataPathRE,
httpListenAddrRE,
graphiteListenAddrRE,
openTSDBListenAddrRE,
vmselectAddrRE,
},
output: output,
})
@@ -43,6 +45,7 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
httpListenAddr: stderrExtracts[1],
graphiteListenAddr: stderrExtracts[2],
openTSDBListenAddr: stderrExtracts[3],
vmselectAddr: stderrExtracts[4],
}), nil
}
@@ -51,6 +54,7 @@ type vmsingleRuntimeValues struct {
httpListenAddr string
graphiteListenAddr string
openTSDBListenAddr string
vmselectAddr string
}
func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
@@ -85,6 +89,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
},
storageDataPath: rt.storageDataPath,
httpListenAddr: rt.httpListenAddr,
vmselectAddr: rt.vmselectAddr,
}
}
@@ -99,6 +104,7 @@ type Vmsingle struct {
storageDataPath string
httpListenAddr string
vmselectAddr string
}
// HTTPAddr returns the address at which the vminsert process is
@@ -107,6 +113,12 @@ func (app *Vmsingle) HTTPAddr() string {
return app.httpListenAddr
}
// VmselectAddr returns the address at which the vmsingle process is listening
// for vmselect connections.
func (app *Vmsingle) VmselectAddr() string {
return app.vmselectAddr
}
// String returns the string representation of the vmsingle app state.
func (app *Vmsingle) String() string {
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{

View File

@@ -0,0 +1,140 @@
package handshake
import (
"bufio"
"fmt"
"io"
"net"
"os"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type bufferedWriter interface {
Write(p []byte) (int, error)
Flush() error
}
// BufferedConn is a net.Conn with Flush suport.
type BufferedConn struct {
net.Conn
// IsLegacy defines if BufferedConn operates in legacy mode
// and doesn't support RPC protocol
IsLegacy bool
br io.Reader
bw bufferedWriter
readDeadline time.Time
writeDeadline time.Time
}
const bufferSize = 64 * 1024
// newBufferedConn returns buffered connection with the given compression level.
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
bc := &BufferedConn{
Conn: c,
}
if compressionLevel <= 0 {
bc.bw = bufio.NewWriterSize(c, bufferSize)
} else {
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
}
if !isReadCompressed {
bc.br = bufio.NewReaderSize(c, bufferSize)
} else {
bc.br = zstd.NewReader(c)
}
return bc
}
// SetDeadline sets read and write deadlines for bc to t.
//
// Deadline is checked on each Read and Write call.
func (bc *BufferedConn) SetDeadline(t time.Time) error {
bc.readDeadline = t
bc.writeDeadline = t
return bc.Conn.SetDeadline(t)
}
// SetReadDeadline sets read deadline for bc to t.
//
// Deadline is checked on each Read call.
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
bc.readDeadline = t
return bc.Conn.SetReadDeadline(t)
}
// SetWriteDeadline sets write deadline for bc to t.
//
// Deadline is checked on each Write call.
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
bc.writeDeadline = t
return bc.Conn.SetWriteDeadline(t)
}
// Read reads up to len(p) from bc to p.
func (bc *BufferedConn) Read(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.readDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.br.Read(p)
if err != nil && err != io.EOF {
err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
// Write writes p to bc.
//
// Do not forget to call Flush if needed.
func (bc *BufferedConn) Write(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.writeDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.bw.Write(p)
if err != nil {
err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool {
if deadline.IsZero() {
return false
}
return currentTimestamp > uint64(deadline.Unix())
}
// Close closes bc.
func (bc *BufferedConn) Close() error {
// Close the Conn at first. It is expected that all the required data
// is already flushed to the Conn.
err := bc.Conn.Close()
bc.Conn = nil
if zr, ok := bc.br.(*zstd.Reader); ok {
zr.Release()
}
bc.br = nil
if zw, ok := bc.bw.(*zstd.Writer); ok {
// Do not call zw.Close(), since we already closed the underlying conn.
zw.Release()
}
bc.bw = nil
bc.IsLegacy = false
return err
}
// Flush flushes internal write buffers to the underlying conn.
func (bc *BufferedConn) Flush() error {
return bc.bw.Flush()
}

318
lib/handshake/handshake.go Normal file
View File

@@ -0,0 +1,318 @@
package handshake
import (
"errors"
"flag"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.")
const (
vminsertHelloLegacyVersion = "vminsert.02"
vminsertHello = "vminsert.03"
vmselectHello = "vmselect.01"
successResponse = "ok"
)
// Func must perform handshake on the given c using the given compressionLevel.
//
// It must return BufferedConn wrapper for c on successful handshake.
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
// VMInsertClientWithDialer performs client-side handshake for vminsert protocol.
//
// it uses provided dial func to establish connection to the server.
// compressionLevel is a legacy option which defines the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) {
c, err := dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err := vminsertClient(c, 0)
if err == nil {
return bc, nil
}
_ = c.Close()
if !strings.Contains(err.Error(), "cannot read success response after sending hello") {
return nil, err
}
// try to fallback to the prev non-RPC API version
// we cannot re-use exist connection, since vmstorage already closed it
c, err = dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("legacy handshake error: %w", err)
}
bc.IsLegacy = true
logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
return bc, nil
}
func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vminsertHello, compressionLevel)
}
// VMInsertClientWithHello performs client-side handshake for vminsert protocol.
//
// should be used for testing only
func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, helloMsg, compressionLevel)
}
// VMInsertServer performs server-side handshake for vminsert protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
var isRPCSupported bool
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
buf, err := readData(c, len(vminsertHello))
if err != nil {
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762
return errTCPHealthcheck
}
return fmt.Errorf("cannot read hello: %w", err)
}
isRPCSupported = string(buf) == vminsertHello
if !isRPCSupported {
// try to fallback to the previous protocol version
if string(buf) != vminsertHelloLegacyVersion {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello)
}
logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
}
return nil
})
if err != nil {
return nil, err
}
bc.IsLegacy = !isRPCSupported
return bc, nil
}
// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol
// with legacy hello message
//
// should be used for testing only
func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) {
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
return readMessage(c, vminsertHelloLegacyVersion)
})
if err != nil {
return nil, err
}
bc.IsLegacy = true
return bc, nil
}
// VMSelectClient performs client-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vmselectHello, compressionLevel)
}
// VMSelectServer performs server-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericServer(c, compressionLevel, func(c net.Conn) error {
err := readMessage(c, vmselectHello)
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786
return errTCPHealthcheck
}
return err
})
}
// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check
// and was closed immediately after being established.
//
// This is expected behavior and can be safely ignored.
var errTCPHealthcheck = fmt.Errorf("TCP health check connection safe to ignore")
// IsTCPHealthcheck determines whether the provided error is a TCP health check
func IsTCPHealthcheck(err error) bool {
return errors.Is(err, errTCPHealthcheck)
}
// IsClientNetworkError determines whether the provided error is a client-side network error,
// such as io.EOF, io.ErrUnexpectedEOF, or a timeout.
// These errors typically occur when a client disconnects abruptly or fails during the handshake,
// and are generally non-actionable from the server point of view.
// This function helps distinguish such errors from critical ones during the handshake process
// and adjust logging accordingly.
//
// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880
func IsClientNetworkError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
if IsTimeoutNetworkError(err) {
return true
}
if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") {
return true
}
return false
}
// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout.
func IsTimeoutNetworkError(err error) bool {
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return true
}
return false
}
func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := readHelloMessage(c); err != nil {
return nil, fmt.Errorf("cannot read hello message : %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := writeMessage(c, msg); err != nil {
return nil, fmt.Errorf("cannot write hello: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func writeIsCompressed(c net.Conn, isCompressed bool) error {
var buf [1]byte
if isCompressed {
buf[0] = 1
}
return writeMessage(c, string(buf[:]))
}
func readIsCompressed(c net.Conn) (bool, error) {
buf, err := readData(c, 1)
if err != nil {
return false, err
}
isCompressed := buf[0] != 0
return isCompressed, nil
}
func writeMessage(c net.Conn, msg string) error {
if _, err := io.WriteString(c, msg); err != nil {
return fmt.Errorf("cannot write %q to server: %w", msg, err)
}
if fc, ok := c.(flusher); ok {
if err := fc.Flush(); err != nil {
return fmt.Errorf("cannot flush %q to server: %w", msg, err)
}
}
return nil
}
type flusher interface {
Flush() error
}
func readMessage(c net.Conn, msg string) error {
buf, err := readData(c, len(msg))
if err != nil {
return err
}
if string(buf) != msg {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg)
}
return nil
}
func readData(c net.Conn, dataLen int) ([]byte, error) {
data := make([]byte, dataLen)
if n, err := io.ReadFull(c, data); err != nil {
return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n)
}
return data, nil
}

View File

@@ -0,0 +1,83 @@
package handshake
import (
"fmt"
"net"
"testing"
"time"
)
func TestVMInsertHandshake(t *testing.T) {
testHandshake(t, vminsertClient, VMInsertServer)
}
func TestVMSelectHandshake(t *testing.T) {
testHandshake(t, VMSelectClient, VMSelectServer)
}
func TestVMSelectServerTCPHealthcheck(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("cannot start listener: %s", err)
}
c, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatalf("cannot dial: %s", err)
}
if err := c.Close(); err != nil {
t.Fatalf("cannot close client conn: %s", err)
}
s, err := ln.Accept()
if err != nil {
t.Fatalf("cannot accept conn: %s", err)
}
if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) {
t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err)
}
}
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
t.Helper()
c, s := net.Pipe()
ch := make(chan error, 1)
go func() {
bcs, err := serverFunc(s, 3)
if err != nil {
ch <- fmt.Errorf("error on outer handshake: %w", err)
return
}
bcc, err := clientFunc(bcs, 3)
if err != nil {
ch <- fmt.Errorf("error on inner handshake: %w", err)
return
}
if bcc == nil {
ch <- fmt.Errorf("expecting non-nil conn")
return
}
ch <- nil
}()
bcc, err := clientFunc(c, 0)
if err != nil {
t.Fatalf("error on outer handshake: %s", err)
}
bcs, err := serverFunc(bcc, 0)
if err != nil {
t.Fatalf("error on inner handshake: %s", err)
}
if bcs == nil {
t.Fatalf("expecting non-nil conn")
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error on the server side: %s", err)
}
}
}

View File

@@ -325,6 +325,17 @@ func (s *Search) NextMetricBlock() bool {
// SearchQuery is used for sending search queries from vmselect to vmstorage.
type SearchQuery struct {
AccountID uint32
ProjectID uint32
// TenantTokens and IsMultiTenant is artificial fields
// they're only exist at runtime and cannot be transferred
// via network calls for keeping communication protocol compatibility
// TODO:@f41gh7 introduce breaking change to the protocol later
// and use TenantTokens instead of AccountID and ProjectID
TenantTokens []TenantToken
IsMultiTenant bool
// The time range for searching time series
MinTimestamp int64
MaxTimestamp int64
@@ -476,7 +487,15 @@ func (sq *SearchQuery) String() string {
}
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
if !sq.IsMultiTenant {
return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end)
}
tts := make([]string, len(sq.TenantTokens))
for i, tt := range sq.TenantTokens {
tts[i] = tt.String()
}
return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end)
}
func tagFiltersToString(tfs []TagFilter) string {
@@ -487,8 +506,9 @@ func tagFiltersToString(tfs []TagFilter) string {
return "{" + strings.Join(a, ",") + "}"
}
// Marshal appends marshaled sq to dst and returns the result.
func (sq *SearchQuery) Marshal(dst []byte) []byte {
// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result.
// It is expected that TenantToken is already marshaled to dst.
func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp)
dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss)))
@@ -498,11 +518,25 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte {
dst = tagFilters[i].Marshal(dst)
}
}
dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics))
return dst
}
// Unmarshal unmarshals sq from src and returns the tail.
func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.AccountID = encoding.UnmarshalUint32(src)
src = src[4:]
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.ProjectID = encoding.UnmarshalUint32(src)
src = src[4:]
sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}}
minTs, nSize := encoding.UnmarshalVarInt64(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
@@ -543,6 +577,12 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
sq.TagFilterss[i] = tagFilters
}
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.MaxMetrics = int(encoding.UnmarshalUint32(src))
src = src[4:]
return src, nil
}

View File

@@ -32,7 +32,12 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
// Skip nil sq1.
continue
}
buf = sq1.Marshal(buf[:0])
tt := TenantToken{
AccountID: sq1.AccountID,
ProjectID: sq1.ProjectID,
}
buf = tt.Marshal(buf[:0])
buf = sq1.MarshalWithoutTenant(buf)
tail, err := sq2.Unmarshal(buf)
if err != nil {
@@ -41,6 +46,12 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
if len(tail) > 0 {
t.Fatalf("unexpected tail left after SearchQuery unmarshaling; tail (len=%d): %q", len(tail), tail)
}
if sq2.AccountID != sq1.AccountID {
t.Fatalf("unexpected AccountID; got %d; want %d", sq2.AccountID, sq1.AccountID)
}
if sq2.ProjectID != sq1.ProjectID {
t.Fatalf("unexpected ProjectID; got %d; want %d", sq2.ProjectID, sq1.ProjectID)
}
if sq1.MinTimestamp != sq2.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", sq2.MinTimestamp, sq1.MinTimestamp)
}

1222
lib/vmselectapi/server.go Normal file

File diff suppressed because it is too large Load Diff