mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-02 08:32:46 +03:00
Compare commits
3 Commits
master
...
vmsingle-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f589b25ab6 | ||
|
|
e7bf06fa4e | ||
|
|
fb04f6aefc |
15
Makefile
15
Makefile
@@ -471,7 +471,7 @@ test-full-386:
|
||||
|
||||
apptest:
|
||||
$(MAKE) victoria-metrics-race vmagent-race vmalert-race vmauth-race vmctl-race vmbackup-race vmrestore-race
|
||||
go test ./apptest/... -skip="^Test(Cluster|Legacy).*"
|
||||
go test ./apptest/... -skip="^Test(Cluster|Mixed|Legacy).*"
|
||||
|
||||
apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
@@ -489,6 +489,19 @@ apptest-legacy: victoria-metrics-race vmbackup-race vmrestore-race
|
||||
VMSTORAGE_V1_132_0_PATH=$${DIR}/vmstorage-prod \
|
||||
go test ./apptest/tests -run="^TestLegacySingle.*"
|
||||
|
||||
apptest-clusternative-vmsingle: victoria-metrics-race
|
||||
OS=$$(uname | tr '[:upper:]' '[:lower:]'); \
|
||||
ARCH=$$(uname -m | tr '[:upper:]' '[:lower:]' | sed 's/x86_64/amd64/'); \
|
||||
VERSION=v1.144.0; \
|
||||
VMCLUSTER=victoria-metrics-$${OS}-$${ARCH}-$${VERSION}-cluster.tar.gz; \
|
||||
URL=https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/$${VERSION}; \
|
||||
DIR=/tmp/$${VERSION}; \
|
||||
test -d $${DIR} || (mkdir $${DIR} && \
|
||||
curl --output-dir /tmp -LO $${URL}/$${VMCLUSTER} && tar xzf /tmp/$${VMCLUSTER} -C $${DIR} \
|
||||
); \
|
||||
VMSELECT_PATH=$${DIR}/vmselect-prod \
|
||||
go test ./apptest/tests -run="^TestMixed.*"
|
||||
|
||||
benchmark:
|
||||
go test -run=NO_TESTS -bench=. ./lib/...
|
||||
go test -run=NO_TESTS -bench=. ./app/...
|
||||
|
||||
@@ -12,8 +12,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -28,6 +27,8 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -36,6 +37,7 @@ var (
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
|
||||
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
|
||||
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
|
||||
vmselectAddr = flag.String("vmselectAddr", "", "TCP address to accept connections from vmselect services")
|
||||
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
|
||||
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
|
||||
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
|
||||
@@ -184,6 +186,14 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
})
|
||||
metrics.RegisterSet(storageMetrics)
|
||||
|
||||
if *vmselectAddr != "" {
|
||||
var err error
|
||||
vmselectSrv, err = servers.NewVMSelectServer(*vmselectAddr, strg)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
WG = syncwg.WaitGroup{}
|
||||
resetResponseCacheIfNeeded = resetCacheIfNeeded
|
||||
Storage = strg
|
||||
@@ -197,6 +207,8 @@ var storageMetrics *metrics.Set
|
||||
// for proper graceful shutdown when Stop is called.
|
||||
var Storage *storage.Storage
|
||||
|
||||
var vmselectSrv *vmselectapi.Server
|
||||
|
||||
// WG must be incremented before Storage call.
|
||||
//
|
||||
// Use syncwg instead of sync, since Add is called from concurrent goroutines.
|
||||
@@ -335,6 +347,9 @@ func Stop() {
|
||||
startTime := time.Now()
|
||||
WG.WaitAndBlock()
|
||||
stopStaleSnapshotsRemover()
|
||||
if vmselectSrv != nil {
|
||||
vmselectSrv.MustStop()
|
||||
}
|
||||
Storage.MustClose()
|
||||
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
|
||||
|
||||
|
||||
319
app/vmstorage/servers/vmselect.go
Normal file
319
app/vmstorage/servers/vmselect.go
Normal file
@@ -0,0 +1,319 @@
|
||||
package servers
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
|
||||
var (
|
||||
maxUniqueTimeseries = flag.Int("clusternative.maxUniqueTimeseries", 0, "The maximum number of unique time series, "+
|
||||
"which can be scanned during every query by the vmselect PRC server. This allows protecting against heavy queries, "+
|
||||
"which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on "+
|
||||
" -clusternative.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). "+
|
||||
"See also -clusternative.max* command-line flags at vmselect")
|
||||
maxTagKeys = flag.Int("clusternative.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxTagValues = flag.Int("clusternative.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
|
||||
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
||||
maxTagValueSuffixesPerSearch = flag.Int("clusternative.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||
maxConcurrentRequests = flag.Int("clusternative.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent vmselect requests "+
|
||||
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+
|
||||
"may require high amounts of memory. See also -search.maxQueueDuration")
|
||||
maxQueueDuration = flag.Duration("clusternative.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
|
||||
"when -search.maxConcurrentRequests limit is reached")
|
||||
|
||||
disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
|
||||
"This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||
)
|
||||
|
||||
func getDefaultMaxConcurrentRequests() int {
|
||||
// A single request can saturate all the CPU cores, so there is no sense
|
||||
// in allowing higher number of concurrent requests - they will just contend
|
||||
// for unavailable CPU time.
|
||||
n := min(cgroup.AvailableCPUs()*2, 16)
|
||||
return n
|
||||
}
|
||||
|
||||
var (
|
||||
maxUniqueTimeseriesValue int
|
||||
maxUniqueTimeseriesValueOnce sync.Once
|
||||
)
|
||||
|
||||
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
|
||||
func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) {
|
||||
api := newVMSingleAPI(s)
|
||||
limits := vmselectapi.Limits{
|
||||
MaxLabelNames: *maxTagKeys,
|
||||
MaxLabelValues: *maxTagValues,
|
||||
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
|
||||
MaxConcurrentRequests: *maxConcurrentRequests,
|
||||
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
|
||||
MaxQueueDuration: *maxQueueDuration,
|
||||
MaxQueueDurationFlagName: "search.maxQueueDuration",
|
||||
}
|
||||
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
|
||||
}
|
||||
|
||||
// vmstorageAPI impelements vmselectapi.API
|
||||
type vmstorageAPI struct {
|
||||
s *storage.Storage
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := getMaxMetrics(sq.MaxMetrics)
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(tfss) == 0 {
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
bi := getBlockIterator()
|
||||
bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline)
|
||||
if err := bi.sr.Error(); err != nil {
|
||||
bi.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
return bi, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
// fallback to maxUniqueTimeSeries if no limit is provided,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
|
||||
maxMetrics = GetMaxUniqueTimeSeries()
|
||||
}
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(tfss) == 0 {
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
// fallback to maxUniqueTimeSeries if no limit is provided,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
|
||||
maxMetrics = GetMaxUniqueTimeSeries()
|
||||
}
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return api.s.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
|
||||
maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
suffixes, err := api.s.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixes) >= maxSuffixes {
|
||||
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
|
||||
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
|
||||
}
|
||||
return suffixes, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
// fallback to maxUniqueTimeSeries if no limit is provided,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
|
||||
maxMetrics = GetMaxUniqueTimeSeries()
|
||||
}
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return api.s.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, _, _ uint32, deadline uint64) (uint64, error) {
|
||||
return api.s.GetSeriesCount(deadline)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
// fallback to maxUniqueTimeSeries if no limit is provided,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
|
||||
maxMetrics = GetMaxUniqueTimeSeries()
|
||||
}
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
|
||||
return api.s.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
// fallback to maxUniqueTimeSeries if no limit is provided,
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
|
||||
maxMetrics = GetMaxUniqueTimeSeries()
|
||||
}
|
||||
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(tfss) == 0 {
|
||||
return 0, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
return api.s.DeleteSeries(qt, tfss, maxMetrics)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
|
||||
api.s.RegisterMetricNames(qt, mrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, _ *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
|
||||
return api.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
|
||||
api.s.ResetMetricNamesStats(qt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
|
||||
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
|
||||
for _, tagFilters := range sq.TagFilterss {
|
||||
tfs := storage.NewTagFilters()
|
||||
for i := range tagFilters {
|
||||
tf := &tagFilters[i]
|
||||
if string(tf.Key) == "__graphite__" {
|
||||
query := tf.Value
|
||||
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
|
||||
paths, err := api.s.SearchGraphitePaths(qtChild, tr, query, maxMetrics, deadline)
|
||||
qtChild.Donef("found %d series", len(paths))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
||||
}
|
||||
if len(paths) >= maxMetrics {
|
||||
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
|
||||
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
|
||||
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
|
||||
}
|
||||
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
|
||||
continue
|
||||
}
|
||||
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
|
||||
}
|
||||
}
|
||||
tfss = append(tfss, tfs)
|
||||
}
|
||||
return tfss, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, _ *storage.TenantToken, limit int, metricName string, _ uint64) ([]*metricsmetadata.Row, error) {
|
||||
return api.s.GetMetadataRows(qt, limit, metricName), nil
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
}
|
||||
|
||||
var blockIteratorsPool sync.Pool
|
||||
|
||||
func (bi *blockIterator) MustClose() {
|
||||
bi.sr.MustClose()
|
||||
bi.mb.MetricName = nil
|
||||
bi.mb.Block.Reset()
|
||||
blockIteratorsPool.Put(bi)
|
||||
}
|
||||
|
||||
func getBlockIterator() *blockIterator {
|
||||
v := blockIteratorsPool.Get()
|
||||
if v == nil {
|
||||
v = &blockIterator{}
|
||||
}
|
||||
return v.(*blockIterator)
|
||||
}
|
||||
|
||||
func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
if !bi.sr.NextMetricBlock() {
|
||||
return dst, false
|
||||
}
|
||||
mb := bi.mb
|
||||
mb.MetricName = bi.sr.MetricBlockRef.MetricName
|
||||
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
|
||||
dst = marshalMetricBlock(dst[:0], &mb)
|
||||
return dst, true
|
||||
}
|
||||
|
||||
func (bi *blockIterator) Error() error {
|
||||
return bi.sr.Error()
|
||||
}
|
||||
|
||||
func getMaxMetrics(searchQueryLimit int) int {
|
||||
if searchQueryLimit <= 0 {
|
||||
return GetMaxUniqueTimeSeries()
|
||||
}
|
||||
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
|
||||
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
|
||||
searchQueryLimit = *maxUniqueTimeseries
|
||||
}
|
||||
return searchQueryLimit
|
||||
}
|
||||
|
||||
// GetMaxUniqueTimeSeries returns `-search.maxUniqueTimeseries` or the auto-calculated value based on available resources.
|
||||
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
|
||||
func GetMaxUniqueTimeSeries() int {
|
||||
maxUniqueTimeseriesValueOnce.Do(func() {
|
||||
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
||||
if maxUniqueTimeseriesValue <= 0 {
|
||||
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(*maxConcurrentRequests, memory.Remaining())
|
||||
}
|
||||
})
|
||||
return maxUniqueTimeseriesValue
|
||||
}
|
||||
|
||||
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
|
||||
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
|
||||
if maxConcurrentRequests <= 0 {
|
||||
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
|
||||
// In such cases, fallback to unlimited.
|
||||
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
||||
return 2e9
|
||||
}
|
||||
|
||||
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
|
||||
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
|
||||
mts := remainingMemory / 200 / maxConcurrentRequests
|
||||
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
||||
return mts
|
||||
}
|
||||
185
app/vmstorage/servers/vmselect_single_node.go
Normal file
185
app/vmstorage/servers/vmselect_single_node.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package servers
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
|
||||
var (
|
||||
accountID = flag.Uint64("clusternative.accountID", 0, "The accountID of the stored data")
|
||||
projectID = flag.Uint64("clusternative.projectID", 0, "The projectID of the stored data")
|
||||
)
|
||||
|
||||
const (
|
||||
maxAccountID = uint64(math.MaxUint32)
|
||||
maxProjectID = uint64(math.MaxUint32)
|
||||
)
|
||||
|
||||
func newVMSingleAPI(s *storage.Storage) *vmsingleAPI {
|
||||
if *accountID > maxAccountID {
|
||||
logger.Fatalf("-clusternative.accountID must to be in the range [0, %d], got %d", maxAccountID, *accountID)
|
||||
}
|
||||
if *projectID > maxProjectID {
|
||||
logger.Fatalf("-clusternative.projectID must to be in the range [0, %d], got %d", maxProjectID, *projectID)
|
||||
}
|
||||
api := &vmsingleAPI{
|
||||
s: &vmstorageAPI{s: s},
|
||||
accountID: uint32(*accountID),
|
||||
projectID: uint32(*projectID),
|
||||
}
|
||||
return api
|
||||
}
|
||||
|
||||
// vmsingleAPI impelements vmselectapi.API for single node.
|
||||
type vmsingleAPI struct {
|
||||
s *vmstorageAPI
|
||||
accountID uint32
|
||||
projectID uint32
|
||||
}
|
||||
|
||||
// marshalMetricBlock serializes a metric block in the format expected by
|
||||
// vmselect.
|
||||
//
|
||||
// vmselect expects metric names and data blocks to have the tenantID but
|
||||
// vmsingle does not have it. Therefore the tenantID needs to be included to
|
||||
// every metric name and block.
|
||||
func marshalMetricBlock(dst []byte, src *storage.MetricBlock) []byte {
|
||||
// Marshal metric name.
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(src.MetricName))+8)
|
||||
dst = encoding.MarshalUint32(dst, uint32(*accountID))
|
||||
dst = encoding.MarshalUint32(dst, uint32(*projectID))
|
||||
dst = append(dst, src.MetricName...)
|
||||
|
||||
// Marshal data block.
|
||||
dst = encoding.MarshalUint32(dst, uint32(*accountID))
|
||||
dst = encoding.MarshalUint32(dst, uint32(*projectID))
|
||||
dst = storage.MarshalBlock(dst, &src.Block)
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
// emptyBlockIterator is an implementation of vmselectapi.BlockIterator that
|
||||
// always returns no data.
|
||||
type emptyBlockIterator struct{}
|
||||
|
||||
func (*emptyBlockIterator) MustClose() {}
|
||||
|
||||
func (*emptyBlockIterator) NextBlock(dst []byte) ([]byte, bool) {
|
||||
return dst, false
|
||||
}
|
||||
|
||||
func (*emptyBlockIterator) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var emptyBI = &emptyBlockIterator{}
|
||||
|
||||
func (api *vmsingleAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return emptyBI, nil
|
||||
}
|
||||
return api.s.InitSearch(qt, sq, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metricNames, err := api.s.SearchMetricNames(qt, sq, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// vmselect expects metric names to have the tenantID but vmsingle does not
|
||||
// have it. Therefore the tenantID needs to be appended to every metric
|
||||
// name.
|
||||
dst := make([]byte, 0, 8)
|
||||
dst = encoding.MarshalUint32(dst, sq.AccountID)
|
||||
dst = encoding.MarshalUint32(dst, sq.ProjectID)
|
||||
tenantID := string(dst)
|
||||
|
||||
for i, metricName := range metricNames {
|
||||
metricNames[i] = tenantID + metricName
|
||||
}
|
||||
return metricNames, nil
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return api.s.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
|
||||
maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
if accountID != api.accountID || projectID != api.projectID {
|
||||
return nil, nil
|
||||
}
|
||||
return api.s.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return api.s.LabelNames(qt, sq, maxLabelNames, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||
if accountID != api.accountID || projectID != api.projectID {
|
||||
return 0, nil
|
||||
}
|
||||
return api.s.SeriesCount(qt, accountID, projectID, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) Tenants(_ *querytracer.Tracer, _ storage.TimeRange, _ uint64) ([]string, error) {
|
||||
tenantID := fmt.Sprintf("%d:%d", api.accountID, api.projectID)
|
||||
return []string{tenantID}, nil
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return &storage.TSDBStatus{}, nil
|
||||
}
|
||||
return api.s.TSDBStatus(qt, sq, focusLabel, topN, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
if !sq.IsMultiTenant && (sq.AccountID != api.accountID || sq.ProjectID != api.projectID) {
|
||||
return 0, nil
|
||||
}
|
||||
return api.s.DeleteSeries(qt, sq, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
|
||||
if tt != nil && (tt.AccountID != api.accountID || tt.ProjectID != api.projectID) {
|
||||
return metricnamestats.StatsResult{}, nil
|
||||
}
|
||||
return api.s.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
|
||||
return api.s.ResetMetricNamesUsageStats(qt, deadline)
|
||||
}
|
||||
|
||||
func (api *vmsingleAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
|
||||
if tt != nil && (tt.AccountID != api.accountID || tt.ProjectID != api.projectID) {
|
||||
return nil, nil
|
||||
}
|
||||
return api.s.GetMetadataRecords(qt, tt, limit, metricName, deadline)
|
||||
}
|
||||
52
app/vmstorage/servers/vmselect_test.go
Normal file
52
app/vmstorage/servers/vmselect_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package servers
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
|
||||
f := func(maxConcurrentRequest, remainingMemory, expect int) {
|
||||
t.Helper()
|
||||
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
|
||||
if maxMetricsLimit != expect {
|
||||
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
|
||||
}
|
||||
}
|
||||
|
||||
// 64-bit architectures support memory sizes > 4GB.
|
||||
if strconv.IntSize == 64 {
|
||||
// 8 CPU & 32 GiB
|
||||
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
|
||||
// 4 CPU & 32 GiB
|
||||
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
|
||||
}
|
||||
|
||||
// 2 CPU & 4 GiB
|
||||
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
|
||||
|
||||
// other edge cases
|
||||
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
|
||||
f(4, 0, 0)
|
||||
|
||||
}
|
||||
|
||||
func TestGetMaxMetrics(t *testing.T) {
|
||||
originalMaxUniqueTimeSeries := *maxUniqueTimeseries
|
||||
defer func() {
|
||||
*maxUniqueTimeseries = originalMaxUniqueTimeSeries
|
||||
}()
|
||||
f := func(searchQueryLimit, storageMaxUniqueTimeseries, expect int) {
|
||||
t.Helper()
|
||||
*maxUniqueTimeseries = storageMaxUniqueTimeseries
|
||||
maxMetrics := getMaxMetrics(searchQueryLimit)
|
||||
if maxMetrics != expect {
|
||||
t.Fatalf("unexpected max metrics: got %d, want %d", maxMetrics, expect)
|
||||
}
|
||||
}
|
||||
|
||||
f(0, 1e6, 1e6)
|
||||
f(2e6, 0, 2e6)
|
||||
f(2e6, 1e6, 1e6)
|
||||
}
|
||||
358
apptest/testdata.go
Normal file
358
apptest/testdata.go
Normal file
@@ -0,0 +1,358 @@
|
||||
package apptest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
)
|
||||
|
||||
type TestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantSeries []map[string]string
|
||||
WantLabels []string
|
||||
WantLabelValues []string
|
||||
WantQueryResults []*QueryResult
|
||||
WantMetadata map[string][]MetadataEntry
|
||||
WantMetricNamesStats []MetricNamesStatsRecord
|
||||
}
|
||||
|
||||
func GenerateTestData(prefix string, numMetrics, start, end int64) TestData {
|
||||
d := TestData{
|
||||
Samples: []string{},
|
||||
Step: (end - start) / numMetrics,
|
||||
WantSeries: make([]map[string]string, numMetrics),
|
||||
WantLabels: make([]string, numMetrics),
|
||||
WantLabelValues: make([]string, numMetrics),
|
||||
WantQueryResults: make([]*QueryResult, numMetrics),
|
||||
WantMetadata: make(map[string][]MetadataEntry),
|
||||
WantMetricNamesStats: make([]MetricNamesStatsRecord, numMetrics),
|
||||
}
|
||||
for i := range numMetrics {
|
||||
metricName := fmt.Sprintf("%s_%04d", prefix, i)
|
||||
metricHelp := fmt.Sprintf("# HELP %s some help message", metricName)
|
||||
metricType := fmt.Sprintf("# TYPE %s gauge", metricName)
|
||||
labelName := fmt.Sprintf("label_%04d", i)
|
||||
labelValue := fmt.Sprintf("value_%04d", i)
|
||||
value := i
|
||||
timestamp := start + i*d.Step
|
||||
sample := fmt.Sprintf(`%s{%s="value", label="%s"} %d %d`, metricName, labelName, labelValue, value, timestamp)
|
||||
|
||||
d.Samples = append(d.Samples, metricHelp, metricType, sample)
|
||||
d.WantSeries[i] = map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
}
|
||||
d.WantLabels[i] = labelName
|
||||
d.WantLabelValues[i] = labelValue
|
||||
d.WantQueryResults[i] = &QueryResult{
|
||||
Metric: map[string]string{
|
||||
"__name__": metricName,
|
||||
labelName: "value",
|
||||
"label": labelValue,
|
||||
},
|
||||
Samples: []*Sample{{Timestamp: timestamp, Value: float64(value)}},
|
||||
}
|
||||
d.WantMetadata[metricName] = []MetadataEntry{{Help: "some help message", Type: "gauge"}}
|
||||
d.WantMetricNamesStats[i].MetricName = metricName
|
||||
}
|
||||
d.WantLabels = append(d.WantLabels, "__name__", "label")
|
||||
slices.Sort(d.WantLabels)
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertSeries retrieves metric names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertSeries(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []map[string]string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Series(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
}).Sort()
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
Retries: 1000,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertSeriesCount retrieves series count and compares it with expected one.
|
||||
func AssertSeriesCount(tc *TestCase, app PrometheusQuerier, tenantID string, start, end int64, want uint64) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/series/count response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1SeriesCount(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1SeriesCountResponse{
|
||||
Status: "success",
|
||||
Data: []uint64{want},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabels retrieves label names from the storage and compares the result
|
||||
// with the expected one.
|
||||
func AssertLabels(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1Labels(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelsResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertLabelValues retrieves values for the label whose name is labelName for
|
||||
// the series whose name mathes metricNameRE, compares the result with the
|
||||
// expected one.
|
||||
func AssertLabelValues(tc *TestCase, app PrometheusQuerier, metricNameRE, labelName, tenantID string, start, end int64, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/labels/.../values response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
res := app.PrometheusAPIV1LabelValues(tc.T(), labelName, query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
})
|
||||
slices.Sort(res.Data)
|
||||
return res
|
||||
},
|
||||
Want: &PrometheusAPIV1LabelValuesResponse{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
// AssertQueryResults sends a data query to storage and compares the query
|
||||
// result with the expected one.
|
||||
func AssertQueryResults(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, start, end, step int64, want []*QueryResult) {
|
||||
tc.T().Helper()
|
||||
|
||||
query := fmt.Sprintf(`{__name__=~"%s"}`, metricNameRE)
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/query_range response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1QueryRange(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
Start: fmt.Sprintf("%d", start),
|
||||
End: fmt.Sprintf("%d", end),
|
||||
Step: fmt.Sprintf("%dms", step),
|
||||
MaxLookback: fmt.Sprintf("%dms", step-1),
|
||||
NoCache: "1",
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1QueryResponse{
|
||||
Status: "success",
|
||||
Data: &QueryData{
|
||||
ResultType: "matrix",
|
||||
Result: want,
|
||||
},
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetadata(tc *TestCase, app PrometheusQuerier, metricName, tenantID string, want map[string][]MetadataEntry) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/metadata response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1Metadata(tc.T(), metricName, 0, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: &PrometheusAPIV1Metadata{
|
||||
Status: "success",
|
||||
Data: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
|
||||
func AssertMetricNamesStats(tc *TestCase, app PrometheusQuerier, metricNameRE, tenantID string, want []MetricNamesStatsRecord) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /prometheus/api/v1/status/metric_names_stats response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.PrometheusAPIV1StatusMetricNamesStats(tc.T(), "", "", metricNameRE, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: MetricNamesStatsResponse{
|
||||
Records: want,
|
||||
},
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// GraphiteTestData holds the data samples in Graphite Pickle format, distance
|
||||
// between samples in milliseconds and expected responses for various Graphite
|
||||
// API endpoints.
|
||||
type GraphiteTestData struct {
|
||||
Samples []string
|
||||
Step int64
|
||||
WantMetricsIndex []string
|
||||
WantMetricsFind []GraphiteMetric
|
||||
WantMetricsExpand []string
|
||||
WantRenderedTargets []GraphiteRenderedTarget
|
||||
}
|
||||
|
||||
// GenerateGraphiteTestData generates Graphite test data.
|
||||
func GenerateGraphiteTestData(prefix string, numMetrics, start, end int64) GraphiteTestData {
|
||||
d := GraphiteTestData{
|
||||
Samples: make([]string, numMetrics),
|
||||
Step: (end - start) / numMetrics,
|
||||
WantMetricsIndex: make([]string, numMetrics),
|
||||
WantMetricsFind: make([]GraphiteMetric, numMetrics),
|
||||
WantMetricsExpand: make([]string, numMetrics),
|
||||
WantRenderedTargets: make([]GraphiteRenderedTarget, numMetrics),
|
||||
}
|
||||
|
||||
datapoints := make([][2]float64, numMetrics)
|
||||
for i := range numMetrics {
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
datapoints[i][1] = float64(timestamp)
|
||||
}
|
||||
|
||||
for i := range numMetrics {
|
||||
suffix := fmt.Sprintf("%04d", i)
|
||||
metricName := fmt.Sprintf("%s.%s", prefix, suffix)
|
||||
value := i
|
||||
timestamp := (start + i*d.Step) / 1000
|
||||
sample := fmt.Sprintf(`%s %d %d`, metricName, value, timestamp)
|
||||
|
||||
d.Samples[i] = sample
|
||||
d.WantMetricsIndex[i] = metricName
|
||||
d.WantMetricsFind[i].Id = metricName
|
||||
d.WantMetricsFind[i].Text = suffix
|
||||
d.WantMetricsFind[i].Leaf = 1
|
||||
d.WantMetricsExpand[i] = metricName
|
||||
d.WantRenderedTargets[i].Target = metricName
|
||||
d.WantRenderedTargets[i].Datapoints = slices.Clone(datapoints)
|
||||
d.WantRenderedTargets[i].Datapoints[i][0] = float64(value)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsIndex retrieves all metrics by sending a request to
|
||||
// /graphite/metrics/index.json and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsIndex(tc *TestCase, app PrometheusQuerier, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/index.json response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsIndex(tc.T(), QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
Retries: 30,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind finds metric names by sending a request to
|
||||
// /graphite/metrics/find and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsFind(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []GraphiteMetric) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/find response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsFind(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteMetricsFind expands metric names by sending a request to
|
||||
// /graphite/metrics/expand and compares the result with the expected one.
|
||||
func AssertGraphiteMetricsExpand(tc *TestCase, app PrometheusQuerier, query, tenantID string, want []string) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/metrics/expand response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteMetricsExpand(tc.T(), query, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// AssertGraphiteRender retieves metric raw data by sending a request to
|
||||
// /graphite/render and compares the result with the expected one.
|
||||
func AssertGraphiteRender(tc *TestCase, app PrometheusQuerier, target, tenantID string, from, until, step int64, want []GraphiteRenderedTarget) {
|
||||
tc.T().Helper()
|
||||
|
||||
tc.Assert(&AssertOptions{
|
||||
Msg: "unexpected /graphite/render response",
|
||||
Got: func() any {
|
||||
tc.T().Helper()
|
||||
return app.GraphiteRender(tc.T(), target, QueryOpts{
|
||||
Tenant: tenantID,
|
||||
From: fmt.Sprintf("%d", from/1000),
|
||||
Until: fmt.Sprintf("%d", until/1000),
|
||||
StorageStep: fmt.Sprintf("%dms", step),
|
||||
})
|
||||
},
|
||||
Want: want,
|
||||
FailNow: true,
|
||||
})
|
||||
}
|
||||
216
apptest/tests/mixed_test.go
Normal file
216
apptest/tests/mixed_test.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestMixedPrometheusQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateTestData("metric", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
emptyLabels := []string{}
|
||||
emptyLabelValues := []string{}
|
||||
emptyQueryResults := []*apptest.QueryResult{}
|
||||
emptyMetadata := map[string][]apptest.MetadataEntry{}
|
||||
emptyMetricNamesStats := []apptest.MetricNamesStatsRecord{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-clusternative.accountID=%d", accountID1),
|
||||
fmt.Sprintf("-clusternative.projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmsingle, "", start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmsingle, "metric.*", "", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmsingle, "metric.*", "label", "", start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmsingle, "", "", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 1
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmsingle, "", "", data.WantMetricNamesStats)
|
||||
|
||||
// Check that current vmsingle tenant (configured via flags) is tenant1.
|
||||
gotAdminTenantsResponse := vmselect.APIV1AdminTenants(t, apptest.QueryOpts{})
|
||||
wantAdminTenantsResponse := &apptest.AdminTenantsResponse{
|
||||
Status: "success",
|
||||
Data: []string{tenantID1},
|
||||
}
|
||||
if diff := cmp.Diff(wantAdminTenantsResponse, gotAdminTenantsResponse); diff != "" {
|
||||
t.Fatalf("unexpected tenants (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID1, start, end, data.WantSeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID1, start, end, numMetrics)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID1, start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID1, start, end, data.WantLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID1, data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 2
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID1, data.WantMetricNamesStats)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", tenantID2, start, end, emptySeries)
|
||||
apptest.AssertSeriesCount(tc, vmselect, tenantID2, start, end, 0)
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", tenantID2, start, end, emptyLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", tenantID2, start, end, emptyLabelValues)
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", tenantID2, emptyMetadata)
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", tenantID2, emptyMetricNamesStats)
|
||||
|
||||
// Ensure vmselect returns data for multitenant.
|
||||
for _, v := range data.WantSeries {
|
||||
v["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertSeries(tc, vmselect, "metric.*", "multitenant", start, end, data.WantSeries)
|
||||
data.WantLabels = append(data.WantLabels, "vm_account_id", "vm_project_id")
|
||||
apptest.AssertLabels(tc, vmselect, "metric.*", "multitenant", start, end, data.WantLabels)
|
||||
apptest.AssertLabelValues(tc, vmselect, "metric.*", "label", "multitenant", start, end, data.WantLabelValues)
|
||||
for _, v := range data.WantQueryResults {
|
||||
v.Metric["vm_account_id"] = strconv.Itoa(accountID1)
|
||||
v.Metric["vm_project_id"] = strconv.Itoa(projectID1)
|
||||
}
|
||||
apptest.AssertQueryResults(tc, vmselect, "metric.*", "multitenant", start, end, data.Step, data.WantQueryResults)
|
||||
apptest.AssertMetadata(tc, vmselect, "", "multitenant", data.WantMetadata)
|
||||
for i := range data.WantMetricNamesStats {
|
||||
data.WantMetricNamesStats[i].QueryRequestsCount = 3
|
||||
}
|
||||
apptest.AssertMetricNamesStats(tc, vmselect, "", "multitenant", data.WantMetricNamesStats)
|
||||
}
|
||||
|
||||
func TestMixedDeleteSeries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data1 := apptest.GenerateTestData("metric1", numMetrics, start, end)
|
||||
data2 := apptest.GenerateTestData("metric2", numMetrics, start, end)
|
||||
emptySeries := []map[string]string{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-clusternative.accountID=%d", accountID1),
|
||||
fmt.Sprintf("-clusternative.projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data1.Samples, apptest.QueryOpts{})
|
||||
vmsingle.PrometheusAPIV1ImportPrometheus(tc.T(), data2.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
wantSeries12 := slices.Concat(data1.WantSeries, data2.WantSeries)
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, wantSeries12)
|
||||
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric1.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID1,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: tenantID2,
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, data2.WantSeries)
|
||||
vmselect.PrometheusAPIV1AdminTSDBDeleteSeries(tc.T(), `{__name__=~"metric2.*"}`, apptest.QueryOpts{
|
||||
Tenant: "multitenant",
|
||||
})
|
||||
apptest.AssertSeries(tc, vmsingle, "metric.*", "", start, end, emptySeries)
|
||||
}
|
||||
|
||||
func TestMixedGraphiteQueries(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
const (
|
||||
accountID1 = 12
|
||||
projectID1 = 34
|
||||
accountID2 = 56
|
||||
projectID2 = 78
|
||||
numMetrics = 10
|
||||
)
|
||||
tenantID1 := fmt.Sprintf("%d:%d", accountID1, projectID1)
|
||||
tenantID2 := fmt.Sprintf("%d:%d", accountID2, projectID2)
|
||||
start := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
end := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
data := apptest.GenerateGraphiteTestData("metric", numMetrics, start, end)
|
||||
emptyMetricsIndex := []string{}
|
||||
emptyMetricsFind := []apptest.GraphiteMetric{}
|
||||
emptyMetricsExpand := []string{}
|
||||
emptyRenderedTargets := []apptest.GraphiteRenderedTarget{}
|
||||
|
||||
vmsingle := tc.MustStartVmsingle("vmsingle", []string{
|
||||
"-storageDataPath=" + filepath.Join(tc.Dir(), "vmsingle"),
|
||||
"-retentionPeriod=100y",
|
||||
fmt.Sprintf("-clusternative.accountID=%d", accountID1),
|
||||
fmt.Sprintf("-clusternative.projectID=%d", projectID1),
|
||||
})
|
||||
vmselect := tc.MustStartVmselect("vmselect", []string{
|
||||
"-storageNode=" + vmsingle.VmselectAddr(),
|
||||
})
|
||||
|
||||
vmsingle.GraphiteWrite(tc.T(), data.Samples, apptest.QueryOpts{})
|
||||
vmsingle.ForceFlush(t)
|
||||
|
||||
// Ensure vmsingle returns data.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmsingle, "", data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmsingle, "metric.*", "", data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmsingle, "metric.*", "", data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmsingle, "metric.*", "", start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect returns data for tenant1.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID1, data.WantMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID1, data.WantMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID1, data.WantMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID1, start, end, data.Step, data.WantRenderedTargets)
|
||||
|
||||
// Ensure vmselect does not return any data for tenant2.
|
||||
apptest.AssertGraphiteMetricsIndex(tc, vmselect, tenantID2, emptyMetricsIndex)
|
||||
apptest.AssertGraphiteMetricsFind(tc, vmselect, "metric.*", tenantID2, emptyMetricsFind)
|
||||
apptest.AssertGraphiteMetricsExpand(tc, vmselect, "metric.*", tenantID2, emptyMetricsExpand)
|
||||
apptest.AssertGraphiteRender(tc, vmselect, "metric.*", tenantID2, start, end, data.Step, emptyRenderedTargets)
|
||||
}
|
||||
@@ -25,12 +25,14 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
"-httpListenAddr": "127.0.0.1:0",
|
||||
"-graphiteListenAddr": "127.0.0.1:0",
|
||||
"-opentsdbListenAddr": "127.0.0.1:0",
|
||||
"-vmselectAddr": "127.0.0.1:0",
|
||||
},
|
||||
extractREs: []*regexp.Regexp{
|
||||
storageDataPathRE,
|
||||
httpListenAddrRE,
|
||||
graphiteListenAddrRE,
|
||||
openTSDBListenAddrRE,
|
||||
vmselectAddrRE,
|
||||
},
|
||||
output: output,
|
||||
})
|
||||
@@ -43,6 +45,7 @@ func StartVmsingle(instance string, flags []string, cli *Client, output io.Write
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
graphiteListenAddr: stderrExtracts[2],
|
||||
openTSDBListenAddr: stderrExtracts[3],
|
||||
vmselectAddr: stderrExtracts[4],
|
||||
}), nil
|
||||
}
|
||||
|
||||
@@ -51,6 +54,7 @@ type vmsingleRuntimeValues struct {
|
||||
httpListenAddr string
|
||||
graphiteListenAddr string
|
||||
openTSDBListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
@@ -85,6 +89,7 @@ func newVmsingle(app *app, cli *Client, rt vmsingleRuntimeValues) *Vmsingle {
|
||||
},
|
||||
storageDataPath: rt.storageDataPath,
|
||||
httpListenAddr: rt.httpListenAddr,
|
||||
vmselectAddr: rt.vmselectAddr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +104,7 @@ type Vmsingle struct {
|
||||
|
||||
storageDataPath string
|
||||
httpListenAddr string
|
||||
vmselectAddr string
|
||||
}
|
||||
|
||||
// HTTPAddr returns the address at which the vminsert process is
|
||||
@@ -107,6 +113,12 @@ func (app *Vmsingle) HTTPAddr() string {
|
||||
return app.httpListenAddr
|
||||
}
|
||||
|
||||
// VmselectAddr returns the address at which the vmsingle process is listening
|
||||
// for vmselect connections.
|
||||
func (app *Vmsingle) VmselectAddr() string {
|
||||
return app.vmselectAddr
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmsingle app state.
|
||||
func (app *Vmsingle) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q}", []any{
|
||||
|
||||
140
lib/handshake/buffered_conn.go
Normal file
140
lib/handshake/buffered_conn.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
type bufferedWriter interface {
|
||||
Write(p []byte) (int, error)
|
||||
Flush() error
|
||||
}
|
||||
|
||||
// BufferedConn is a net.Conn with Flush suport.
|
||||
type BufferedConn struct {
|
||||
net.Conn
|
||||
|
||||
// IsLegacy defines if BufferedConn operates in legacy mode
|
||||
// and doesn't support RPC protocol
|
||||
IsLegacy bool
|
||||
|
||||
br io.Reader
|
||||
bw bufferedWriter
|
||||
|
||||
readDeadline time.Time
|
||||
writeDeadline time.Time
|
||||
}
|
||||
|
||||
const bufferSize = 64 * 1024
|
||||
|
||||
// newBufferedConn returns buffered connection with the given compression level.
|
||||
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
|
||||
bc := &BufferedConn{
|
||||
Conn: c,
|
||||
}
|
||||
if compressionLevel <= 0 {
|
||||
bc.bw = bufio.NewWriterSize(c, bufferSize)
|
||||
} else {
|
||||
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
|
||||
}
|
||||
if !isReadCompressed {
|
||||
bc.br = bufio.NewReaderSize(c, bufferSize)
|
||||
} else {
|
||||
bc.br = zstd.NewReader(c)
|
||||
}
|
||||
return bc
|
||||
}
|
||||
|
||||
// SetDeadline sets read and write deadlines for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Read and Write call.
|
||||
func (bc *BufferedConn) SetDeadline(t time.Time) error {
|
||||
bc.readDeadline = t
|
||||
bc.writeDeadline = t
|
||||
return bc.Conn.SetDeadline(t)
|
||||
}
|
||||
|
||||
// SetReadDeadline sets read deadline for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Read call.
|
||||
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
|
||||
bc.readDeadline = t
|
||||
return bc.Conn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets write deadline for bc to t.
|
||||
//
|
||||
// Deadline is checked on each Write call.
|
||||
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
|
||||
bc.writeDeadline = t
|
||||
return bc.Conn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// Read reads up to len(p) from bc to p.
|
||||
func (bc *BufferedConn) Read(p []byte) (int, error) {
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
if deadlineExceeded(bc.readDeadline, startTime) {
|
||||
return 0, os.ErrDeadlineExceeded
|
||||
}
|
||||
n, err := bc.br.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Write writes p to bc.
|
||||
//
|
||||
// Do not forget to call Flush if needed.
|
||||
func (bc *BufferedConn) Write(p []byte) (int, error) {
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
if deadlineExceeded(bc.writeDeadline, startTime) {
|
||||
return 0, os.ErrDeadlineExceeded
|
||||
}
|
||||
n, err := bc.bw.Write(p)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool {
|
||||
if deadline.IsZero() {
|
||||
return false
|
||||
}
|
||||
return currentTimestamp > uint64(deadline.Unix())
|
||||
}
|
||||
|
||||
// Close closes bc.
|
||||
func (bc *BufferedConn) Close() error {
|
||||
// Close the Conn at first. It is expected that all the required data
|
||||
// is already flushed to the Conn.
|
||||
err := bc.Conn.Close()
|
||||
bc.Conn = nil
|
||||
|
||||
if zr, ok := bc.br.(*zstd.Reader); ok {
|
||||
zr.Release()
|
||||
}
|
||||
bc.br = nil
|
||||
|
||||
if zw, ok := bc.bw.(*zstd.Writer); ok {
|
||||
// Do not call zw.Close(), since we already closed the underlying conn.
|
||||
zw.Release()
|
||||
}
|
||||
bc.bw = nil
|
||||
|
||||
bc.IsLegacy = false
|
||||
return err
|
||||
}
|
||||
|
||||
// Flush flushes internal write buffers to the underlying conn.
|
||||
func (bc *BufferedConn) Flush() error {
|
||||
return bc.bw.Flush()
|
||||
}
|
||||
318
lib/handshake/handshake.go
Normal file
318
lib/handshake/handshake.go
Normal file
@@ -0,0 +1,318 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.")
|
||||
|
||||
const (
|
||||
vminsertHelloLegacyVersion = "vminsert.02"
|
||||
vminsertHello = "vminsert.03"
|
||||
vmselectHello = "vmselect.01"
|
||||
|
||||
successResponse = "ok"
|
||||
)
|
||||
|
||||
// Func must perform handshake on the given c using the given compressionLevel.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
|
||||
|
||||
// VMInsertClientWithDialer performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// it uses provided dial func to establish connection to the server.
|
||||
// compressionLevel is a legacy option which defines the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial error: %w", err)
|
||||
}
|
||||
bc, err := vminsertClient(c, 0)
|
||||
if err == nil {
|
||||
return bc, nil
|
||||
}
|
||||
_ = c.Close()
|
||||
if !strings.Contains(err.Error(), "cannot read success response after sending hello") {
|
||||
return nil, err
|
||||
}
|
||||
// try to fallback to the prev non-RPC API version
|
||||
// we cannot re-use exist connection, since vmstorage already closed it
|
||||
c, err = dial()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial error: %w", err)
|
||||
}
|
||||
bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel)
|
||||
if err != nil {
|
||||
_ = c.Close()
|
||||
return nil, fmt.Errorf("legacy handshake error: %w", err)
|
||||
}
|
||||
bc.IsLegacy = true
|
||||
logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, vminsertHello, compressionLevel)
|
||||
}
|
||||
|
||||
// VMInsertClientWithHello performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// should be used for testing only
|
||||
func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, helloMsg, compressionLevel)
|
||||
}
|
||||
|
||||
// VMInsertServer performs server-side handshake for vminsert protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
|
||||
var isRPCSupported bool
|
||||
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
buf, err := readData(c, len(vminsertHello))
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762
|
||||
return errTCPHealthcheck
|
||||
}
|
||||
return fmt.Errorf("cannot read hello: %w", err)
|
||||
}
|
||||
isRPCSupported = string(buf) == vminsertHello
|
||||
if !isRPCSupported {
|
||||
// try to fallback to the previous protocol version
|
||||
if string(buf) != vminsertHelloLegacyVersion {
|
||||
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello)
|
||||
}
|
||||
logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc.IsLegacy = !isRPCSupported
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol
|
||||
// with legacy hello message
|
||||
//
|
||||
// should be used for testing only
|
||||
func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
|
||||
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
return readMessage(c, vminsertHelloLegacyVersion)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc.IsLegacy = true
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
// VMSelectClient performs client-side handshake for vmselect protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericClient(c, vmselectHello, compressionLevel)
|
||||
}
|
||||
|
||||
// VMSelectServer performs server-side handshake for vmselect protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, compressionLevel, func(c net.Conn) error {
|
||||
err := readMessage(c, vmselectHello)
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786
|
||||
return errTCPHealthcheck
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check
|
||||
// and was closed immediately after being established.
|
||||
//
|
||||
// This is expected behavior and can be safely ignored.
|
||||
var errTCPHealthcheck = fmt.Errorf("TCP health check connection – safe to ignore")
|
||||
|
||||
// IsTCPHealthcheck determines whether the provided error is a TCP health check
|
||||
func IsTCPHealthcheck(err error) bool {
|
||||
return errors.Is(err, errTCPHealthcheck)
|
||||
}
|
||||
|
||||
// IsClientNetworkError determines whether the provided error is a client-side network error,
|
||||
// such as io.EOF, io.ErrUnexpectedEOF, or a timeout.
|
||||
// These errors typically occur when a client disconnects abruptly or fails during the handshake,
|
||||
// and are generally non-actionable from the server point of view.
|
||||
// This function helps distinguish such errors from critical ones during the handshake process
|
||||
// and adjust logging accordingly.
|
||||
//
|
||||
// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880
|
||||
func IsClientNetworkError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return true
|
||||
}
|
||||
|
||||
if IsTimeoutNetworkError(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout.
|
||||
func IsTimeoutNetworkError(err error) bool {
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) {
|
||||
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set deadline: %w", err)
|
||||
}
|
||||
|
||||
if err := readHelloMessage(c); err != nil {
|
||||
return nil, fmt.Errorf("cannot read hello message : %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
|
||||
if err := c.SetDeadline(time.Time{}); err != nil {
|
||||
return nil, fmt.Errorf("cannot reset deadline: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set deadline: %w", err)
|
||||
}
|
||||
|
||||
if err := writeMessage(c, msg); err != nil {
|
||||
return nil, fmt.Errorf("cannot write hello: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
|
||||
if err := c.SetDeadline(time.Time{}); err != nil {
|
||||
return nil, fmt.Errorf("cannot reset deadline: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func writeIsCompressed(c net.Conn, isCompressed bool) error {
|
||||
var buf [1]byte
|
||||
if isCompressed {
|
||||
buf[0] = 1
|
||||
}
|
||||
return writeMessage(c, string(buf[:]))
|
||||
}
|
||||
|
||||
func readIsCompressed(c net.Conn) (bool, error) {
|
||||
buf, err := readData(c, 1)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
isCompressed := buf[0] != 0
|
||||
return isCompressed, nil
|
||||
}
|
||||
|
||||
func writeMessage(c net.Conn, msg string) error {
|
||||
if _, err := io.WriteString(c, msg); err != nil {
|
||||
return fmt.Errorf("cannot write %q to server: %w", msg, err)
|
||||
}
|
||||
if fc, ok := c.(flusher); ok {
|
||||
if err := fc.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot flush %q to server: %w", msg, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type flusher interface {
|
||||
Flush() error
|
||||
}
|
||||
|
||||
func readMessage(c net.Conn, msg string) error {
|
||||
buf, err := readData(c, len(msg))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if string(buf) != msg {
|
||||
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func readData(c net.Conn, dataLen int) ([]byte, error) {
|
||||
data := make([]byte, dataLen)
|
||||
if n, err := io.ReadFull(c, data); err != nil {
|
||||
return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
83
lib/handshake/handshake_test.go
Normal file
83
lib/handshake/handshake_test.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestVMInsertHandshake(t *testing.T) {
|
||||
testHandshake(t, vminsertClient, VMInsertServer)
|
||||
}
|
||||
|
||||
func TestVMSelectHandshake(t *testing.T) {
|
||||
testHandshake(t, VMSelectClient, VMSelectServer)
|
||||
}
|
||||
|
||||
func TestVMSelectServerTCPHealthcheck(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("cannot start listener: %s", err)
|
||||
}
|
||||
|
||||
c, err := net.Dial("tcp", ln.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("cannot dial: %s", err)
|
||||
}
|
||||
if err := c.Close(); err != nil {
|
||||
t.Fatalf("cannot close client conn: %s", err)
|
||||
}
|
||||
s, err := ln.Accept()
|
||||
if err != nil {
|
||||
t.Fatalf("cannot accept conn: %s", err)
|
||||
}
|
||||
if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) {
|
||||
t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
||||
t.Helper()
|
||||
|
||||
c, s := net.Pipe()
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
bcs, err := serverFunc(s, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on outer handshake: %w", err)
|
||||
return
|
||||
}
|
||||
bcc, err := clientFunc(bcs, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on inner handshake: %w", err)
|
||||
return
|
||||
}
|
||||
if bcc == nil {
|
||||
ch <- fmt.Errorf("expecting non-nil conn")
|
||||
return
|
||||
}
|
||||
ch <- nil
|
||||
}()
|
||||
|
||||
bcc, err := clientFunc(c, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on outer handshake: %s", err)
|
||||
}
|
||||
bcs, err := serverFunc(bcc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on inner handshake: %s", err)
|
||||
}
|
||||
if bcs == nil {
|
||||
t.Fatalf("expecting non-nil conn")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error on the server side: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,6 +90,77 @@ type MetricBlockRef struct {
|
||||
BlockRef *BlockRef
|
||||
}
|
||||
|
||||
// MetricBlock is a time series block for a single metric.
|
||||
type MetricBlock struct {
|
||||
// MetricName is metric name for the given Block.
|
||||
MetricName []byte
|
||||
|
||||
// Block is a block for the given MetricName
|
||||
Block Block
|
||||
}
|
||||
|
||||
// Marshal marshals MetricBlock to dst
|
||||
func (mb *MetricBlock) Marshal(dst []byte) []byte {
|
||||
dst = encoding.MarshalBytes(dst, mb.MetricName)
|
||||
return MarshalBlock(dst, &mb.Block)
|
||||
}
|
||||
|
||||
// CopyFrom copies src to mb.
|
||||
func (mb *MetricBlock) CopyFrom(src *MetricBlock) {
|
||||
mb.MetricName = append(mb.MetricName[:0], src.MetricName...)
|
||||
mb.Block.CopyFrom(&src.Block)
|
||||
}
|
||||
|
||||
// MarshalBlock marshals b to dst.
|
||||
//
|
||||
// b.MarshalData must be called on b before calling MarshalBlock.
|
||||
func MarshalBlock(dst []byte, b *Block) []byte {
|
||||
dst = b.bh.Marshal(dst)
|
||||
dst = encoding.MarshalBytes(dst, b.timestampsData)
|
||||
dst = encoding.MarshalBytes(dst, b.valuesData)
|
||||
return dst
|
||||
}
|
||||
|
||||
// Unmarshal unmarshals MetricBlock from src
|
||||
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
||||
mb.Block.Reset()
|
||||
mn, nSize := encoding.UnmarshalBytes(src)
|
||||
if nSize <= 0 {
|
||||
return src, fmt.Errorf("cannot unmarshal MetricName")
|
||||
}
|
||||
src = src[nSize:]
|
||||
mb.MetricName = append(mb.MetricName[:0], mn...)
|
||||
|
||||
return UnmarshalBlock(&mb.Block, src)
|
||||
}
|
||||
|
||||
// UnmarshalBlock unmarshal Block from src to dst.
|
||||
//
|
||||
// dst.UnmarshalData isn't called on the block.
|
||||
func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
|
||||
tail, err := dst.bh.Unmarshal(src)
|
||||
if err != nil {
|
||||
return tail, fmt.Errorf("cannot unmarshal blockHeader: %w", err)
|
||||
}
|
||||
src = tail
|
||||
|
||||
tds, nSize := encoding.UnmarshalBytes(src)
|
||||
if nSize <= 0 {
|
||||
return tail, fmt.Errorf("cannot unmarshal timestampsData")
|
||||
}
|
||||
src = src[nSize:]
|
||||
dst.timestampsData = append(dst.timestampsData[:0], tds...)
|
||||
|
||||
vd, nSize := encoding.UnmarshalBytes(src)
|
||||
if nSize <= 0 {
|
||||
return tail, fmt.Errorf("cannot unmarshal valuesData")
|
||||
}
|
||||
src = src[nSize:]
|
||||
dst.valuesData = append(dst.valuesData[:0], vd...)
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
// Search is a search for time series.
|
||||
type Search struct {
|
||||
// MetricBlockRef is updated with each Search.NextMetricBlock call.
|
||||
@@ -254,6 +325,17 @@ func (s *Search) NextMetricBlock() bool {
|
||||
|
||||
// SearchQuery is used for sending search queries from vmselect to vmstorage.
|
||||
type SearchQuery struct {
|
||||
AccountID uint32
|
||||
ProjectID uint32
|
||||
|
||||
// TenantTokens and IsMultiTenant is artificial fields
|
||||
// they're only exist at runtime and cannot be transferred
|
||||
// via network calls for keeping communication protocol compatibility
|
||||
// TODO:@f41gh7 introduce breaking change to the protocol later
|
||||
// and use TenantTokens instead of AccountID and ProjectID
|
||||
TenantTokens []TenantToken
|
||||
IsMultiTenant bool
|
||||
|
||||
// The time range for searching time series
|
||||
MinTimestamp int64
|
||||
MaxTimestamp int64
|
||||
@@ -290,6 +372,40 @@ func NewSearchQuery(start, end int64, tagFilterss [][]TagFilter, maxMetrics int)
|
||||
}
|
||||
}
|
||||
|
||||
// TenantToken represents a tenant (accountID, projectID) pair.
|
||||
type TenantToken struct {
|
||||
AccountID uint32
|
||||
ProjectID uint32
|
||||
}
|
||||
|
||||
// String returns string representation of t.
|
||||
func (t *TenantToken) String() string {
|
||||
return fmt.Sprintf("{accountID=%d, projectID=%d}", t.AccountID, t.ProjectID)
|
||||
}
|
||||
|
||||
// Marshal appends marshaled t to dst and returns the result.
|
||||
func (t *TenantToken) Marshal(dst []byte) []byte {
|
||||
dst = encoding.MarshalUint32(dst, t.AccountID)
|
||||
dst = encoding.MarshalUint32(dst, t.ProjectID)
|
||||
return dst
|
||||
}
|
||||
|
||||
// NewMultiTenantSearchQuery creates new search query for the given args.
|
||||
func NewMultiTenantSearchQuery(tenants []TenantToken, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery {
|
||||
if start < 0 {
|
||||
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553
|
||||
start = 0
|
||||
}
|
||||
return &SearchQuery{
|
||||
TenantTokens: tenants,
|
||||
MinTimestamp: start,
|
||||
MaxTimestamp: end,
|
||||
TagFilterss: tagFilterss,
|
||||
MaxMetrics: maxMetrics,
|
||||
IsMultiTenant: true,
|
||||
}
|
||||
}
|
||||
|
||||
// TagFilter represents a single tag filter from SearchQuery.
|
||||
type TagFilter struct {
|
||||
Key []byte
|
||||
@@ -387,7 +503,15 @@ func (sq *SearchQuery) String() string {
|
||||
}
|
||||
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
|
||||
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
|
||||
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
|
||||
if !sq.IsMultiTenant {
|
||||
return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end)
|
||||
}
|
||||
|
||||
tts := make([]string, len(sq.TenantTokens))
|
||||
for i, tt := range sq.TenantTokens {
|
||||
tts[i] = tt.String()
|
||||
}
|
||||
return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end)
|
||||
}
|
||||
|
||||
func tagFiltersToString(tfs []TagFilter) string {
|
||||
@@ -398,8 +522,9 @@ func tagFiltersToString(tfs []TagFilter) string {
|
||||
return "{" + strings.Join(a, ",") + "}"
|
||||
}
|
||||
|
||||
// Marshal appends marshaled sq to dst and returns the result.
|
||||
func (sq *SearchQuery) Marshal(dst []byte) []byte {
|
||||
// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result.
|
||||
// It is expected that TenantToken is already marshaled to dst.
|
||||
func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
|
||||
dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp)
|
||||
dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss)))
|
||||
@@ -409,11 +534,25 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte {
|
||||
dst = tagFilters[i].Marshal(dst)
|
||||
}
|
||||
}
|
||||
dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics))
|
||||
return dst
|
||||
}
|
||||
|
||||
// Unmarshal unmarshals sq from src and returns the tail.
|
||||
func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.AccountID = encoding.UnmarshalUint32(src)
|
||||
src = src[4:]
|
||||
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.ProjectID = encoding.UnmarshalUint32(src)
|
||||
src = src[4:]
|
||||
|
||||
sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}}
|
||||
minTs, nSize := encoding.UnmarshalVarInt64(src)
|
||||
if nSize <= 0 {
|
||||
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
|
||||
@@ -454,6 +593,12 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||
sq.TagFilterss[i] = tagFilters
|
||||
}
|
||||
|
||||
if len(src) < 4 {
|
||||
return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||
}
|
||||
sq.MaxMetrics = int(encoding.UnmarshalUint32(src))
|
||||
src = src[4:]
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,12 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
// Skip nil sq1.
|
||||
continue
|
||||
}
|
||||
buf = sq1.Marshal(buf[:0])
|
||||
tt := TenantToken{
|
||||
AccountID: sq1.AccountID,
|
||||
ProjectID: sq1.ProjectID,
|
||||
}
|
||||
buf = tt.Marshal(buf[:0])
|
||||
buf = sq1.MarshalWithoutTenant(buf)
|
||||
|
||||
tail, err := sq2.Unmarshal(buf)
|
||||
if err != nil {
|
||||
@@ -41,6 +46,12 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
if len(tail) > 0 {
|
||||
t.Fatalf("unexpected tail left after SearchQuery unmarshaling; tail (len=%d): %q", len(tail), tail)
|
||||
}
|
||||
if sq2.AccountID != sq1.AccountID {
|
||||
t.Fatalf("unexpected AccountID; got %d; want %d", sq2.AccountID, sq1.AccountID)
|
||||
}
|
||||
if sq2.ProjectID != sq1.ProjectID {
|
||||
t.Fatalf("unexpected ProjectID; got %d; want %d", sq2.ProjectID, sq1.ProjectID)
|
||||
}
|
||||
if sq1.MinTimestamp != sq2.MinTimestamp {
|
||||
t.Fatalf("unexpected MinTimestamp; got %d; want %d", sq2.MinTimestamp, sq1.MinTimestamp)
|
||||
}
|
||||
|
||||
@@ -328,6 +328,11 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
|
||||
return s
|
||||
}
|
||||
|
||||
// RetentionMsecs returns retentionMsecs for s.
|
||||
func (s *Storage) RetentionMsecs() int64 {
|
||||
return s.retentionMsecs
|
||||
}
|
||||
|
||||
var maxTSIDCacheSize int
|
||||
|
||||
// SetTSIDCacheSize overrides the default size of storage/tsid cache
|
||||
|
||||
68
lib/vmselectapi/api.go
Normal file
68
lib/vmselectapi/api.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package vmselectapi
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
)
|
||||
|
||||
// API must implement vmselect API.
|
||||
type API interface {
|
||||
// InitSearch initialize series search for the given sq.
|
||||
//
|
||||
// The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed.
|
||||
InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error)
|
||||
|
||||
// SearchMetricNames returns metric names matching the given sq.
|
||||
SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error)
|
||||
|
||||
// LabelValues returns values for labelName label acorss series matching the given sq.
|
||||
LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error)
|
||||
|
||||
// TagValueSuffixes returns tag value suffixes for the given args.
|
||||
TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error)
|
||||
|
||||
// LabelNames returns lable names for series matching the given sq.
|
||||
LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error)
|
||||
|
||||
// SeriesCount returns the number of series for the given (accountID, projectID).
|
||||
SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error)
|
||||
|
||||
// TSDBStatus returns tsdb status for the given sq.
|
||||
TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error)
|
||||
|
||||
// DeleteSeries deletes series matching the given sq.
|
||||
DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error)
|
||||
|
||||
// RegisterMetricNames registers the given mrs in the storage.
|
||||
RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error
|
||||
|
||||
// Tenants returns list of tenants in the storage on the given tr.
|
||||
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
|
||||
|
||||
// GetMetricNamesUsageStats returns statistics for metric names
|
||||
GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error)
|
||||
|
||||
// ResetMetricNamesUsageStats resets internal state of metric names tracker
|
||||
ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error
|
||||
|
||||
// GetMetadataRecords returns metrics metadata.
|
||||
GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error)
|
||||
}
|
||||
|
||||
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.
|
||||
//
|
||||
// MustClose must be called in order to free up allocated resources when BlockIterator is no longer needed.
|
||||
type BlockIterator interface {
|
||||
// NextBlock marshals next storage.MetricBlock into dst.
|
||||
//
|
||||
// It returns true on success, false on error or if no blocks to read.
|
||||
NextBlock(dst []byte) ([]byte, bool)
|
||||
|
||||
// MustClose frees up resources allocated by BlockIterator.
|
||||
MustClose()
|
||||
|
||||
// Error returns the last error occurred in NextBlock(), which returns false.
|
||||
Error() error
|
||||
}
|
||||
1248
lib/vmselectapi/server.go
Normal file
1248
lib/vmselectapi/server.go
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user