Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
200c744b6e build(deps): bump github/codeql-action from 4.35.3 to 4.35.5
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 4.35.3 to 4.35.5.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](e46ed2cbd0...9e0d7b8d25)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 4.35.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-05 04:13:19 +00:00
21 changed files with 4902 additions and 938 deletions

View File

@@ -52,14 +52,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/autobuild@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
with:
category: 'language:go'

View File

@@ -36,12 +36,6 @@ var (
"By default there are no limits on samples ingestion rate.")
)
// custom api help links [["/api","doc"]] without http.pathPrefix.
var customAPIPathList = [][]string{
{"/graph/explore", "explore metrics grafana page"},
{"/graph/d/prometheus-advanced/advanced-data-exploration", "PMM grafana dashboard"},
}
func main() {
// VictoriaMetrics is optimized for reduced memory allocations,
// so it can run with the reduced GOGC in order to reduce the used memory,
@@ -141,10 +135,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
{"api/v1/status/active_queries", "active queries"},
{"-/reload", "reload configuration"},
})
for _, p := range customAPIPathList {
p, doc := p[0], p[1]
fmt.Fprintf(w, "<a href=%q>%s</a> - %s<br/>", p, p, doc)
}
return true
}
if vminsert.RequestHandler(w, r) {

View File

@@ -17,7 +17,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@@ -325,21 +324,14 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
}
var (
rowsReadPerSeries = metrics.NewHistogram(`vm_rows_read_per_series`)
rowsReadPerQuery = metrics.NewHistogram(`vm_rows_read_per_query`)
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
seriesPrometheusReadPerQuery = metrics.NewHistogram(`vm_promdb_series_read_per_query`)
rowsReadPerSeries = metrics.NewHistogram(`vm_rows_read_per_series`)
rowsReadPerQuery = metrics.NewHistogram(`vm_rows_read_per_query`)
seriesReadPerQuery = metrics.NewHistogram(`vm_series_read_per_query`)
)
type packedTimeseries struct {
metricName string
brs []blockRef
pd *promData
}
type promData struct {
values []float64
timestamps []int64
}
type unpackWork struct {
@@ -443,21 +435,9 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
putSortBlocksHeap(sbh)
return err
}
if pts.pd != nil {
// Add data from Prometheus to dst.
// It usually has smaller timestamps than the data from sbs, so put it first.
dst.Values = append(dst.Values, pts.pd.values...)
dst.Timestamps = append(dst.Timestamps, pts.pd.timestamps...)
}
dedupInterval := storage.GetDedupInterval(tr.MinTimestamp)
dedupInterval := storage.GetDedupInterval()
mergeSortBlocks(dst, sbh, dedupInterval)
putSortBlocksHeap(sbh)
if pts.pd != nil {
if !sort.IsSorted(dst) {
sort.Sort(dst)
}
pts.pd = nil
}
return nil
}
@@ -566,27 +546,6 @@ func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr s
return dst, firstErr
}
// sort.Interface implementation for Result
// Len implements sort.Interface
func (r *Result) Len() int {
return len(r.Timestamps)
}
// Less implements sort.Interface
func (r *Result) Less(i, j int) bool {
timestamps := r.Timestamps
return timestamps[i] < timestamps[j]
}
// Swap implements sort.Interface
func (r *Result) Swap(i, j int) {
timestamps := r.Timestamps
values := r.Values
timestamps[i], timestamps[j] = timestamps[j], timestamps[i]
values[i], values[j] = values[j], values[i]
}
func getSortBlock() *sortBlock {
v := sbPool.Get()
if v == nil {
@@ -826,15 +785,6 @@ func LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames i
if err != nil {
return nil, fmt.Errorf("error during labels search on time range: %w", err)
}
// Merge labels obtained from Prometheus storage.
promLabels, err := promdb.GetLabelNamesOnTimeRange(tr, deadline)
if err != nil {
return nil, fmt.Errorf("cannot obtain labels from Prometheus storage: %w", err)
}
qt.Printf("get %d label names from Prometheus storage", len(promLabels))
labels = mergeStrings(labels, promLabels)
// Sort labels like Prometheus does
sort.Strings(labels)
qt.Printf("sort %d labels", len(labels))
@@ -901,44 +851,14 @@ func LabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQue
}
labelValues, err := vmstorage.SearchLabelValues(qt, labelName, tfss, tr, maxLabelValues, sq.MaxMetrics, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label values search on time range: %w", err)
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
}
// Merge label values obtained from Prometheus storage.
promLabelValues, err := promdb.GetLabelValuesOnTimeRange(labelName, tr, deadline)
if err != nil {
return nil, fmt.Errorf("cannot obtain label values on time range for %q from Prometheus storage: %w", labelName, err)
}
qt.Printf("get %d label values from Prometheus storage", len(promLabelValues))
labelValues = mergeStrings(labelValues, promLabelValues)
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
qt.Printf("sort %d label values", len(labelValues))
return labelValues, nil
}
func mergeStrings(a, b []string) []string {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
m := make(map[string]struct{}, len(a)+len(b))
for _, s := range a {
m[s] = struct{}{}
}
for _, s := range b {
m[s] = struct{}{}
}
result := make([]string, 0, len(m))
for s := range m {
result = append(result, s)
}
return result
}
// GetMetricsMetadata returns time series metric names metadata for the given args
func GetMetricsMetadata(qt *querytracer.Tracer, limit int, metricName string) ([]*metricsmetadata.Row, error) {
qt = qt.NewChild("get metrics metadata: limit=%d, metric_name=%q", limit, metricName)
@@ -1340,26 +1260,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
}
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(m), blocksRead, samples, tbf.Len())
// Fetch data from promdb.
pm := make(map[string]*promData)
err = promdb.VisitSeries(sq, deadline, func(metricName []byte, values []float64, timestamps []int64) {
pd := pm[string(metricName)]
if pd == nil {
if _, ok := m[string(metricName)]; !ok {
orderedMetricNames = append(orderedMetricNames, string(metricName))
}
pd = &promData{}
pm[string(metricName)] = pd
}
pd.values = append(pd.values, values...)
pd.timestamps = append(pd.timestamps, timestamps...)
})
if err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("error when searching in Prometheus data: %w", err)
}
seriesPrometheusReadPerQuery.Update(float64(len(pm)))
var rss Results
rss.tr = tr
rss.deadline = deadline
@@ -1368,7 +1268,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
pts[i] = packedTimeseries{
metricName: metricName,
brs: brssPool[m[metricName]].brs,
pd: pm[metricName],
}
}
rss.packedTimeseries = pts

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -37,11 +37,11 @@
<meta property="og:title" content="UI for VictoriaMetrics">
<meta property="og:url" content="https://victoriametrics.com/">
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
<script type="module" crossorigin src="./assets/index-U3iNn2Tx.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
<link rel="stylesheet" crossorigin href="./assets/index-BL7jEFBa.css">
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>

View File

@@ -14,7 +14,6 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -111,9 +110,6 @@ var (
metadataStorageSize = flagutil.NewBytes("storage.maxMetadataStorageSize", 0, "Overrides max size for metrics metadata entries in-memory storage. "+
"If set to 0 or a negative value, defaults to 1% of allowed memory.")
downsamplingPeriods = flagutil.NewArrayString("downsampling.period", "Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs "+
"to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details")
)
func DataPath() string {
@@ -126,11 +122,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
logger.Fatalf("invalid `-precisionBits`: %s", err)
}
err := storage.SetDownsamplingPeriods(*downsamplingPeriods, *minScrapeInterval)
if err != nil {
logger.Fatalf("cannot parse -downsampling.period: %s", err)
}
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
storage.LegacySetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
@@ -188,11 +180,10 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
// register storage metrics
storageMetrics = metrics.NewSet()
storageMetrics.RegisterMetricsWriter(func(w io.Writer) {
writeStorageMetrics(w, Storage)
writeStorageMetrics(w, strg)
})
metrics.RegisterSet(storageMetrics)
promdb.Init(retentionPeriod.Milliseconds())
WG = syncwg.WaitGroup{}
resetResponseCacheIfNeeded = resetCacheIfNeeded
Storage = strg
@@ -343,7 +334,6 @@ func Stop() {
logger.Infof("gracefully closing the storage at %s", *storageDataPath)
startTime := time.Now()
WG.WaitAndBlock()
promdb.MustClose()
stopStaleSnapshotsRemover()
Storage.MustClose()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())

View File

@@ -1,268 +0,0 @@
package promdb
import (
"context"
"flag"
"fmt"
"log/slog"
"time"
"github.com/oklog/ulid/v2"
"github.com/prometheus/prometheus/model/labels"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
var prometheusDataPath = flag.String("prometheusDataPath", "", "Optional path to readonly historical Prometheus data")
var prometheusRetentionMsecs int64
// Init must be called after flag.Parse and before using the package.
//
// See also MustClose.
func Init(retentionMsecs int64) {
if promDB != nil {
logger.Fatalf("BUG: promdb.Init is called multiple times without promdb.MustClose call")
}
prometheusRetentionMsecs = retentionMsecs
if *prometheusDataPath == "" {
return
}
l := slog.New(slog.Default().Handler())
opts := tsdb.DefaultOptions()
opts.RetentionDuration = retentionMsecs
// Set max block duration to 10% of retention period or 31 days
// according to https://prometheus.io/docs/prometheus/latest/storage/#compaction
maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond)
if maxBlockDuration > retentionMsecs/10 {
maxBlockDuration = retentionMsecs / 10
}
if maxBlockDuration < opts.MinBlockDuration {
maxBlockDuration = opts.MinBlockDuration
}
opts.MaxBlockDuration = maxBlockDuration
// Custom delete function is needed, because Prometheus by default doesn't delete
// blocks outside the retention if no new blocks are created with samples with the current timestamps.
// See https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116
opts.BlocksToDelete = func(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
m := make(map[ulid.ULID]struct{})
minRetentionTime := time.Now().Unix()*1000 - retentionMsecs
for _, block := range blocks {
meta := block.Meta()
// delete block marked for deletion by compaction code.
if meta.Compaction.Deletable {
m[meta.ULID] = struct{}{}
continue
}
if block.MaxTime() < minRetentionTime {
m[meta.ULID] = struct{}{}
}
}
return m
}
pdb, err := tsdb.Open(*prometheusDataPath, l, nil, opts, nil)
if err != nil {
logger.Panicf("FATAL: cannot open Prometheus data at -prometheusDataPath=%q: %s", *prometheusDataPath, err)
}
promDB = pdb
logger.Infof("successfully opened historical Prometheus data at -prometheusDataPath=%q with retentionMsecs=%d", *prometheusDataPath, retentionMsecs)
}
// MustClose must be called on graceful shutdown.
//
// Package functionality cannot be used after this call.
func MustClose() {
if *prometheusDataPath == "" {
return
}
if promDB == nil {
logger.Panicf("BUG: promdb.MustClose is called without promdb.Init call")
}
if err := promDB.Close(); err != nil {
logger.Panicf("FATAL: cannot close promDB: %s", err)
}
promDB = nil
logger.Infof("successfully closed historical Prometheus data at -prometheusDataPath=%q", *prometheusDataPath)
}
var promDB *tsdb.DB
// GetLabelNamesOnTimeRange returns label names.
func GetLabelNamesOnTimeRange(tr storage.TimeRange, deadline searchutil.Deadline) ([]string, error) {
if *prometheusDataPath == "" {
return nil, nil
}
d := time.Unix(int64(deadline.Deadline()), 0)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
q, err := promDB.Querier(tr.MinTimestamp, tr.MaxTimestamp)
if err != nil {
return nil, err
}
defer mustCloseQuerier(q)
names, _, err := q.LabelNames(ctx, nil)
// Make full copy of names, since they cannot be used after q is closed.
names = copyStringsWithMemory(names)
return names, err
}
// GetLabelValuesOnTimeRange returns values for the given labelName on the given tr.
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutil.Deadline) ([]string, error) {
if *prometheusDataPath == "" {
return nil, nil
}
d := time.Unix(int64(deadline.Deadline()), 0)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
q, err := promDB.Querier(tr.MinTimestamp, tr.MaxTimestamp)
if err != nil {
return nil, err
}
defer mustCloseQuerier(q)
values, _, err := q.LabelValues(ctx, labelName, nil)
// Make full copy of values, since they cannot be used after q is closed.
values = copyStringsWithMemory(values)
return values, err
}
func copyStringsWithMemory(a []string) []string {
result := make([]string, len(a))
for i, s := range a {
result[i] = string(append([]byte{}, s...))
}
return result
}
// SeriesVisitor is called by VisitSeries for each matching time series.
//
// The caller shouldn't hold references to metricName, values and timestamps after returning.
type SeriesVisitor func(metricName []byte, values []float64, timestamps []int64)
// VisitSeries calls f for each series found in the pdb.
func VisitSeries(sq *storage.SearchQuery, deadline searchutil.Deadline, f SeriesVisitor) error {
if *prometheusDataPath == "" {
return nil
}
d := time.Unix(int64(deadline.Deadline()), 0)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
minTime, maxTime := getSearchTimeRange(sq)
q, err := promDB.Querier(minTime, maxTime)
if err != nil {
return err
}
defer mustCloseQuerier(q)
var seriesSet []promstorage.SeriesSet
for _, tf := range sq.TagFilterss {
ms, err := convertTagFiltersToMatchers(tf)
if err != nil {
return fmt.Errorf("cannot convert tag filters to matchers: %w", err)
}
s := q.Select(ctx, false, nil, ms...)
seriesSet = append(seriesSet, s)
}
ss := promstorage.NewMergeSeriesSet(seriesSet, 0, promstorage.ChainedSeriesMerge)
var (
mn storage.MetricName
metricName []byte
values []float64
timestamps []int64
)
var it chunkenc.Iterator
for ss.Next() {
s := ss.At()
convertPromLabelsToMetricName(&mn, s.Labels())
metricName = mn.SortAndMarshal(metricName[:0])
values = values[:0]
timestamps = timestamps[:0]
it = s.Iterator(it)
for {
typ := it.Next()
if typ == chunkenc.ValNone {
break
}
if typ != chunkenc.ValFloat {
// Skip unsupported values
continue
}
ts, v := it.At()
values = append(values, v)
timestamps = append(timestamps, ts)
}
if err := it.Err(); err != nil {
return fmt.Errorf("error when iterating Prometheus series: %w", err)
}
f(metricName, values, timestamps)
}
return ss.Err()
}
func getSearchTimeRange(sq *storage.SearchQuery) (int64, int64) {
maxTime := sq.MaxTimestamp
minTime := sq.MinTimestamp
minRetentionTime := time.Now().Unix()*1000 - prometheusRetentionMsecs
if maxTime < minRetentionTime {
maxTime = minRetentionTime
}
if minTime < minRetentionTime {
minTime = minRetentionTime
}
return minTime, maxTime
}
func convertPromLabelsToMetricName(dst *storage.MetricName, ls labels.Labels) {
dst.Reset()
ls.Range(func(label labels.Label) {
if label.Name == "__name__" {
dst.MetricGroup = append(dst.MetricGroup[:0], label.Value...)
} else {
dst.AddTag(label.Name, label.Value)
}
})
}
func convertTagFiltersToMatchers(tfs []storage.TagFilter) ([]*labels.Matcher, error) {
ms := make([]*labels.Matcher, 0, len(tfs))
for _, tf := range tfs {
var mt labels.MatchType
if tf.IsNegative {
if tf.IsRegexp {
mt = labels.MatchNotRegexp
} else {
mt = labels.MatchNotEqual
}
} else {
if tf.IsRegexp {
mt = labels.MatchRegexp
} else {
mt = labels.MatchEqual
}
}
key := string(tf.Key)
if key == "" {
key = "__name__"
}
value := string(tf.Value)
m, err := labels.NewMatcher(mt, key, value)
if err != nil {
return nil, err
}
ms = append(ms, m)
}
return ms, nil
}
func mustCloseQuerier(q promstorage.Querier) {
if err := q.Close(); err != nil {
logger.Panicf("FATAL: cannot close querier: %s", err)
}
}

View File

@@ -26,13 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Release candidate
* SECURITY: upgrade Go builder from Go1.26.3 to Go1.26.4. See [the list of issues addressed in Go1.26.4](https://github.com/golang/go/issues?q=milestone%3AGo1.26.4%20label%3ACherryPickApproved).
* FEATURE: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add the new metrics `vm_downsampling_partitions_scheduled_rows` and `vm_retention_filters_partitions_scheduled_rows` for measuring background historical data merge completion time. See [#10960](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10960)
* FEATURE: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): support `match[]=<label_selector>` query parameters in `/api/v1/rules` and `/api/v1/alerts` APIs to return only the rules that have configured labels satisfying the provided label selectors. See [11020](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11020).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `-opentelemetry.promoteAllResourceAttributes` and `-opentelemetry.promoteScopeMetadata` command-line flags to allow managing label promotion for resource attributes and OTel scope metadata. See [OpenTelemetry](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/) docs and [#10931](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10931).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) : introduce `vmagent_remotewrite_kafka_outbuf_latency_seconds` and `vmagent_remotewrite_kafka_rtt_seconds` metrics for [kafka integration](https://docs.victoriametrics.com/victoriametrics/integrations/kafka/). The metrics could help identify throughput bottlenecks. See [#10730](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10730).

4
go.mod
View File

@@ -25,7 +25,6 @@ require (
github.com/googleapis/gax-go/v2 v2.22.0
github.com/influxdata/influxdb v1.12.4
github.com/klauspost/compress v1.18.5
github.com/oklog/ulid/v2 v2.1.1
github.com/prometheus/prometheus v0.311.3
github.com/urfave/cli/v2 v2.27.7
github.com/valyala/fastjson v1.6.10
@@ -110,6 +109,7 @@ require (
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.150.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.150.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.150.0 // indirect
@@ -155,7 +155,7 @@ require (
go.uber.org/zap v1.27.1 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.52.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/term v0.43.0 // indirect

4
go.sum
View File

@@ -509,8 +509,8 @@ go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988=
golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f h1:W3F4c+6OLc6H2lb//N1q4WpJkhzJCK5J6kUi1NTVXfM=
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f/go.mod h1:J1xhfL/vlindoeF/aINzNzt2Bket5bjo9sdOYzOsU80=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=

View File

@@ -163,8 +163,7 @@ func (b *Block) deduplicateSamplesDuringMerge() {
// Nothing to dedup.
return
}
maxTimestamp := srcTimestamps[len(srcTimestamps)-1]
dedupInterval := GetDedupInterval(maxTimestamp)
dedupInterval := GetDedupInterval()
if dedupInterval <= 0 {
// Deduplication is disabled.
return

View File

@@ -1,11 +1,29 @@
package storage
import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
//
// De-duplication is disabled if dedupInterval is 0.
//
// This function must be called before initializing the storage.
func SetDedupInterval(dedupInterval time.Duration) {
globalDedupInterval = dedupInterval.Milliseconds()
}
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
func GetDedupInterval() int64 {
return globalDedupInterval
}
var globalDedupInterval int64
func isDedupEnabled() bool {
return len(downsamplingPeriods) > 0
return globalDedupInterval > 0
}
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in milliseconds.

View File

@@ -1,123 +0,0 @@
package storage
import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metricsql"
)
// SetDownsamplingPeriods configures downsampling.
//
// The function must be called before opening or creating any storage.
func SetDownsamplingPeriods(periods []string, dedupInterval time.Duration) error {
dsps, err := parseDownsamplingPeriods(periods)
if err != nil {
return err
}
dedupIntervalMs := dedupInterval.Milliseconds()
if dedupIntervalMs > 0 {
if len(dsps) > 0 && dsps[len(dsps)-1].Offset == 0 {
return fmt.Errorf("-dedup.minScrapeInterval=%s cannot be used if -downsampling.period=%s contains zero offset", dedupInterval, periods)
}
// Deduplication is a special case of downsampling with zero offset.
dsps = append(dsps, DownsamplingPeriod{
Offset: 0,
Interval: dedupIntervalMs,
})
}
downsamplingPeriods = dsps
return nil
}
// DownsamplingPeriod describes downsampling period
type DownsamplingPeriod struct {
// Offset in milliseconds from the current time when the downsampling with the given interval must be applied
Offset int64
// Interval for downsampling - only a single sample is left per each interval
Interval int64
}
// String implements interface
func (dsp DownsamplingPeriod) String() string {
offset := time.Duration(dsp.Offset) * time.Millisecond
interval := time.Duration(dsp.Interval) * time.Millisecond
return fmt.Sprintf("%s:%s", offset, interval)
}
func (dsp *DownsamplingPeriod) parse(s string) error {
idx := strings.Index(s, ":")
if idx <= 0 {
return fmt.Errorf("incorrect format for downsampling period: %s, want `offset:interval` format", s)
}
offsetStr, intervalStr := s[:idx], s[idx+1:]
interval, err := metricsql.DurationValue(intervalStr, 0)
if err != nil {
return fmt.Errorf("incorrect interval: %s format for downsampling interval: %s err: %w", intervalStr, s, err)
}
offset, err := metricsql.DurationValue(offsetStr, 0)
if err != nil {
return fmt.Errorf("incorrect duration: %s format for downsampling offset: %s err: %w", offsetStr, s, err)
}
dsp.Interval = interval
dsp.Offset = offset
// sanity check
if offset > 0 && interval > offset {
return fmt.Errorf("downsampling interval=%d cannot exceed offset=%d", dsp.Interval, dsp.Offset)
}
return nil
}
var downsamplingPeriods []DownsamplingPeriod
// GetDedupInterval returns dedup interval, which must be applied to samples with the given timestamp.
func GetDedupInterval(timestamp int64) int64 {
dsp := getDownsamplingPeriod(timestamp)
return dsp.Interval
}
// getDownsamplingPeriod returns downsampling period, which must be used for the given timestamp
func getDownsamplingPeriod(timestamp int64) DownsamplingPeriod {
offset := int64(fasttime.UnixTimestamp())*1000 - timestamp
for _, dsp := range downsamplingPeriods {
if offset >= dsp.Offset {
return dsp
}
}
return DownsamplingPeriod{}
}
func parseDownsamplingPeriods(periods []string) ([]DownsamplingPeriod, error) {
if len(periods) == 0 {
return nil, nil
}
var dsps []DownsamplingPeriod
for _, period := range periods {
var dsp DownsamplingPeriod
if err := dsp.parse(period); err != nil {
return nil, fmt.Errorf("cannot parse downsampling period %q: %w", period, err)
}
dsps = append(dsps, dsp)
}
sort.Slice(dsps, func(i, j int) bool {
return dsps[i].Offset > dsps[j].Offset
})
dspPrev := dsps[0]
// sanity checks.
for _, dsp := range dsps[1:] {
if dspPrev.Interval <= dsp.Interval {
return nil, fmt.Errorf("prev downsampling interval %d must be bigger than the next interval %d", dspPrev.Interval, dsp.Interval)
}
if dspPrev.Offset == dsp.Offset {
return nil, fmt.Errorf("duplicate downsampling offset: %d", dsp.Offset)
}
if dspPrev.Interval%dsp.Interval != 0 {
return nil, fmt.Errorf("downsamping intervals must be multiples; prev: %d, current: %d", dspPrev.Interval, dsp.Interval)
}
dspPrev = dsp
}
return dsps, nil
}

View File

@@ -1,62 +0,0 @@
package storage
import (
"strings"
"testing"
)
func TestParseDownsamplingPeriodsFailure(t *testing.T) {
f := func(name string, src []string) {
t.Helper()
t.Run(name, func(t *testing.T) {
if _, err := parseDownsamplingPeriods(src); err == nil {
t.Fatalf("want fail for input: %s", strings.Join(src, ","))
}
})
}
f("empty duration", []string{"15d"})
f("empty interval", []string{":1m"})
f("incorrect duration decrease", []string{"30d:15h", "60d:1h"})
f("duplicate offset", []string{"30d:15h", "30d:1h"})
f("duplicate interval", []string{"60d:1h", "30d:1h"})
f("not multiple intervals", []string{"90d:12h", "60:9h", "30d:7h"})
}
func TestParseDownsamplingPeriodsSuccess(t *testing.T) {
f := func(name string, src []string, expected []DownsamplingPeriod) {
t.Helper()
t.Run(name, func(t *testing.T) {
dsps, err := parseDownsamplingPeriods(src)
if err != nil {
t.Fatalf("cannot parse downsampling configuration for: %s, err: %s", strings.Join(src, ","), err)
}
assertDownsamplingPeriods(t, expected, dsps)
})
}
f("one period", []string{"30d:1m"}, []DownsamplingPeriod{
{Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000},
})
f("three periods", []string{"15d:30s", "30d:1m", "60d:15m"}, []DownsamplingPeriod{
{Offset: 60 * 24 * 3600 * 1000, Interval: 15 * 60 * 1000},
{Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000},
{Offset: 15 * 24 * 3600 * 1000, Interval: 30 * 1000},
})
f("with the same divider periods", []string{"15d:1m", "30d:7m", "60d:14m", "90d:28m"}, []DownsamplingPeriod{
{Offset: 90 * 24 * 3600 * 1000, Interval: 28 * 60 * 1000},
{Offset: 60 * 24 * 3600 * 1000, Interval: 14 * 60 * 1000},
{Offset: 30 * 24 * 3600 * 1000, Interval: 7 * 60 * 1000},
{Offset: 15 * 24 * 3600 * 1000, Interval: 60 * 1000},
})
}
func assertDownsamplingPeriods(t *testing.T, want, got []DownsamplingPeriod) {
t.Helper()
if len(want) != len(got) {
t.Fatalf("len mismatch, want: %d, got: %d", len(want), len(got))
}
for i := 0; i < len(want); i++ {
if want[i] != got[i] {
t.Fatalf("want period: %s, got period: %s, idx: %d", want[i], got[i], i)
}
}
}

View File

@@ -400,12 +400,6 @@ func (mn *MetricName) String() string {
return fmt.Sprintf("%s{%s}", mnCopy.MetricGroup, tagsStr)
}
// SortAndMarshal sorts mn tags and then marshals them to dst.
func (mn *MetricName) SortAndMarshal(dst []byte) []byte {
mn.sortTags()
return mn.Marshal(dst)
}
// Marshal appends marshaled mn to dst and returns the result.
//
// mn.sortTags must be called before calling this function

View File

@@ -1374,7 +1374,7 @@ func (pt *partition) releasePartsToMerge(pws []*partWrapper) {
}
func (pt *partition) isFinalDedupNeeded() bool {
dedupInterval := GetDedupInterval(pt.tr.MaxTimestamp)
dedupInterval := GetDedupInterval()
pws := pt.GetParts(nil, false)
minDedupInterval := getMinDedupInterval(pws)
@@ -1610,7 +1610,7 @@ func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWrit
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
}
if dstPartPath != "" {
ph.MinDedupInterval = GetDedupInterval(ph.MaxTimestamp)
ph.MinDedupInterval = GetDedupInterval()
ph.MustWriteMetadata(dstPartPath)
}
return &ph, nil

View File

@@ -20,7 +20,7 @@ func chacha20Poly1305Open(dst []byte, key []uint32, src, ad []byte) bool
func chacha20Poly1305Seal(dst []byte, key []uint32, src, ad []byte)
var (
useAVX2 = cpu.X86.HasSSSE3 && cpu.X86.HasAVX2 && cpu.X86.HasBMI2
useAVX2 = cpu.X86.HasAVX2 && cpu.X86.HasBMI2
)
// setupState writes a ChaCha20 input matrix to state. See
@@ -47,7 +47,7 @@ func setupState(state *[16]uint32, key *[32]byte, nonce []byte) {
}
func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []byte {
if !useAVX2 {
if !cpu.X86.HasSSSE3 {
return c.sealGeneric(dst, nonce, plaintext, additionalData)
}
@@ -66,7 +66,7 @@ func (c *chacha20poly1305) seal(dst, nonce, plaintext, additionalData []byte) []
}
func (c *chacha20poly1305) open(dst, nonce, ciphertext, additionalData []byte) ([]byte, error) {
if !useAVX2 {
if !cpu.X86.HasSSSE3 {
return c.openGeneric(dst, nonce, ciphertext, additionalData)
}

File diff suppressed because it is too large Load Diff

2
vendor/modules.txt vendored
View File

@@ -851,7 +851,7 @@ go.yaml.in/yaml/v2
# go.yaml.in/yaml/v3 v3.0.4
## explicit; go 1.16
go.yaml.in/yaml/v3
# golang.org/x/crypto v0.52.0
# golang.org/x/crypto v0.51.0
## explicit; go 1.25.0
golang.org/x/crypto/chacha20
golang.org/x/crypto/chacha20poly1305