lib/storage: copy vmselect server from vmcluster to vmsingle

Work in progress: compiles but panics due to duplicate flags.

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev
2026-04-27 14:36:13 +02:00
parent 5f5a2109e8
commit f5d028d0e3
10 changed files with 2437 additions and 5 deletions

View File

@@ -12,8 +12,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -28,6 +27,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -35,6 +36,7 @@ var (
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter")
futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+
"The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*")
@@ -181,6 +183,12 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
})
metrics.RegisterSet(storageMetrics)
fs.RegisterPathFsMetrics(*DataPath)
var err error
vmselectSrv, err = servers.NewVMSelectServer(*vmselectAddr, strg)
if err != nil {
logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err)
}
}
var storageMetrics *metrics.Set
@@ -191,6 +199,8 @@ var storageMetrics *metrics.Set
// for proper graceful shutdown when Stop is called.
var Storage *storage.Storage
var vmselectSrv *vmselectapi.Server
// WG must be incremented before Storage call.
//
// Use syncwg instead of sync, since Add is called from concurrent goroutines.
@@ -329,6 +339,7 @@ func Stop() {
startTime := time.Now()
WG.WaitAndBlock()
stopStaleSnapshotsRemover()
vmselectSrv.MustStop()
Storage.MustClose()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())

View File

@@ -0,0 +1,362 @@
package servers
import (
"flag"
"fmt"
"net/http"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
)
var (
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect")
maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+
"may require high amounts of memory. See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
"when -search.maxConcurrentRequests limit is reached")
disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
"This reduces CPU usage at the cost of higher network bandwidth usage")
denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee")
)
var (
maxUniqueTimeseriesValue int
maxUniqueTimeseriesValueOnce sync.Once
)
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) {
api := &vmstorageAPI{
s: s,
}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
MaxConcurrentRequests: *maxConcurrentRequests,
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
MaxQueueDuration: *maxQueueDuration,
MaxQueueDurationFlagName: "search.maxQueueDuration",
}
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
}
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct {
s *storage.Storage
}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
if err := checkTimeRange(api.s, tr); err != nil {
return nil, err
}
maxMetrics := getMaxMetrics(sq.MaxMetrics)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
bi.MustClose()
return nil, err
}
return bi, nil
}
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
// TODO(rtm0): Return empty result if accountID, projectID do not match
// tenantID from flag.
suffixes, err := api.s.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
}
return suffixes, nil
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
// TODO(rtm0): Return 0 if accountID, projectID do not match tenantID from
// flag.
return api.s.GetSeriesCount(deadline)
}
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
// TODO(rtm0): Return the tenantID from flag.
return []string{"0:0"}, nil
// return api.s.SearchTenants(qt, tr, deadline)
}
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
return api.s.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
// TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not
// match tenantID from flag and sq is not multitenant.
tr := sq.GetTimeRange()
maxMetrics := sq.MaxMetrics
if maxMetrics <= 0 {
// fallback to maxUniqueTimeSeries if no limit is provided,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857
maxMetrics = GetMaxUniqueTimeSeries()
}
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return 0, err
}
if len(tfss) == 0 {
return 0, fmt.Errorf("missing tag filters")
}
return api.s.DeleteSeries(qt, tfss, maxMetrics)
}
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
return fmt.Errorf("not implemented")
}
func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
// TODO(rtm0): Return empty result if tt do not match tenantID from flag.
return api.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
}
func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
api.s.ResetMetricNamesStats(qt)
return nil
}
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters()
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := api.s.SearchGraphitePaths(qtChild, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
"see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}
func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
// TODO(rtm0): Return empty result if tt do not match tenantID from flag.
return api.s.GetMetadataRows(qt, limit, metricName), nil
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search
mb storage.MetricBlock
}
var blockIteratorsPool sync.Pool
func (bi *blockIterator) MustClose() {
bi.sr.MustClose()
bi.mb.MetricName = nil
bi.mb.Block.Reset()
blockIteratorsPool.Put(bi)
}
func getBlockIterator() *blockIterator {
v := blockIteratorsPool.Get()
if v == nil {
v = &blockIterator{}
}
return v.(*blockIterator)
}
func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) {
if !bi.sr.NextMetricBlock() {
return dst, false
}
mb := bi.mb
mb.MetricName = bi.sr.MetricBlockRef.MetricName
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
dst = mb.Marshal(dst[:0])
return dst, true
}
func (bi *blockIterator) Error() error {
return bi.sr.Error()
}
// checkTimeRange returns true if the given tr is denied for querying.
func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
if !*denyQueriesOutsideRetention {
return nil
}
retentionMsecs := s.RetentionMsecs()
minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionMsecs
if tr.MinTimestamp > minAllowedTimestamp {
return nil
}
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention",
&tr, float64(retentionMsecs)/(24*3600*1000)),
StatusCode: http.StatusServiceUnavailable,
}
}
func getMaxMetrics(searchQueryLimit int) int {
if searchQueryLimit <= 0 {
return GetMaxUniqueTimeSeries()
}
// searchQueryLimit cannot exceed `-search.maxUniqueTimeseries`
if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries {
searchQueryLimit = *maxUniqueTimeseries
}
return searchQueryLimit
}
// GetMaxUniqueTimeSeries returns `-search.maxUniqueTimeseries` or the auto-calculated value based on available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func GetMaxUniqueTimeSeries() int {
maxUniqueTimeseriesValueOnce.Do(func() {
maxUniqueTimeseriesValue = *maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(*maxConcurrentRequests, memory.Remaining())
}
})
return maxUniqueTimeseriesValue
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
return mts
}

View File

@@ -0,0 +1,52 @@
package servers
import (
"math"
"runtime"
"testing"
)
func TestCalculateMaxMetricsLimitByResource(t *testing.T) {
f := func(maxConcurrentRequest, remainingMemory, expect int) {
t.Helper()
maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory)
if maxMetricsLimit != expect {
t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect)
}
}
// Skip when GOARCH=386
if runtime.GOARCH != "386" {
// 8 CPU & 32 GiB
f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967)
// 4 CPU & 32 GiB
f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934)
}
// 2 CPU & 4 GiB
f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483)
// other edge cases
f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9)
f(4, 0, 0)
}
func TestGetMaxMetrics(t *testing.T) {
originalMaxUniqueTimeSeries := *maxUniqueTimeseries
defer func() {
*maxUniqueTimeseries = originalMaxUniqueTimeSeries
}()
f := func(searchQueryLimit, storageMaxUniqueTimeseries, expect int) {
t.Helper()
*maxUniqueTimeseries = storageMaxUniqueTimeseries
maxMetrics := getMaxMetrics(searchQueryLimit)
if maxMetrics != expect {
t.Fatalf("unexpected max metrics: got %d, want %d", maxMetrics, expect)
}
}
f(0, 1e6, 1e6)
f(2e6, 0, 2e6)
f(2e6, 1e6, 1e6)
}

View File

@@ -0,0 +1,140 @@
package handshake
import (
"bufio"
"fmt"
"io"
"net"
"os"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type bufferedWriter interface {
Write(p []byte) (int, error)
Flush() error
}
// BufferedConn is a net.Conn with Flush suport.
type BufferedConn struct {
net.Conn
// IsLegacy defines if BufferedConn operates in legacy mode
// and doesn't support RPC protocol
IsLegacy bool
br io.Reader
bw bufferedWriter
readDeadline time.Time
writeDeadline time.Time
}
const bufferSize = 64 * 1024
// newBufferedConn returns buffered connection with the given compression level.
func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn {
bc := &BufferedConn{
Conn: c,
}
if compressionLevel <= 0 {
bc.bw = bufio.NewWriterSize(c, bufferSize)
} else {
bc.bw = zstd.NewWriterLevel(c, compressionLevel)
}
if !isReadCompressed {
bc.br = bufio.NewReaderSize(c, bufferSize)
} else {
bc.br = zstd.NewReader(c)
}
return bc
}
// SetDeadline sets read and write deadlines for bc to t.
//
// Deadline is checked on each Read and Write call.
func (bc *BufferedConn) SetDeadline(t time.Time) error {
bc.readDeadline = t
bc.writeDeadline = t
return bc.Conn.SetDeadline(t)
}
// SetReadDeadline sets read deadline for bc to t.
//
// Deadline is checked on each Read call.
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
bc.readDeadline = t
return bc.Conn.SetReadDeadline(t)
}
// SetWriteDeadline sets write deadline for bc to t.
//
// Deadline is checked on each Write call.
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
bc.writeDeadline = t
return bc.Conn.SetWriteDeadline(t)
}
// Read reads up to len(p) from bc to p.
func (bc *BufferedConn) Read(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.readDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.br.Read(p)
if err != nil && err != io.EOF {
err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
// Write writes p to bc.
//
// Do not forget to call Flush if needed.
func (bc *BufferedConn) Write(p []byte) (int, error) {
startTime := fasttime.UnixTimestamp()
if deadlineExceeded(bc.writeDeadline, startTime) {
return 0, os.ErrDeadlineExceeded
}
n, err := bc.bw.Write(p)
if err != nil {
err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return n, err
}
func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool {
if deadline.IsZero() {
return false
}
return currentTimestamp > uint64(deadline.Unix())
}
// Close closes bc.
func (bc *BufferedConn) Close() error {
// Close the Conn at first. It is expected that all the required data
// is already flushed to the Conn.
err := bc.Conn.Close()
bc.Conn = nil
if zr, ok := bc.br.(*zstd.Reader); ok {
zr.Release()
}
bc.br = nil
if zw, ok := bc.bw.(*zstd.Writer); ok {
// Do not call zw.Close(), since we already closed the underlying conn.
zw.Release()
}
bc.bw = nil
bc.IsLegacy = false
return err
}
// Flush flushes internal write buffers to the underlying conn.
func (bc *BufferedConn) Flush() error {
return bc.bw.Flush()
}

318
lib/handshake/handshake.go Normal file
View File

@@ -0,0 +1,318 @@
package handshake
import (
"errors"
"flag"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.")
const (
vminsertHelloLegacyVersion = "vminsert.02"
vminsertHello = "vminsert.03"
vmselectHello = "vmselect.01"
successResponse = "ok"
)
// Func must perform handshake on the given c using the given compressionLevel.
//
// It must return BufferedConn wrapper for c on successful handshake.
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
// VMInsertClientWithDialer performs client-side handshake for vminsert protocol.
//
// it uses provided dial func to establish connection to the server.
// compressionLevel is a legacy option which defines the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) {
c, err := dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err := vminsertClient(c, 0)
if err == nil {
return bc, nil
}
_ = c.Close()
if !strings.Contains(err.Error(), "cannot read success response after sending hello") {
return nil, err
}
// try to fallback to the prev non-RPC API version
// we cannot re-use exist connection, since vmstorage already closed it
c, err = dial()
if err != nil {
return nil, fmt.Errorf("dial error: %w", err)
}
bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("legacy handshake error: %w", err)
}
bc.IsLegacy = true
logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
return bc, nil
}
func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vminsertHello, compressionLevel)
}
// VMInsertClientWithHello performs client-side handshake for vminsert protocol.
//
// should be used for testing only
func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, helloMsg, compressionLevel)
}
// VMInsertServer performs server-side handshake for vminsert protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
var isRPCSupported bool
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
buf, err := readData(c, len(vminsertHello))
if err != nil {
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762
return errTCPHealthcheck
}
return fmt.Errorf("cannot read hello: %w", err)
}
isRPCSupported = string(buf) == vminsertHello
if !isRPCSupported {
// try to fallback to the previous protocol version
if string(buf) != vminsertHelloLegacyVersion {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello)
}
logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr())
}
return nil
})
if err != nil {
return nil, err
}
bc.IsLegacy = !isRPCSupported
return bc, nil
}
// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol
// with legacy hello message
//
// should be used for testing only
func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) {
bc, err := genericServer(c, compressionLevel, func(c net.Conn) error {
return readMessage(c, vminsertHelloLegacyVersion)
})
if err != nil {
return nil, err
}
bc.IsLegacy = true
return bc, nil
}
// VMSelectClient performs client-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericClient(c, vmselectHello, compressionLevel)
}
// VMSelectServer performs server-side handshake for vmselect protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericServer(c, compressionLevel, func(c net.Conn) error {
err := readMessage(c, vmselectHello)
if errors.Is(err, io.EOF) {
// This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786
return errTCPHealthcheck
}
return err
})
}
// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check
// and was closed immediately after being established.
//
// This is expected behavior and can be safely ignored.
var errTCPHealthcheck = fmt.Errorf("TCP health check connection safe to ignore")
// IsTCPHealthcheck determines whether the provided error is a TCP health check
func IsTCPHealthcheck(err error) bool {
return errors.Is(err, errTCPHealthcheck)
}
// IsClientNetworkError determines whether the provided error is a client-side network error,
// such as io.EOF, io.ErrUnexpectedEOF, or a timeout.
// These errors typically occur when a client disconnects abruptly or fails during the handshake,
// and are generally non-actionable from the server point of view.
// This function helps distinguish such errors from critical ones during the handshake process
// and adjust logging accordingly.
//
// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880
func IsClientNetworkError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
if IsTimeoutNetworkError(err) {
return true
}
if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") {
return true
}
return false
}
// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout.
func IsTimeoutNetworkError(err error) bool {
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return true
}
return false
}
func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := readHelloMessage(c); err != nil {
return nil, fmt.Errorf("cannot read hello message : %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil {
return nil, fmt.Errorf("cannot set deadline: %w", err)
}
if err := writeMessage(c, msg); err != nil {
return nil, fmt.Errorf("cannot write hello: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
if err := c.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func writeIsCompressed(c net.Conn, isCompressed bool) error {
var buf [1]byte
if isCompressed {
buf[0] = 1
}
return writeMessage(c, string(buf[:]))
}
func readIsCompressed(c net.Conn) (bool, error) {
buf, err := readData(c, 1)
if err != nil {
return false, err
}
isCompressed := buf[0] != 0
return isCompressed, nil
}
func writeMessage(c net.Conn, msg string) error {
if _, err := io.WriteString(c, msg); err != nil {
return fmt.Errorf("cannot write %q to server: %w", msg, err)
}
if fc, ok := c.(flusher); ok {
if err := fc.Flush(); err != nil {
return fmt.Errorf("cannot flush %q to server: %w", msg, err)
}
}
return nil
}
type flusher interface {
Flush() error
}
func readMessage(c net.Conn, msg string) error {
buf, err := readData(c, len(msg))
if err != nil {
return err
}
if string(buf) != msg {
return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg)
}
return nil
}
func readData(c net.Conn, dataLen int) ([]byte, error) {
data := make([]byte, dataLen)
if n, err := io.ReadFull(c, data); err != nil {
return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n)
}
return data, nil
}

View File

@@ -0,0 +1,83 @@
package handshake
import (
"fmt"
"net"
"testing"
"time"
)
func TestVMInsertHandshake(t *testing.T) {
testHandshake(t, vminsertClient, VMInsertServer)
}
func TestVMSelectHandshake(t *testing.T) {
testHandshake(t, VMSelectClient, VMSelectServer)
}
func TestVMSelectServerTCPHealthcheck(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("cannot start listener: %s", err)
}
c, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatalf("cannot dial: %s", err)
}
if err := c.Close(); err != nil {
t.Fatalf("cannot close client conn: %s", err)
}
s, err := ln.Accept()
if err != nil {
t.Fatalf("cannot accept conn: %s", err)
}
if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) {
t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err)
}
}
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
t.Helper()
c, s := net.Pipe()
ch := make(chan error, 1)
go func() {
bcs, err := serverFunc(s, 3)
if err != nil {
ch <- fmt.Errorf("error on outer handshake: %w", err)
return
}
bcc, err := clientFunc(bcs, 3)
if err != nil {
ch <- fmt.Errorf("error on inner handshake: %w", err)
return
}
if bcc == nil {
ch <- fmt.Errorf("expecting non-nil conn")
return
}
ch <- nil
}()
bcc, err := clientFunc(c, 0)
if err != nil {
t.Fatalf("error on outer handshake: %s", err)
}
bcs, err := serverFunc(bcc, 0)
if err != nil {
t.Fatalf("error on inner handshake: %s", err)
}
if bcs == nil {
t.Fatalf("expecting non-nil conn")
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error on the server side: %s", err)
}
}
}

View File

@@ -90,6 +90,77 @@ type MetricBlockRef struct {
BlockRef *BlockRef
}
// MetricBlock is a time series block for a single metric.
type MetricBlock struct {
// MetricName is metric name for the given Block.
MetricName []byte
// Block is a block for the given MetricName
Block Block
}
// Marshal marshals MetricBlock to dst
func (mb *MetricBlock) Marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mb.MetricName)
return MarshalBlock(dst, &mb.Block)
}
// CopyFrom copies src to mb.
func (mb *MetricBlock) CopyFrom(src *MetricBlock) {
mb.MetricName = append(mb.MetricName[:0], src.MetricName...)
mb.Block.CopyFrom(&src.Block)
}
// MarshalBlock marshals b to dst.
//
// b.MarshalData must be called on b before calling MarshalBlock.
func MarshalBlock(dst []byte, b *Block) []byte {
dst = b.bh.Marshal(dst)
dst = encoding.MarshalBytes(dst, b.timestampsData)
dst = encoding.MarshalBytes(dst, b.valuesData)
return dst
}
// Unmarshal unmarshals MetricBlock from src
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
mb.Block.Reset()
mn, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MetricName")
}
src = src[nSize:]
mb.MetricName = append(mb.MetricName[:0], mn...)
return UnmarshalBlock(&mb.Block, src)
}
// UnmarshalBlock unmarshal Block from src to dst.
//
// dst.UnmarshalData isn't called on the block.
func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
tail, err := dst.bh.Unmarshal(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal blockHeader: %w", err)
}
src = tail
tds, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return tail, fmt.Errorf("cannot unmarshal timestampsData")
}
src = src[nSize:]
dst.timestampsData = append(dst.timestampsData[:0], tds...)
vd, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return tail, fmt.Errorf("cannot unmarshal valuesData")
}
src = src[nSize:]
dst.valuesData = append(dst.valuesData[:0], vd...)
return src, nil
}
// Search is a search for time series.
type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call.
@@ -254,6 +325,17 @@ func (s *Search) NextMetricBlock() bool {
// SearchQuery is used for sending search queries from vmselect to vmstorage.
type SearchQuery struct {
AccountID uint32
ProjectID uint32
// TenantTokens and IsMultiTenant is artificial fields
// they're only exist at runtime and cannot be transferred
// via network calls for keeping communication protocol compatibility
// TODO:@f41gh7 introduce breaking change to the protocol later
// and use TenantTokens instead of AccountID and ProjectID
TenantTokens []TenantToken
IsMultiTenant bool
// The time range for searching time series
MinTimestamp int64
MaxTimestamp int64
@@ -290,6 +372,40 @@ func NewSearchQuery(start, end int64, tagFilterss [][]TagFilter, maxMetrics int)
}
}
// TenantToken represents a tenant (accountID, projectID) pair.
type TenantToken struct {
AccountID uint32
ProjectID uint32
}
// String returns string representation of t.
func (t *TenantToken) String() string {
return fmt.Sprintf("{accountID=%d, projectID=%d}", t.AccountID, t.ProjectID)
}
// Marshal appends marshaled t to dst and returns the result.
func (t *TenantToken) Marshal(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, t.AccountID)
dst = encoding.MarshalUint32(dst, t.ProjectID)
return dst
}
// NewMultiTenantSearchQuery creates new search query for the given args.
func NewMultiTenantSearchQuery(tenants []TenantToken, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery {
if start < 0 {
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553
start = 0
}
return &SearchQuery{
TenantTokens: tenants,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
MaxMetrics: maxMetrics,
IsMultiTenant: true,
}
}
// TagFilter represents a single tag filter from SearchQuery.
type TagFilter struct {
Key []byte
@@ -387,7 +503,15 @@ func (sq *SearchQuery) String() string {
}
start := TimestampToHumanReadableFormat(sq.MinTimestamp)
end := TimestampToHumanReadableFormat(sq.MaxTimestamp)
return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end)
if !sq.IsMultiTenant {
return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end)
}
tts := make([]string, len(sq.TenantTokens))
for i, tt := range sq.TenantTokens {
tts[i] = tt.String()
}
return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end)
}
func tagFiltersToString(tfs []TagFilter) string {
@@ -398,8 +522,9 @@ func tagFiltersToString(tfs []TagFilter) string {
return "{" + strings.Join(a, ",") + "}"
}
// Marshal appends marshaled sq to dst and returns the result.
func (sq *SearchQuery) Marshal(dst []byte) []byte {
// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result.
// It is expected that TenantToken is already marshaled to dst.
func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte {
dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp)
dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss)))
@@ -409,11 +534,25 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte {
dst = tagFilters[i].Marshal(dst)
}
}
dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics))
return dst
}
// Unmarshal unmarshals sq from src and returns the tail.
func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.AccountID = encoding.UnmarshalUint32(src)
src = src[4:]
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.ProjectID = encoding.UnmarshalUint32(src)
src = src[4:]
sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}}
minTs, nSize := encoding.UnmarshalVarInt64(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
@@ -454,6 +593,12 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
sq.TagFilterss[i] = tagFilters
}
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4)
}
sq.MaxMetrics = int(encoding.UnmarshalUint32(src))
src = src[4:]
return src, nil
}

View File

@@ -325,6 +325,11 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
return s
}
// RetentionMsecs returns retentionMsecs for s.
func (s *Storage) RetentionMsecs() int64 {
return s.retentionMsecs
}
var maxTSIDCacheSize int
// SetTSIDCacheSize overrides the default size of storage/tsid cache

68
lib/vmselectapi/api.go Normal file
View File

@@ -0,0 +1,68 @@
package vmselectapi
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
)
// API must implement vmselect API.
type API interface {
// InitSearch initialize series search for the given sq.
//
// The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed.
InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error)
// SearchMetricNames returns metric names matching the given sq.
SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error)
// LabelValues returns values for labelName label acorss series matching the given sq.
LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error)
// TagValueSuffixes returns tag value suffixes for the given args.
TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error)
// LabelNames returns lable names for series matching the given sq.
LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error)
// SeriesCount returns the number of series for the given (accountID, projectID).
SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error)
// TSDBStatus returns tsdb status for the given sq.
TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error)
// DeleteSeries deletes series matching the given sq.
DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error)
// RegisterMetricNames registers the given mrs in the storage.
RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error
// Tenants returns list of tenants in the storage on the given tr.
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
// GetMetricNamesUsageStats returns statistics for metric names
GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error)
// ResetMetricNamesUsageStats resets internal state of metric names tracker
ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error
// GetMetadataRecords returns metrics metadata.
GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error)
}
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.
//
// MustClose must be called in order to free up allocated resources when BlockIterator is no longer needed.
type BlockIterator interface {
// NextBlock marshals next storage.MetricBlock into dst.
//
// It returns true on success, false on error or if no blocks to read.
NextBlock(dst []byte) ([]byte, bool)
// MustClose frees up resources allocated by BlockIterator.
MustClose()
// Error returns the last error occurred in NextBlock(), which returns false.
Error() error
}

1248
lib/vmselectapi/server.go Normal file

File diff suppressed because it is too large Load Diff