app/{vmstorage,vmselect,vminsert}: introduce metrics metadata storage

This commits adds storage part and cluster RPC methods for metrics metadata.

 Key concepts:
* vmstorage persists metadata in-memory only.
* vmstorage evicts metadata records older than 1 hour.
* vmstorage stores only the last value of metadata for time series
  metric name.
* vminsert opens an additional TCP connection to the vmstorage for
  metadata write requests.
* vmselect doesn't support `limit_per_metric_name`.

This feature is available optional and must be enabled via flag - `-enableMetadata` provided to vminsert/vmsingle.

Fixes github.com/VictoriaMetrics/VictoriaMetrics/issues/2974
This commit is contained in:
f41gh7
2025-11-14 09:21:08 +01:00
parent 847cd1e336
commit 5a587f2006
29 changed files with 2051 additions and 217 deletions

View File

@@ -11,9 +11,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
)
@@ -50,8 +52,9 @@ var (
type InsertCtx struct {
Labels sortedLabels
mrs []storage.MetricRow
metricNamesBuf []byte
mrs []storage.MetricRow
mms []metricsmetadata.Row
metricNameBuf []byte
relabelCtx relabel.Ctx
streamAggrCtx streamAggrCtx
@@ -73,8 +76,13 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
}
mrs = slicesutil.SetLength(mrs, rowsLen)
ctx.mrs = mrs[:0]
mms := ctx.mms
for i := range mms {
cleanMetricMetadata(&mms[i])
}
ctx.mms = mms[:0]
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
ctx.metricNameBuf = ctx.metricNameBuf[:0]
ctx.relabelCtx.Reset()
ctx.streamAggrCtx.Reset()
ctx.skipStreamAggr = false
@@ -84,11 +92,20 @@ func cleanMetricRow(mr *storage.MetricRow) {
mr.MetricNameRaw = nil
}
func cleanMetricMetadata(mm *metricsmetadata.Row) {
mm.MetricFamilyName = nil
mm.Unit = nil
mm.Help = nil
mm.Type = 0
mm.ProjectID = 0
mm.AccountID = 0
}
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
start := len(ctx.metricNamesBuf)
ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...)
ctx.metricNamesBuf = storage.MarshalMetricNameRaw(ctx.metricNamesBuf, labels)
metricNameRaw := ctx.metricNamesBuf[start:]
start := len(ctx.metricNameBuf)
ctx.metricNameBuf = append(ctx.metricNameBuf, prefix...)
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf, labels)
metricNameRaw := ctx.metricNameBuf[start:]
return metricNameRaw[:len(metricNameRaw):len(metricNameRaw)]
}
@@ -143,7 +160,7 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6
mr.MetricNameRaw = metricNameRaw
mr.Timestamp = timestamp
mr.Value = value
if len(ctx.metricNamesBuf) > 16*1024*1024 {
if len(ctx.metricNameBuf) > 16*1024*1024 {
if err := ctx.FlushBufs(); err != nil {
return err
}
@@ -151,6 +168,55 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6
return nil
}
// WriteMetadata writes given prometheus protobuf metadata into the storage.
func (ctx *InsertCtx) WriteMetadata(mmpbs []prompb.MetricMetadata) error {
if len(mmpbs) == 0 {
return nil
}
mms := ctx.mms
mms = slicesutil.SetLength(mms, len(mmpbs))
for idx, mmpb := range mmpbs {
mm := &mms[idx]
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.MetricFamilyName)
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
mm.Type = mmpb.Type
mm.Unit = bytesutil.ToUnsafeBytes(mmpb.Unit)
}
err := vmstorage.AddMetadataRows(mms)
if err != nil {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot store metrics metadata: %w", err),
StatusCode: http.StatusServiceUnavailable,
}
}
return nil
}
// WritePromMetadata writes given prometheus metric metadata into the storage
func (ctx *InsertCtx) WritePromMetadata(mmps []prometheus.Metadata) error {
if len(mmps) == 0 {
return nil
}
mms := ctx.mms
mms = slicesutil.SetLength(mms, len(mmps))
for idx, mmpb := range mmps {
mm := &mms[idx]
mm.MetricFamilyName = bytesutil.ToUnsafeBytes(mmpb.Metric)
mm.Help = bytesutil.ToUnsafeBytes(mmpb.Help)
mm.Type = mmpb.Type
}
err := vmstorage.AddMetadataRows(mms)
if err != nil {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot store prometheus metrics metadata: %w", err),
StatusCode: http.StatusServiceUnavailable,
}
}
return nil
}
// AddLabelBytes adds (name, value) label to ctx.Labels.
//
// name and value must exist until ctx.Labels is used.

View File

@@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/firehose"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
@@ -14,8 +15,9 @@ import (
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentelemetry"}`)
metadataInserted = metrics.NewCounter(`vm_metadata_rows_inserted_total{type="opentelemetry"}`)
)
// InsertHandler processes opentelemetry metrics.
@@ -33,12 +35,12 @@ func InsertHandler(req *http.Request) error {
return fmt.Errorf("json encoding isn't supported for opentelemetry format. Use protobuf encoding")
}
}
return stream.ParseStream(req.Body, encoding, processBody, func(tss []prompb.TimeSeries, _ []prompb.MetricMetadata) error {
return insertRows(tss, extraLabels)
return stream.ParseStream(req.Body, encoding, processBody, func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error {
return insertRows(tss, mms, extraLabels)
})
}
func insertRows(tss []prompb.TimeSeries, extraLabels []prompb.Label) error {
func insertRows(tss []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@@ -75,5 +77,14 @@ func insertRows(tss []prompb.TimeSeries, extraLabels []prompb.Label) error {
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
if err := ctx.FlushBufs(); err != nil {
return fmt.Errorf("cannot flush metric bufs: %w", err)
}
if prommetadata.IsEnabled() {
if err := ctx.WriteMetadata(mms); err != nil {
return err
}
metadataInserted.Add(len(mms))
}
return nil
}

View File

@@ -1,6 +1,7 @@
package prometheusimport
import (
"fmt"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
@@ -15,8 +16,9 @@ import (
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="prometheus"}`)
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="prometheus"}`)
metadataInserted = metrics.NewCounter(`vm_metadata_rows_inserted_total{type="prometheus"}`)
)
// InsertHandler processes `/api/v1/import/prometheus` request.
@@ -30,14 +32,14 @@ func InsertHandler(req *http.Request) error {
return err
}
encoding := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, defaultTimestamp, encoding, true, prommetadata.IsEnabled(), func(rows []prometheus.Row, _ []prometheus.Metadata) error {
return insertRows(rows, extraLabels)
return stream.Parse(req.Body, defaultTimestamp, encoding, true, prommetadata.IsEnabled(), func(rows []prometheus.Row, mms []prometheus.Metadata) error {
return insertRows(rows, mms, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)
})
}
func insertRows(rows []prometheus.Row, extraLabels []prompb.Label) error {
func insertRows(rows []prometheus.Row, mms []prometheus.Metadata, extraLabels []prompb.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@@ -64,5 +66,15 @@ func insertRows(rows []prometheus.Row, extraLabels []prompb.Label) error {
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
if err := ctx.FlushBufs(); err != nil {
return fmt.Errorf("cannot flush metric bufs: %w", err)
}
if prommetadata.IsEnabled() {
if err := ctx.WritePromMetadata(mms); err != nil {
return err
}
metadataInserted.Add(len(mms))
}
return nil
}

View File

@@ -1,10 +1,12 @@
package promremotewrite
import (
"fmt"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
@@ -12,8 +14,9 @@ import (
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promremotewrite"}`)
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promremotewrite"}`)
metadataInserted = metrics.NewCounter(`vm_metadata_rows_inserted_total{type="promremotewrite"}`)
)
// InsertHandler processes remote write for prometheus.
@@ -23,12 +26,12 @@ func InsertHandler(req *http.Request) error {
return err
}
isVMRemoteWrite := req.Header.Get("Content-Encoding") == "zstd"
return stream.Parse(req.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries, _ []prompb.MetricMetadata) error {
return insertRows(tss, extraLabels)
return stream.Parse(req.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries, mms []prompb.MetricMetadata) error {
return insertRows(tss, mms, extraLabels)
})
}
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompb.Label) error {
func insertRows(timeseries []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
@@ -68,5 +71,15 @@ func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompb.Label) erro
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
if err := ctx.FlushBufs(); err != nil {
return fmt.Errorf("cannot flush metric bufs: %w", err)
}
if prommetadata.IsEnabled() {
if err := ctx.WriteMetadata(mms); err != nil {
return err
}
metadataInserted.Add(len(mms))
}
return nil
}

View File

@@ -421,6 +421,16 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "/api/v1/metadata":
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
metadataRequests.Inc()
if err := prometheus.MetadataHandler(qt, startTime, w, r); err != nil {
metadataErrors.Inc()
httpserver.SendPrometheusError(w, r, err)
return true
}
return true
default:
return false
}
@@ -574,12 +584,6 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"status":"success","data":{"notifiers":[]}}`)
return true
case "/api/v1/metadata":
// Return dumb placeholder for https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
metadataRequests.Inc()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "%s", `{"status":"success","data":{}}`)
return true
case "/api/v1/status/buildinfo":
buildInfoRequests.Inc()
w.Header().Set("Content-Type", "application/json")
@@ -708,7 +712,9 @@ var (
alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/alerts"}`)
notifiersRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/notifiers"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`)
metadataErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/metadata"}`)
buildInfoRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/buildinfo"}`)
queryExemplarsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/query_exemplars"}`)

View File

@@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
)
var (
@@ -865,6 +866,23 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
return labelValues, nil
}
// GetMetricsMetadata returns time series metric names metadata for the given args
func GetMetricsMetadata(qt *querytracer.Tracer, limit int, metricName string) ([]*metricsmetadata.Row, error) {
qt = qt.NewChild("get metrics metadata: limit=%d, metric_name=%q", limit, metricName)
defer qt.Done()
metadata := vmstorage.Storage.GetMetadataRows(qt, limit, metricName)
sort.Slice(metadata, func(i, j int) bool {
return string(metadata[i].MetricFamilyName) < string(metadata[j].MetricFamilyName)
})
if limit > 0 && len(metadata) >= limit {
metadata = metadata[:limit]
}
return metadata, nil
}
// GraphiteTagValues returns tag values for the given tagName until the given deadline.
func GraphiteTagValues(qt *querytracer.Tracer, tagName, filter string, limit int, deadline searchutil.Deadline) ([]string, error) {
qt = qt.NewChild("get graphite tag values for tagName=%s, filter=%s, limit=%d", tagName, filter, limit)

View File

@@ -0,0 +1,36 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
) %}
{% stripspace %}
MetadataResponse generates response for /api/v1/metadata
See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
{% func MetadataResponse( result []*metricsmetadata.Row, qt *querytracer.Tracer) %}
{
"status":"success",
"data": {
{% code
mapItems := len(result)
currentItem := 0
%}
{% for _, row := range result %}
"{%s string(row.MetricFamilyName) %}": [
{
"type": {%q= prompb.MetricMetadataTypeToString(row.Type) %},
{% if len(row.Unit) > 0 -%}
"unit": {%q= string(row.Unit) %},
{% endif -%}
"help": {%q= string(row.Help) %}
}
]
{% if currentItem != mapItems-1 %},{% endif %}
{% code currentItem++ %}
{% endfor %}
}
{%= dumpQueryTrace(qt) %}
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,109 @@
// Code generated by qtc from "metadata_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/prometheus/metadata_response.qtpl:1
package prometheus
//line app/vmselect/prometheus/metadata_response.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
)
// MetadataResponse generates response for /api/v1/metadataSee https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
//line app/vmselect/prometheus/metadata_response.qtpl:10
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/metadata_response.qtpl:10
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/metadata_response.qtpl:10
func StreamMetadataResponse(qw422016 *qt422016.Writer, result []*metricsmetadata.Row, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/metadata_response.qtpl:10
qw422016.N().S(`{"status":"success","data": {`)
//line app/vmselect/prometheus/metadata_response.qtpl:15
mapItems := len(result)
currentItem := 0
//line app/vmselect/prometheus/metadata_response.qtpl:18
for _, row := range result {
//line app/vmselect/prometheus/metadata_response.qtpl:18
qw422016.N().S(`"`)
//line app/vmselect/prometheus/metadata_response.qtpl:19
qw422016.E().S(string(row.MetricFamilyName))
//line app/vmselect/prometheus/metadata_response.qtpl:19
qw422016.N().S(`": [{"type":`)
//line app/vmselect/prometheus/metadata_response.qtpl:21
qw422016.N().Q(prompb.MetricMetadataTypeToString(row.Type))
//line app/vmselect/prometheus/metadata_response.qtpl:21
qw422016.N().S(`,`)
//line app/vmselect/prometheus/metadata_response.qtpl:22
if len(row.Unit) > 0 {
//line app/vmselect/prometheus/metadata_response.qtpl:22
qw422016.N().S(`"unit":`)
//line app/vmselect/prometheus/metadata_response.qtpl:23
qw422016.N().Q(string(row.Unit))
//line app/vmselect/prometheus/metadata_response.qtpl:23
qw422016.N().S(`,`)
//line app/vmselect/prometheus/metadata_response.qtpl:24
}
//line app/vmselect/prometheus/metadata_response.qtpl:24
qw422016.N().S(`"help":`)
//line app/vmselect/prometheus/metadata_response.qtpl:25
qw422016.N().Q(string(row.Help))
//line app/vmselect/prometheus/metadata_response.qtpl:25
qw422016.N().S(`}]`)
//line app/vmselect/prometheus/metadata_response.qtpl:28
if currentItem != mapItems-1 {
//line app/vmselect/prometheus/metadata_response.qtpl:28
qw422016.N().S(`,`)
//line app/vmselect/prometheus/metadata_response.qtpl:28
}
//line app/vmselect/prometheus/metadata_response.qtpl:29
currentItem++
//line app/vmselect/prometheus/metadata_response.qtpl:30
}
//line app/vmselect/prometheus/metadata_response.qtpl:30
qw422016.N().S(`}`)
//line app/vmselect/prometheus/metadata_response.qtpl:32
streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/metadata_response.qtpl:32
qw422016.N().S(`}`)
//line app/vmselect/prometheus/metadata_response.qtpl:34
}
//line app/vmselect/prometheus/metadata_response.qtpl:34
func WriteMetadataResponse(qq422016 qtio422016.Writer, result []*metricsmetadata.Row, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/metadata_response.qtpl:34
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/metadata_response.qtpl:34
StreamMetadataResponse(qw422016, result, qt)
//line app/vmselect/prometheus/metadata_response.qtpl:34
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/metadata_response.qtpl:34
}
//line app/vmselect/prometheus/metadata_response.qtpl:34
func MetadataResponse(result []*metricsmetadata.Row, qt *querytracer.Tracer) string {
//line app/vmselect/prometheus/metadata_response.qtpl:34
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/metadata_response.qtpl:34
WriteMetadataResponse(qb422016, result, qt)
//line app/vmselect/prometheus/metadata_response.qtpl:34
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/metadata_response.qtpl:34
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/metadata_response.qtpl:34
return qs422016
//line app/vmselect/prometheus/metadata_response.qtpl:34
}

View File

@@ -639,6 +639,37 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return nil
}
// MetadataHandler processes /api/v1/metadata request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
func MetadataHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, r *http.Request) error {
limit, err := httputil.GetInt(r, "limit")
if err != nil {
return err
}
if limit < 0 {
limit = 0
}
metricName := r.FormValue("metric")
metadata, err := netstorage.GetMetricsMetadata(qt, limit, metricName)
if err != nil {
return fmt.Errorf("cannot get metadata: %w", err)
}
qt.Done()
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteMetadataResponse(bw, metadata, qt)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send metadata response to remote client: %w", err)
}
return nil
}
var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`)
// SeriesCountHandler processes /api/v1/series/count request.

View File

@@ -22,6 +22,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
@@ -90,6 +91,9 @@ var (
"In most cases, this value should not be changed. The maximum allowed value is 23h.")
logNewSeriesAuthKey = flagutil.NewPassword("logNewSeriesAuthKey", "authKey, which must be passed in query string to /internal/log_new_series. It overrides -httpAuth.*")
metadataStorageSize = flagutil.NewBytes("storage.maxMetadataStorageSize", 0, "Overrides max size for metrics metadata entries in-memory storage. "+
"If set to 0 or a negative value, defaults to 1% of allowed memory.")
)
// CheckTimeRange returns true if the given tr is denied for querying.
@@ -120,6 +124,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN())
storage.SetMetricNamesStatsCacheSize(cacheSizeMetricNamesStats.IntN())
storage.SetMetricNameCacheSize(cacheSizeStorageMetricName.IntN())
storage.SetMetadataStorageSize(metadataStorageSize.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
@@ -194,6 +199,19 @@ func AddRows(mrs []storage.MetricRow) error {
return nil
}
// AddMetadataRows adds mrs to the storage.
//
// The caller should limit the number of concurrent calls to AddMetadataRows() in order to limit memory usage.
func AddMetadataRows(mms []metricsmetadata.Row) error {
if Storage.IsReadOnly() {
return errReadOnly
}
WG.Add(1)
Storage.AddMetadataRows(mms)
WG.Done()
return nil
}
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
// RegisterMetricNames registers all the metrics from mrs in the storage.
@@ -689,6 +707,11 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize)
metrics.WriteGaugeUint64(w, `vm_metrics_metadata_storage_items`, m.MetadataStorageItemsCurrent)
metrics.WriteCounterUint64(w, `vm_metrics_metadata_storage_size_bytes`, m.MetadataStorageCurrentSizeBytes)
metrics.WriteCounterUint64(w, `vm_metrics_metadata_storage_max_size_bytes`, m.MetadataStorageMaxSizeBytes)
}
func jsonResponseError(w http.ResponseWriter, err error) {

View File

@@ -25,6 +25,7 @@ type PrometheusQuerier interface {
PrometheusAPIV1Labels(t *testing.T, query string, opts QueryOpts) *PrometheusAPIV1LabelsResponse
PrometheusAPIV1LabelValues(t *testing.T, labelName, query string, opts QueryOpts) *PrometheusAPIV1LabelValuesResponse
PrometheusAPIV1ExportNative(t *testing.T, query string, opts QueryOpts) []byte
PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata
APIV1AdminTSDBDeleteSeries(t *testing.T, matchQuery string, opts QueryOpts)
@@ -37,7 +38,7 @@ type PrometheusQuerier interface {
// Writer contains methods for writing new data
type Writer interface {
// Prometheus APIs
PrometheusAPIV1Write(t *testing.T, records []prompb.TimeSeries, opts QueryOpts)
PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts)
PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts)
PrometheusAPIV1ImportCSV(t *testing.T, records []string, opts QueryOpts)
PrometheusAPIV1ImportNative(t *testing.T, data []byte, opts QueryOpts)
@@ -350,6 +351,33 @@ func NewPrometheusAPIV1LabelValuesResponse(t *testing.T, s string) *PrometheusAP
return res
}
// PrometheusAPIV1Metadata is an inmemory representation of the
// /prometheus/api/v1/metadata response.
type PrometheusAPIV1Metadata struct {
Status string
IsPartial bool
Data map[string][]MetadataEntry
Trace *Trace
}
type MetadataEntry struct {
Type string
Help string
Unit string
}
// NewPrometheusAPIV1Metadata is a test helper function that creates a new
// instance of PrometheusAPIV1Metadata by unmarshalling a json string.
func NewPrometheusAPIV1Metadata(t *testing.T, s string) *PrometheusAPIV1Metadata {
t.Helper()
res := &PrometheusAPIV1Metadata{}
if err := json.Unmarshal([]byte(s), res); err != nil {
t.Fatalf("could not unmarshal series response data:\n%s\n err: %v", string(s), err)
}
return res
}
// Trace provides the description and the duration of some unit of work that has
// been performed during the request processing.
type Trace struct {

View File

@@ -99,37 +99,39 @@ func testDeduplication(tc *apptest.TestCase, sut apptest.PrometheusWriteQuerier,
ts3 := start.Add(3 * time.Second).UnixMilli()
ts5 := start.Add(5 * time.Second).UnixMilli()
ts10 := start.Add(10 * time.Second).UnixMilli()
data := []prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}},
Samples: []prompb.Sample{
{Timestamp: ts1, Value: 3},
{Timestamp: ts3, Value: 10},
{Timestamp: ts5, Value: 5},
data := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}},
Samples: []prompb.Sample{
{Timestamp: ts1, Value: 3},
{Timestamp: ts3, Value: 10},
{Timestamp: ts5, Value: 5},
},
},
},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric2"}},
Samples: []prompb.Sample{
{Timestamp: ts1, Value: 3},
{Timestamp: ts3, Value: decimal.StaleNaN},
{Timestamp: ts5, Value: 5},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric2"}},
Samples: []prompb.Sample{
{Timestamp: ts1, Value: 3},
{Timestamp: ts3, Value: decimal.StaleNaN},
{Timestamp: ts5, Value: 5},
},
},
},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric3"}},
Samples: []prompb.Sample{
{Timestamp: ts10, Value: 30},
{Timestamp: ts10, Value: 100},
{Timestamp: ts10, Value: 50},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric3"}},
Samples: []prompb.Sample{
{Timestamp: ts10, Value: 30},
{Timestamp: ts10, Value: 100},
{Timestamp: ts10, Value: 50},
},
},
},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric4"}},
Samples: []prompb.Sample{
{Timestamp: ts10, Value: 30},
{Timestamp: ts10, Value: decimal.StaleNaN},
{Timestamp: ts10, Value: 50},
{
Labels: []prompb.Label{{Name: "__name__", Value: "metric4"}},
Samples: []prompb.Sample{
{Timestamp: ts10, Value: 30},
{Timestamp: ts10, Value: decimal.StaleNaN},
{Timestamp: ts10, Value: 50},
},
},
},
}

View File

@@ -158,7 +158,11 @@ func TestSingleIngestionProtocols(t *testing.T) {
// prometheus text exposition format
sut.PrometheusAPIV1ImportPrometheus(t, []string{
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`# HELP importprometheus_series some help message`,
`# TYPE importprometheus_series gauge`,
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`# HELP importprometheus_series2 some help message second one`,
`# TYPE importprometheus_series2 gauge`,
`importprometheus_series2{label="foo",label1="value1"} 20 1707123456800`, // 2024-02-05T08:57:36.800Z
}, apptest.QueryOpts{
ExtraLabels: []string{"el1=elv1", "el2=elv2"},
@@ -187,42 +191,58 @@ func TestSingleIngestionProtocols(t *testing.T) {
})
// prometheus remote write format
pbData := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
pbData := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
},
},
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
},
},
},
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
},
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
Metadata: []prompb.MetricMetadata{
{
Type: 1,
MetricFamilyName: "prometheusrw_series",
Help: "some help",
Unit: "",
},
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
{
Type: 1,
MetricFamilyName: "prometheusrw_series2",
Help: "some help2",
Unit: "",
},
},
}
@@ -245,7 +265,6 @@ func TestSingleIngestionProtocols(t *testing.T) {
{Timestamp: 1707123456800, Value: 20}, // 2024-02-05T08:57:36.700Z
},
})
}
func TestClusterIngestionProtocols(t *testing.T) {
@@ -297,7 +316,11 @@ func TestClusterIngestionProtocols(t *testing.T) {
// prometheus text exposition format
vminsert.PrometheusAPIV1ImportPrometheus(t, []string{
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`# HELP importprometheus_series some help message`,
`# TYPE importprometheus_series gauge`,
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`# HELP importprometheus_series2 some help message second one`,
`# TYPE importprometheus_series2 gauge`,
`importprometheus_series2{label="foo",label1="value1"} 20 1707123456800`, // 2024-02-05T08:57:36.800Z
}, apptest.QueryOpts{
ExtraLabels: []string{"el1=elv1", "el2=elv2"},
@@ -434,42 +457,58 @@ func TestClusterIngestionProtocols(t *testing.T) {
})
// prometheus remote write format
pbData := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
pbData := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
},
},
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
},
},
},
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
},
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
Metadata: []prompb.MetricMetadata{
{
Type: 1,
MetricFamilyName: "prometheusrw_series",
Help: "some help",
Unit: "",
},
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
{
Type: 1,
MetricFamilyName: "prometheusrw_series2",
Help: "some help2",
Unit: "",
},
},
}

View File

@@ -0,0 +1,132 @@
package tests
import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func TestClusterMetricsMetadata(t *testing.T) {
fs.MustRemoveDir(t.Name())
tc := apptest.NewTestCase(t)
defer tc.Stop()
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-1",
"-retentionPeriod=100y",
})
vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
"-retentionPeriod=100y",
})
vminsert1 := tc.MustStartVminsert("vminsert1", []string{
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VminsertAddr(), vmstorage2.VminsertAddr()),
"-enableMetadata",
})
vminsert2 := tc.MustStartVminsert("vminsert-2", []string{
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VminsertAddr(), vmstorage2.VminsertAddr()),
"-enableMetadata",
})
vminsertGlobal := tc.MustStartVminsert("vminsert-global", []string{
fmt.Sprintf("-storageNode=%s,%s", vminsert1.ClusternativeListenAddr(), vminsert2.ClusternativeListenAddr()),
"-enableMetadata",
})
vmselect := tc.MustStartVmselect("vmselect", []string{
fmt.Sprintf("-storageNode=%s,%s", vmstorage1.VmselectAddr(), vmstorage2.VmselectAddr()),
})
// verify empty stats
resp := vmselect.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{Tenant: "0:0"})
if len(resp.Data) != 0 {
t.Fatalf("unexpected resp Records: %d, want: %d", len(resp.Data), 0)
}
const ingestTimestamp = 1707123456700
prometheusTextDataSet := []string{
`# HELP metric_name_1 some help message`,
`# TYPE metric_name_1 gauge`,
`metric_name_1{label="foo"} 10`,
`metric_name_1{label="bar"} 10`,
`metric_name_1{label="baz"} 10`,
`# HELP metric_name_2 some help message`,
`# TYPE metric_name_2 counter`,
`metric_name_2{label="baz"} 20`,
`# HELP metric_name_3 some help message`,
`# TYPE metric_name_3 gauge`,
`metric_name_3{label="baz"} 30`,
}
prometheusRemoteWriteDataSet := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
},
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: uint32(prompb.MetricMetadataSUMMARY)},
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: uint32(prompb.MetricMetadataSUMMARY)},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: uint32(prompb.MetricMetadataSTATESET)},
},
}
assertMetadataIngestOn := func(t *testing.T, vminsert *apptest.Vminsert, tenantID string) {
t.Helper()
vminsert.PrometheusAPIV1ImportPrometheus(t, prometheusTextDataSet, apptest.QueryOpts{Tenant: tenantID})
vminsert.PrometheusAPIV1Write(t, prometheusRemoteWriteDataSet, apptest.QueryOpts{Tenant: tenantID})
vmstorage1.ForceFlush(t)
vmstorage2.ForceFlush(t)
expected := &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
},
}
gotStats := vmselect.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{Tenant: tenantID})
if diff := cmp.Diff(expected, gotStats); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
}
assertMetadataIngestOn(t, vminsert1, "2:2")
assertMetadataIngestOn(t, vminsert2, "3:3")
assertMetadataIngestOn(t, vminsertGlobal, "5:5")
// check query metric name filter
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/metadata response",
Got: func() any {
return vmselect.PrometheusAPIV1Metadata(t, "metric_name_4", 0, apptest.QueryOpts{Tenant: "multitenant"})
},
Want: &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_4": {{Help: "some help message", Type: "summary"}},
},
},
})
// check query limit filter
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/metadata response",
Got: func() any {
return vmselect.PrometheusAPIV1Metadata(t, "", 3, apptest.QueryOpts{Tenant: "5:5"})
},
Want: &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
},
},
})
}

View File

@@ -47,14 +47,16 @@ func TestClusterInstantQuery(t *testing.T) {
}
func testInstantQueryWithUTFNames(t *testing.T, sut apptest.PrometheusWriteQuerier) {
data := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "3fooµ¥"},
{Name: "3👋tfにちは", Value: "漢©®€£"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: millis("2024-01-01T00:01:00Z")},
data := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "3fooµ¥"},
{Name: "3👋tfにちは", Value: "漢©®€£"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: millis("2024-01-01T00:01:00Z")},
},
},
},
}
@@ -89,23 +91,25 @@ func testInstantQueryWithUTFNames(t *testing.T, sut apptest.PrometheusWriteQueri
fn(`{"3👋tfにちは"="漢©®€£"}`)
}
var staleNaNsData = func() []prompb.TimeSeries {
return []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric",
var staleNaNsData = func() prompb.WriteRequest {
return prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "metric",
},
},
},
Samples: []prompb.Sample{
{
Value: 1,
Timestamp: millis("2024-01-01T00:01:00Z"),
},
{
Value: decimal.StaleNaN,
Timestamp: millis("2024-01-01T00:02:00Z"),
Samples: []prompb.Sample{
{
Value: 1,
Timestamp: millis("2024-01-01T00:01:00Z"),
},
{
Value: decimal.StaleNaN,
Timestamp: millis("2024-01-01T00:02:00Z"),
},
},
},
},
@@ -185,21 +189,23 @@ func testInstantQueryDoesNotReturnStaleNaNs(t *testing.T, sut apptest.Prometheus
// However, conversion of math.NaN to int64 could behave differently depending on platform and Go version.
// Hence, this test could succeed for some platforms even if fix is rolled back.
func testQueryRangeWithAtModifier(t *testing.T, sut apptest.PrometheusWriteQuerier) {
data := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
data := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: millis("2025-01-01T00:01:00Z")},
},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: millis("2025-01-01T00:01:00Z")},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "metricNaN"},
},
Samples: []prompb.Sample{
{Value: decimal.StaleNaN, Timestamp: millis("2025-01-01T00:01:00Z")},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "metricNaN"},
},
Samples: []prompb.Sample{
{Value: decimal.StaleNaN, Timestamp: millis("2025-01-01T00:01:00Z")},
},
},
},
}

View File

@@ -139,41 +139,43 @@ func TestSingleIngestionWithRelabeling(t *testing.T) {
},
})
pbData := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
pbData := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
},
{
Name: "label",
Value: "foo2",
},
},
{
Name: "label",
Value: "foo2",
},
},
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
Samples: []prompb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
},
},
},
},
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "must_drop_series",
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "must_drop_series",
},
{
Name: "label",
Value: "foo2",
},
},
{
Name: "label",
Value: "foo2",
},
},
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
Samples: []prompb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
},
},
},

View File

@@ -973,7 +973,7 @@ func testGroupSkipSlowReplicas(tc *apptest.TestCase, opts *testGroupReplicationO
// The data is replicated across N groups of M nodes. Replication factor is
// globalRF. There is no replication across the nodes within each group or
//it is unknown it there is one.
// it is unknown it there is one.
//
// Max number of nodes to skip is M*(globalRF-1). This corresponds to the
// case when N-globalRF+1 groups have received the response from all of

View File

@@ -11,6 +11,7 @@ import (
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prommetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -58,10 +59,11 @@ func StartVminsert(instance string, flags []string, cli *Client, output io.Write
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert", flags, &appOptions{
defaultFlags: map[string]string{
"-httpListenAddr": "127.0.0.1:0",
"-clusternativeListenAddr": "127.0.0.1:0",
"-graphiteListenAddr": ":0",
"-opentsdbListenAddr": "127.0.0.1:0",
"-httpListenAddr": "127.0.0.1:0",
"-clusternativeListenAddr": "127.0.0.1:0",
"-graphiteListenAddr": ":0",
"-opentsdbListenAddr": "127.0.0.1:0",
"-clusternative.vminsertConnsShutdownDuration": "1ms",
},
extractREs: extractREs,
output: output,
@@ -200,13 +202,16 @@ func (app *Vminsert) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOp
// PrometheusAPIV1Write is a test helper function that inserts a
// collection of records in Prometheus remote-write format by sending a HTTP
// POST request to /prometheus/api/v1/write vminsert endpoint.
func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, records []prompb.TimeSeries, opts QueryOpts) {
func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.getTenant())
wr := prompb.WriteRequest{Timeseries: records}
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
app.sendBlocking(t, len(records), func() {
recordsCount := len(wr.Timeseries)
if prommetadata.IsEnabled() {
recordsCount += len(wr.Metadata)
}
app.sendBlocking(t, recordsCount, func() {
_, statusCode := app.cli.Post(t, url, "application/x-protobuf", data)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
@@ -230,7 +235,19 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
url += "?" + uvs
}
data := []byte(strings.Join(records, "\n"))
app.sendBlocking(t, len(records), func() {
var recordsCount int
var metadataRecords int
for _, record := range records {
if strings.HasPrefix(record, "#") {
metadataRecords++
continue
}
recordsCount++
}
if prommetadata.IsEnabled() {
recordsCount += metadataRecords
}
app.sendBlocking(t, recordsCount, func() {
_, statusCode := app.cli.Post(t, url, "text/plain", data)
if statusCode != http.StatusNoContent {
t.Fatalf("unexpected status code: got %d, want %d", statusCode, http.StatusNoContent)
@@ -267,7 +284,8 @@ func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func(
)
wantRowsSentCount := app.rpcRowsSentTotal(t) + numRecordsToSend
for range retries {
if app.rpcRowsSentTotal(t) >= wantRowsSentCount {
d := app.rpcRowsSentTotal(t)
if d >= wantRowsSentCount {
return
}
time.Sleep(period)

View File

@@ -6,6 +6,7 @@ import (
"io"
"net/http"
"regexp"
"strconv"
"testing"
)
@@ -186,6 +187,20 @@ func (app *Vmselect) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQu
return NewPrometheusAPIV1LabelValuesResponse(t, res)
}
// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint
// and returns the results.
func (app *Vmselect) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata {
t.Helper()
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
queryURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/metadata", app.httpListenAddr, opts.getTenant())
res, _ := app.cli.PostForm(t, queryURL, values)
return NewPrometheusAPIV1Metadata(t, res)
}
// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending
// a request to /api/v1/admin/tsdb/delete_series.
//

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"testing"
"time"
@@ -211,10 +212,9 @@ func (app *Vmsingle) OpenTSDBAPIPut(t *testing.T, records []string, opts QueryOp
// PrometheusAPIV1Write is a test helper function that inserts a
// collection of records in Prometheus remote-write format by sending a HTTP
// POST request to /prometheus/api/v1/write vmsingle endpoint.
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []prompb.TimeSeries, _ QueryOpts) {
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, wr prompb.WriteRequest, _ QueryOpts) {
t.Helper()
wr := prompb.WriteRequest{Timeseries: records}
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
_, statusCode := app.cli.Post(t, app.prometheusAPIV1WriteURL, "application/x-protobuf", data)
if statusCode != http.StatusNoContent {
@@ -364,6 +364,20 @@ func (app *Vmsingle) PrometheusAPIV1LabelValues(t *testing.T, labelName, matchQu
return NewPrometheusAPIV1LabelValuesResponse(t, res)
}
// PrometheusAPIV1Metadata sends a query to a /prometheus/api/v1/metadata endpoint
// and returns the results.
func (app *Vmsingle) PrometheusAPIV1Metadata(t *testing.T, metric string, limit int, opts QueryOpts) *PrometheusAPIV1Metadata {
t.Helper()
values := opts.asURLValues()
values.Add("metric", metric)
values.Add("limit", strconv.Itoa(limit))
queryURL := fmt.Sprintf("http://%s/prometheus/api/v1/metadata", app.httpListenAddr)
res, _ := app.cli.PostForm(t, queryURL, values)
return NewPrometheusAPIV1Metadata(t, res)
}
// APIV1AdminTSDBDeleteSeries deletes the series that match the query by sending
// a request to /api/v1/admin/tsdb/delete_series.
//

View File

@@ -30,6 +30,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add ability to set `attach_metadata.namespace=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/victoriametrics/sd_configs/#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/victoriametrics/vmagent/#quick-start), or via `-promscrape.kubernetes.attachNamespaceMetadataAll` command-line flag. This allows attaching namespace labels and annotations to discovered targets for `pod`, `service`, `endpoints`, `endpointslice`, and `ingress` roles. See [#9880](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9880) for more details. Thank you, @clementnuss, for the contribution.
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): print the error message as value if [templating](https://docs.victoriametrics.com/victoriametrics/vmalert/#templating) fails in alerting rule label or annotation values, and continue generating alerts. Previously, a templating error prevented alerts. See [#9853](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9853).
* FEATURE: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `-enableMetadata` command-line flag to enable metrics metadata ingestion. See [#2974](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2974) and the following [doc](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#metrics-metadata).
* FEATURE: `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add in-memory storage for storing and querying metrics metadata. See [#2974](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2974) and the following [doc](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#metrics-metadata).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add option to always show all points on the chart.
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): improve overall chart rendering performance. See [#9699](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9699).
* FEATURE: [vmui relabeling playground](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling): relax the validation for the labels text area. It now accepts labels without curly braces (e.g. `__name__=metric_name, label1=value1`). The regression was introduced in [#8770](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8770). See [#9900](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9900) for details.

View File

@@ -48,9 +48,22 @@ func (cm *ConnsMap) Delete(c net.Conn) {
// If shutdownDuration <= 0, then all the connections are closed simultaneously.
func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
cm.mu.Lock()
conns := make([]net.Conn, 0, len(cm.m))
conns := make([]remoteConns, 0, len(cm.m))
connsByIP := make(map[string]int, len(cm.m))
// group remote connection by IP address
// it's needed to properly close multiple opened connections
// from the same instance at once
for c := range cm.m {
conns = append(conns, c)
remoteIP, _, _ := net.SplitHostPort(c.RemoteAddr().String())
idx, ok := connsByIP[remoteIP]
if !ok {
connsByIP[remoteIP] = len(conns)
conns = append(conns, remoteConns{remoteIP: remoteIP, clientName: cm.clientName})
idx = len(conns) - 1
}
rcs := &conns[idx]
rcs.conns = append(rcs.conns, c)
delete(cm.m, c)
}
cm.isClosed = true
@@ -59,7 +72,7 @@ func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
if shutdownDuration <= 0 {
// Close all the connections at once.
for _, c := range conns {
_ = c.Close()
c.closeAll()
}
return
}
@@ -68,27 +81,36 @@ func (cm *ConnsMap) CloseAll(shutdownDuration time.Duration) {
}
if len(conns) == 1 {
// Simple case - just close a single connection and that's it!
_ = conns[0].Close()
conns[0].closeAll()
return
}
// Sort conns in order to make the order of closing connections deterministic across clients.
// This should reduce resource usage spikes at clients during rolling restarts.
sort.Slice(conns, func(i, j int) bool {
return conns[i].RemoteAddr().String() < conns[j].RemoteAddr().String()
return conns[i].remoteIP < conns[j].remoteIP
})
shutdownInterval := shutdownDuration / time.Duration(len(conns)-1)
startTime := time.Now()
logger.Infof("closing %d %s connections with %dms interval between them", len(conns), cm.clientName, shutdownInterval.Milliseconds())
remoteAddr := conns[0].RemoteAddr().String()
_ = conns[0].Close()
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
for _, c := range conns[1:] {
for _, c := range conns {
c.closeAll()
time.Sleep(shutdownInterval)
remoteAddr := c.RemoteAddr().String()
_ = c.Close()
logger.Infof("closed %s connection %s", cm.clientName, remoteAddr)
}
logger.Infof("closed %d %s connections in %s", len(conns), cm.clientName, time.Since(startTime))
}
type remoteConns struct {
clientName string
remoteIP string
conns []net.Conn
}
func (rcs *remoteConns) closeAll() {
for _, c := range rcs.conns {
remoteAddr := c.RemoteAddr().String()
_ = c.Close()
logger.Infof("closed %s connection %s", rcs.clientName, remoteAddr)
}
}

View File

@@ -378,6 +378,41 @@ func (mm *MetricMetadata) unmarshalProtobuf(src []byte) (err error) {
return nil
}
// MetricMetadataTypeToString maps given uint32 value to the human read-able string
func MetricMetadataTypeToString(mt uint32) string {
// enum MetricType {
// UNKNOWN = 0;
// COUNTER = 1;
// GAUGE = 2;
// HISTOGRAM = 3;
// GAUGEHISTOGRAM = 4;
// SUMMARY = 5;
// INFO = 6;
// STATESET = 7;
// }
// source https://github.com/prometheus/prometheus/blob/c5282933765ec322a0664d0a0268f8276e83b156/prompb/types.proto#L22
switch mt {
case 0:
return "unknown"
case 1:
return "counter"
case 2:
return "gauge"
case 3:
return "histogram"
case 4:
return "gauge histogram"
case 5:
return "summary"
case 6:
return "info"
case 7:
return "stateset"
default:
return fmt.Sprintf("unknown(%d)", mt)
}
}
// IsEmpty checks if the WriteRequest has data to push.
func (m *WriteRequest) IsEmpty() bool {
return m == nil || (len(m.Timeseries) == 0 && len(m.Metadata) == 0)

View File

@@ -0,0 +1,130 @@
package metricsmetadata
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
// Row represents time series metadata record
type Row struct {
lastWriteTime uint64
heapIdx int
MetricFamilyName []byte
Help []byte
Unit []byte
AccountID uint32
ProjectID uint32
Type uint32
}
// MarshalTo serializes Row into provided buffer and returns result
func (mr *Row) MarshalTo(dst []byte) []byte {
dstLen := len(dst)
// tenant information (accountID and projectID)
dstSize := dstLen + 8
// 2 bytes per string + 4 bytes for type
dstSize += 10
dstSize += len(mr.MetricFamilyName) + len(mr.Help) + len(mr.Unit)
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstSize)[:dstLen]
dst = encoding.MarshalUint32(dst, mr.AccountID)
dst = encoding.MarshalUint32(dst, mr.ProjectID)
dst = encoding.MarshalUint32(dst, mr.Type)
dst = marshalBytesFast(dst, mr.MetricFamilyName)
dst = marshalBytesFast(dst, mr.Help)
dst = marshalBytesFast(dst, mr.Unit)
return dst
}
// Unmarshal parses Row from provided buffer and returns tail buffer
func (mr *Row) Unmarshal(data []byte) ([]byte, error) {
// accountID + projectID + type + metricFamilyName + help + unit
// 4 + 4 + 4 + 2 + len(metricFamilyName) + 2 + len(help) + 2 + len(unit)
if len(data) < 18 {
return data, fmt.Errorf("data too short for unmarshaling metadata; got %d bytes; want at least 18 bytes", len(data))
}
accountID := encoding.UnmarshalUint32(data)
projectID := encoding.UnmarshalUint32(data[4:])
data = data[8:]
mr.AccountID = accountID
mr.ProjectID = projectID
mr.Type = encoding.UnmarshalUint32(data)
data = data[4:]
nextString := func() ([]byte, error) {
size := encoding.UnmarshalUint16(data)
data = data[2:]
if len(data) < int(size) {
return nil, fmt.Errorf("string data too short; got %d bytes; want %d bytes", len(data), size)
}
val := data[:size]
data = data[size:]
return val, nil
}
var err error
mr.MetricFamilyName, err = nextString()
if err != nil {
return data, fmt.Errorf("cannot unmarshal metric family name: %w", err)
}
mr.Help, err = nextString()
if err != nil {
return data, fmt.Errorf("cannot unmarshal help: %w", err)
}
mr.Unit, err = nextString()
if err != nil {
return data, fmt.Errorf("cannot unmarshal unit: %w", err)
}
return data, nil
}
// Reset resets Row
func (mr *Row) Reset() {
mr.AccountID = 0
mr.ProjectID = 0
mr.Type = 0
mr.MetricFamilyName = mr.MetricFamilyName[:0]
mr.Help = mr.Help[:0]
mr.Unit = mr.Unit[:0]
mr.heapIdx = 0
mr.lastWriteTime = 0
}
// String implements Stringer interface
func (mr *Row) String() string {
return fmt.Sprintf("AccountID: %d, ProjectID: %d, Type: %d, MetricFamilyName: %q, Help: %q, Unit: %q",
mr.AccountID, mr.ProjectID, mr.Type, mr.MetricFamilyName, mr.Help, mr.Unit)
}
// UnmarshalRows parses Rows from provided buffer according to the maxRows
//
// returns parsed Rows and tails buffer if maxRows value was reached
func UnmarshalRows(dst []Row, src []byte, maxRows int) ([]Row, []byte, error) {
for len(src) > 0 && maxRows > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Row{})
}
mr := &dst[len(dst)-1]
tail, err := mr.Unmarshal(src)
if err != nil {
return dst, tail, err
}
src = tail
maxRows--
}
return dst, src, nil
}
func marshalBytesFast(dst []byte, s []byte) []byte {
dst = encoding.MarshalUint16(dst, uint16(len(s)))
dst = append(dst, s...)
return dst
}

View File

@@ -0,0 +1,422 @@
package metricsmetadata
import (
"bytes"
"container/heap"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
const (
// bucketsCount is the number of buckets for the storage.
bucketsCount = 8
// size of buffer to be used for cloning metric name and help
metricNameHelpBufSize = 4 * 1024
// size of rows buffer to be used for cloning Row
rowsBufSize = 512
metadataExpireDuration = time.Hour
)
var bbPool bytesutil.ByteBufferPool
// Storage for metrics metadata
type Storage struct {
buckets [bucketsCount]*bucket
maxSizeBytes int
cleanerStopCh chan struct{}
wg sync.WaitGroup
}
// NewStorage returns new initialized Storage.
func NewStorage(maxSizeBytes int) *Storage {
s := &Storage{
cleanerStopCh: make(chan struct{}),
maxSizeBytes: maxSizeBytes,
}
maxShardBytes := maxSizeBytes / bucketsCount
for i := range bucketsCount {
s.buckets[i] = &bucket{
perTenantStorage: make(map[uint64]map[string]*Row),
maxSizeBytes: int64(maxShardBytes),
}
}
s.wg.Add(1)
go s.cleaner()
return s
}
// MustClose closes the storage and waits for all background tasks to finish.
func (s *Storage) MustClose() {
close(s.cleanerStopCh)
s.wg.Wait()
}
// Add adds rows to the Storage.
func (s *Storage) Add(rows []Row) {
if len(rows) == 0 {
return
}
now := fasttime.UnixTimestamp()
bb := bbPool.Get()
for _, mr := range rows {
var bucketIDx uint64
bucketIDx = xxhash.Sum64(mr.MetricFamilyName)
bucketIDx %= bucketsCount
s.buckets[bucketIDx].add(&mr, now)
}
bbPool.Put(bb)
}
// GetForTenant returns rows for the given tenant, limit and optional metricName
//
// can only be used for cluster version
func (s *Storage) GetForTenant(accountID, projectID uint32, limit int, metricName string) []*Row {
tenantID := encodeTenantID(accountID, projectID)
if len(metricName) > 0 {
return s.getRowForTenantIDByMetricName(tenantID, metricName)
}
totalItems := s.totalItems()
dst := make([]*Row, 0, totalItems)
for _, b := range s.buckets {
b.mu.Lock()
ts, ok := b.perTenantStorage[tenantID]
if !ok {
b.mu.Unlock()
continue
}
for _, v := range ts {
dst = append(dst, v)
}
b.mu.Unlock()
}
sortRows(dst)
if limit > 0 && len(dst) > limit {
dst = dst[:limit]
}
return dst
}
func (s *Storage) getRowForTenantIDByMetricName(tenantID uint64, metricName string) []*Row {
bucketIDx := xxhash.Sum64([]byte(metricName))
bucketIDx %= bucketsCount
b := s.buckets[bucketIDx]
b.mu.Lock()
ts, ok := b.perTenantStorage[tenantID]
if !ok {
b.mu.Unlock()
return nil
}
row := ts[metricName]
b.mu.Unlock()
if row != nil {
return []*Row{row}
}
return nil
}
// Get returns rows for the given limit and optional metricName
func (s *Storage) Get(limit int, metricName string) []*Row {
if len(metricName) > 0 {
return s.getRowsByMetricName(metricName)
}
totalItems := s.totalItems()
dst := make([]*Row, 0, totalItems)
for _, b := range s.buckets {
b.mu.Lock()
dst = append(dst, b.lwh...)
b.mu.Unlock()
}
sortRows(dst)
if limit > 0 && len(dst) > limit {
dst = dst[:limit]
}
return dst
}
func (s *Storage) getRowsByMetricName(metricName string) []*Row {
bucketIDx := xxhash.Sum64([]byte(metricName))
bucketIDx %= bucketsCount
b := s.buckets[bucketIDx]
b.mu.Lock()
var rows []*Row
for _, ts := range b.perTenantStorage {
row := ts[metricName]
if row != nil {
rows = append(rows, row)
}
}
b.mu.Unlock()
sortRows(rows)
return rows
}
// MetadataStorageMetrics contains metrics for the storage.
type MetadataStorageMetrics struct {
ItemsCurrent uint64
CurrentSizeBytes uint64
MaxSizeBytes uint64
}
// UpdateMetrics updates dst with metrics storage metrics.
func (s *Storage) UpdateMetrics(dst *MetadataStorageMetrics) {
for _, b := range s.buckets {
dst.CurrentSizeBytes += uint64(b.itemsTotalSize.Load())
dst.ItemsCurrent += uint64(b.itemsCurrent.Load())
}
dst.MaxSizeBytes = uint64(s.maxSizeBytes)
}
func (s *Storage) cleaner() {
defer s.wg.Done()
d := timeutil.AddJitterToDuration(time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-s.cleanerStopCh:
return
case <-ticker.C:
s.cleanByTimeout()
}
}
}
func (s *Storage) cleanByTimeout() {
for _, b := range s.buckets {
b.cleanByTimeout()
}
}
func (s *Storage) totalItems() int {
var itemsCount int
for _, b := range s.buckets {
itemsCount += int(b.itemsCurrent.Load())
}
return itemsCount
}
type bucket struct {
maxSizeBytes int64
itemsCurrent atomic.Int64
itemsTotalSize atomic.Int64
// mu protects fields below
mu sync.Mutex
perTenantStorage map[uint64]map[string]*Row
// The heap for removing the oldest used entries from metricsMetadataStorage.
lwh lastWriteHeap
metricNamesBuf []byte
rowsBuff []Row
}
func (b *bucket) cloneRowLocked(src *Row) *Row {
if len(b.rowsBuff) >= cap(b.rowsBuff) {
// allocate a new slice instead of reallocating existing
// it saves memory and reduces GC pressure
b.rowsBuff = make([]Row, 0, rowsBufSize)
}
b.rowsBuff = b.rowsBuff[:len(b.rowsBuff)+1]
mrDst := &b.rowsBuff[len(b.rowsBuff)-1]
// allocate metricName and help in one go
mrDst.MetricFamilyName, mrDst.Help = b.cloneMetricNameHelpLocked(src.MetricFamilyName, src.Help)
mrDst.ProjectID = src.ProjectID
mrDst.AccountID = src.AccountID
mrDst.Unit = internUnit(src.Unit)
mrDst.Type = src.Type
return mrDst
}
// cloneMetricNameHelpLocked uses the same idea as strings.Clone.
// But instead of direct []byte allocation for each cloned string,
// it allocates metricNamesBuf, copies provided metricName and help into it
// and uses string *byte references for it via subslice.
//
// allocating metricName and help as a single buffer allows GC to free memory for
// row in the same time
func (b *bucket) cloneMetricNameHelpLocked(metricName, help []byte) ([]byte, []byte) {
if len(metricName) > metricNameHelpBufSize {
// metricName is too large for default buffer
// directly allocate it on heap as strings.Clone does
b := make([]byte, len(metricName)+len(help))
copy(b, metricName)
copy(b[len(metricName):], help)
return b[:len(metricName)], b[len(metricName):]
}
idx := len(b.metricNamesBuf)
n := len(metricName) + len(b.metricNamesBuf) + len(help)
if n > cap(b.metricNamesBuf) {
// allocate a new slice instead of reallocating existing
// it saves memory and reduces GC pressure
b.metricNamesBuf = make([]byte, 0, metricNameHelpBufSize)
idx = 0
}
b.metricNamesBuf = append(b.metricNamesBuf, metricName...)
b.metricNamesBuf = append(b.metricNamesBuf, help...)
return b.metricNamesBuf[idx : idx+len(metricName)], b.metricNamesBuf[idx+len(metricName):]
}
func (b *bucket) add(mr *Row, lastIngestion uint64) {
b.mu.Lock()
defer b.mu.Unlock()
tenantID := encodeTenantID(mr.AccountID, mr.ProjectID)
storage, ok := b.perTenantStorage[tenantID]
if !ok {
storage = make(map[string]*Row, rowsBufSize)
b.perTenantStorage[tenantID] = storage
}
if existMR, ok := storage[string(mr.MetricFamilyName)]; ok {
if !bytes.Equal(existMR.Help, mr.Help) || !bytes.Equal(existMR.Unit, mr.Unit) || existMR.Type != mr.Type {
// in case of metadata update, allocate the new row instead of mutation
// since it could be referenced by get request
// and it could lead to data race
mrDst := b.cloneRowLocked(mr)
mrDst.heapIdx = existMR.heapIdx
storage[bytesutil.ToUnsafeString(mrDst.MetricFamilyName)] = mrDst
b.lwh[mrDst.heapIdx] = mrDst
b.itemsTotalSize.Add(rowSize(mrDst) - rowSize(existMR))
existMR = mrDst
}
existMR.lastWriteTime = lastIngestion
heap.Fix(&b.lwh, existMR.heapIdx)
return
}
mrDst := b.cloneRowLocked(mr)
mrDst.heapIdx = len(b.lwh)
mrDst.lastWriteTime = lastIngestion
heap.Push(&b.lwh, mrDst)
b.itemsCurrent.Add(1)
b.itemsTotalSize.Add(rowSize(mrDst))
storage[bytesutil.ToUnsafeString(mrDst.MetricFamilyName)] = mrDst
if b.itemsTotalSize.Load() > b.maxSizeBytes {
b.removeLeastRecentlyWrittenItemLocked()
}
}
func (b *bucket) cleanByTimeout() {
// Delete items written more than metadataExpireDuration ago.
deadline := fasttime.UnixTimestamp() - uint64(metadataExpireDuration/time.Second)
b.mu.Lock()
defer b.mu.Unlock()
for len(b.lwh) > 0 {
if deadline < b.lwh[0].lastWriteTime {
break
}
b.removeLeastRecentlyWrittenItemLocked()
}
}
func (b *bucket) removeLeastRecentlyWrittenItemLocked() {
e := b.lwh[0]
b.itemsTotalSize.Add(-rowSize(e))
b.itemsCurrent.Add(-1)
tenantID := encodeTenantID(e.AccountID, e.ProjectID)
delete(b.perTenantStorage[tenantID], string(e.MetricFamilyName))
heap.Pop(&b.lwh)
}
const (
perItemOverhead = int64(int(unsafe.Sizeof(Row{})) + 24) // 24 bytes for map overhead
)
func rowSize(r *Row) int64 {
return perItemOverhead + int64(len(r.MetricFamilyName)+len(r.Help)+len(r.Unit))
}
func sortRows(rows []*Row) {
sort.Slice(rows, func(i, j int) bool {
if rows[i].lastWriteTime == rows[j].lastWriteTime {
return string(rows[i].MetricFamilyName) < string(rows[j].MetricFamilyName)
}
return rows[i].lastWriteTime < rows[j].lastWriteTime
})
}
// lastWriteHeap implements heap.Interface
type lastWriteHeap []*Row
func (lwh *lastWriteHeap) Len() int {
return len(*lwh)
}
func (lwh *lastWriteHeap) Swap(i, j int) {
h := *lwh
a := h[i]
b := h[j]
a.heapIdx = j
b.heapIdx = i
h[i] = b
h[j] = a
}
func (lwh *lastWriteHeap) Less(i, j int) bool {
h := *lwh
return h[i].lastWriteTime < h[j].lastWriteTime
}
func (lwh *lastWriteHeap) Push(x any) {
e := x.(*Row)
h := *lwh
e.heapIdx = len(h)
*lwh = append(h, e)
}
func (lwh *lastWriteHeap) Pop() any {
h := *lwh
e := h[len(h)-1]
// Remove the reference to deleted entry, so Go GC could free up memory occupied by the deleted entry.
h[len(h)-1] = nil
*lwh = h[:len(h)-1]
return e
}
func encodeTenantID(accountID, projectID uint32) uint64 {
return uint64(accountID)<<32 | uint64(projectID)
}
var unitInternStorage sync.Map
// units are statically defined and cannot have high cardinality
func internUnit(unit []byte) []byte {
v, ok := unitInternStorage.Load(string(unit))
if ok {
return v.([]byte)
}
b := make([]byte, len(unit))
copy(b, unit)
unitInternStorage.Store(string(b), b)
return b
}

View File

@@ -0,0 +1,80 @@
//go:build goexperiment.synctest
package metricsmetadata
import (
"testing"
"testing/synctest"
"time"
"github.com/google/go-cmp/cmp"
)
func TestWriteEviction(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
s := NewStorage(256 * bucketsCount)
defer s.MustClose()
rows := []Row{
{MetricFamilyName: []byte("metric_name_1"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_2"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_3"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_4"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
}
s.Add(rows)
got := s.Get(-1, "")
expected := []*Row{
{MetricFamilyName: []byte("metric_name_2"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_1"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_4"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
}
sortRows(expected)
if diff := cmp.Diff(got, expected, rowCmpOpts); len(diff) > 0 {
t.Errorf("unexpected rows (-want, +got):\n%s", diff)
}
// evict all previous records by max storage size
rows = []Row{
{MetricFamilyName: []byte("metric_name_6"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_7"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_9"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_10"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_11"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
}
s.Add(rows)
got = s.Get(-1, "")
expected = []*Row{
{MetricFamilyName: []byte("metric_name_6"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_7"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_9"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_10"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
{MetricFamilyName: []byte("metric_name_11"), Help: []byte("some useless help message"), Unit: []byte("seconds")},
}
sortRows(expected)
if diff := cmp.Diff(got, expected, rowCmpOpts); len(diff) > 0 {
t.Errorf("unexpected rows (-want, +got):\n%s", diff)
}
// evict all records based on expire duration
time.Sleep(metadataExpireDuration + time.Hour)
synctest.Wait()
got = s.Get(-1, "")
expected = expected[:0]
if diff := cmp.Diff(got, expected, rowCmpOpts); len(diff) > 0 {
t.Errorf("unexpected rows (-want, +got):\n%s", diff)
}
var sm MetadataStorageMetrics
s.UpdateMetrics(&sm)
if sm.CurrentSizeBytes != 0 {
t.Fatalf("unexpected size: %d want 0", sm.CurrentSizeBytes)
}
if sm.ItemsCurrent != 0 {
t.Fatalf("unexpected items count: %d want 0", sm.ItemsCurrent)
}
})
}

View File

@@ -0,0 +1,394 @@
package metricsmetadata
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
var rowCmpOpts = cmpopts.IgnoreFields(Row{}, "lastWriteTime", "heapIdx")
func TestStorageWrite(t *testing.T) {
s := NewStorage(4096)
defer s.MustClose()
f := func(toIngest []Row, expected []*Row) {
t.Helper()
s.Add(toIngest)
// replace row values with dummy data
// in order to check possible memory corruption
dummyValue := []byte(`redacted`)
for _, row := range toIngest {
row.Help = append(row.Help[:0], dummyValue...)
row.MetricFamilyName = append(row.MetricFamilyName[:0], dummyValue...)
row.Unit = append(row.Unit[:0], dummyValue...)
row.Type = 0
}
got := s.Get(0, "")
sortRows(expected)
if diff := cmp.Diff(got, expected, rowCmpOpts); len(diff) > 0 {
t.Errorf("unexpected rows (-want, +got):\n%s", diff)
}
}
rows := []Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("help1"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Unit: []byte("bytes"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
}
expected := []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("help1"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Unit: []byte("bytes"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
}
f(rows, expected)
// update Help
rowToUpdate := []Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 1,
ProjectID: 1,
},
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Unit: []byte("bytes"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
}
f(rowToUpdate, expected)
// update Unit and Type
rowToUpdate = []Row{
{
MetricFamilyName: []byte("metric2"),
Type: 5,
Unit: []byte("meters"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 5,
Unit: []byte("meters"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
}
f(rowToUpdate, expected)
// add the same metric name to other tenants
rowToAdd := []Row{
{
MetricFamilyName: []byte("metric2"),
Type: 5,
Unit: []byte("meters"),
Help: []byte("help2"),
AccountID: 15,
ProjectID: 0,
},
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 0,
ProjectID: 0,
},
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Unit: []byte("seconds"),
Help: []byte("UseLessHelp2"),
AccountID: 0,
ProjectID: 0,
},
{
MetricFamilyName: []byte("metric2"),
Type: 5,
Unit: []byte("meters"),
Help: []byte("help2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 5,
Unit: []byte("meters"),
Help: []byte("help2"),
AccountID: 15,
ProjectID: 0,
},
}
f(rowToAdd, expected)
}
func TestStorageRead(t *testing.T) {
s := NewStorage(4096)
defer s.MustClose()
// Add test data
rows := []Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 0,
ProjectID: 0,
},
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Help: []byte("uselesshelp2"),
Unit: []byte("seconds2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Help: []byte("uselesshelp2"),
Unit: []byte("seconds2"),
AccountID: 3,
ProjectID: 15,
},
{
MetricFamilyName: []byte("metric3"),
Unit: []byte("unknown"),
Help: []byte("help3"),
Type: 1,
AccountID: 2,
ProjectID: 1,
},
}
s.Add(rows)
f := func(get func() []*Row, expected []*Row) {
t.Helper()
got := get()
sortRows(expected)
if diff := cmp.Diff(got, expected, rowCmpOpts); len(diff) > 0 {
t.Errorf("unexpected rows get result (-want, +got):\n%s", diff)
}
}
get := func() []*Row {
return s.Get(0, "")
}
expected := []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 0,
ProjectID: 0,
},
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Help: []byte("uselesshelp2"),
Unit: []byte("seconds2"),
AccountID: 1,
ProjectID: 1,
},
{
MetricFamilyName: []byte("metric2"),
Type: 2,
Help: []byte("uselesshelp2"),
Unit: []byte("seconds2"),
AccountID: 3,
ProjectID: 15,
},
{
MetricFamilyName: []byte("metric3"),
Unit: []byte("unknown"),
Help: []byte("help3"),
Type: 1,
AccountID: 2,
ProjectID: 1,
},
}
f(get, expected)
// with metric name
get = func() []*Row {
return s.Get(0, "metric3")
}
expected = []*Row{
{
MetricFamilyName: []byte("metric3"),
Unit: []byte("unknown"),
Help: []byte("help3"),
Type: 1,
AccountID: 2,
ProjectID: 1,
},
}
f(get, expected)
// with metric name different tenant
get = func() []*Row {
return s.Get(0, "metric1")
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 0,
ProjectID: 0,
},
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 1,
ProjectID: 1,
},
}
f(get, expected)
// nonexistent metric name
get = func() []*Row {
return s.Get(0, "nonexistent")
}
expected = nil
f(get, expected)
// with limit
get = func() []*Row {
return s.Get(1, "")
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 0,
ProjectID: 0,
},
}
f(get, expected)
// for specific tenant
get = func() []*Row {
return s.GetForTenant(2, 1, 0, "")
}
expected = []*Row{
{
MetricFamilyName: []byte("metric3"),
Unit: []byte("unknown"),
Help: []byte("help3"),
Type: 1,
AccountID: 2,
ProjectID: 1,
},
}
f(get, expected)
// metric name at tenant
get = func() []*Row {
return s.GetForTenant(0, 0, 0, "metric1")
}
expected = []*Row{
{
MetricFamilyName: []byte("metric1"),
Type: 1,
Help: []byte("uselesshelp1"),
Unit: []byte("seconds1"),
AccountID: 0,
ProjectID: 0,
},
}
f(get, expected)
}

View File

@@ -0,0 +1,118 @@
package metricsmetadata
import (
"fmt"
"testing"
)
func BenchmarkStorageWrite(b *testing.B) {
for _, p := range []int{10, 100} {
for _, rowsCount := range []int{10e3, 100e3} {
rows := getRows(0, 0, rowsCount)
b.Run(fmt.Sprintf("singletenant/parallel=%d,rows=%d,no_eviction=true", p, rowsCount), func(b *testing.B) {
// allocate storage without eviction
s := NewStorage(rowsCount * int(perItemOverhead) * bucketsCount)
defer s.MustClose()
b.SetParallelism(p)
b.ReportAllocs()
b.SetBytes(int64(len(rows)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
s.Add(rows)
}
})
})
}
}
}
func BenchmarkStorageWriteMultitenant(b *testing.B) {
tenants := [][2]uint32{{1, 2}, {0, 0}, {3, 3}}
for _, rowsCount := range []int{10e3, 100e3} {
var rows []Row
for _, tenant := range tenants {
rows = append(rows, getRows(tenant[0], tenant[1], rowsCount)...)
}
b.Run(fmt.Sprintf("multitenant/parallel=10,rows=%d,", rowsCount), func(b *testing.B) {
// allocate storage without eviction
s := NewStorage(rowsCount * len(tenants) * int(perItemOverhead) * bucketsCount)
defer s.MustClose()
b.SetParallelism(10)
b.ReportAllocs()
b.SetBytes(int64(len(rows)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
s.Add(rows)
}
})
})
}
}
func BenchmarkStorageRead(b *testing.B) {
s := NewStorage(512 * 1024)
defer s.MustClose()
rows := getRows(0, 0, 10e3)
s.Add(rows)
for _, l := range []int{-1, 100, 20e3} {
b.Run(fmt.Sprintf("limit=%d", l), func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = s.Get(l, "")
}
})
})
}
}
func BenchmarkStorageReadMultitenant(b *testing.B) {
var rows []Row
tenants := [][2]uint32{{0, 0}, {1, 1}, {2, 2}}
for _, tenant := range tenants {
rows = append(rows, getRows(tenant[0], tenant[1], 10e3)...)
}
s := NewStorage(10e3 * int(perItemOverhead) * len(tenants))
defer s.MustClose()
s.Add(rows)
for _, l := range []int{-1, 100, 20e3} {
b.Run(fmt.Sprintf("limit=%d", l), func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var i int
for pb.Next() {
if i >= len(tenants) {
i = 0
}
tenant := tenants[i]
i++
_ = s.GetForTenant(tenant[0], tenant[1], l, "")
}
})
})
}
}
func getRows(accountID, projectID uint32, n int) []Row {
rows := make([]Row, n)
for i := range rows {
rows[i] = Row{
AccountID: accountID,
ProjectID: projectID,
MetricFamilyName: []byte(fmt.Sprintf("metric_%d_%d", i, n)),
Type: uint32(i % 3),
Help: []byte("help text for metric"),
Unit: []byte("seconds"),
}
}
return rows
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot/snapshotutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricnamestats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/metricsmetadata"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
@@ -177,6 +178,8 @@ type Storage struct {
// logNewSeriesUntil is the timestamp until which new series will be logged. We will log new series when logNewSeries is true or logNewSeriesUntil is greater than the current time.
logNewSeriesUntil atomic.Uint64
metadataStorage *metricsmetadata.Storage
}
// OpenOptions optional args for MustOpenStorage
@@ -275,6 +278,8 @@ func MustOpenStorage(path string, opts OpenOptions) *Storage {
}
}
s.metadataStorage = metricsmetadata.NewStorage(getMetadataStorageSize())
// Load metadata
metadataDir := filepath.Join(path, metadataDirname)
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
@@ -374,6 +379,20 @@ func getMetricNamesCacheSize() int {
return maxMetricNameCacheSize
}
var maxMetadataStorageSize int
// SetMetadataStorageSize overrides the default size of the metadata store
func SetMetadataStorageSize(size int) {
maxMetadataStorageSize = size
}
func getMetadataStorageSize() int {
if maxMetadataStorageSize <= 0 {
return memory.Allowed() / 100
}
return maxMetadataStorageSize
}
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.deletedMetricIDs.Load()
}
@@ -650,6 +669,10 @@ type Metrics struct {
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
MetadataStorageItemsCurrent uint64
MetadataStorageCurrentSizeBytes uint64
MetadataStorageMaxSizeBytes uint64
}
// Reset resets m.
@@ -741,6 +764,12 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.MetricNamesUsageTrackerSize = tm.CurrentItemsCount
m.MetricNamesUsageTrackerSizeMaxBytes = tm.MaxSizeBytes
var mr metricsmetadata.MetadataStorageMetrics
s.metadataStorage.UpdateMetrics(&mr)
m.MetadataStorageItemsCurrent = uint64(mr.ItemsCurrent)
m.MetadataStorageCurrentSizeBytes = mr.CurrentSizeBytes
m.MetadataStorageMaxSizeBytes = mr.MaxSizeBytes
d := s.nextRetentionSeconds()
if d < 0 {
d = 0
@@ -1003,6 +1032,9 @@ func (s *Storage) MustClose() {
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
s.metricsTracker.MustClose()
s.metadataStorage.MustClose()
// Release lock file.
fs.MustClose(s.flockF)
s.flockF = nil
@@ -2935,3 +2967,21 @@ func (s *Storage) GetMetricNamesStats(_ *querytracer.Tracer, limit, le int, matc
func (s *Storage) ResetMetricNamesStats(_ *querytracer.Tracer) {
s.metricsTracker.Reset(s.tsidCache.Reset)
}
// GetMetadataRows returns time series metric names metadata for the given args
func (s *Storage) GetMetadataRows(qt *querytracer.Tracer, limit int, metricName string) []*metricsmetadata.Row {
var (
res []*metricsmetadata.Row
)
qt = qt.NewChild("search metrics metadata rows limit=%d,metricName=%q", limit, metricName)
res = s.metadataStorage.Get(limit, metricName)
qt.Printf("found %d metadata rows", len(res))
qt.Done()
return res
}
// AddMetadataRows writes time series metric names metadata into storage
func (s *Storage) AddMetadataRows(rows []metricsmetadata.Row) {
s.metadataStorage.Add(rows)
}