mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-09 20:04:48 +03:00
Compare commits
2 Commits
ci-change-
...
vmsingle-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f287ae7b1e | ||
|
|
85904390d4 |
3
.github/workflows/build.yml
vendored
3
.github/workflows/build.yml
vendored
@@ -33,8 +33,7 @@ jobs:
|
||||
name: ${{ matrix.os }}-${{ matrix.arch }}
|
||||
permissions:
|
||||
contents: read
|
||||
# Runs on dedicated runner with extra resources to increase build speed.
|
||||
runs-on: 'vm-runner'
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
|
||||
7
.github/workflows/test.yml
vendored
7
.github/workflows/test.yml
vendored
@@ -30,8 +30,7 @@ jobs:
|
||||
name: lint
|
||||
permissions:
|
||||
contents: read
|
||||
# Runs on dedicated runner with extra resources since golangci-lint requires extra memory
|
||||
runs-on: 'vm-runner'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Code checkout
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
@@ -65,8 +64,7 @@ jobs:
|
||||
name: unit
|
||||
permissions:
|
||||
contents: read
|
||||
# Runs on dedicated runner with extra resources to increase tests speed.
|
||||
runs-on: 'vm-runner'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -97,7 +95,6 @@ jobs:
|
||||
name: apptest
|
||||
permissions:
|
||||
contents: read
|
||||
# Runs on dedicated runner to isolate app tests from other tests.
|
||||
runs-on: apptest
|
||||
|
||||
steps:
|
||||
|
||||
@@ -10,8 +10,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
@@ -23,6 +21,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vminsertapi"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -153,7 +152,7 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
|
||||
LogNewSeries: *logNewSeries,
|
||||
}
|
||||
strg := storage.MustOpenStorage(*storageDataPath, opts)
|
||||
vmStorage = newVMStorageSingleNode(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
|
||||
vmStorage = newVMStorage(strg, vmselectMaxConcurrentRequests, resetCacheIfNeeded)
|
||||
|
||||
var m storage.Metrics
|
||||
strg.UpdateMetrics(&m)
|
||||
@@ -175,15 +174,15 @@ func Init(vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []stora
|
||||
GetSearch = vmStorage.GetSearch
|
||||
PutSearch = vmStorage.PutSearch
|
||||
RequestHandler = vmStorage.requestHandler
|
||||
DebugFlush = vmStorage.vms.s.DebugFlush
|
||||
DebugFlush = vmStorage.s.DebugFlush
|
||||
}
|
||||
|
||||
var storageMetrics *metrics.Set
|
||||
|
||||
var (
|
||||
// vmStorageSingleNode is an instance of vmstorage used by vminsert and
|
||||
// vmStorage is an instance of vmstorage used by vminsert and
|
||||
// vmselect for writing and reading data.
|
||||
vmStorage *VMStorageSingleNode
|
||||
vmStorage *VMStorage
|
||||
VMInsertAPI vminsertapi.API
|
||||
VMSelectAPI vmselectapi.API
|
||||
GetSearch func(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error)
|
||||
@@ -209,15 +208,12 @@ func Stop() {
|
||||
logger.Infof("the vmstorage has been stopped")
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.requestHandler(w, r)
|
||||
}
|
||||
|
||||
// requestHandler is a storage request handler.
|
||||
// TODO(@rtm0): Move to a separate file, request_handler.go
|
||||
func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
path := r.URL.Path
|
||||
if path == "/internal/force_merge" {
|
||||
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
|
||||
@@ -361,15 +357,11 @@ var (
|
||||
snapshotsDeleteAllErrorsTotal = metrics.NewCounter(`vm_http_request_errors_total{path="/snapshot/delete_all"}`)
|
||||
)
|
||||
|
||||
// TODO(@rtm0): Move to metrics.go.
|
||||
func (vmssn *VMStorageSingleNode) writeStorageMetrics(w io.Writer) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
vmssn.vms.writeStorageMetrics(w)
|
||||
}
|
||||
|
||||
// TODO(@rtm0): Move to metrics.go.
|
||||
func (vms *VMStorage) writeStorageMetrics(w io.Writer) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
strg := vms.s
|
||||
var m storage.Metrics
|
||||
strg.UpdateMetrics(&m)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package vmstorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"sync"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
@@ -34,7 +36,7 @@ var (
|
||||
// newVMStorage creates a new instance of of VMStorage.
|
||||
//
|
||||
// The created VMStorage instance takes ownership of s.
|
||||
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStorage {
|
||||
func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorage {
|
||||
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
|
||||
logger.Fatalf("invalid -precisionBits=%d: %s", *precisionBits, err)
|
||||
}
|
||||
@@ -49,6 +51,8 @@ func newVMStorage(s *storage.Storage, vmselectMaxConcurrentRequests int) *VMStor
|
||||
maxUniqueTimeseries: *maxUniqueTimeseries,
|
||||
maxUniqueTimeSeriesCalculated: maxUniqueTimeseriesCalculated,
|
||||
staleSnapshotsRemoverCh: make(chan struct{}),
|
||||
wg: syncwg.WaitGroup{},
|
||||
resetCacheIfNeeded: resetCacheIfNeeded,
|
||||
}
|
||||
vms.initStaleSnapshotsRemover()
|
||||
return vms
|
||||
@@ -78,6 +82,17 @@ type VMStorage struct {
|
||||
maxUniqueTimeSeriesCalculated int
|
||||
staleSnapshotsRemoverCh chan struct{}
|
||||
staleSnapshotsRemoverWG sync.WaitGroup
|
||||
|
||||
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
|
||||
// for proper graceful shutdown when Stop is called.
|
||||
//
|
||||
// Use syncwg instead of sync, since Add is called from concurrent
|
||||
// goroutines.
|
||||
wg syncwg.WaitGroup
|
||||
|
||||
// resetCacheIfNeeded is a callback for automatic resetting of response
|
||||
// cache if needed.
|
||||
resetCacheIfNeeded func(mrs []storage.MetricRow)
|
||||
}
|
||||
|
||||
func (vms *VMStorage) initStaleSnapshotsRemover() {
|
||||
@@ -103,6 +118,7 @@ func (vms *VMStorage) initStaleSnapshotsRemover() {
|
||||
func (vms *VMStorage) Stop() {
|
||||
close(vms.staleSnapshotsRemoverCh)
|
||||
vms.staleSnapshotsRemoverWG.Wait()
|
||||
vms.wg.WaitAndBlock()
|
||||
vms.s.MustClose()
|
||||
}
|
||||
|
||||
@@ -111,6 +127,14 @@ func (vms *VMStorage) Stop() {
|
||||
// The caller should limit the number of concurrent calls to WriteRows() in
|
||||
// order to limit memory usage.
|
||||
func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
if vms.s.IsReadOnly() {
|
||||
return errReadOnly
|
||||
}
|
||||
vms.resetCacheIfNeeded(rows)
|
||||
|
||||
vms.s.AddRows(rows, uint8(*precisionBits))
|
||||
return nil
|
||||
}
|
||||
@@ -120,26 +144,41 @@ func (vms *VMStorage) WriteRows(rows []storage.MetricRow) error {
|
||||
// The caller should limit the number of concurrent calls to WriteMetadata() in
|
||||
// order to limit memory usage.
|
||||
func (vms *VMStorage) WriteMetadata(rows []metricsmetadata.Row) error {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
if vms.s.IsReadOnly() {
|
||||
return errReadOnly
|
||||
}
|
||||
vms.s.AddMetadataRows(rows)
|
||||
return nil
|
||||
}
|
||||
|
||||
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
||||
|
||||
// IsReadOnly returns true is the storage is in read-only mode.
|
||||
func (vms *VMStorage) IsReadOnly() bool {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
return vms.s.IsReadOnly()
|
||||
}
|
||||
|
||||
func (vms *VMStorage) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
vms.wg.Add(1)
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
|
||||
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
vms.wg.Done()
|
||||
return nil, err
|
||||
}
|
||||
if len(tfss) == 0 {
|
||||
vms.wg.Done()
|
||||
return nil, fmt.Errorf("missing tag filters")
|
||||
}
|
||||
bi := getBlockIterator()
|
||||
bi.wgDone = vms.wg.Done
|
||||
bi.sr.Init(qt, vms.s, tfss, tr, maxMetrics, deadline)
|
||||
if err := bi.sr.Error(); err != nil {
|
||||
bi.MustClose()
|
||||
@@ -161,8 +200,9 @@ func (vms *VMStorage) getMaxMetrics(searchQueryLimit int) int {
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
sr storage.Search
|
||||
mb storage.MetricBlock
|
||||
wgDone func()
|
||||
}
|
||||
|
||||
var blockIteratorsPool sync.Pool
|
||||
@@ -171,6 +211,8 @@ func (bi *blockIterator) MustClose() {
|
||||
bi.sr.MustClose()
|
||||
bi.mb.MetricName = nil
|
||||
bi.mb.Block.Reset()
|
||||
bi.wgDone()
|
||||
bi.wgDone = nil
|
||||
blockIteratorsPool.Put(bi)
|
||||
}
|
||||
|
||||
@@ -197,8 +239,63 @@ func (bi *blockIterator) Error() error {
|
||||
return bi.sr.Error()
|
||||
}
|
||||
|
||||
// GetSearch sets up an instance of storage search and returns it to the caller
|
||||
// along with the max series count that the search can return.
|
||||
//
|
||||
// This method is not part of the vmselectapi.API and must only be used by
|
||||
// vmsingle HTTP handlers.
|
||||
//
|
||||
// Callers of this method must call PutSearch() once the search instance is not
|
||||
// needed anymore.
|
||||
func (vms *VMStorage) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
|
||||
vms.wg.Add(1)
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := vms.getMaxMetrics(sq.MaxMetrics)
|
||||
tfss, err := vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
vms.wg.Done()
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sr := getSearch()
|
||||
maxSeriesCount := sr.Init(qt, vms.s, tfss, tr, sq.MaxMetrics, deadline)
|
||||
return sr, maxSeriesCount, nil
|
||||
}
|
||||
|
||||
// PutSearch resets the search once it is not needed anymore and puts it aside
|
||||
// for future reuse.
|
||||
//
|
||||
// This method is not part of the vmselectapi.API and must only be used by
|
||||
// vmsingle HTTP handlers.
|
||||
//
|
||||
// The method must only be used on search instances that have been created with
|
||||
// GetSearch().
|
||||
func (vms *VMStorage) PutSearch(sr *storage.Search) {
|
||||
putSearch(sr)
|
||||
vms.wg.Done()
|
||||
}
|
||||
|
||||
func getSearch() *storage.Search {
|
||||
v := ssPool.Get()
|
||||
if v == nil {
|
||||
return &storage.Search{}
|
||||
}
|
||||
return v.(*storage.Search)
|
||||
}
|
||||
|
||||
func putSearch(sr *storage.Search) {
|
||||
sr.MustClose()
|
||||
ssPool.Put(sr)
|
||||
}
|
||||
|
||||
var ssPool sync.Pool
|
||||
|
||||
// SearchMetricNames returns metric names for the given tfss on the given tr.
|
||||
func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
@@ -219,6 +316,9 @@ func (vms *VMStorage) SearchMetricNames(qt *querytracer.Tracer, sq *storage.Sear
|
||||
// SearchLabelValues searches for label values for the given labelName, tfss and
|
||||
// tr.
|
||||
func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
if maxLabelValues <= 0 || maxLabelValues > *maxTagValues {
|
||||
maxLabelValues = *maxTagValues
|
||||
@@ -244,6 +344,9 @@ func (vms *VMStorage) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuer
|
||||
// similar APIs.
|
||||
func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
|
||||
maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
if maxSuffixes <= 0 || maxSuffixes > *maxTagValueSuffixesPerSearch {
|
||||
maxSuffixes = *maxTagValueSuffixesPerSearch
|
||||
}
|
||||
@@ -260,6 +363,9 @@ func (vms *VMStorage) TagValueSuffixes(qt *querytracer.Tracer, _, _ uint32, tr s
|
||||
|
||||
// SearchLabelNames searches for tag keys matching the given tfss on tr.
|
||||
func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
if maxLabelNames <= 0 || maxLabelNames > *maxTagKeys {
|
||||
maxLabelNames = *maxTagKeys
|
||||
@@ -278,6 +384,8 @@ func (vms *VMStorage) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery
|
||||
}
|
||||
|
||||
func (vms *VMStorage) SeriesCount(_ *querytracer.Tracer, _, _ uint32, deadline uint64) (uint64, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
return vms.s.GetSeriesCount(deadline)
|
||||
}
|
||||
|
||||
@@ -287,6 +395,9 @@ func (vms *VMStorage) Tenants(_ *querytracer.Tracer, _ storage.TimeRange, _ uint
|
||||
|
||||
// GetTSDBStatus returns TSDB status for given filters on the given date.
|
||||
func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
@@ -306,6 +417,9 @@ func (vms *VMStorage) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery
|
||||
//
|
||||
// Returns the number of deleted series.
|
||||
func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := sq.MaxMetrics
|
||||
if maxMetrics <= 0 {
|
||||
@@ -324,17 +438,26 @@ func (vms *VMStorage) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQue
|
||||
}
|
||||
|
||||
func (vms *VMStorage) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
vms.s.RegisterMetricNames(qt, mrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMetricNamesUsageStats returns metric name usage stats.
|
||||
func (vms *VMStorage) GetMetricNamesUsageStats(qt *querytracer.Tracer, _ *storage.TenantToken, limit, le int, matchPattern string, _ uint64) (metricnamestats.StatsResult, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
return vms.s.GetMetricNamesStats(qt, limit, le, matchPattern), nil
|
||||
}
|
||||
|
||||
// ResetMetricNamesStats resets state for metric names usage tracker
|
||||
func (vms *VMStorage) ResetMetricNamesUsageStats(qt *querytracer.Tracer, _ uint64) error {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
|
||||
vms.s.ResetMetricNamesStats(qt)
|
||||
return nil
|
||||
}
|
||||
@@ -371,6 +494,8 @@ func (vms *VMStorage) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery,
|
||||
}
|
||||
|
||||
func (vms *VMStorage) GetMetadataRecords(qt *querytracer.Tracer, _ *storage.TenantToken, limit int, metricName string, _ uint64) ([]*metricsmetadata.Row, error) {
|
||||
vms.wg.Add(1)
|
||||
defer vms.wg.Done()
|
||||
return vms.s.GetMetadataRows(qt, limit, metricName), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,213 +0,0 @@
|
||||
package vmstorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
||||
)
|
||||
|
||||
// newVMStorageSingleNode creates a new instance of of VMStorage for vmsingle.
|
||||
func newVMStorageSingleNode(s *storage.Storage, maxConcurrentRequests int, resetCacheIfNeeded func(mrs []storage.MetricRow)) *VMStorageSingleNode {
|
||||
vms := newVMStorage(s, maxConcurrentRequests)
|
||||
return &VMStorageSingleNode{
|
||||
vms: vms,
|
||||
wg: syncwg.WaitGroup{},
|
||||
resetCacheIfNeeded: resetCacheIfNeeded,
|
||||
}
|
||||
}
|
||||
|
||||
type VMStorageSingleNode struct {
|
||||
vms *VMStorage
|
||||
|
||||
// wg is used to wrap every storage call into wg.Add(1) ... wg.Done()
|
||||
// for proper graceful shutdown when Stop is called.
|
||||
//
|
||||
// Use syncwg instead of sync, since Add is called from concurrent
|
||||
// goroutines.
|
||||
wg syncwg.WaitGroup
|
||||
|
||||
// resetCacheIfNeeded is a callback for automatic resetting of response
|
||||
// cache if needed.
|
||||
resetCacheIfNeeded func(mrs []storage.MetricRow)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) Stop() {
|
||||
vmssn.wg.WaitAndBlock()
|
||||
vmssn.vms.Stop()
|
||||
}
|
||||
|
||||
// WriteRows writes metric rows to the storage.
|
||||
//
|
||||
// Returns an error if the storage is in read-only mode.
|
||||
//
|
||||
// The caller should limit the number of concurrent calls to WriteRows() in
|
||||
// order to limit memory usage.
|
||||
func (vmssn *VMStorageSingleNode) WriteRows(rows []storage.MetricRow) error {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
|
||||
if vmssn.vms.IsReadOnly() {
|
||||
return errReadOnly
|
||||
}
|
||||
vmssn.resetCacheIfNeeded(rows)
|
||||
return vmssn.vms.WriteRows(rows)
|
||||
}
|
||||
|
||||
// WriteMetadata writes metrics metadata to storage.
|
||||
//
|
||||
// Returns an error if the storage is in read-only mode.
|
||||
//
|
||||
// The caller should limit the number of concurrent calls to WriteMetadata() in
|
||||
// order to limit memory usage.
|
||||
func (vmssn *VMStorageSingleNode) WriteMetadata(rows []metricsmetadata.Row) error {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
|
||||
if vmssn.vms.IsReadOnly() {
|
||||
return errReadOnly
|
||||
}
|
||||
return vmssn.vms.WriteMetadata(rows)
|
||||
}
|
||||
|
||||
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
||||
|
||||
func (vmssn *VMStorageSingleNode) IsReadOnly() bool {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.IsReadOnly()
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
return nil, fmt.Errorf("not implemented in vmsingle")
|
||||
}
|
||||
|
||||
// GetSearch sets up an instance of storage search and returns it to the caller
|
||||
// along with the max series count that the search can return.
|
||||
//
|
||||
// This method is not part of the vmselectapi.API and must only be used by
|
||||
// vmsingle HTTP handlers.
|
||||
//
|
||||
// Callers of this method must call PutSearch() once the search instance is not
|
||||
// needed anymore.
|
||||
func (vmssn *VMStorageSingleNode) GetSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (*storage.Search, int, error) {
|
||||
vmssn.wg.Add(1)
|
||||
|
||||
tr := sq.GetTimeRange()
|
||||
maxMetrics := vmssn.vms.getMaxMetrics(sq.MaxMetrics)
|
||||
tfss, err := vmssn.vms.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
||||
if err != nil {
|
||||
vmssn.wg.Done()
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
sr := getSearch()
|
||||
maxSeriesCount := sr.Init(qt, vmssn.vms.s, tfss, tr, sq.MaxMetrics, deadline)
|
||||
return sr, maxSeriesCount, nil
|
||||
}
|
||||
|
||||
// PutSearch resets the search once it is not needed anymore and puts it aside
|
||||
// for future reuse.
|
||||
//
|
||||
// This method is not part of the vmselectapi.API and must only be used by
|
||||
// vmsingle HTTP handlers.
|
||||
//
|
||||
// The method must only be used on search instances that have been created with
|
||||
// GetSearch().
|
||||
func (vmssn *VMStorageSingleNode) PutSearch(sr *storage.Search) {
|
||||
putSearch(sr)
|
||||
vmssn.wg.Done()
|
||||
}
|
||||
|
||||
func getSearch() *storage.Search {
|
||||
v := ssPool.Get()
|
||||
if v == nil {
|
||||
return &storage.Search{}
|
||||
}
|
||||
return v.(*storage.Search)
|
||||
}
|
||||
|
||||
func putSearch(sr *storage.Search) {
|
||||
sr.MustClose()
|
||||
ssPool.Put(sr)
|
||||
}
|
||||
|
||||
var ssPool sync.Pool
|
||||
|
||||
func (vmssn *VMStorageSingleNode) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.SearchMetricNames(qt, sq, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.LabelValues(qt, sq, labelName, maxLabelValues, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.TagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.LabelNames(qt, sq, maxLabelNames, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.SeriesCount(qt, accountID, projectID, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.Tenants(qt, tr, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.TSDBStatus(qt, sq, focusLabel, topN, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.DeleteSeries(qt, sq, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.RegisterMetricNames(qt, mrs, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) GetMetricNamesUsageStats(qt *querytracer.Tracer, tt *storage.TenantToken, limit, le int, matchPattern string, deadline uint64) (metricnamestats.StatsResult, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.GetMetricNamesUsageStats(qt, tt, limit, le, matchPattern, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) ResetMetricNamesUsageStats(qt *querytracer.Tracer, deadline uint64) error {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.ResetMetricNamesUsageStats(qt, deadline)
|
||||
}
|
||||
|
||||
func (vmssn *VMStorageSingleNode) GetMetadataRecords(qt *querytracer.Tracer, tt *storage.TenantToken, limit int, metricName string, deadline uint64) ([]*metricsmetadata.Row, error) {
|
||||
vmssn.wg.Add(1)
|
||||
defer vmssn.wg.Done()
|
||||
return vmssn.vms.GetMetadataRecords(qt, tt, limit, metricName, deadline)
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func TestGetMaxMetrics(t *testing.T) {
|
||||
t.Helper()
|
||||
*maxUniqueTimeseries = storageMaxUniqueTimeseries
|
||||
s := storage.MustOpenStorage(t.Name(), storage.OpenOptions{})
|
||||
vms := newVMStorage(s, maxConcurrentRequests)
|
||||
vms := newVMStorage(s, maxConcurrentRequests, func(mrs []storage.MetricRow) {})
|
||||
defer vms.Stop()
|
||||
maxMetrics := vms.getMaxMetrics(searchQueryLimit)
|
||||
if maxMetrics != expect {
|
||||
|
||||
Reference in New Issue
Block a user