diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 8a0d12cc98..aa3c265cfe 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -12,8 +12,7 @@ import ( "sync" "time" - "github.com/VictoriaMetrics/metrics" - + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/servers" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -28,6 +27,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi" + "github.com/VictoriaMetrics/metrics" ) var ( @@ -35,6 +36,7 @@ var ( "See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention. See also -retentionFilter") futureRetention = flagutil.NewRetentionDuration("futureRetention", "2d", "Data with timestamps bigger than now+futureRetention is automatically deleted. "+ "The minimum futureRetention is 2 days. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention") + vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flagutil.NewPassword("snapshotAuthKey", "authKey, which must be passed in query string to /snapshot* pages. It overrides -httpAuth.*") forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*") forceFlushAuthKey = flagutil.NewPassword("forceFlushAuthKey", "authKey, which must be passed in query string to /internal/force_flush pages. It overrides -httpAuth.*") @@ -181,6 +183,12 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { }) metrics.RegisterSet(storageMetrics) fs.RegisterPathFsMetrics(*DataPath) + + var err error + vmselectSrv, err = servers.NewVMSelectServer(*vmselectAddr, strg) + if err != nil { + logger.Fatalf("cannot create a server with -vmselectAddr=%s: %s", *vmselectAddr, err) + } } var storageMetrics *metrics.Set @@ -191,6 +199,8 @@ var storageMetrics *metrics.Set // for proper graceful shutdown when Stop is called. var Storage *storage.Storage +var vmselectSrv *vmselectapi.Server + // WG must be incremented before Storage call. // // Use syncwg instead of sync, since Add is called from concurrent goroutines. @@ -329,6 +339,7 @@ func Stop() { startTime := time.Now() WG.WaitAndBlock() stopStaleSnapshotsRemover() + vmselectSrv.MustStop() Storage.MustClose() logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go new file mode 100644 index 0000000000..80eeb89d36 --- /dev/null +++ b/app/vmstorage/servers/vmselect.go @@ -0,0 +1,362 @@ +package servers + +import ( + "flag" + "fmt" + "net/http" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi" +) + +var ( + maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+ + "This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect") + maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+ + "See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration") + maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+ + "See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration") + maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find") + maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+ + "the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+ + "may require high amounts of memory. See also -search.maxQueueDuration") + maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+ + "when -search.maxConcurrentRequests limit is reached") + + disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+ + "This reduces CPU usage at the cost of higher network bandwidth usage") + denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+ + "When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+ + "This may be useful when multiple data sources with distinct retentions are hidden behind query-tee") +) + +var ( + maxUniqueTimeseriesValue int + maxUniqueTimeseriesValueOnce sync.Once +) + +// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s. +func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) { + api := &vmstorageAPI{ + s: s, + } + limits := vmselectapi.Limits{ + MaxLabelNames: *maxTagKeys, + MaxLabelValues: *maxTagValues, + MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch, + MaxConcurrentRequests: *maxConcurrentRequests, + MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests", + MaxQueueDuration: *maxQueueDuration, + MaxQueueDurationFlagName: "search.maxQueueDuration", + } + return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression) +} + +// vmstorageAPI impelements vmselectapi.API +type vmstorageAPI struct { + s *storage.Storage +} + +func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + if err := checkTimeRange(api.s, tr); err != nil { + return nil, err + } + maxMetrics := getMaxMetrics(sq.MaxMetrics) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if len(tfss) == 0 { + return nil, fmt.Errorf("missing tag filters") + } + bi := getBlockIterator() + bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline) + if err := bi.sr.Error(); err != nil { + bi.MustClose() + return nil, err + } + return bi, nil +} + +func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + maxMetrics := sq.MaxMetrics + if maxMetrics <= 0 { + // fallback to maxUniqueTimeSeries if no limit is provided, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857 + maxMetrics = GetMaxUniqueTimeSeries() + } + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if len(tfss) == 0 { + return nil, fmt.Errorf("missing tag filters") + } + return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) +} + +func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + maxMetrics := sq.MaxMetrics + if maxMetrics <= 0 { + // fallback to maxUniqueTimeSeries if no limit is provided, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857 + maxMetrics = GetMaxUniqueTimeSeries() + } + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + return api.s.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) +} + +func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, + maxSuffixes int, deadline uint64) ([]string, error) { + // TODO(rtm0): Return empty result if accountID, projectID do not match + // tenantID from flag. + + suffixes, err := api.s.SearchTagValueSuffixes(qt, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) + if err != nil { + return nil, err + } + if len(suffixes) >= maxSuffixes { + return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+ + "either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes) + } + return suffixes, nil +} + +func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + maxMetrics := sq.MaxMetrics + if maxMetrics <= 0 { + // fallback to maxUniqueTimeSeries if no limit is provided, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857 + maxMetrics = GetMaxUniqueTimeSeries() + } + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + return api.s.SearchLabelNames(qt, tfss, tr, maxLabelNames, maxMetrics, deadline) +} + +func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) { + // TODO(rtm0): Return 0 if accountID, projectID do not match tenantID from + // flag. + return api.s.GetSeriesCount(deadline) +} + +func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) { + // TODO(rtm0): Return the tenantID from flag. + return []string{"0:0"}, nil + // return api.s.SearchTenants(qt, tr, deadline) +} + +func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + maxMetrics := sq.MaxMetrics + if maxMetrics <= 0 { + // fallback to maxUniqueTimeSeries if no limit is provided, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857 + maxMetrics = GetMaxUniqueTimeSeries() + } + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000) + return api.s.GetTSDBStatus(qt, tfss, date, focusLabel, topN, maxMetrics, deadline) +} + +func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) { + // TODO(rtm0): Return empty result if sq.AccountID, sq.ProjectID do not + // match tenantID from flag and sq is not multitenant. + + tr := sq.GetTimeRange() + maxMetrics := sq.MaxMetrics + if maxMetrics <= 0 { + // fallback to maxUniqueTimeSeries if no limit is provided, + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7857 + maxMetrics = GetMaxUniqueTimeSeries() + } + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return 0, err + } + if len(tfss) == 0 { + return 0, fmt.Errorf("missing tag filters") + } + return api.s.DeleteSeries(qt, tfss, maxMetrics) +} + +func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error { + return fmt.Errorf("not implemented") +} + +func (api *vmstorageAPI) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) { + // TODO(rtm0): Return empty result if tt do not match tenantID from flag. + return api.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil +} + +func (api *vmstorageAPI) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error { + api.s.ResetMetricNamesStats(qt) + return nil +} + +func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) { + tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss)) + for _, tagFilters := range sq.TagFilterss { + tfs := storage.NewTagFilters() + for i := range tagFilters { + tf := &tagFilters[i] + if string(tf.Key) == "__graphite__" { + query := tf.Value + qtChild := qt.NewChild("searching for series matching __graphite__=%q", query) + paths, err := api.s.SearchGraphitePaths(qtChild, tr, query, maxMetrics, deadline) + qtChild.Donef("found %d series", len(paths)) + if err != nil { + return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) + } + if len(paths) >= maxMetrics { + return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+ + "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+ + "see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", maxMetrics, query) + } + tfs.AddGraphiteQuery(query, paths, tf.IsNegative) + continue + } + if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { + return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err) + } + } + tfss = append(tfss, tfs) + } + return tfss, nil +} + +func (api *vmstorageAPI) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) { + // TODO(rtm0): Return empty result if tt do not match tenantID from flag. + return api.s.GetMetadataRows(qt, limit, metricName), nil +} + +// blockIterator implements vmselectapi.BlockIterator +type blockIterator struct { + sr storage.Search + mb storage.MetricBlock +} + +var blockIteratorsPool sync.Pool + +func (bi *blockIterator) MustClose() { + bi.sr.MustClose() + bi.mb.MetricName = nil + bi.mb.Block.Reset() + blockIteratorsPool.Put(bi) +} + +func getBlockIterator() *blockIterator { + v := blockIteratorsPool.Get() + if v == nil { + v = &blockIterator{} + } + return v.(*blockIterator) +} + +func (bi *blockIterator) NextBlock(dst []byte) ([]byte, bool) { + if !bi.sr.NextMetricBlock() { + return dst, false + } + mb := bi.mb + mb.MetricName = bi.sr.MetricBlockRef.MetricName + bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block) + dst = mb.Marshal(dst[:0]) + return dst, true +} + +func (bi *blockIterator) Error() error { + return bi.sr.Error() +} + +// checkTimeRange returns true if the given tr is denied for querying. +func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { + if !*denyQueriesOutsideRetention { + return nil + } + retentionMsecs := s.RetentionMsecs() + minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionMsecs + if tr.MinTimestamp > minAllowedTimestamp { + return nil + } + return &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention", + &tr, float64(retentionMsecs)/(24*3600*1000)), + StatusCode: http.StatusServiceUnavailable, + } +} + +func getMaxMetrics(searchQueryLimit int) int { + if searchQueryLimit <= 0 { + return GetMaxUniqueTimeSeries() + } + // searchQueryLimit cannot exceed `-search.maxUniqueTimeseries` + if *maxUniqueTimeseries != 0 && searchQueryLimit > *maxUniqueTimeseries { + searchQueryLimit = *maxUniqueTimeseries + } + return searchQueryLimit +} + +// GetMaxUniqueTimeSeries returns `-search.maxUniqueTimeseries` or the auto-calculated value based on available resources. +// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing. +func GetMaxUniqueTimeSeries() int { + maxUniqueTimeseriesValueOnce.Do(func() { + maxUniqueTimeseriesValue = *maxUniqueTimeseries + if maxUniqueTimeseriesValue <= 0 { + maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(*maxConcurrentRequests, memory.Remaining()) + } + }) + return maxUniqueTimeseriesValue +} + +// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources. +func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int { + if maxConcurrentRequests <= 0 { + // This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`. + // In such cases, fallback to unlimited. + logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests) + return 2e9 + } + + // Calculate the max metrics limit for a single request in the worst-case concurrent scenario. + // The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes. + mts := remainingMemory / 200 / maxConcurrentRequests + logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory) + return mts +} diff --git a/app/vmstorage/servers/vmselect_test.go b/app/vmstorage/servers/vmselect_test.go new file mode 100644 index 0000000000..845db08ebc --- /dev/null +++ b/app/vmstorage/servers/vmselect_test.go @@ -0,0 +1,52 @@ +package servers + +import ( + "math" + "runtime" + "testing" +) + +func TestCalculateMaxMetricsLimitByResource(t *testing.T) { + f := func(maxConcurrentRequest, remainingMemory, expect int) { + t.Helper() + maxMetricsLimit := calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequest, remainingMemory) + if maxMetricsLimit != expect { + t.Fatalf("unexpected max metrics limit: got %d, want %d", maxMetricsLimit, expect) + } + } + + // Skip when GOARCH=386 + if runtime.GOARCH != "386" { + // 8 CPU & 32 GiB + f(16, int(math.Round(32*1024*1024*1024*0.4)), 4294967) + // 4 CPU & 32 GiB + f(8, int(math.Round(32*1024*1024*1024*0.4)), 8589934) + } + + // 2 CPU & 4 GiB + f(4, int(math.Round(4*1024*1024*1024*0.4)), 2147483) + + // other edge cases + f(0, int(math.Round(4*1024*1024*1024*0.4)), 2e9) + f(4, 0, 0) + +} + +func TestGetMaxMetrics(t *testing.T) { + originalMaxUniqueTimeSeries := *maxUniqueTimeseries + defer func() { + *maxUniqueTimeseries = originalMaxUniqueTimeSeries + }() + f := func(searchQueryLimit, storageMaxUniqueTimeseries, expect int) { + t.Helper() + *maxUniqueTimeseries = storageMaxUniqueTimeseries + maxMetrics := getMaxMetrics(searchQueryLimit) + if maxMetrics != expect { + t.Fatalf("unexpected max metrics: got %d, want %d", maxMetrics, expect) + } + } + + f(0, 1e6, 1e6) + f(2e6, 0, 2e6) + f(2e6, 1e6, 1e6) +} diff --git a/lib/handshake/buffered_conn.go b/lib/handshake/buffered_conn.go new file mode 100644 index 0000000000..95600e6a09 --- /dev/null +++ b/lib/handshake/buffered_conn.go @@ -0,0 +1,140 @@ +package handshake + +import ( + "bufio" + "fmt" + "io" + "net" + "os" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +type bufferedWriter interface { + Write(p []byte) (int, error) + Flush() error +} + +// BufferedConn is a net.Conn with Flush suport. +type BufferedConn struct { + net.Conn + + // IsLegacy defines if BufferedConn operates in legacy mode + // and doesn't support RPC protocol + IsLegacy bool + + br io.Reader + bw bufferedWriter + + readDeadline time.Time + writeDeadline time.Time +} + +const bufferSize = 64 * 1024 + +// newBufferedConn returns buffered connection with the given compression level. +func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *BufferedConn { + bc := &BufferedConn{ + Conn: c, + } + if compressionLevel <= 0 { + bc.bw = bufio.NewWriterSize(c, bufferSize) + } else { + bc.bw = zstd.NewWriterLevel(c, compressionLevel) + } + if !isReadCompressed { + bc.br = bufio.NewReaderSize(c, bufferSize) + } else { + bc.br = zstd.NewReader(c) + } + return bc +} + +// SetDeadline sets read and write deadlines for bc to t. +// +// Deadline is checked on each Read and Write call. +func (bc *BufferedConn) SetDeadline(t time.Time) error { + bc.readDeadline = t + bc.writeDeadline = t + return bc.Conn.SetDeadline(t) +} + +// SetReadDeadline sets read deadline for bc to t. +// +// Deadline is checked on each Read call. +func (bc *BufferedConn) SetReadDeadline(t time.Time) error { + bc.readDeadline = t + return bc.Conn.SetReadDeadline(t) +} + +// SetWriteDeadline sets write deadline for bc to t. +// +// Deadline is checked on each Write call. +func (bc *BufferedConn) SetWriteDeadline(t time.Time) error { + bc.writeDeadline = t + return bc.Conn.SetWriteDeadline(t) +} + +// Read reads up to len(p) from bc to p. +func (bc *BufferedConn) Read(p []byte) (int, error) { + startTime := fasttime.UnixTimestamp() + if deadlineExceeded(bc.readDeadline, startTime) { + return 0, os.ErrDeadlineExceeded + } + n, err := bc.br.Read(p) + if err != nil && err != io.EOF { + err = fmt.Errorf("cannot read data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + return n, err +} + +// Write writes p to bc. +// +// Do not forget to call Flush if needed. +func (bc *BufferedConn) Write(p []byte) (int, error) { + startTime := fasttime.UnixTimestamp() + if deadlineExceeded(bc.writeDeadline, startTime) { + return 0, os.ErrDeadlineExceeded + } + n, err := bc.bw.Write(p) + if err != nil { + err = fmt.Errorf("cannot write data in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err) + } + return n, err +} + +func deadlineExceeded(deadline time.Time, currentTimestamp uint64) bool { + if deadline.IsZero() { + return false + } + return currentTimestamp > uint64(deadline.Unix()) +} + +// Close closes bc. +func (bc *BufferedConn) Close() error { + // Close the Conn at first. It is expected that all the required data + // is already flushed to the Conn. + err := bc.Conn.Close() + bc.Conn = nil + + if zr, ok := bc.br.(*zstd.Reader); ok { + zr.Release() + } + bc.br = nil + + if zw, ok := bc.bw.(*zstd.Writer); ok { + // Do not call zw.Close(), since we already closed the underlying conn. + zw.Release() + } + bc.bw = nil + + bc.IsLegacy = false + return err +} + +// Flush flushes internal write buffers to the underlying conn. +func (bc *BufferedConn) Flush() error { + return bc.bw.Flush() +} diff --git a/lib/handshake/handshake.go b/lib/handshake/handshake.go new file mode 100644 index 0000000000..d216c6103a --- /dev/null +++ b/lib/handshake/handshake.go @@ -0,0 +1,318 @@ +package handshake + +import ( + "errors" + "flag" + "fmt" + "io" + "net" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +var rpcHandshakeTimeout = flag.Duration("rpc.handshakeTimeout", 5*time.Second, "Timeout for RPC handshake between vminsert/vmselect and vmstorage. Increase this value if transient handshake failures occur. See https://docs.victoriametrics.com/victoriametrics/troubleshooting/#cluster-instability section for more details.") + +const ( + vminsertHelloLegacyVersion = "vminsert.02" + vminsertHello = "vminsert.03" + vmselectHello = "vmselect.01" + + successResponse = "ok" +) + +// Func must perform handshake on the given c using the given compressionLevel. +// +// It must return BufferedConn wrapper for c on successful handshake. +type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error) + +// VMInsertClientWithDialer performs client-side handshake for vminsert protocol. +// +// it uses provided dial func to establish connection to the server. +// compressionLevel is a legacy option which defines the level used for compression of the data sent +// to the server. +// compressionLevel <= 0 means 'no compression' +func VMInsertClientWithDialer(dial func() (net.Conn, error), compressionLevel int) (*BufferedConn, error) { + c, err := dial() + if err != nil { + return nil, fmt.Errorf("dial error: %w", err) + } + bc, err := vminsertClient(c, 0) + if err == nil { + return bc, nil + } + _ = c.Close() + if !strings.Contains(err.Error(), "cannot read success response after sending hello") { + return nil, err + } + // try to fallback to the prev non-RPC API version + // we cannot re-use exist connection, since vmstorage already closed it + c, err = dial() + if err != nil { + return nil, fmt.Errorf("dial error: %w", err) + } + bc, err = genericClient(c, vminsertHelloLegacyVersion, compressionLevel) + if err != nil { + _ = c.Close() + return nil, fmt.Errorf("legacy handshake error: %w", err) + } + bc.IsLegacy = true + logger.Infof("server=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr()) + return bc, nil +} + +func vminsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) { + return genericClient(c, vminsertHello, compressionLevel) +} + +// VMInsertClientWithHello performs client-side handshake for vminsert protocol. +// +// should be used for testing only +func VMInsertClientWithHello(c net.Conn, helloMsg string, compressionLevel int) (*BufferedConn, error) { + return genericClient(c, helloMsg, compressionLevel) +} + +// VMInsertServer performs server-side handshake for vminsert protocol. +// +// compressionLevel is the level used for compression of the data sent +// to the client. +// compressionLevel <= 0 means 'no compression' +func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) { + + var isRPCSupported bool + bc, err := genericServer(c, compressionLevel, func(c net.Conn) error { + buf, err := readData(c, len(vminsertHello)) + if err != nil { + if errors.Is(err, io.EOF) { + // This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 + return errTCPHealthcheck + } + return fmt.Errorf("cannot read hello: %w", err) + } + isRPCSupported = string(buf) == vminsertHello + if !isRPCSupported { + // try to fallback to the previous protocol version + if string(buf) != vminsertHelloLegacyVersion { + return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, vminsertHello) + } + logger.Infof("client=%q doesn't support new RPC version, fallback to the legacy format", c.RemoteAddr()) + } + return nil + }) + if err != nil { + return nil, err + } + bc.IsLegacy = !isRPCSupported + return bc, nil +} + +// VMInsertServerWithLegacyHello performs server-side handshake for vminsert protocol +// with legacy hello message +// +// should be used for testing only +func VMInsertServerWithLegacyHello(c net.Conn, compressionLevel int) (*BufferedConn, error) { + + bc, err := genericServer(c, compressionLevel, func(c net.Conn) error { + return readMessage(c, vminsertHelloLegacyVersion) + }) + if err != nil { + return nil, err + } + bc.IsLegacy = true + return bc, nil +} + +// VMSelectClient performs client-side handshake for vmselect protocol. +// +// compressionLevel is the level used for compression of the data sent +// to the server. +// compressionLevel <= 0 means 'no compression' +func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) { + return genericClient(c, vmselectHello, compressionLevel) +} + +// VMSelectServer performs server-side handshake for vmselect protocol. +// +// compressionLevel is the level used for compression of the data sent +// to the client. +// compressionLevel <= 0 means 'no compression' +func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) { + return genericServer(c, compressionLevel, func(c net.Conn) error { + err := readMessage(c, vmselectHello) + if errors.Is(err, io.EOF) { + // This is likely a TCP healthcheck, which must be ignored in order to prevent logs pollution. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1762 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10786 + return errTCPHealthcheck + } + return err + }) +} + +// errTCPHealthcheck indicates that the connection was opened as part of a TCP health check +// and was closed immediately after being established. +// +// This is expected behavior and can be safely ignored. +var errTCPHealthcheck = fmt.Errorf("TCP health check connection – safe to ignore") + +// IsTCPHealthcheck determines whether the provided error is a TCP health check +func IsTCPHealthcheck(err error) bool { + return errors.Is(err, errTCPHealthcheck) +} + +// IsClientNetworkError determines whether the provided error is a client-side network error, +// such as io.EOF, io.ErrUnexpectedEOF, or a timeout. +// These errors typically occur when a client disconnects abruptly or fails during the handshake, +// and are generally non-actionable from the server point of view. +// This function helps distinguish such errors from critical ones during the handshake process +// and adjust logging accordingly. +// +// See: https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/880 +func IsClientNetworkError(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + + if IsTimeoutNetworkError(err) { + return true + } + + if errMsg := err.Error(); strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "reset by peer") { + return true + } + + return false +} + +// IsTimeoutNetworkError determines whether the provided error is a network error with a timeout. +func IsTimeoutNetworkError(err error) bool { + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + return true + } + + return false +} + +func genericServer(c net.Conn, compressionLevel int, readHelloMessage func(c net.Conn) error) (*BufferedConn, error) { + if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil { + return nil, fmt.Errorf("cannot set deadline: %w", err) + } + + if err := readHelloMessage(c); err != nil { + return nil, fmt.Errorf("cannot read hello message : %w", err) + } + if err := writeMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err) + } + isRemoteCompressed, err := readIsCompressed(c) + if err != nil { + return nil, fmt.Errorf("cannot read isCompressed flag: %w", err) + } + if err := writeMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err) + } + if err := writeIsCompressed(c, compressionLevel > 0); err != nil { + return nil, fmt.Errorf("cannot write isCompressed flag: %w", err) + } + if err := readMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err) + } + + if err := c.SetDeadline(time.Time{}); err != nil { + return nil, fmt.Errorf("cannot reset deadline: %w", err) + } + + bc := newBufferedConn(c, compressionLevel, isRemoteCompressed) + return bc, nil +} + +func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) { + if err := c.SetDeadline(time.Now().Add(*rpcHandshakeTimeout)); err != nil { + return nil, fmt.Errorf("cannot set deadline: %w", err) + } + + if err := writeMessage(c, msg); err != nil { + return nil, fmt.Errorf("cannot write hello: %w", err) + } + if err := readMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot read success response after sending hello: %w", err) + } + if err := writeIsCompressed(c, compressionLevel > 0); err != nil { + return nil, fmt.Errorf("cannot write isCompressed flag: %w", err) + } + if err := readMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err) + } + isRemoteCompressed, err := readIsCompressed(c) + if err != nil { + return nil, fmt.Errorf("cannot read isCompressed flag: %w", err) + } + if err := writeMessage(c, successResponse); err != nil { + return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err) + } + + if err := c.SetDeadline(time.Time{}); err != nil { + return nil, fmt.Errorf("cannot reset deadline: %w", err) + } + + bc := newBufferedConn(c, compressionLevel, isRemoteCompressed) + return bc, nil +} + +func writeIsCompressed(c net.Conn, isCompressed bool) error { + var buf [1]byte + if isCompressed { + buf[0] = 1 + } + return writeMessage(c, string(buf[:])) +} + +func readIsCompressed(c net.Conn) (bool, error) { + buf, err := readData(c, 1) + if err != nil { + return false, err + } + isCompressed := buf[0] != 0 + return isCompressed, nil +} + +func writeMessage(c net.Conn, msg string) error { + if _, err := io.WriteString(c, msg); err != nil { + return fmt.Errorf("cannot write %q to server: %w", msg, err) + } + if fc, ok := c.(flusher); ok { + if err := fc.Flush(); err != nil { + return fmt.Errorf("cannot flush %q to server: %w", msg, err) + } + } + return nil +} + +type flusher interface { + Flush() error +} + +func readMessage(c net.Conn, msg string) error { + buf, err := readData(c, len(msg)) + if err != nil { + return err + } + if string(buf) != msg { + return fmt.Errorf("unexpected message obtained; got %q; want %q", buf, msg) + } + return nil +} + +func readData(c net.Conn, dataLen int) ([]byte, error) { + data := make([]byte, dataLen) + if n, err := io.ReadFull(c, data); err != nil { + return nil, fmt.Errorf("cannot read message with size %d: %w; read only %d bytes", dataLen, err, n) + } + return data, nil +} diff --git a/lib/handshake/handshake_test.go b/lib/handshake/handshake_test.go new file mode 100644 index 0000000000..6bf35a228f --- /dev/null +++ b/lib/handshake/handshake_test.go @@ -0,0 +1,83 @@ +package handshake + +import ( + "fmt" + "net" + "testing" + "time" +) + +func TestVMInsertHandshake(t *testing.T) { + testHandshake(t, vminsertClient, VMInsertServer) +} + +func TestVMSelectHandshake(t *testing.T) { + testHandshake(t, VMSelectClient, VMSelectServer) +} + +func TestVMSelectServerTCPHealthcheck(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("cannot start listener: %s", err) + } + + c, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatalf("cannot dial: %s", err) + } + if err := c.Close(); err != nil { + t.Fatalf("cannot close client conn: %s", err) + } + s, err := ln.Accept() + if err != nil { + t.Fatalf("cannot accept conn: %s", err) + } + if _, err := VMSelectServer(s, 0); !IsTCPHealthcheck(err) { + t.Fatalf("unexpected error; got %v; want TCP healthcheck error", err) + } +} + +func testHandshake(t *testing.T, clientFunc, serverFunc Func) { + t.Helper() + + c, s := net.Pipe() + ch := make(chan error, 1) + go func() { + bcs, err := serverFunc(s, 3) + if err != nil { + ch <- fmt.Errorf("error on outer handshake: %w", err) + return + } + bcc, err := clientFunc(bcs, 3) + if err != nil { + ch <- fmt.Errorf("error on inner handshake: %w", err) + return + } + if bcc == nil { + ch <- fmt.Errorf("expecting non-nil conn") + return + } + ch <- nil + }() + + bcc, err := clientFunc(c, 0) + if err != nil { + t.Fatalf("error on outer handshake: %s", err) + } + bcs, err := serverFunc(bcc, 0) + if err != nil { + t.Fatalf("error on inner handshake: %s", err) + } + if bcs == nil { + t.Fatalf("expecting non-nil conn") + } + + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout") + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error on the server side: %s", err) + } + } +} diff --git a/lib/storage/search.go b/lib/storage/search.go index aa4ae79f43..50d8d10a4e 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -90,6 +90,77 @@ type MetricBlockRef struct { BlockRef *BlockRef } +// MetricBlock is a time series block for a single metric. +type MetricBlock struct { + // MetricName is metric name for the given Block. + MetricName []byte + + // Block is a block for the given MetricName + Block Block +} + +// Marshal marshals MetricBlock to dst +func (mb *MetricBlock) Marshal(dst []byte) []byte { + dst = encoding.MarshalBytes(dst, mb.MetricName) + return MarshalBlock(dst, &mb.Block) +} + +// CopyFrom copies src to mb. +func (mb *MetricBlock) CopyFrom(src *MetricBlock) { + mb.MetricName = append(mb.MetricName[:0], src.MetricName...) + mb.Block.CopyFrom(&src.Block) +} + +// MarshalBlock marshals b to dst. +// +// b.MarshalData must be called on b before calling MarshalBlock. +func MarshalBlock(dst []byte, b *Block) []byte { + dst = b.bh.Marshal(dst) + dst = encoding.MarshalBytes(dst, b.timestampsData) + dst = encoding.MarshalBytes(dst, b.valuesData) + return dst +} + +// Unmarshal unmarshals MetricBlock from src +func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) { + mb.Block.Reset() + mn, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal MetricName") + } + src = src[nSize:] + mb.MetricName = append(mb.MetricName[:0], mn...) + + return UnmarshalBlock(&mb.Block, src) +} + +// UnmarshalBlock unmarshal Block from src to dst. +// +// dst.UnmarshalData isn't called on the block. +func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) { + tail, err := dst.bh.Unmarshal(src) + if err != nil { + return tail, fmt.Errorf("cannot unmarshal blockHeader: %w", err) + } + src = tail + + tds, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return tail, fmt.Errorf("cannot unmarshal timestampsData") + } + src = src[nSize:] + dst.timestampsData = append(dst.timestampsData[:0], tds...) + + vd, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return tail, fmt.Errorf("cannot unmarshal valuesData") + } + src = src[nSize:] + dst.valuesData = append(dst.valuesData[:0], vd...) + + return src, nil +} + // Search is a search for time series. type Search struct { // MetricBlockRef is updated with each Search.NextMetricBlock call. @@ -254,6 +325,17 @@ func (s *Search) NextMetricBlock() bool { // SearchQuery is used for sending search queries from vmselect to vmstorage. type SearchQuery struct { + AccountID uint32 + ProjectID uint32 + + // TenantTokens and IsMultiTenant is artificial fields + // they're only exist at runtime and cannot be transferred + // via network calls for keeping communication protocol compatibility + // TODO:@f41gh7 introduce breaking change to the protocol later + // and use TenantTokens instead of AccountID and ProjectID + TenantTokens []TenantToken + IsMultiTenant bool + // The time range for searching time series MinTimestamp int64 MaxTimestamp int64 @@ -290,6 +372,40 @@ func NewSearchQuery(start, end int64, tagFilterss [][]TagFilter, maxMetrics int) } } +// TenantToken represents a tenant (accountID, projectID) pair. +type TenantToken struct { + AccountID uint32 + ProjectID uint32 +} + +// String returns string representation of t. +func (t *TenantToken) String() string { + return fmt.Sprintf("{accountID=%d, projectID=%d}", t.AccountID, t.ProjectID) +} + +// Marshal appends marshaled t to dst and returns the result. +func (t *TenantToken) Marshal(dst []byte) []byte { + dst = encoding.MarshalUint32(dst, t.AccountID) + dst = encoding.MarshalUint32(dst, t.ProjectID) + return dst +} + +// NewMultiTenantSearchQuery creates new search query for the given args. +func NewMultiTenantSearchQuery(tenants []TenantToken, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery { + if start < 0 { + // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553 + start = 0 + } + return &SearchQuery{ + TenantTokens: tenants, + MinTimestamp: start, + MaxTimestamp: end, + TagFilterss: tagFilterss, + MaxMetrics: maxMetrics, + IsMultiTenant: true, + } +} + // TagFilter represents a single tag filter from SearchQuery. type TagFilter struct { Key []byte @@ -387,7 +503,15 @@ func (sq *SearchQuery) String() string { } start := TimestampToHumanReadableFormat(sq.MinTimestamp) end := TimestampToHumanReadableFormat(sq.MaxTimestamp) - return fmt.Sprintf("filters=%s, timeRange=[%s..%s]", a, start, end) + if !sq.IsMultiTenant { + return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end) + } + + tts := make([]string, len(sq.TenantTokens)) + for i, tt := range sq.TenantTokens { + tts[i] = tt.String() + } + return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end) } func tagFiltersToString(tfs []TagFilter) string { @@ -398,8 +522,9 @@ func tagFiltersToString(tfs []TagFilter) string { return "{" + strings.Join(a, ",") + "}" } -// Marshal appends marshaled sq to dst and returns the result. -func (sq *SearchQuery) Marshal(dst []byte) []byte { +// MarshalWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result. +// It is expected that TenantToken is already marshaled to dst. +func (sq *SearchQuery) MarshalWithoutTenant(dst []byte) []byte { dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp) dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp) dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss))) @@ -409,11 +534,25 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte { dst = tagFilters[i].Marshal(dst) } } + dst = encoding.MarshalUint32(dst, uint32(sq.MaxMetrics)) return dst } // Unmarshal unmarshals sq from src and returns the tail. func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { + if len(src) < 4 { + return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4) + } + sq.AccountID = encoding.UnmarshalUint32(src) + src = src[4:] + + if len(src) < 4 { + return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4) + } + sq.ProjectID = encoding.UnmarshalUint32(src) + src = src[4:] + + sq.TenantTokens = []TenantToken{{AccountID: sq.AccountID, ProjectID: sq.ProjectID}} minTs, nSize := encoding.UnmarshalVarInt64(src) if nSize <= 0 { return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint") @@ -454,6 +593,12 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { sq.TagFilterss[i] = tagFilters } + if len(src) < 4 { + return src, fmt.Errorf("cannot unmarshal MaxMetrics: too short src len: %d; must be at least %d bytes", len(src), 4) + } + sq.MaxMetrics = int(encoding.UnmarshalUint32(src)) + src = src[4:] + return src, nil } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 277e18c355..27603c2c67 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -325,6 +325,11 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage { return s } +// RetentionMsecs returns retentionMsecs for s. +func (s *Storage) RetentionMsecs() int64 { + return s.retentionMsecs +} + var maxTSIDCacheSize int // SetTSIDCacheSize overrides the default size of storage/tsid cache diff --git a/lib/vmselectapi/api.go b/lib/vmselectapi/api.go new file mode 100644 index 0000000000..a1209694ae --- /dev/null +++ b/lib/vmselectapi/api.go @@ -0,0 +1,68 @@ +package vmselectapi + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata" +) + +// API must implement vmselect API. +type API interface { + // InitSearch initialize series search for the given sq. + // + // The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed. + InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error) + + // SearchMetricNames returns metric names matching the given sq. + SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) + + // LabelValues returns values for labelName label acorss series matching the given sq. + LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) + + // TagValueSuffixes returns tag value suffixes for the given args. + TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) + + // LabelNames returns lable names for series matching the given sq. + LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error) + + // SeriesCount returns the number of series for the given (accountID, projectID). + SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) + + // TSDBStatus returns tsdb status for the given sq. + TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) + + // DeleteSeries deletes series matching the given sq. + DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) + + // RegisterMetricNames registers the given mrs in the storage. + RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error + + // Tenants returns list of tenants in the storage on the given tr. + Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) + + // GetMetricNamesUsageStats returns statistics for metric names + GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) + + // ResetMetricNamesUsageStats resets internal state of metric names tracker + ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error + + // GetMetadataRecords returns metrics metadata. + GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) +} + +// BlockIterator must iterate through series blocks found by VMSelect.InitSearch. +// +// MustClose must be called in order to free up allocated resources when BlockIterator is no longer needed. +type BlockIterator interface { + // NextBlock marshals next storage.MetricBlock into dst. + // + // It returns true on success, false on error or if no blocks to read. + NextBlock(dst []byte) ([]byte, bool) + + // MustClose frees up resources allocated by BlockIterator. + MustClose() + + // Error returns the last error occurred in NextBlock(), which returns false. + Error() error +} diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go new file mode 100644 index 0000000000..ca5b255c1a --- /dev/null +++ b/lib/vmselectapi/server.go @@ -0,0 +1,1248 @@ +package vmselectapi + +import ( + "errors" + "fmt" + "io" + "net" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" +) + +// Server processes vmselect requests. +type Server struct { + // api contains the implementation of the server API for vmselect requests. + api API + + // limits contains various limits for the Server. + limits Limits + + // disableResponseCompression controls whether vmselect server must compress responses. + disableResponseCompression bool + + // ln is the listener for incoming connections to the server. + ln net.Listener + + // The channel for limiting the number of concurrently executed requests. + concurrencyLimitCh chan struct{} + + // connsMap is a map of currently established connections to the server. + // It is used for closing the connections when MustStop() is called. + connsMap ingestserver.ConnsMap + + // wg is used for waiting for worker goroutines to stop when MustStop() is called. + wg sync.WaitGroup + + // stopFlag is set to true when the server needs to stop. + stopFlag atomic.Bool + + concurrencyLimitReached *metrics.Counter + concurrencyLimitTimeout *metrics.Counter + + vmselectConns *metrics.Counter + vmselectConnErrors *metrics.Counter + + registerMetricNamesRequests *metrics.Counter + deleteSeriesRequests *metrics.Counter + labelNamesRequests *metrics.Counter + labelValuesRequests *metrics.Counter + tagValueSuffixesRequests *metrics.Counter + seriesCountRequests *metrics.Counter + tsdbStatusRequests *metrics.Counter + searchMetricNamesRequests *metrics.Counter + searchRequests *metrics.Counter + tenantsRequests *metrics.Counter + searchMetadataRequests *metrics.Counter + + metricBlocksRead *metrics.Counter +} + +// Limits contains various limits for Server. +type Limits struct { + // MaxLabelNames is the maximum label names, which may be returned from labelNames request. + MaxLabelNames int + + // MaxLabelValues is the maximum label values, which may be returned from labelValues request. + MaxLabelValues int + + // MaxTagValueSuffixes is the maximum number of entries, which can be returned from tagValueSuffixes request. + MaxTagValueSuffixes int + + // MaxConcurrentRequests is the maximum number of concurrent requests a server can process. + // + // The remaining requests wait for up to MaxQueueDuration for their execution. + MaxConcurrentRequests int + + // MaxConcurrentRequestsFlagName is the name for the flag containing the MaxConcurrentRequests value. + MaxConcurrentRequestsFlagName string + + // MaxQueueDuration is the maximum duration to wait if MaxConcurrentRequests are executed. + MaxQueueDuration time.Duration + + // MaxQueueDurationFlagName is the name for the flag containing the MaxQueueDuration value. + MaxQueueDurationFlagName string +} + +// NewServer starts new Server at the given addr, which serves the given api with the given limits. +// +// If disableResponseCompression is set to true, then the returned server doesn't compress responses. +func NewServer(addr string, api API, limits Limits, disableResponseCompression bool) (*Server, error) { + ln, err := netutil.NewTCPListener("vmselect", addr, false, nil) + if err != nil { + return nil, fmt.Errorf("unable to listen vmselectAddr %s: %w", addr, err) + } + concurrencyLimitCh := make(chan struct{}, limits.MaxConcurrentRequests) + _ = metrics.NewGauge(`vm_vmselect_concurrent_requests_capacity`, func() float64 { + return float64(cap(concurrencyLimitCh)) + }) + _ = metrics.NewGauge(`vm_vmselect_concurrent_requests_current`, func() float64 { + return float64(len(concurrencyLimitCh)) + }) + s := &Server{ + api: api, + limits: limits, + disableResponseCompression: disableResponseCompression, + ln: ln, + + concurrencyLimitCh: concurrencyLimitCh, + + concurrencyLimitReached: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_reached_total{addr=%q}`, addr)), + concurrencyLimitTimeout: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_concurrent_requests_limit_timeout_total{addr=%q}`, addr)), + + vmselectConns: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conns{addr=%q}`, addr)), + vmselectConnErrors: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_conn_errors_total{addr=%q}`, addr)), + + registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="registerMetricNames",addr=%q}`, addr)), + deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="deleteSeries",addr=%q}`, addr)), + labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelNames",addr=%q}`, addr)), + labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="labelValues",addr=%q}`, addr)), + tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tagValueSuffixes",addr=%q}`, addr)), + seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="seriesSount",addr=%q}`, addr)), + tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tsdbStatus",addr=%q}`, addr)), + searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="searchMetricNames",addr=%q}`, addr)), + searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="search",addr=%q}`, addr)), + tenantsRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="tenants",addr=%q}`, addr)), + searchMetadataRequests: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_rpc_requests_total{action="searchMetadata",addr=%q}`, addr)), + + metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_vmselect_metric_blocks_read_total{addr=%q}`, addr)), + } + + s.connsMap.Init("vmselect") + s.wg.Go(s.run) + return s, nil +} + +func (s *Server) run() { + logger.Infof("accepting vmselect conns at %s", s.ln.Addr()) + for { + c, err := s.ln.Accept() + if err != nil { + if pe, ok := err.(net.Error); ok && pe.Temporary() { + continue + } + if s.isStopping() { + return + } + logger.Panicf("FATAL: cannot process vmselect conns at %s: %s", s.ln.Addr(), err) + } + // Do not log connection accept from vmselect, since this can generate too many lines + // in the log because vmselect tends to re-establish idle connections. + + if !s.connsMap.Add(c) { + // The server is closed. + _ = c.Close() + return + } + s.vmselectConns.Inc() + s.wg.Go(func() { + defer func() { + s.connsMap.Delete(c) + s.vmselectConns.Dec() + }() + + // Compress responses to vmselect even if they already contain compressed blocks. + // Responses contain uncompressed metric names, which should compress well + // when the response contains high number of time series. + // Additionally, recently added metric blocks are usually uncompressed, so the compression + // should save network bandwidth. + compressionLevel := 1 + if s.disableResponseCompression { + compressionLevel = 0 + } + bc, err := handshake.VMSelectServer(c, compressionLevel) + if err != nil { + if s.isStopping() { + // c is closed inside Server.MustStop + return + } + if handshake.IsTimeoutNetworkError(err) { + logger.Warnf("cannot complete vmselect handshake due to network timeout error with client %q: %s. "+ + "If errors are transient and infrequent increase -rpc.handshakeTimeout and -vmstorageDialTimeout on client and server side. Check vmselect logs for errors", c.RemoteAddr(), err) + } else if handshake.IsClientNetworkError(err) { + logger.Warnf("cannot complete vmselect handshake due to network error with client %q: %s. "+ + "Check vmselect logs for errors", c.RemoteAddr(), err) + } else if !handshake.IsTCPHealthcheck(err) { + logger.Errorf("cannot perform vmselect handshake with client %q: %s", c.RemoteAddr(), err) + } + + _ = c.Close() + return + } + + defer func() { + _ = bc.Close() + }() + if err := s.processConn(bc); err != nil { + if s.isStopping() { + return + } + s.vmselectConnErrors.Inc() + logger.Errorf("cannot process vmselect conn %s: %s", c.RemoteAddr(), err) + } + }) + } +} + +// MustStop gracefully stops s, so it no longer touches s.api after returning. +func (s *Server) MustStop() { + // Mark the server as stoping. + s.setIsStopping() + + // Stop accepting new connections from vmselect. + if err := s.ln.Close(); err != nil { + logger.Panicf("FATAL: cannot close vmselect listener: %s", err) + } + + // Close existing connections from vmselect, so the goroutines + // processing these connections are finished. + s.connsMap.CloseAll(0) + + // Wait until all the goroutines processing vmselect conns are finished. + s.wg.Wait() +} + +func (s *Server) setIsStopping() { + s.stopFlag.Store(true) +} + +func (s *Server) isStopping() bool { + return s.stopFlag.Load() +} + +func (s *Server) processConn(bc *handshake.BufferedConn) error { + ctx := &vmselectRequestCtx{ + bc: bc, + sizeBuf: make([]byte, 8), + } + for { + if err := s.processRequest(ctx); err != nil { + if isExpectedError(err) { + return nil + } + if errors.Is(err, storage.ErrDeadlineExceeded) { + return fmt.Errorf("cannot process vmselect request in %d seconds: %w", ctx.timeout, err) + } + return fmt.Errorf("cannot process vmselect request: %w", err) + } + if err := bc.Flush(); err != nil { + return fmt.Errorf("cannot flush compressed buffers: %w", err) + } + } +} + +func isExpectedError(err error) bool { + if err == io.EOF { + // Remote client gracefully closed the connection. + return true + } + if errors.Is(err, net.ErrClosed) { + return true + } + errStr := err.Error() + if strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "connection reset by peer") { + // The connection has been interrupted abruptly. + // It could happen due to unexpected network glitch or because connection was + // interrupted by remote client. In both cases, remote client will notice + // connection breach and handle it on its own. No need in mirroring the error here. + return true + } + return false +} + +type vmselectRequestCtx struct { + bc *handshake.BufferedConn + sizeBuf []byte + dataBuf []byte + + qt *querytracer.Tracer + sq storage.SearchQuery + + // timeout in seconds for the current request + timeout uint64 + + // deadline in unix timestamp seconds for the current request. + deadline uint64 +} + +func (ctx *vmselectRequestCtx) readTimeRange() (storage.TimeRange, error) { + var tr storage.TimeRange + minTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read minTimestamp: %w", err) + } + maxTimestamp, err := ctx.readUint64() + if err != nil { + return tr, fmt.Errorf("cannot read maxTimestamp: %w", err) + } + tr.MinTimestamp = int64(minTimestamp) + tr.MaxTimestamp = int64(maxTimestamp) + return tr, nil +} + +func (ctx *vmselectRequestCtx) readLimit() (int, error) { + n, err := ctx.readUint32() + if err != nil { + return 0, fmt.Errorf("cannot read limit: %w", err) + } + if n > 1<<31-1 { + n = 1<<31 - 1 + } + return int(n), nil +} + +func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 4) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read uint32: %w", err) + } + n := encoding.UnmarshalUint32(ctx.sizeBuf) + return n, nil +} + +func (ctx *vmselectRequestCtx) readUint64() (uint64, error) { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read uint64: %w", err) + } + n := encoding.UnmarshalUint64(ctx.sizeBuf) + return n, nil +} + +func (ctx *vmselectRequestCtx) readInt64() (int64, error) { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read int64: %w", err) + } + n := encoding.UnmarshalInt64(ctx.sizeBuf) + return n, nil +} + +func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error) { + accountID, err := ctx.readUint32() + if err != nil { + return 0, 0, fmt.Errorf("cannot read accountID: %w", err) + } + projectID, err := ctx.readUint32() + if err != nil { + return 0, 0, fmt.Errorf("cannot read projectID: %w", err) + } + return accountID, projectID, nil +} + +// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes. +// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5154#issuecomment-1757216612 +const maxSearchQuerySize = 5 * 1024 * 1024 + +func (ctx *vmselectRequestCtx) readSearchQuery() error { + if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { + return fmt.Errorf("cannot read searchQuery: %w", err) + } + tail, err := ctx.sq.Unmarshal(ctx.dataBuf) + if err != nil { + return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) + } + if len(tail) > 0 { + return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) + } + return nil +} + +func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { + ctx.sizeBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.sizeBuf, 8) + if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { + if err == io.EOF { + return err + } + return fmt.Errorf("cannot read data size: %w", err) + } + dataSize := encoding.UnmarshalUint64(ctx.sizeBuf) + if dataSize > uint64(maxDataSize) { + return fmt.Errorf("too big data size: %d; it mustn't exceed %d bytes", dataSize, maxDataSize) + } + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, int(dataSize)) + if dataSize == 0 { + return nil + } + if n, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + return fmt.Errorf("cannot read data with size %d: %w; read only %d bytes", dataSize, err, n) + } + return nil +} + +func (ctx *vmselectRequestCtx) readBool() (bool, error) { + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) + if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + if err == io.EOF { + return false, err + } + return false, fmt.Errorf("cannot read bool: %w", err) + } + v := ctx.dataBuf[0] != 0 + return v, nil +} + +func (ctx *vmselectRequestCtx) readByte() (byte, error) { + ctx.dataBuf = bytesutil.ResizeNoCopyMayOverallocate(ctx.dataBuf, 1) + if _, err := io.ReadFull(ctx.bc, ctx.dataBuf); err != nil { + if err == io.EOF { + return 0, err + } + return 0, fmt.Errorf("cannot read byte: %w", err) + } + b := ctx.dataBuf[0] + return b, nil +} + +func (ctx *vmselectRequestCtx) writeDataBufBytes() error { + if err := ctx.writeUint64(uint64(len(ctx.dataBuf))); err != nil { + return fmt.Errorf("cannot write data size: %w", err) + } + if len(ctx.dataBuf) == 0 { + return nil + } + if _, err := ctx.bc.Write(ctx.dataBuf); err != nil { + return fmt.Errorf("cannot write data with size %d: %w", len(ctx.dataBuf), err) + } + return nil +} + +// maxErrorMessageSize is the maximum size of error message to send to clients. +const maxErrorMessageSize = 64 * 1024 + +func (ctx *vmselectRequestCtx) writeErrorMessage(err error) error { + if errors.Is(err, storage.ErrDeadlineExceeded) { + err = fmt.Errorf("cannot execute request in %d seconds: %w", ctx.timeout, err) + } + errMsg := err.Error() + if len(errMsg) > maxErrorMessageSize { + // Trim too long error message. + errMsg = errMsg[:maxErrorMessageSize] + } + if err := ctx.writeString(errMsg); err != nil { + return fmt.Errorf("cannot send error message %q to client: %w", errMsg, err) + } + return nil +} + +func (ctx *vmselectRequestCtx) writeString(s string) error { + ctx.dataBuf = append(ctx.dataBuf[:0], s...) + return ctx.writeDataBufBytes() +} + +func (ctx *vmselectRequestCtx) writeUint64(n uint64) error { + ctx.sizeBuf = encoding.MarshalUint64(ctx.sizeBuf[:0], n) + if _, err := ctx.bc.Write(ctx.sizeBuf); err != nil { + return fmt.Errorf("cannot write uint64 %d: %w", n, err) + } + return nil +} + +const maxRPCNameSize = 128 + +func (s *Server) processRequest(ctx *vmselectRequestCtx) error { + // Read rpcName + // Do not set deadline on reading rpcName, since it may take a + // lot of time for idle connection. + if err := ctx.readDataBufBytes(maxRPCNameSize); err != nil { + if err == io.EOF { + // Remote client gracefully closed the connection. + return err + } + return fmt.Errorf("cannot read rpcName: %w", err) + } + rpcName := string(ctx.dataBuf) + + // Initialize query tracing. + traceEnabled, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read traceEnabled: %w", err) + } + ctx.qt = querytracer.New(traceEnabled, "rpc call %s() at vmstorage", rpcName) + + // Limit the time required for reading request args. + if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { + return fmt.Errorf("cannot set read deadline for reading request args: %w", err) + } + defer func() { + _ = ctx.bc.SetReadDeadline(time.Time{}) + }() + + // Read the timeout for request execution. + timeout, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err) + } + ctx.timeout = uint64(timeout) + ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout) + + // Process the rpcName call. + if err := s.processRPC(ctx, rpcName); err != nil { + return fmt.Errorf("cannot execute %q: %w", rpcName, err) + } + + // Finish query trace. + ctx.qt.Done() + traceJSON := ctx.qt.ToJSON() + if err := ctx.writeString(traceJSON); err != nil { + return fmt.Errorf("cannot send trace with length %d bytes to vmselect: %w", len(traceJSON), err) + } + return nil +} + +func (s *Server) beginConcurrentRequest(ctx *vmselectRequestCtx) error { + select { + case s.concurrencyLimitCh <- struct{}{}: + return nil + default: + d := min(time.Duration(ctx.timeout)*time.Second, s.limits.MaxQueueDuration) + t := timerpool.Get(d) + s.concurrencyLimitReached.Inc() + select { + case s.concurrencyLimitCh <- struct{}{}: + timerpool.Put(t) + ctx.qt.Printf("wait in queue because -%s=%d concurrent requests are executed", s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests) + return nil + case <-t.C: + timerpool.Put(t) + s.concurrencyLimitTimeout.Inc() + return fmt.Errorf("couldn't start executing the request in %.3f seconds, since -%s=%d concurrent requests "+ + "are already executed. Possible solutions: to reduce the query load; to add more compute resources to the server; "+ + "to increase -%s=%d; to increase -%s", + d.Seconds(), s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests, + s.limits.MaxQueueDurationFlagName, s.limits.MaxQueueDuration, s.limits.MaxConcurrentRequestsFlagName) + } + } +} + +func (s *Server) endConcurrentRequest() { + <-s.concurrencyLimitCh +} + +func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { + switch rpcName { + case "search_v7": + return s.processSearch(ctx) + case "searchMetricNames_v3": + return s.processSearchMetricNames(ctx) + case "labelValues_v5": + return s.processLabelValues(ctx) + case "tagValueSuffixes_v4": + return s.processTagValueSuffixes(ctx) + case "labelNames_v5": + return s.processLabelNames(ctx) + case "seriesCount_v4": + return s.processSeriesCount(ctx) + case "tsdbStatus_v6": + return s.processTSDBStatus(ctx) + case "deleteSeries_v5": + return s.processDeleteSeries(ctx) + case "registerMetricNames_v3": + return s.processRegisterMetricNames(ctx) + case "tenants_v1": + return s.processTenants(ctx) + case "metricNamesUsageStats_v1": + return s.processMetricNamesUsageStats(ctx) + case "resetMetricNamesStats_v1": + return s.processResetMetricUsageStats(ctx) + case "searchMetadata_v1": + return s.processSearchMetadata(ctx) + default: + return fmt.Errorf("unsupported rpcName: %q", rpcName) + } +} + +const maxMetricNameRawSize = 1024 * 1024 +const maxMetricNamesPerRequest = 1024 * 1024 + +func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error { + s.registerMetricNamesRequests.Inc() + + // Read request + metricsCount, err := ctx.readUint64() + if err != nil { + return fmt.Errorf("cannot read metricsCount: %w", err) + } + if metricsCount > maxMetricNamesPerRequest { + return fmt.Errorf("too many metric names in a single request; got %d; mustn't exceed %d", metricsCount, maxMetricNamesPerRequest) + } + mrs := make([]storage.MetricRow, metricsCount) + for i := range int(metricsCount) { + if err := ctx.readDataBufBytes(maxMetricNameRawSize); err != nil { + return fmt.Errorf("cannot read metricNameRaw: %w", err) + } + mr := &mrs[i] + mr.MetricNameRaw = append(mr.MetricNameRaw[:0], ctx.dataBuf...) + n, err := ctx.readUint64() + if err != nil { + return fmt.Errorf("cannot read timestamp: %w", err) + } + mr.Timestamp = int64(n) + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Register metric names from mrs. + if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + return nil +} + +func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error { + s.deleteSeriesRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request. + deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + // Send deletedCount to vmselect. + if err := ctx.writeUint64(uint64(deletedCount)); err != nil { + return fmt.Errorf("cannot send deletedCount=%d: %w", deletedCount, err) + } + return nil +} + +func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error { + s.labelNamesRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + maxLabelNames, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read maxLabelNames: %w", err) + } + if maxLabelNames <= 0 || maxLabelNames > s.limits.MaxLabelNames { + maxLabelNames = s.limits.MaxLabelNames + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send labelNames to vmselect + for _, labelName := range labelNames { + if len(labelName) == 0 { + // Skip empty label names, since they may break RPC communication with vmselect + continue + } + if err := ctx.writeString(labelName); err != nil { + return fmt.Errorf("cannot write label name %q: %w", labelName, err) + } + } + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +const maxLabelValueSize = 16 * 1024 + +func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error { + s.labelValuesRequests.Inc() + + // Read request + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read labelName: %w", err) + } + labelName := string(ctx.dataBuf) + if err := ctx.readSearchQuery(); err != nil { + return err + } + maxLabelValues, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read maxLabelValues: %w", err) + } + if maxLabelValues <= 0 || maxLabelValues > s.limits.MaxLabelValues { + maxLabelValues = s.limits.MaxLabelValues + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send labelValues to vmselect + for _, labelValue := range labelValues { + if len(labelValue) == 0 { + // Skip empty label values, since they may break RPC communication with vmselect + continue + } + if err := ctx.writeString(labelValue); err != nil { + return fmt.Errorf("cannot write labelValue %q: %w", labelValue, err) + } + } + // Send 'end of label values' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error { + s.tagValueSuffixesRequests.Inc() + + // read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + tr, err := ctx.readTimeRange() + if err != nil { + return err + } + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read tagKey: %w", err) + } + tagKey := string(ctx.dataBuf) + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read tagValuePrefix: %w", err) + } + tagValuePrefix := string(ctx.dataBuf) + delimiter, err := ctx.readByte() + if err != nil { + return fmt.Errorf("cannot read delimiter: %w", err) + } + maxSuffixes, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read maxTagValueSuffixes: %d", err) + } + if maxSuffixes <= 0 || maxSuffixes > s.limits.MaxTagValueSuffixes { + maxSuffixes = s.limits.MaxTagValueSuffixes + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + if len(suffixes) >= s.limits.MaxTagValueSuffixes { + err := fmt.Errorf("more than %d tag value suffixes found "+ + "for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+ + "either narrow down the query or increase -search.max* command-line flag value; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#resource-usage-limits", + s.limits.MaxTagValueSuffixes, tagKey, tagValuePrefix, delimiter, tr.String()) + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send suffixes to vmselect. + // Suffixes may contain empty string, so prepend suffixes with suffixCount. + if err := ctx.writeUint64(uint64(len(suffixes))); err != nil { + return fmt.Errorf("cannot write suffixesCount: %w", err) + } + for i, suffix := range suffixes { + if err := ctx.writeString(suffix); err != nil { + return fmt.Errorf("cannot write suffix #%d: %w", i+1, err) + } + } + return nil +} + +func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error { + s.seriesCountRequests.Inc() + + // Read request + accountID, projectID, err := ctx.readAccountIDProjectID() + if err != nil { + return err + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send series count to vmselect. + if err := ctx.writeUint64(n); err != nil { + return fmt.Errorf("cannot write series count to vmselect: %w", err) + } + return nil +} + +func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error { + s.tsdbStatusRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil { + return fmt.Errorf("cannot read focusLabel: %w", err) + } + focusLabel := string(ctx.dataBuf) + topN, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read topN: %w", err) + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send status to vmselect. + return writeTSDBStatus(ctx, status) +} + +func (s *Server) processTenants(ctx *vmselectRequestCtx) error { + s.tenantsRequests.Inc() + + // Read request + tr, err := ctx.readTimeRange() + if err != nil { + return err + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute the request + tenants, err := s.api.Tenants(ctx.qt, tr, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send tenants to vmselect + for _, tenant := range tenants { + if len(tenant) == 0 { + logger.Panicf("BUG: unexpected empty tenant name") + } + if err := ctx.writeString(tenant); err != nil { + return fmt.Errorf("cannot write tenant %q: %w", tenant, err) + } + } + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error { + if err := ctx.writeUint64(status.TotalSeries); err != nil { + return fmt.Errorf("cannot write totalSeries to vmselect: %w", err) + } + if err := ctx.writeUint64(status.TotalLabelValuePairs); err != nil { + return fmt.Errorf("cannot write totalLabelValuePairs to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { + return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelName); err != nil { + return fmt.Errorf("cannot write seriesCountByLabelName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByFocusLabelValue); err != nil { + return fmt.Errorf("cannot write seriesCountByFocusLabelValue to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil { + return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil { + return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err) + } + if err := writeMetricNameStatRecords(ctx, status.SeriesQueryStatsByMetricName); err != nil { + return fmt.Errorf("cannot write SeriesMetricNamesStats: %w", err) + } + return nil +} + +func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error { + if err := ctx.writeUint64(uint64(len(a))); err != nil { + return fmt.Errorf("cannot write topHeapEntries size: %w", err) + } + for _, e := range a { + if err := ctx.writeString(e.Name); err != nil { + return fmt.Errorf("cannot write topHeapEntry name: %w", err) + } + if err := ctx.writeUint64(e.Count); err != nil { + return fmt.Errorf("cannot write topHeapEntry count: %w", err) + } + } + return nil +} + +func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error { + s.searchMetricNamesRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err + } + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Execute request. + metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send response. + metricNamesCount := len(metricNames) + if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil { + return fmt.Errorf("cannot send metricNamesCount: %w", err) + } + for i, metricName := range metricNames { + if err := ctx.writeString(metricName); err != nil { + return fmt.Errorf("cannot send metricName #%d: %w", i+1, err) + } + } + ctx.qt.Printf("sent %d series to vmselect", len(metricNames)) + return nil +} + +func (s *Server) processSearch(ctx *vmselectRequestCtx) error { + s.searchRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err + } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + // Initiaialize the search. + bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + defer bi.MustClose() + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send found blocks to vmselect. + blocksRead := 0 + var ok bool + for { + ctx.dataBuf, ok = bi.NextBlock(ctx.dataBuf[:0]) + if !ok { + break + } + blocksRead++ + s.metricBlocksRead.Inc() + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot send MetricBlock: %w", err) + } + } + + if err := bi.Error(); err != nil { + return fmt.Errorf("search error: %w", err) + } + ctx.qt.Printf("sent %d blocks to vmselect", blocksRead) + + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func (s *Server) processMetricNamesUsageStats(ctx *vmselectRequestCtx) error { + // Read request. + hasTenant, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read hasTenant: %w", err) + } + var at *storage.TenantToken + if hasTenant { + accountID, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read accountID: %w", err) + } + projectID, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read projectID: %w", err) + } + at = &storage.TenantToken{ + AccountID: accountID, + ProjectID: projectID, + } + } + limit, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read limit: %w", err) + } + le, err := ctx.readInt64() + if err != nil { + return fmt.Errorf("cannot read le: %w", err) + } + if err := ctx.readDataBufBytes(256); err != nil { + return fmt.Errorf("cannot read matchPattern: %w", err) + } + matchPattern := string(ctx.dataBuf) + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + result, err := s.api.GetMetricNamesUsageStats(ctx.qt, at, limit, int(le), matchPattern, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + if err := ctx.writeUint64(result.CollectedSinceTs); err != nil { + return fmt.Errorf("cannot write CollectedSinceTs: %w", err) + } + if err := ctx.writeUint64(result.TotalRecords); err != nil { + return fmt.Errorf("cannot write TotalRecords: %w", err) + } + if err := ctx.writeUint64(result.CurrentSizeBytes); err != nil { + return fmt.Errorf("cannot write CurrentSizeBytes: %w", err) + } + if err := ctx.writeUint64(result.MaxSizeBytes); err != nil { + return fmt.Errorf("cannot write MaxSizeBytes: %w", err) + } + if err := writeMetricNameStatRecords(ctx, result.Records); err != nil { + return fmt.Errorf("cannot write Records: %w", err) + } + return nil +} + +func writeMetricNameStatRecords(ctx *vmselectRequestCtx, records []metricnamestats.StatRecord) error { + if err := ctx.writeUint64(uint64(len(records))); err != nil { + return fmt.Errorf("cannot write MetricNamesStatsRecord size: %w", err) + } + for _, r := range records { + if err := ctx.writeString(r.MetricName); err != nil { + return fmt.Errorf("cannot write MetricName=%q record: %w", r.MetricName, err) + } + if err := ctx.writeUint64(r.LastRequestTs); err != nil { + return fmt.Errorf("cannot write record LastRequestTs: %w", err) + } + if err := ctx.writeUint64(r.RequestsCount); err != nil { + return fmt.Errorf("cannot write record RequestCount: %w", err) + } + } + return nil +} + +func (s *Server) processResetMetricUsageStats(ctx *vmselectRequestCtx) error { + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + if err := s.api.ResetMetricNamesUsageStats(ctx.qt, ctx.deadline); err != nil { + return fmt.Errorf("cannot reset state of the metric names usage tracker: %w", err) + } + return nil +} + +func (s *Server) processSearchMetadata(ctx *vmselectRequestCtx) error { + s.searchMetadataRequests.Inc() + + // Read request. + hasTenant, err := ctx.readBool() + if err != nil { + return fmt.Errorf("cannot read hasTenant: %w", err) + } + var at *storage.TenantToken + if hasTenant { + accountID, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read accountID: %w", err) + } + projectID, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read projectID: %w", err) + } + at = &storage.TenantToken{ + AccountID: accountID, + ProjectID: projectID, + } + } + limit, err := ctx.readLimit() + if err != nil { + return fmt.Errorf("cannot read limit: %w", err) + } + if err := ctx.readDataBufBytes(1024); err != nil { + return fmt.Errorf("cannot read metric name: %w", err) + } + metricName := string(ctx.dataBuf) + + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + + result, err := s.api.GetMetadataRecords(ctx.qt, at, limit, metricName, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + if err := writeMetadataRows(ctx, result); err != nil { + return fmt.Errorf("cannot write metadata rows: %w", err) + } + return nil +} + +func writeMetadataRows(ctx *vmselectRequestCtx, records []*metricsmetadata.Row) error { + if err := ctx.writeUint64(uint64(len(records))); err != nil { + return fmt.Errorf("cannot write metadata rows count: %w", err) + } + for _, r := range records { + ctx.dataBuf = r.MarshalTo(ctx.dataBuf[:0]) + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot write metadata rows: %w", err) + } + } + + return nil +}