Compare commits

..

8 Commits

Author SHA1 Message Date
Max Kotliar
4841008fd6 app/vmestimator: add cluster mode with -storageNode flag
Selector nodes query each configured storage node's
/clusternative/snapshot in parallel, merge HyperLogLog sketches, and
expose consolidated cardinality_estimate metrics at
-cardinalityMetrics.exposeAt (default /metrics).

Also:
- fixed gob "duplicate type received" error
- snapshots converted from map alias to struct with sync.Mutex for safe
concurrent add() calls from parallel storage-node goroutines.
- loadConfig now returns []*estimator directly and accepts an empty path
when -storageNode is set (selector-only mode with no local estimators).
2026-06-22 10:48:02 +03:00
Max Kotliar
dcd2fba50a upd doc 2026-06-17 16:13:37 +03:00
Max Kotliar
24ae4758f2 count global inserted ts 2026-06-17 15:44:48 +03:00
Max Kotliar
b94dd42126 fix potential race in cardinality metrics writer 2026-06-17 15:44:48 +03:00
Max Kotliar
54768be72d fix potential name\value slice reuse in protoparser 2026-06-17 15:44:47 +03:00
Max Kotliar
fe1f2b7c2a rename metrics cestimator_ -> vmestimator 2026-06-17 15:24:31 +03:00
Max Kotliar
4f27d60563 fix golangci issues; fix groupValuesKey slice reuse bug; fix group reject bug 2026-06-17 15:16:48 +03:00
Max Kotliar
3d4e8b59fd app/vmestimator: Introduce a stand alone cardinality estimator service
Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10848
2026-06-17 14:20:43 +03:00
113 changed files with 5902 additions and 1428 deletions

0
.codex Normal file
View File

View File

@@ -54,14 +54,14 @@ jobs:
restore-keys: go-artifacts-${{ runner.os }}-codeql-analyze-${{ steps.go.outputs.go-version }}-
- name: Initialize CodeQL
uses: github/codeql-action/init@87557b9c84dde89fdd9b10e88954ac2f4248e463 # v4.36.1
uses: github/codeql-action/init@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
languages: go
- name: Autobuild
uses: github/codeql-action/autobuild@87557b9c84dde89fdd9b10e88954ac2f4248e463 # v4.36.1
uses: github/codeql-action/autobuild@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@87557b9c84dde89fdd9b10e88954ac2f4248e463 # v4.36.1
uses: github/codeql-action/analyze@e46ed2cbd01164d986452f91f178727624ae40d7 # v4.35.3
with:
category: 'language:go'

View File

@@ -187,7 +187,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
return c
}
func (c *client) init(argIdx int, sanitizedURL string) {
func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 {
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL)
@@ -204,20 +204,11 @@ func (c *client) init(argIdx int, sanitizedURL string) {
c.packetsDropped = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_packets_dropped_total{url=%q}`, c.sanitizedURL))
c.retriesCount = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.sanitizedURL))
c.sendDuration = metrics.GetOrCreateFloatCounter(fmt.Sprintf(`vmagent_remotewrite_send_duration_seconds_total{url=%q}`, c.sanitizedURL))
workers := queues.GetOptionalArg(argIdx)
if workers <= 0 {
workers = 1
}
inmemoryWorkers := inmemoryQueues.GetOptionalArg(argIdx)
for range inmemoryWorkers {
c.wg.Go(func() {
c.runWorker(c.fq.MustReadInMemoryBlockBlocking)
})
}
for range workers {
c.wg.Go(func() {
c.runWorker(c.fq.MustReadBlock)
})
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, c.sanitizedURL), func() float64 {
return float64(concurrency)
})
for range concurrency {
c.wg.Go(c.runWorker)
}
logger.Infof("initialized client for -remoteWrite.url=%q", c.sanitizedURL)
}
@@ -311,12 +302,12 @@ func getAWSAPIConfig(argIdx int) (*awsapi.Config, error) {
return cfg, nil
}
func (c *client) runWorker(readBlock func(dst []byte) ([]byte, bool)) {
func (c *client) runWorker() {
var ok bool
var block []byte
ch := make(chan bool, 1)
for {
block, ok = readBlock(block[:0])
block, ok = c.fq.MustReadBlock(block[:0])
if !ok {
return
}

View File

@@ -66,9 +66,6 @@ var (
queues = flagutil.NewArrayInt("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
"isn't enough for sending high volume of collected data to remote storage. "+
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage")
inmemoryQueues = flagutil.NewArrayInt("remoteWrite.inmemoryQueues", 0, "The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, "+
"while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. "+
"This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages.")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
maxPendingBytesPerURL = flagutil.NewArrayBytes("remoteWrite.maxDiskUsagePerURL", 0, "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath "+
@@ -909,8 +906,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
isPQDisabled := disableOnDiskQueue.GetOptionalArg(argIdx)
inmemoryQueueSize := inmemoryQueues.GetOptionalArg(argIdx)
queuesSize := queues.GetOptionalArg(argIdx) + inmemoryQueueSize
queuesSize := queues.GetOptionalArg(argIdx)
if queuesSize > maxQueues {
queuesSize = maxQueues
} else if queuesSize <= 0 {
@@ -927,13 +923,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
fqOpts := persistentqueue.OpenFastQueueOpts{
MaxInmemoryBlocks: maxInmemoryBlocks,
MaxPendingBytes: maxPendingBytes,
IsPQDisabled: isPQDisabled,
PrioritizeInmemoryData: inmemoryQueueSize > 0,
}
fq := persistentqueue.MustOpenFastQueueWithOpts(queuePath, sanitizedURL, fqOpts)
fq := persistentqueue.MustOpenFastQueue(queuePath, sanitizedURL, maxInmemoryBlocks, maxPendingBytes, isPQDisabled)
_ = metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{path=%q, url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(fq.GetPendingBytes())
})
@@ -946,9 +936,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
return 0
})
metrics.GetOrCreateGauge(fmt.Sprintf(`vmagent_remotewrite_queues{url=%q}`, sanitizedURL), func() float64 {
return float64(queuesSize)
})
var c *client
switch remoteWriteURL.Scheme {
@@ -957,7 +944,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
default:
logger.Fatalf("unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`", remoteWriteURL.Scheme, sanitizedURL)
}
c.init(argIdx, sanitizedURL)
c.init(argIdx, queuesSize, sanitizedURL)
// Initialize pss
sf := significantFigures.GetOptionalArg(argIdx)

View File

@@ -1,3 +1,3 @@
See vmctl docs [here](https://docs.victoriametrics.com/victoriametrics/vmctl/).
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmctl/vmctl.md).
vmctl docs can be edited at [docs/vmctl.md](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/docs/victoriametrics/vmctl.md).

View File

@@ -259,7 +259,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
fieldValues, ok := r.values[cr.field]
if !ok {
return nil, nil, fmt.Errorf("response doesn't contain field %q", cr.field)
return nil, nil, fmt.Errorf("response doesn't contain filed %q", cr.field)
}
values := make([]float64, len(fieldValues))
for i, fv := range fieldValues {

View File

@@ -563,11 +563,11 @@ func main() {
}()
err = app.Run(os.Args)
pushmetrics.StopAndPush()
if err != nil {
log.Fatalln(err)
}
log.Printf("Total time: %v", time.Since(start))
pushmetrics.StopAndPush()
}
func initConfigVM(c *cli.Context) (vm.Config, error) {

View File

@@ -405,16 +405,7 @@ func buildMatchWithFilter(filter string, metricName string) (string, error) {
if len(tf.Key) == 0 {
continue
}
switch {
case tf.IsNegative && tf.IsRegexp:
a = append(a, fmt.Sprintf("%s!~%q", tf.Key, tf.Value))
case tf.IsNegative:
a = append(a, fmt.Sprintf("%s!=%q", tf.Key, tf.Value))
case tf.IsRegexp:
a = append(a, fmt.Sprintf("%s=~%q", tf.Key, tf.Value))
default:
a = append(a, fmt.Sprintf("%s=%q", tf.Key, tf.Value))
}
a = append(a, tf.String())
}
a = append(a, nameFilter)
filters = append(filters, strings.Join(a, ","))

View File

@@ -0,0 +1,89 @@
package main
import (
"bytes"
"flag"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
var (
cardinalityMetricsWrites = metrics.NewCounter(`vmestimator_write_cardinality_metrics_total`)
cardinalityMetricsWriteDuration = metrics.NewFloatCounter(`vmestimator_write_cardinality_metrics_duration_seconds_total`)
cardinalityMetricsWriteBytes = metrics.NewCounter(`vmestimator_write_cardinality_metrics_size_bytes_total`)
cardinalityCacheMu sync.Mutex
cardinalityMetricsCacheAt time.Time
cardinalityMetricsCache []byte
cardinalityMetricsCacheTTL = flag.Duration("cardinalityMetrics.cacheTTL", time.Second*30, "Duration for caching cardinality metrics response")
cardinalityMetricsExposeAt = flag.String(`cardinalityMetrics.exposeAt`, `/metrics`, "HTTP path for exposing cardinality metrics. "+
"If set to the default /metrics, cardinality metrics are merged with regular metrics and exposed together. "+
"If set to a different path, only cardinality metrics are exposed at that endpoint. "+
"If set to an empty value, cardinality metrics are not exposed via HTTP at all.")
)
func writeCardinalityMetrics(w io.Writer, es []*estimator, storageNodeURLs []string) {
startTime := time.Now()
cardinalityCacheMu.Lock()
if time.Since(cardinalityMetricsCacheAt) >= *cardinalityMetricsCacheTTL || *cardinalityMetricsCacheTTL == 0 {
plain := bytes.NewBuffer(cardinalityMetricsCache[:0])
for _, e := range es {
e.writeMetrics(plain)
}
if len(storageNodeURLs) > 0 {
ss := newSnapshots()
var wg sync.WaitGroup
for _, nodeURL := range storageNodeURLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := fetchAndMergeSnapshots(url, ss.add); err != nil {
logger.Errorf("fetch snapshots from %s: %s", url, err)
}
}(nodeURL)
}
wg.Wait()
if err := ss.writeMetrics(plain); err != nil {
logger.Errorf("write cardinality metrics: %s", err)
}
}
cardinalityMetricsCache = plain.Bytes()
cardinalityMetricsCacheAt = time.Now()
}
cm := make([]byte, len(cardinalityMetricsCache))
copy(cm, cardinalityMetricsCache)
cardinalityCacheMu.Unlock()
if _, err := w.Write(cm); err != nil {
logger.Warnf("writing cardinality metrics: %s", err)
}
cardinalityMetricsWrites.Inc()
cardinalityMetricsWriteDuration.Add(time.Since(startTime).Seconds())
cardinalityMetricsWriteBytes.Add(len(cm))
}
func fetchAndMergeSnapshots(storageNodeURL string, cb func(s *snapshot)) error {
url := fmt.Sprintf("%s/clusternative/snapshot", storageNodeURL)
resp, err := http.Get(url) //nolint:noctx
if err != nil {
return fmt.Errorf("http get %s: %w", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %d from %s", resp.StatusCode, url)
}
return decodeSnapshots(resp.Body, cb)
}

60
app/vmestimator/config.go Normal file
View File

@@ -0,0 +1,60 @@
package main
import (
"fmt"
"os"
"sort"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"gopkg.in/yaml.v2"
)
type Config struct {
Streams []EstimatorConfig `yaml:"streams"`
}
type EstimatorConfig struct {
GroupBy []string `yaml:"group_by"`
GroupLimit int `yaml:"group_limit"`
Labels map[string]string `yaml:"labels"`
Interval time.Duration `yaml:"interval"`
Buckets int `yaml:"buckets"`
HLLPrecision uint8 `yaml:"hll_precision"`
HLLSparse *bool `yaml:"hll_sparse"`
}
func loadConfig(path string) ([]*estimator, error) {
if path == "" && len(*storageNodes) > 0 {
return nil, nil
}
if path == "" {
return nil, fmt.Errorf("no -config flag specified")
}
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("cannot read config file %q: %w", path, err)
}
var cfg Config
if err := yaml.UnmarshalStrict(data, &cfg); err != nil {
return nil, fmt.Errorf("cannot parse config file %q: %w", path, err)
}
for _, stream := range cfg.Streams {
sort.Strings(stream.GroupBy)
if stream.HLLPrecision != 0 && (stream.HLLPrecision < 4 || stream.HLLPrecision > 18) {
return nil, fmt.Errorf("invalid precision %d: must be in range [4, 18]", stream.HLLPrecision)
}
}
es := make([]*estimator, 0, len(cfg.Streams))
for _, ec := range cfg.Streams {
e, err := newEstimator(ec)
if err != nil {
logger.Fatalf("cannot create estimator: %v", err)
}
es = append(es, e)
}
return es, nil
}

View File

@@ -0,0 +1,560 @@
package main
import (
"encoding/gob"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/axiomhq/hyperloglog"
"github.com/dgryski/go-metro"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmestimator/protoparser"
)
type estimator struct {
groupBy []string
groupByKeysLabel string
groupLimit int64
groupSize atomic.Int64
groupRejectedMu sync.Mutex
groupRejectedSketch *hyperloglog.Sketch
groupRejectedSketchPrev *hyperloglog.Sketch
buckets []*estimatorBucket
metricsSet *metrics.Set
insertTotal *metrics.Counter
stopCh chan struct{}
}
func newEstimator(cfg EstimatorConfig) (*estimator, error) {
if cfg.Interval == 0 {
cfg.Interval = time.Minute * 5
}
if cfg.GroupLimit <= 0 {
cfg.GroupLimit = 10000
}
if cfg.Buckets <= 0 {
cfg.Buckets = min(64, 2*cgroup.AvailableCPUs())
}
if cfg.HLLPrecision == 0 {
cfg.HLLPrecision = 14
}
if cfg.HLLSparse == nil {
cfg.HLLSparse = new(true)
}
metricPrefix := fmt.Sprintf("cardinality_estimate{interval=%q", cfg.Interval)
if len(cfg.Labels) > 0 {
keys := make([]string, 0, len(cfg.Labels))
for k := range cfg.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
metricPrefix += fmt.Sprintf(",%s=%q", k, cfg.Labels[k])
}
}
groupByKeysLabel := "__global__"
if len(cfg.GroupBy) > 0 {
groupByKeysLabel = strings.Join(cfg.GroupBy, `,`)
}
e := &estimator{
groupBy: cfg.GroupBy,
groupByKeysLabel: groupByKeysLabel,
groupLimit: int64(cfg.GroupLimit),
groupRejectedSketch: mustNewGroupRejectSketch(),
groupRejectedSketchPrev: mustNewGroupRejectSketch(),
buckets: make([]*estimatorBucket, cfg.Buckets),
metricsSet: metrics.NewSet(),
stopCh: make(chan struct{}),
}
e.insertTotal = e.metricsSet.NewCounter(
fmt.Sprintf(`vmestimator_estimator_insert_total{group_by_keys=%q}`, e.groupByKeysLabel),
)
e.metricsSet.NewGauge(fmt.Sprintf(`vmestimator_estimator_group_rejected_size{group_by_keys=%q}`, e.groupByKeysLabel), func() float64 {
e.groupRejectedMu.Lock()
defer e.groupRejectedMu.Unlock()
return float64(e.groupRejectedSketch.Estimate())
})
for i := 0; i < len(e.buckets); i++ {
eb := &estimatorBucket{
groupBy: cfg.GroupBy,
extraLabels: cfg.Labels,
interval: cfg.Interval,
metricPrefix: metricPrefix,
groupByKeysLabel: groupByKeysLabel,
groupLimit: int64(cfg.GroupLimit),
groupSize: &e.groupSize,
groupRejectedMu: &e.groupRejectedMu,
groupRejectedSketch: e.groupRejectedSketch,
precision: cfg.HLLPrecision,
sparse: *cfg.HLLSparse,
}
if len(cfg.GroupBy) == 0 {
eb.sketch = eb.newSketch()
} else {
eb.groups = make(map[string]groupSketch)
eb.prevGroups = make(map[string]groupSketch)
e.metricsSet.NewGauge(fmt.Sprintf(`vmestimator_estimator_group_size{group_by_keys=%q,bucket="%d"}`, eb.groupByKeysLabel, i), func() float64 {
return float64(eb.groupSize.Load())
})
e.metricsSet.NewGauge(fmt.Sprintf(`vmestimator_estimator_group_limit{group_by_keys=%q,bucket="%d"}`, eb.groupByKeysLabel, i), func() float64 {
return float64(eb.groupLimit)
})
}
e.buckets[i] = eb
}
go e.runRotation(cfg.Interval)
metrics.RegisterSet(e.metricsSet)
return e, nil
}
func (e *estimator) stop() {
close(e.stopCh)
e.metricsSet.UnregisterAllMetrics()
}
var groupValuesPool = sync.Pool{}
func getGroupValuesKeySlice() *[]byte {
v0 := groupValuesPool.Get()
if v0 == nil {
v := make([]byte, 128)
return &v
}
return v0.(*[]byte)
}
func putGroupValuesSlice(key *[]byte) {
if key == nil {
return
}
*key = (*key)[:0]
groupValuesPool.Put(key)
}
func (e *estimator) insertMany(tss []protoparser.TimeSerie) {
bucketsNum := uint64(len(e.buckets))
groupValuesKeyP := getGroupValuesKeySlice()
groupValuesKey := *groupValuesKeyP
defer func() {
*groupValuesKeyP = groupValuesKey
putGroupValuesSlice(groupValuesKeyP)
}()
groupValues := make([]string, len(e.groupBy))
var cnt int
for _, ts := range tss {
if len(e.groupBy) == 0 {
i := int(ts.Fingerprint % bucketsNum)
e.buckets[i].insert(ts, "", nil)
cnt++
continue
}
groupValuesKey = groupValuesKey[:0]
clear(groupValues)
var hasNames bool
for i, labelName := range e.groupBy {
if i > 0 {
groupValuesKey = append(groupValuesKey, ',')
}
for _, l := range ts.GroupLabels {
if l.Name == labelName {
hasNames = true
groupValuesKey = append(groupValuesKey, l.Value...)
groupValues[i] = l.Value
break
}
}
}
// time series does not contribute to this groupBy
if !hasNames {
continue
}
i := int(hash(groupValuesKey) % bucketsNum)
e.buckets[i].insert(ts, bytesutil.ToUnsafeString(groupValuesKey), groupValues)
cnt++
}
e.insertTotal.Add(cnt)
}
func (e *estimator) reset() {
e.groupSize.Store(0)
for _, b := range e.buckets {
b.reset()
}
e.groupRejectedMu.Lock()
e.groupRejectedSketch.Reset()
e.groupRejectedMu.Unlock()
}
func (e *estimator) writeMetrics(w io.Writer) {
eb0 := e.buckets[0]
if len(e.groupBy) == 0 {
formatBuf := make([]byte, 0, 1024)
resSK := eb0.newSketch()
for _, eb := range e.buckets {
eb.writeNoGroupMetric(resSK)
}
formatBuf = appendGlobalMetric(formatBuf, eb0.metricPrefix)
formatBuf = strconv.AppendUint(formatBuf, resSK.Estimate(), 10)
formatBuf = append(formatBuf, "\n"...)
if _, err := w.Write(formatBuf); err != nil {
logger.Errorf("writing metrics failed: %s; written cardinality metrics might be incomplete or invalid", err)
}
return
}
formatBuf := make([]byte, 0, 16384)
formatBuf = appendGroupByKeysAndValuesPrefix(formatBuf, eb0.metricPrefix, eb0.groupByKeysLabel)
prefixLen := len(formatBuf)
resSK := eb0.newSketch()
for _, eb := range e.buckets {
formatBuf = eb.writeGroupMetrics(w, resSK, formatBuf[:prefixLen])
}
groupSize := e.groupSize.Load()
if groupSize >= int64(float64(e.groupLimit)*0.8) {
e.groupRejectedMu.Lock()
res := mustNewGroupRejectSketch()
if err := res.Merge(e.groupRejectedSketch); err != nil {
logger.Fatalf("BUG: groupRejectedSketch merge failed: %s", err)
}
if err := res.Merge(e.groupRejectedSketchPrev); err != nil {
logger.Fatalf("BUG: groupRejectedSketchPrev merge failed: %s", err)
}
e.groupRejectedMu.Unlock()
groupSize += int64(res.Estimate())
}
formatBuf = formatBuf[:0]
formatBuf = appendGroupMetric(formatBuf, eb0.metricPrefix, eb0.groupByKeysLabel)
formatBuf = strconv.AppendInt(formatBuf, groupSize, 10)
formatBuf = append(formatBuf, "\n"...)
if _, err := w.Write(formatBuf); err != nil {
logger.Errorf("writing metrics failed: %s; written cardinality metrics might be incomplete or invalid", err)
}
}
func (e *estimator) runRotation(interval time.Duration) {
t := time.NewTicker(interval / 2)
defer t.Stop()
for {
select {
case <-t.C:
e.rotate()
case <-e.stopCh:
return
}
}
}
func (e *estimator) rotate() {
e.groupSize.Store(0)
var wg sync.WaitGroup
for i := range e.buckets {
wg.Go(e.buckets[i].rotate)
}
wg.Wait()
e.groupRejectedMu.Lock()
prevSK := e.groupRejectedSketchPrev
prevSK.Reset()
e.groupRejectedSketchPrev = e.groupRejectedSketch
e.groupRejectedSketch = prevSK
e.groupRejectedMu.Unlock()
}
func (e *estimator) writeSnapshot(enc *gob.Encoder) error {
if len(e.groupBy) == 0 {
s := newSnapshot()
if err := enc.Encode(convertNoGroupToSnapshot(e, s)); err != nil {
return fmt.Errorf("encode snapshot: %w", err)
}
return nil
}
eb0 := e.buckets[0]
formatBuf := make([]byte, 0, 16384)
formatBuf = appendGroupByKeysAndValuesPrefix(formatBuf, eb0.metricPrefix, eb0.groupByKeysLabel)
s := newSnapshot()
for i, eb := range e.buckets {
s.reset()
if i == 0 {
eb.groupRejectedMu.Lock()
if eb.groupRejectedSketch != nil {
s.GroupRejectedSketch = eb.groupRejectedSketch.Clone()
}
eb.groupRejectedMu.Unlock()
}
if err := enc.Encode(convertGroupBucketToSnapshot(eb, s, formatBuf)); err != nil {
return fmt.Errorf("encode snapshot: %w", err)
}
}
return nil
}
type estimatorBucket struct {
mu sync.Mutex
groupBy []string
groupLimit int64
extraLabels map[string]string
interval time.Duration
metricPrefix string
groupByKeysLabel string
precision uint8
sparse bool
sketch *hyperloglog.Sketch
prevSketch *hyperloglog.Sketch
groupSize *atomic.Int64
groups map[string]groupSketch
prevGroups map[string]groupSketch
groupRejectedMu *sync.Mutex
groupRejectedSketch *hyperloglog.Sketch
}
func (eb *estimatorBucket) String() string {
return fmt.Sprintf(
"interval: %s; group_by: %v; extra_labels: %v", eb.interval, eb.groupBy, eb.extraLabels)
}
func (eb *estimatorBucket) reset() {
eb.mu.Lock()
defer eb.mu.Unlock()
if len(eb.groupBy) == 0 {
eb.prevSketch.Reset()
eb.sketch.Reset()
return
}
eb.groups = make(map[string]groupSketch)
eb.prevGroups = make(map[string]groupSketch)
}
func (eb *estimatorBucket) rotate() {
if len(eb.groupBy) == 0 {
eb.mu.Lock()
eb.prevSketch = eb.sketch
eb.sketch = eb.newSketch()
eb.mu.Unlock()
return
}
eb.mu.Lock()
eb.prevGroups = eb.groups
eb.groups = make(map[string]groupSketch, len(eb.groups))
eb.mu.Unlock()
eb.groupSize.Add(int64(len(eb.prevGroups)))
}
func (eb *estimatorBucket) insert(ts protoparser.TimeSerie, groupValuesKey string, groupValues []string) {
eb.mu.Lock()
defer eb.mu.Unlock()
if len(eb.groupBy) == 0 {
eb.sketch.InsertHash(ts.Fingerprint)
return
}
gsk, ok := eb.groups[groupValuesKey]
if !ok {
if _, ok := eb.prevGroups[groupValuesKey]; !ok {
groupSize := eb.groupSize.Load()
if groupSize+1 > eb.groupLimit {
eb.groupRejectedMu.Lock()
eb.groupRejectedSketch.InsertHash(hash([]byte(groupValuesKey)))
eb.groupRejectedMu.Unlock()
return
}
eb.groupSize.Add(1)
}
formatBuf := make([]byte, 0, 1024)
formatBuf = strconv.AppendQuote(formatBuf, groupValuesKey)
for i := range groupValues {
formatBuf = append(formatBuf, ',')
if eb.groupBy[i] == `__name__` {
formatBuf = append(formatBuf, `by__name__`...)
} else {
formatBuf = append(formatBuf, `by_`...)
formatBuf = append(formatBuf, eb.groupBy[i]...)
}
formatBuf = append(formatBuf, '=')
formatBuf = strconv.AppendQuote(formatBuf, groupValues[i])
}
formatBuf = append(formatBuf, `} `...)
gsk = groupSketch{
groupValueLabels: bytesutil.ToUnsafeString(formatBuf),
Sketch: eb.newSketch(),
}
eb.groups[strings.Clone(groupValuesKey)] = gsk
}
gsk.InsertHash(ts.Fingerprint)
}
func (eb *estimatorBucket) writeNoGroupMetric(res *hyperloglog.Sketch) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.mergeSketches(eb.sketch, eb.prevSketch, res)
}
func (eb *estimatorBucket) writeGroupMetrics(w io.Writer, res *hyperloglog.Sketch, formatBuf []byte) []byte {
eb.mu.Lock()
defer eb.mu.Unlock()
prefixLen := len(formatBuf)
for valuesKey, gsk := range eb.groups {
res.Reset()
formatBuf = append(formatBuf[:prefixLen], gsk.groupValueLabels...)
eb.mergeSketches(gsk.Sketch, eb.prevGroups[valuesKey].Sketch, res)
formatBuf = strconv.AppendUint(formatBuf, res.Estimate(), 10)
formatBuf = append(formatBuf, "\n"...)
if _, err := w.Write(formatBuf); err != nil {
logger.Errorf("writing metrics failed: %s; written cardinality metrics might be incomplete or invalid", err)
}
}
for valuesKey := range eb.prevGroups {
if _, ok := eb.groups[valuesKey]; ok {
continue
}
res.Reset()
formatBuf = formatBuf[:prefixLen]
gsk := eb.prevGroups[valuesKey]
formatBuf = append(formatBuf, gsk.groupValueLabels...)
eb.mergeSketches(nil, eb.prevGroups[valuesKey].Sketch, res)
formatBuf = strconv.AppendUint(formatBuf, res.Estimate(), 10)
formatBuf = append(formatBuf, "\n"...)
if _, err := w.Write(formatBuf); err != nil {
logger.Errorf("writing metrics failed: %s; written cardinality metrics might be incomplete or invalid", err)
}
}
return formatBuf[:prefixLen]
}
func (eb *estimatorBucket) mergeSketches(cur, prev, res *hyperloglog.Sketch) {
if err := res.Merge(cur); err != nil {
panic(err)
}
if prev != nil {
if err := res.Merge(prev); err != nil {
panic(err)
}
}
}
func (eb *estimatorBucket) newSketch() *hyperloglog.Sketch {
return mustNewSketch(eb.precision, eb.sparse)
}
type groupSketch struct {
groupValueLabels string
*hyperloglog.Sketch
}
func mustNewGroupRejectSketch() *hyperloglog.Sketch {
return mustNewSketch(10, true)
}
func mustNewSketch(precision uint8, sparse bool) *hyperloglog.Sketch {
sk, err := hyperloglog.NewSketch(precision, sparse)
if err != nil {
panic(fmt.Sprintf("cannot create HLL sketch with precision=%d and sparse=%v: %s", precision, sparse, err))
}
return sk
}
func hash(v []byte) uint64 {
return metro.Hash64(v, 1337)
}
// appendGlobalMetric produces:
// 'cardinality_estimate{interval="5m",group_by_keys="__global__"} '
func appendGlobalMetric(buf []byte, metricPrefix string) []byte {
buf = append(buf, metricPrefix...)
buf = append(buf, `,group_by_keys="__global__"} `...)
return buf
}
// appendGroupMetric produces:
// 'cardinality_estimate{interval="5m",group_by_keys="__group__",group_by_values="fooKey,barKey"} '
func appendGroupMetric(buf []byte, metricPrefix, groupByKeysLabel string) []byte {
buf = append(buf, metricPrefix...)
buf = append(buf, `,group_by_keys="__group__",group_by_values="`...)
buf = append(buf, groupByKeysLabel...)
buf = append(buf, `"} `...)
return buf
}
// appendGroupByKeysAndValuesPrefix produces:
// 'cardinality_estimate{interval="5m",group_by_keys="fooKey,barKey",group_by_values='
func appendGroupByKeysAndValuesPrefix(buf []byte, metricPrefix, groupByKeysLabel string) []byte {
buf = append(buf, metricPrefix...)
buf = append(buf, `,group_by_keys="`...)
buf = append(buf, groupByKeysLabel...)
buf = append(buf, `",group_by_values=`...)
return buf
}

View File

@@ -0,0 +1,274 @@
package main
import (
"fmt"
"io"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmestimator/protoparser"
)
func BenchmarkEstimator_WriteMetrics(b *testing.B) {
b.Run("NoGroup/NoPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{Interval: time.Hour})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 5_000, 0)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
b.Run("NoGroup/WithPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{Interval: time.Hour})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 5_000, 0)
for _, eb := range e.buckets {
eb.rotate()
}
insertSeriesIntoEstimator(e, 5_000, 0)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
b.Run("Group100/NoPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 5_000, 100)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
b.Run("Group100/WithPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 5_000, 100)
for _, eb := range e.buckets {
eb.rotate()
}
insertSeriesIntoEstimator(e, 5_000, 100)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
b.Run("Group10k/NoPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 50_000, 10_000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
b.Run("Group10k/WithPrev", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
insertSeriesIntoEstimator(e, 50_000, 10_000)
for _, eb := range e.buckets {
eb.rotate()
}
insertSeriesIntoEstimator(e, 50_000, 10_000)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
e.writeMetrics(io.Discard)
}
})
}
func BenchmarkEstimator_InsertManyParallel(b *testing.B) {
b.Run("NoGroup", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{Interval: time.Hour})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var i uint64
for pb.Next() {
e.insertMany([]protoparser.TimeSerie{{Fingerprint: i}})
i++
}
})
})
b.Run("Group100", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var i uint64
for pb.Next() {
e.insertMany([]protoparser.TimeSerie{{
GroupLabels: []protoparser.Label{{Name: "groupLabel", Value: fmt.Sprintf("%d", i%100)}},
Fingerprint: i,
}})
i++
}
})
})
b.Run("Group10k", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var i uint64
for pb.Next() {
e.insertMany([]protoparser.TimeSerie{{
GroupLabels: []protoparser.Label{{Name: "groupLabel", Value: fmt.Sprintf("%d", i%10_000)}},
Fingerprint: i,
}})
i++
}
})
})
b.Run("Group100k", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{
GroupBy: []string{"groupLabel"},
Interval: time.Hour,
})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var i uint64
for pb.Next() {
e.insertMany([]protoparser.TimeSerie{{
GroupLabels: []protoparser.Label{{Name: "groupLabel", Value: fmt.Sprintf("%d", i%100_000)}},
Fingerprint: i,
}})
i++
}
})
})
}
// BenchmarkEstimator_InsertRotateCycle benchmarks the insert→rotate→insert cycle
// for the global (no-group) estimator in two HLL regimes:
// - Sparse: 1 000 series per interval (sketch stays in sparse mode)
// - Normal: 30 000 series per interval (sketch converts to dense mode)
func BenchmarkEstimator_InsertRotateCycle(b *testing.B) {
b.Run("SparseHLL", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{Interval: time.Hour})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
insertSeriesIntoEstimator(e, 1_000, 0)
e.rotate()
}
})
b.Run("NormalHLL", func(b *testing.B) {
e, err := newEstimator(EstimatorConfig{Interval: time.Hour})
if err != nil {
b.Fatalf("newEstimator: %v", err)
}
defer e.stop()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
insertSeriesIntoEstimator(e, 30_000, 0)
e.rotate()
}
})
}
// insertSeriesIntoEstimator inserts numSeries time series into e.
// When groupsNum > 0 each series gets a "groupLabel" cycling through groupsNum values.
func insertSeriesIntoEstimator(e *estimator, numSeries, groupsNum int) {
for i := 0; i < numSeries; i++ {
var labels []protoparser.Label
if groupsNum > 0 {
labels = append(labels, protoparser.Label{
Name: "groupLabel",
Value: fmt.Sprintf("%d", i%groupsNum),
})
}
e.insertMany([]protoparser.TimeSerie{
{
GroupLabels: labels,
Fingerprint: hash([]byte(fmt.Sprintf("foobarbaz%d", i))),
},
})
}
}

View File

@@ -0,0 +1,595 @@
package main
import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmestimator/protoparser"
)
func TestGlobalEstimate(t *testing.T) {
genCard := func(cardinality int, seed string) func(e *estimator) {
return func(e *estimator) {
var tss []protoparser.TimeSerie
fpBuf := make([]byte, 8, 8+len(seed))
for i := 0; i < cardinality; i++ {
binary.LittleEndian.PutUint64(fpBuf[:8], uint64(i))
fpBuf = append(fpBuf, seed...)
tss = append(tss, protoparser.TimeSerie{
Fingerprint: hash(fpBuf[:]),
})
if i%10 == 0 {
e.insertMany(tss)
tss = tss[:0]
}
}
if len(tss) > 0 {
e.insertMany(tss)
}
}
}
f := func(gen func(e *estimator), expMetric string) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
Buckets: 5,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
if len(e.buckets) != cfg.Buckets {
t.Fatalf("expected buckets length to be %d but got %d", cfg.Buckets, len(e.buckets))
}
for i, eb := range e.buckets {
if len(eb.groupBy) > 0 {
t.Fatalf("expected bucket %d groupBy length to be 0 but got %d", i, len(eb.groupBy))
}
if eb.groups != nil {
t.Fatalf("expected bucket %d groups length to be 0 but got %d", i, len(eb.groups))
}
if eb.groupSize.Load() != 0 {
t.Fatalf("expected bucket %d groupSize to be 0 but got %d", i, eb.groupSize.Load())
}
}
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
assertMetricsSame(t, "", expMetric, buf.String())
}
// no previous
f(genCard(0, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genCard(1, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 1`)
f(genCard(10, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 10`)
f(genCard(100, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 100`)
f(genCard(1000, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 1000`)
f(genCard(5000, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 4998`)
f(genCard(10000, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 9920`)
f(genCard(100000, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 99658`)
f(genCard(500000, ""), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 496552`)
// rotate once
genRotateOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality, "")(e)
e.rotate()
}
}
f(genRotateOnce(0), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateOnce(1), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 1`)
f(genRotateOnce(10), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 10`)
f(genRotateOnce(100), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 100`)
f(genRotateOnce(1000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 1000`)
f(genRotateOnce(5000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 4998`)
f(genRotateOnce(10000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 9920`)
f(genRotateOnce(100000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 99658`)
f(genRotateOnce(500000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 496552`)
// insert, rotate insert the same
genInsertRotateInsertSameOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality/2, "")(e)
e.rotate()
genCard(cardinality/2, "")(e)
}
}
f(genInsertRotateInsertSameOnce(0), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genInsertRotateInsertSameOnce(1), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genInsertRotateInsertSameOnce(10), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 5`)
f(genInsertRotateInsertSameOnce(100), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 50`)
f(genInsertRotateInsertSameOnce(1000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 500`)
f(genInsertRotateInsertSameOnce(5000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 2499`)
f(genInsertRotateInsertSameOnce(10000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 4998`)
f(genInsertRotateInsertSameOnce(100000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 49529`)
f(genInsertRotateInsertSameOnce(200000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 99658`)
// insert, rotate insert
genInsertRotateInsertOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality/2, "one")(e)
e.rotate()
genCard(cardinality/2, "two")(e)
}
}
f(genInsertRotateInsertOnce(0), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genInsertRotateInsertOnce(1), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genInsertRotateInsertOnce(10), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 10`)
f(genInsertRotateInsertOnce(100), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 100`)
f(genInsertRotateInsertOnce(1000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 1000`)
f(genInsertRotateInsertOnce(5000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 5000`)
f(genInsertRotateInsertOnce(10000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 10058`)
f(genInsertRotateInsertOnce(100000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 99543`)
f(genInsertRotateInsertOnce(200000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 198814`)
// insert, rotate insert
genRotateTwoTimes := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality, "")(e)
e.rotate()
e.rotate()
}
}
f(genRotateTwoTimes(0), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(1), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(10), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(100), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(1000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(5000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(10000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(100000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
f(genRotateTwoTimes(500000), `cardinality_estimate{interval="10m0s",group_by_keys="__global__"} 0`)
}
func TestGroupEstimate(t *testing.T) {
genCard := func(fooCard, barCard, bazCard int, seed string) func(e *estimator) {
return func(e *estimator) {
var tss []protoparser.TimeSerie
for fooI := 0; fooI < max(1, fooCard); fooI++ {
for barI := 0; barI < max(1, barCard); barI++ {
for bazI := 0; bazI < max(1, bazCard); bazI++ {
ts := protoparser.TimeSerie{}
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "__name__", Value: "the_metric_name"})
if fooCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "foo", Value: fmt.Sprintf("%s%d", seed, fooI)})
}
if barCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "bar", Value: fmt.Sprintf("%s%d", seed, barI)})
}
if bazCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "baz", Value: fmt.Sprintf("%s%d", seed, bazI)})
}
var fpBuf []byte
for _, l := range ts.GroupLabels {
fpBuf = append(fpBuf, l.Name...)
fpBuf = append(fpBuf, '=')
fpBuf = append(fpBuf, l.Value...)
fpBuf = append(fpBuf, ',')
}
fpBuf = append(fpBuf, seed...)
ts.Fingerprint = hash(fpBuf)
tss = append(tss, ts)
}
}
}
e.insertMany(tss)
}
}
f := func(groupBy []string, gen func(e *estimator), expMetrics string) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
GroupBy: groupBy,
Buckets: 5,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
if len(e.buckets) != cfg.Buckets {
t.Fatalf("expected buckets length to be %d but got %d", cfg.Buckets, len(e.buckets))
}
for i, eb := range e.buckets {
if eb.sketch != nil {
t.Fatalf("expected bucket %d sketch to be nil", i)
}
if eb.prevSketch != nil {
t.Fatalf("expected bucket %d prevSketch to be nil", i)
}
}
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
assertMetricsSame(t, "", expMetrics, buf.String())
}
// group by metric name
f([]string{"__name__"}, genCard(10, 10, 10, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="__name__"} 1
cardinality_estimate{interval="10m0s",group_by_keys="__name__",group_by_values="the_metric_name",by__name__="the_metric_name"} 1000`,
)
// time series does not contribute to a group
f([]string{"foo"}, genCard(0, 10, 10, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 0`,
)
f([]string{"foo", "bar"}, genCard(0, 0, 10, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 0`,
)
// group by one label
f([]string{"foo"}, genCard(1, 1, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1`,
)
f([]string{"foo"}, genCard(1, 2, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 2`,
)
f([]string{"foo"}, genCard(1, 10, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 10`,
)
f([]string{"foo"}, genCard(1, 100, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 100`,
)
f([]string{"foo"}, genCard(1, 1000, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1000`,
)
f([]string{"foo"}, genCard(1, 10000, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 9957`,
)
f([]string{"foo"}, genCard(1, 50000, 0, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 50387`,
)
f([]string{"foo"}, genCard(1, 1, 1, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1`,
)
f([]string{"foo"}, genCard(1, 2, 2, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 4`,
)
f([]string{"foo"}, genCard(1, 10, 10, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 100`,
)
f([]string{"foo"}, genCard(1, 100, 100, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 9954`,
)
f([]string{"foo"}, genCard(1, 1000, 1000, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1013124`,
)
// group by one label, rotate
genCardRotate := func(fooCard, barCard, bazCard int, seed string) func(e *estimator) {
return func(e *estimator) {
genCard(fooCard, barCard, bazCard, seed)(e)
e.rotate()
}
}
f([]string{"foo"}, genCardRotate(1, 10, 10, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 100`,
)
f([]string{"foo"}, genCardRotate(1, 1000, 1000, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1013124`,
)
// group by one label, rotate, insert same
genCardRotateInsertSame := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "")(e)
e.rotate()
genCard(1, barCard, bazCard, "")(e)
}
}
f([]string{"foo"}, genCardRotateInsertSame(10, 10), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 100`,
)
f([]string{"foo"}, genCardRotateInsertSame(1000, 1000), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="0",by_foo="0"} 1013124`,
)
// group by one label, rotate, insert diff
genCardRotateInsertDiff := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "one")(e)
e.rotate()
genCard(1, barCard, bazCard, "two")(e)
}
}
f([]string{"foo"}, genCardRotateInsertDiff(10, 10), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 2
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="one0",by_foo="one0"} 100
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="two0",by_foo="two0"} 100`,
)
f([]string{"foo"}, genCardRotateInsertDiff(1000, 1000), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 2
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="one0",by_foo="one0"} 995153
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="two0",by_foo="two0"} 992158`,
)
// group by one label, rotate, insert diff
genCardRotateTwice := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "one")(e)
e.rotate()
e.rotate()
}
}
f([]string{"foo"}, genCardRotateTwice(10, 10), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 0`,
)
f([]string{"foo"}, genCardRotateTwice(1000, 1000), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 0`,
)
// group by two labels
f([]string{"foo", "bar"}, genCard(1, 1, 1000, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,0",by_foo="0",by_bar="0"} 1000`,
)
f([]string{"foo", "bar"}, genCard(2, 1, 1000, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 2
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,0",by_foo="0",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,0",by_foo="1",by_bar="0"} 1000`,
)
f([]string{"foo", "bar"}, genCard(2, 2, 1000, ""), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 4
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,0",by_foo="0",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,1",by_foo="0",by_bar="1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,0",by_foo="1",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,1",by_foo="1",by_bar="1"} 1000`,
)
// group by two labels, rotate
genCardTwoLabelsRotate := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "")(e)
e.rotate()
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotate(), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 4
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,0",by_foo="0",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,1",by_foo="0",by_bar="1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,0",by_foo="1",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,1",by_foo="1",by_bar="1"} 1000`,
)
// group by two labels, rotate, insert same
genCardTwoLabelsRotateInsertSame := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "")(e)
e.rotate()
genCard(2, 2, 1000, "")(e)
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotateInsertSame(), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 4
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,0",by_foo="0",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="0,1",by_foo="0",by_bar="1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,0",by_foo="1",by_bar="0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="1,1",by_foo="1",by_bar="1"} 1000`,
)
// group by two labels, rotate, insert diff
genCardTwoLabelsRotateInsertDiff := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "one")(e)
e.rotate()
genCard(2, 2, 1000, "two")(e)
}
}
f(
[]string{"foo", "bar"}, genCardTwoLabelsRotateInsertDiff(), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 8
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="one0,one0",by_foo="one0",by_bar="one0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="one0,one1",by_foo="one0",by_bar="one1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="one1,one0",by_foo="one1",by_bar="one0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="one1,one1",by_foo="one1",by_bar="one1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="two0,two0",by_foo="two0",by_bar="two0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="two0,two1",by_foo="two0",by_bar="two1"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="two1,two0",by_foo="two1",by_bar="two0"} 1000
cardinality_estimate{interval="10m0s",group_by_keys="foo,bar",group_by_values="two1,two1",by_foo="two1",by_bar="two1"} 1000`,
)
// group by two labels, rotate, insert diff
genCardTwoLabelsRotateTwice := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "one")(e)
e.rotate()
e.rotate()
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotateTwice(), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo,bar"} 0`,
)
// quote values: label values with special characters must be properly escaped
genSpecialCard := func(fooVal string) func(e *estimator) {
return func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{
{
GroupLabels: []protoparser.Label{{Name: "foo", Value: fooVal}},
Fingerprint: hash([]byte("foo=" + fooVal + ",")),
},
})
}
}
// double quote in value
f([]string{"foo"}, genSpecialCard(`a"b`), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a\"b",by_foo="a\"b"} 1`,
)
f([]string{"foo"}, genSpecialCard(`a\b`), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a\\b",by_foo="a\\b"} 1`,
)
f([]string{"foo"}, genSpecialCard("a\nb"), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a\nb",by_foo="a\nb"} 1`,
)
f([]string{"foo"}, genSpecialCard("a\tb"), `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a\tb",by_foo="a\tb"} 1`,
)
}
func TestGroupEstimateGroupLimit(t *testing.T) {
makeTS := func(fooVal string) protoparser.TimeSerie {
return protoparser.TimeSerie{
GroupLabels: []protoparser.Label{{Name: "foo", Value: fooVal}},
Fingerprint: hash([]byte("foo=" + fooVal + ",")),
}
}
f := func(groupLimit int, gen func(e *estimator), expRejected int, expMetrics string) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
GroupBy: []string{"foo"},
GroupLimit: groupLimit,
Buckets: 3,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
assertMetricsSame(t, "", expMetrics, buf.String())
var actRejected int
if e.buckets[0].groupRejectedSketch != nil {
actRejected = int(e.buckets[0].groupRejectedSketch.Estimate())
}
if expRejected != actRejected {
t.Fatalf("rejected expected: %d; got: %d", expRejected, actRejected)
}
}
// all groups accepted
f(3, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 0, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 3
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a",by_foo="a"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="b",by_foo="b"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="c",by_foo="c"} 1`,
)
// 2 groups only accepted
f(2, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 1, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 3
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a",by_foo="a"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="b",by_foo="b"} 1`,
)
// one group only accepted
f(1, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 2, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 3
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a",by_foo="a"} 1`,
)
// after rotate: groups in prevGroups bypass the limit; new groups are still checked
f(2, func(e *estimator) {
// fills limit
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b")})
e.rotate()
// "a" bypasses, "c" rejected
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("c")})
}, 1, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 3
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a",by_foo="a"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="b",by_foo="b"} 1`,
)
// after rotate: new group accepted when remaining capacity allows
f(3, func(e *estimator) {
// 2 groups, limit=3
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b")})
e.rotate()
// "a" bypasses, "c" accepted (2+1=3 <= 3)
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("c")})
}, 0, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 3
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a",by_foo="a"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="b",by_foo="b"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="c",by_foo="c"} 1`,
)
// reject 100
f(3, func(e *estimator) {
var tss []protoparser.TimeSerie
for i := 0; i < 103; i++ {
tss = append(tss, makeTS(fmt.Sprintf("a%d", i)))
}
e.insertMany(tss)
}, 100, `
cardinality_estimate{interval="10m0s",group_by_keys="__group__",group_by_values="foo"} 103
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a0",by_foo="a0"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a1",by_foo="a1"} 1
cardinality_estimate{interval="10m0s",group_by_keys="foo",group_by_values="a2",by_foo="a2"} 1`,
)
}
func assertMetricsSame(t *testing.T, msg, exp, act string) {
t.Helper()
expLines := strings.Split(strings.TrimSpace(exp), "\n")
sort.Strings(expLines)
expSorted := strings.TrimSpace(strings.Join(expLines, "\n"))
actLines := strings.Split(strings.TrimSpace(act), "\n")
sort.Strings(actLines)
actSorted := strings.TrimSpace(strings.Join(actLines, "\n"))
if expSorted != actSorted {
t.Fatalf("%s\nexpected:\n%s\n\ngot:\n%s", msg, expSorted, actSorted)
}
}

135
app/vmestimator/main.go Normal file
View File

@@ -0,0 +1,135 @@
package main
import (
"bufio"
"encoding/gob"
"flag"
"io"
"net/http"
"os"
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmestimator/protoparser"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
)
var (
httpListenAddrs = flagutil.NewArrayString("httpListenAddr", "TCP address to listen for incoming HTTP requests")
configPath = flag.String("config", "", "Path to YAML configuration file")
storageNodes = flagutil.NewArrayString("storageNode", "HTTP URLs of remote vmestimator nodes to query for cardinality snapshots, e.g. http://vmestimator-2:8490")
prometheusWriteRequests = metrics.NewCounter(`vmestimator_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`)
)
func main() {
flag.CommandLine.SetOutput(os.Stdout)
envflag.Parse()
buildinfo.Init()
logger.Init()
es, err := loadConfig(*configPath)
if err != nil {
logger.Fatalf("cannot load config: %v", err)
}
if *cardinalityMetricsExposeAt == `/metrics` {
metrics.RegisterMetricsWriter(func(w io.Writer) {
writeCardinalityMetrics(w, es, *storageNodes)
})
}
groupLabelsMap := make(map[string]struct{})
for _, e := range es {
for _, l := range e.groupBy {
groupLabelsMap[l] = struct{}{}
}
}
groupLabels := make([]string, 0, len(groupLabelsMap))
for k := range groupLabelsMap {
groupLabels = append(groupLabels, k)
}
listenAddrs := *httpListenAddrs
if len(listenAddrs) == 0 {
listenAddrs = []string{":8490"}
}
logger.Infof("starting vmestimator at %q", listenAddrs)
startTime := time.Now()
go httpserver.Serve(listenAddrs, func(w http.ResponseWriter, r *http.Request) bool {
cmPath := *cardinalityMetricsExposeAt
if cmPath != "/metrics" && cmPath != "" && r.URL.Path == cmPath {
w.WriteHeader(http.StatusOK)
writeCardinalityMetrics(w, es, *storageNodes)
return true
}
path, _ := strings.CutPrefix(r.URL.Path, `/cardinality`)
switch path {
case "/api/v1/write":
prometheusWriteRequests.Inc()
err := protoparser.Parse(r.Body, groupLabels, func(tss []protoparser.TimeSerie) {
for _, e := range es {
e.insertMany(tss)
}
})
if err != nil {
httpserver.Errorf(w, r, "error parsing remote write request: %s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/clusternative/query", "/clusternative/snapshot":
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
bw := bufio.NewWriterSize(w, 64*1024)
enc := gob.NewEncoder(bw)
for _, e := range es {
if err := e.writeSnapshot(enc); err != nil {
logger.Errorf("write snapshot binary: %s", err)
}
}
if err := bw.Flush(); err != nil {
logger.Errorf("flush snapshot binary: %s", err)
}
return true
case "/reset":
for _, e := range es {
e.reset()
}
w.WriteHeader(http.StatusOK)
return true
}
return false
}, httpserver.ServeOptions{})
logger.Infof("started vmestimator in %.3f seconds", time.Since(startTime).Seconds())
pushmetrics.Init()
sig := procutil.WaitForSigterm()
logger.Infof("received signal %s", sig)
pushmetrics.Stop()
logger.Infof("gracefully shutting down webservice at %q", listenAddrs)
if err := httpserver.Stop(listenAddrs); err != nil {
logger.Errorf("cannot stop http server: %s", err)
}
for _, e := range es {
e.stop()
}
logger.Infof("shutting down vmestimator")
}

View File

@@ -0,0 +1,78 @@
package protoparser
import (
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"
)
var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
// Parse parses Prometheus remote_write message from reader and calls callback for the parsed timeseries.
//
// callback shouldn't hold tss after returning.
func Parse(r io.Reader, groupLabels []string, callback func(tss []TimeSerie)) error {
startTime := fasttime.UnixTimestamp()
readCalls.Inc()
err := protoparserutil.ReadUncompressedData(r, "", maxInsertRequestSize, func(data []byte) error {
return parseRequestBody(data, groupLabels, callback)
})
if err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read prometheus remote_write data from client in %d seconds: %w", fasttime.UnixTimestamp()-startTime, err)
}
return nil
}
func parseRequestBody(data []byte, groupLabels []string, callback func(tss []TimeSerie)) error {
// Synchronously process the request in order to properly return errors to Parse caller,
// so it could properly return HTTP 503 status code in response.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
bb := bodyBufferPool.Get()
defer bodyBufferPool.Put(bb)
if encoding.IsZstd(data) {
var err error
bb.B, err = encoding.DecompressZSTDLimited(bb.B[:0], data, maxInsertRequestSize.IntN())
if err != nil {
return fmt.Errorf("cannot decompress zstd-encoded request with length %d: %w", len(data), err)
}
} else {
var err error
bb.B, err = snappy.Decode(bb.B, data, maxInsertRequestSize.IntN())
if err != nil {
return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(data), err)
}
}
if int64(len(bb.B)) > maxInsertRequestSize.N {
return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B))
}
wru := getWriteRequestUnmarshaler()
defer putWriteRequestUnmarshaler(wru)
if err := wru.UnmarshalProtobuf(bb.B, groupLabels, func(tss []TimeSerie) {
rowsRead.Add(len(tss))
callback(tss)
}); err != nil {
unmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err)
}
return nil
}
var bodyBufferPool bytesutil.ByteBufferPool
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="promremotewrite"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="promremotewrite"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="promremotewrite"}`)
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="promremotewrite"}`)
)

View File

@@ -0,0 +1,67 @@
package protoparser
import (
"bytes"
"fmt"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/golang/snappy"
)
func BenchmarkParse(b *testing.B) {
data := buildSnappyEncodedWriteRequest(5000, 20, 20, 3)
groupLabels := []string{
"foo",
"bar",
"baz",
"__name__",
"job",
"groupLabel",
}
var cnt int
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(data)))
for b.Loop() {
err := Parse(bytes.NewReader(data), groupLabels, func(tss []TimeSerie) {
cnt += len(tss)
})
if err != nil {
b.Fatalf("stream.Parse: %v", err)
}
}
}
// buildSnappyEncodedWriteRequest builds a snappy-encoded protobuf WriteRequest
// with numSeries time series, each having numLabels labels of labelSize bytes each.
func buildSnappyEncodedWriteRequest(numSeries, numLabels, labelSize, groupsNum int) []byte {
labelValue := strings.Repeat("x", labelSize)
tss := make([]prompb.TimeSeries, numSeries)
for i := range tss {
labels := make([]prompb.Label, numLabels)
for j := range labels {
labels[j] = prompb.Label{
Name: fmt.Sprintf("label%02d", j),
Value: fmt.Sprintf("val%05d_%s", i, labelValue),
}
}
labels = append(labels, prompb.Label{
Name: "groupLabel",
Value: fmt.Sprintf("%d", i%groupsNum),
})
tss[i] = prompb.TimeSeries{
Labels: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 1000}},
}
}
wr := &prompb.WriteRequest{Timeseries: tss}
pbData := wr.MarshalProtobuf(nil)
return snappy.Encode(nil, pbData)
}

View File

@@ -0,0 +1,170 @@
package protoparser
import (
"fmt"
"slices"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/easyproto"
"github.com/cespare/xxhash/v2"
)
type TimeSerie struct {
GroupLabels []Label
Fingerprint uint64
}
type Label struct {
Name string
Value string
}
func getWriteRequestUnmarshaler() *writeRequestUnmarshaler {
v := wruPool.Get()
if v == nil {
return &writeRequestUnmarshaler{
tss: make([]TimeSerie, 0, 1024),
labelsPool: make([]Label, 0, 4096),
d: xxhash.New(),
}
}
return v.(*writeRequestUnmarshaler)
}
func putWriteRequestUnmarshaler(wru *writeRequestUnmarshaler) {
wru.Reset()
wruPool.Put(wru)
}
var wruPool sync.Pool
// WriteRequestUnmarshaler is reusable unmarshaler for WriteRequest protobuf messages.
//
// It maintains internal pools for labels and samples to reduce memory allocations.
// See UnmarshalProtobuf for details on how to use it.
type writeRequestUnmarshaler struct {
tss []TimeSerie
labelsPool []Label
d *xxhash.Digest
}
// Reset resets wru, so it could be re-used.
func (wru *writeRequestUnmarshaler) Reset() {
wru.tss = wru.tss[:0]
wru.labelsPool = wru.labelsPool[:0]
wru.d.Reset()
}
func (wru *writeRequestUnmarshaler) UnmarshalProtobuf(src []byte, groupLabels []string, callback func(tss []TimeSerie)) error {
wru.Reset()
var err error
tss := wru.tss
// message WriteRequest {
// repeated TimeSeries timeseries = 1;
// reserved 2;
// repeated Metadata metadata = 3;
// }
labelsPool := wru.labelsPool
var fc easyproto.FieldContext
for len(src) > 0 {
if len(tss) >= cap(tss) {
callback(tss)
tss = tss[:0]
labelsPool = labelsPool[:0]
}
src, err = fc.NextField(src)
if err != nil {
return fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return fmt.Errorf("cannot read timeseries data")
}
tss = tss[:len(tss)+1]
ts := &tss[len(tss)-1]
d := wru.d
d.Reset()
labelsPool, err = ts.unmarshalProtobuf(data, groupLabels, labelsPool, d)
if err != nil {
return fmt.Errorf("cannot unmarshal timeseries: %w", err)
}
}
}
if len(tss) > 0 {
callback(tss)
tss = tss[:0]
labelsPool = labelsPool[:0]
}
wru.tss = tss[:0]
wru.labelsPool = labelsPool
wru.d.Reset()
return nil
}
func (ts *TimeSerie) unmarshalProtobuf(src []byte, groupLabels []string, labelsPool []Label, d *xxhash.Digest) ([]Label, error) {
// message TimeSeries {
// repeated Label labels = 1;
// repeated Sample samples = 2;
// }
labelsPoolLen := len(labelsPool)
var fc easyproto.FieldContext
var lfc easyproto.FieldContext
for len(src) > 0 {
var err error
src, err = fc.NextField(src)
if err != nil {
return labelsPool, fmt.Errorf("cannot read the next field: %w", err)
}
switch fc.FieldNum {
case 1:
data, ok := fc.MessageData()
if !ok {
return labelsPool, fmt.Errorf("cannot read label data")
}
var nameBytes, valueBytes []byte
ldata := data
for len(ldata) > 0 {
ldata, err = lfc.NextField(ldata)
if err != nil {
return labelsPool, fmt.Errorf("cannot read label field: %w", err)
}
switch lfc.FieldNum {
case 1:
nameBytes, ok = lfc.Bytes()
if !ok {
return labelsPool, fmt.Errorf("cannot read label name")
}
case 2:
valueBytes, ok = lfc.Bytes()
if !ok {
return labelsPool, fmt.Errorf("cannot read label value")
}
}
}
_, _ = d.Write(data)
name := bytesutil.ToUnsafeString(nameBytes)
if slices.Contains(groupLabels, name) {
labelsPool = append(labelsPool, Label{
Name: name,
Value: bytesutil.ToUnsafeString(valueBytes),
})
}
}
}
ts.GroupLabels = labelsPool[labelsPoolLen:]
ts.Fingerprint = d.Sum64()
return labelsPool, nil
}

View File

@@ -0,0 +1,86 @@
package protoparser
import (
"fmt"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func BenchmarkWriteRequest_UnmarshalProtobuf(b *testing.B) {
var data = make([]byte, 0, 21_000_000)
f := func(rows, labels, labelSize, groupBy int) {
bName := fmt.Sprintf("Rows=%d/Labels=%d/LabelSize=%d/GroupBy=%d", rows, labels, labelSize, groupBy)
b.Run(bName, func(b *testing.B) {
data := buildEncodedWriteRequest(data, rows, labels, labelSize, groupBy)
groupLabels := []string{
"foo",
"bar",
"baz",
"__name__",
"job",
"groupLabel",
}
wru := getWriteRequestUnmarshaler()
cnt := 0
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(data)))
for b.Loop() {
wru.Reset()
if err := wru.UnmarshalProtobuf(data, groupLabels, func(tss []TimeSerie) {
cnt += len(tss)
}); err != nil {
b.Fatalf("unexpected error: %s", err)
}
}
})
}
f(5_000, 0, 0, 3)
f(5_000, 1, 20, 3)
f(1_000, 20, 20, 3)
f(5_000, 20, 20, 3)
f(10_000, 20, 20, 3)
f(20_000, 20, 20, 3)
// long label values
f(1_000, 20, 2000, 3)
// many labels
f(1_000, 2000, 100, 3)
}
// buildEncodedWriteRequest builds a snappy-encoded protobuf WriteRequest
// with numSeries time series, each having numLabels labels of labelSize bytes each.
func buildEncodedWriteRequest(dst []byte, numSeries, numLabels, labelSize, groupsNum int) []byte {
labelValue := strings.Repeat("x", labelSize)
tss := make([]prompb.TimeSeries, numSeries)
for i := range tss {
labels := make([]prompb.Label, numLabels)
for j := range labels {
labels[j] = prompb.Label{
Name: fmt.Sprintf("label%02d", j),
Value: fmt.Sprintf("val%05d_%s", i, labelValue),
}
}
labels = append(labels, prompb.Label{
Name: "groupLabel",
Value: fmt.Sprintf("%d", i%groupsNum),
})
tss[i] = prompb.TimeSeries{
Labels: labels,
Samples: []prompb.Sample{{Value: 1, Timestamp: 1000}},
}
}
wr := &prompb.WriteRequest{Timeseries: tss}
return wr.MarshalProtobuf(dst[:0])
}

237
app/vmestimator/snapshot.go Normal file
View File

@@ -0,0 +1,237 @@
package main
import (
"encoding/gob"
"io"
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/axiomhq/hyperloglog"
)
type snapshots struct {
mu sync.Mutex
m map[string]*snapshot
}
func newSnapshots() *snapshots {
return &snapshots{m: make(map[string]*snapshot)}
}
func (ss *snapshots) add(newS *snapshot) {
ss.mu.Lock()
defer ss.mu.Unlock()
key := newS.GroupByKeysLabel
if s, found := ss.m[key]; found {
s.merge(newS)
return
}
s := newSnapshot()
s.merge(newS)
ss.m[key] = s
}
func (ss *snapshots) writeMetrics(w io.Writer) error {
ss.mu.Lock()
defer ss.mu.Unlock()
for _, s := range ss.m {
if err := s.writeMetrics(w); err != nil {
return err
}
}
return nil
}
type snapshot struct {
MetricPrefix string
GroupByKeysLabel string
GroupRejectedSketch *hyperloglog.Sketch
GroupBy []string
// prom string metric => hll
Sketches map[string]*hyperloglog.Sketch
}
func newSnapshot() *snapshot {
return &snapshot{
Sketches: make(map[string]*hyperloglog.Sketch),
}
}
// decodeSnapshot reads a stream of gob-encoded EstimatorMerge objects from the response and merges them into the provided estimatorMerge object.
func decodeSnapshots(r io.Reader, cb func(s *snapshot)) error {
d := gob.NewDecoder(r)
s := newSnapshot()
for {
s.reset()
if err := d.Decode(s); err != nil {
if err == io.EOF {
return nil
}
return err
}
cb(s)
}
}
func (s *snapshot) merge(other *snapshot) {
if s.GroupByKeysLabel != "" && s.GroupByKeysLabel != other.GroupByKeysLabel {
logger.Panicf("BUG: merge snapshots must have the same groupByKeysLabel; s: %s; other: %s", s.GroupByKeysLabel, other.GroupByKeysLabel)
}
for name, otherSK := range other.Sketches {
if existing, ok := s.Sketches[name]; ok {
existing.Merge(otherSK)
} else {
s.Sketches[name] = otherSK.Clone()
}
}
s.MetricPrefix = other.MetricPrefix
s.GroupByKeysLabel = other.GroupByKeysLabel
s.GroupBy = append(s.GroupBy, other.GroupBy...)
if other.GroupRejectedSketch != nil {
if s.GroupRejectedSketch == nil {
s.GroupRejectedSketch = other.GroupRejectedSketch.Clone()
} else {
s.GroupRejectedSketch.Merge(other.GroupRejectedSketch)
}
}
}
// writeMetrics writes metrics to w.
// w must be a buffered writer.
func (s *snapshot) writeMetrics(w io.Writer) error {
for name, sketch := range s.Sketches {
if _, err := w.Write(bytesutil.ToUnsafeBytes(name)); err != nil {
return err
}
if _, err := w.Write(strconv.AppendUint(nil, sketch.Estimate(), 10)); err != nil {
return err
}
if _, err := w.Write([]byte("\n")); err != nil {
return err
}
}
if len(s.GroupBy) > 0 {
groupSize := int64(len(s.Sketches))
if s.GroupRejectedSketch != nil {
groupSize += int64(s.GroupRejectedSketch.Estimate())
}
formatBuf := make([]byte, 0, 1024)
formatBuf = appendGroupMetric(formatBuf, s.MetricPrefix, s.GroupByKeysLabel)
formatBuf = strconv.AppendInt(formatBuf, groupSize, 10)
formatBuf = append(formatBuf, "\n"...)
if _, err := w.Write(formatBuf); err != nil {
logger.Errorf("writing metrics failed: %s; written cardinality metrics might be incomplete or invalid", err)
}
}
return nil
}
func (s *snapshot) reset() {
s.GroupByKeysLabel = ""
s.GroupRejectedSketch = nil
s.MetricPrefix = ""
s.GroupBy = s.GroupBy[:0]
clear(s.Sketches)
}
func convertNoGroupToSnapshot(e *estimator, s *snapshot) *snapshot {
if len(e.groupBy) != 0 {
panic("BUG: do not use this function for estimator with non-empty groupBy")
}
if s == nil {
s = newSnapshot()
}
s.reset()
eb0 := e.buckets[0]
resSK := eb0.newSketch()
for _, eb := range e.buckets {
eb.writeNoGroupMetric(resSK)
}
formatBuf := make([]byte, 0, 1024)
formatBuf = appendGlobalMetric(formatBuf, eb0.metricPrefix)
s.Sketches[string(formatBuf)] = resSK
s.GroupByKeysLabel = eb0.groupByKeysLabel
s.MetricPrefix = eb0.metricPrefix
s.GroupBy = append(s.GroupBy[:0], eb0.groupBy...)
return s
}
func convertGroupToSnapshot(e *estimator, s *snapshot) *snapshot {
if len(e.groupBy) == 0 {
panic("BUG: do not use this function for estimator with empty groupBy")
}
eb0 := e.buckets[0]
formatBuf := make([]byte, 0, 16384)
formatBuf = appendGroupByKeysAndValuesPrefix(formatBuf, eb0.metricPrefix, eb0.groupByKeysLabel)
if s == nil {
s = newSnapshot()
}
s.reset()
for _, eb := range e.buckets {
eb.groupRejectedMu.Lock()
if eb.groupRejectedSketch != nil {
s.GroupRejectedSketch = eb.groupRejectedSketch.Clone()
}
eb.groupRejectedMu.Unlock()
s = convertGroupBucketToSnapshot(eb, s, formatBuf)
}
return s
}
func convertGroupBucketToSnapshot(eb *estimatorBucket, s *snapshot, formatBuf []byte) *snapshot {
if len(eb.groupBy) == 0 {
panic("BUG: do not use this function for estimator with empty groupBy")
}
prefixLen := len(formatBuf)
resSK := eb.newSketch()
eb.mu.Lock()
defer eb.mu.Unlock()
for valuesKey, gsk := range eb.groups {
resSK.Reset()
formatBuf = append(formatBuf[:prefixLen], gsk.groupValueLabels...)
eb.mergeSketches(gsk.Sketch, eb.prevGroups[valuesKey].Sketch, resSK)
s.Sketches[string(formatBuf)] = resSK.Clone()
}
for valuesKey := range eb.prevGroups {
if _, ok := eb.groups[valuesKey]; ok {
continue
}
resSK.Reset()
formatBuf = formatBuf[:prefixLen]
gsk := eb.prevGroups[valuesKey]
formatBuf = append(formatBuf, gsk.groupValueLabels...)
eb.mergeSketches(nil, eb.prevGroups[valuesKey].Sketch, resSK)
s.Sketches[string(formatBuf)] = resSK.Clone()
}
s.GroupByKeysLabel = eb.groupByKeysLabel
s.MetricPrefix = eb.metricPrefix
s.GroupBy = append(s.GroupBy[:0], eb.groupBy...)
return s
}

View File

@@ -0,0 +1,471 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmestimator/protoparser"
)
func TestGlobalSnapshot(t *testing.T) {
genCard := func(cardinality int, seed string) func(e *estimator) {
return func(e *estimator) {
var tss []protoparser.TimeSerie
fpBuf := make([]byte, 8, 8+len(seed))
for i := 0; i < cardinality; i++ {
binary.LittleEndian.PutUint64(fpBuf[:8], uint64(i))
fpBuf = append(fpBuf, seed...)
tss = append(tss, protoparser.TimeSerie{
Fingerprint: hash(fpBuf[:]),
})
if i%10 == 0 {
e.insertMany(tss)
tss = tss[:0]
}
}
if len(tss) > 0 {
e.insertMany(tss)
}
}
}
f := func(gen func(e *estimator)) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
Buckets: 5,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
if len(e.buckets) != cfg.Buckets {
t.Fatalf("expected buckets length to be %d but got %d", cfg.Buckets, len(e.buckets))
}
for i, eb := range e.buckets {
if len(eb.groupBy) > 0 {
t.Fatalf("expected bucket %d groupBy length to be 0 but got %d", i, len(eb.groupBy))
}
if eb.groups != nil {
t.Fatalf("expected bucket %d groups length to be 0 but got %d", i, len(eb.groups))
}
if eb.groupSize.Load() != 0 {
t.Fatalf("expected bucket %d groupSize to be 0 but got %d", i, eb.groupSize.Load())
}
}
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
expMetric := buf.String()
buf.Reset()
if err := convertNoGroupToSnapshot(e, nil).writeMetrics(buf); err != nil {
t.Fatalf("convertNoGroupToSnapshot: %v", err)
}
assertMetricsSame(t, "convertNoGroupToSnapshot", expMetric, buf.String())
// test encode/decode snapshot produce same result
buf.Reset()
if err := e.writeSnapshot(gob.NewEncoder(buf)); err != nil {
t.Fatalf("writeSnapshot: %v", err)
}
ss := newSnapshots()
if err := decodeSnapshots(buf, func(s *snapshot) {
ss.add(s)
}); err != nil {
t.Fatalf("decodeSnapshot: %v", err)
}
buf.Reset()
if err := ss.writeMetrics(buf); err != nil {
t.Fatalf("writeMetrics: %v", err)
}
assertMetricsSame(t, "encode/decode", expMetric, buf.String())
}
f(func(e *estimator) {})
// no previous
f(genCard(0, ""))
f(genCard(100, ""))
f(genCard(10000, ""))
f(genCard(100000, ""))
// rotate once
genRotateOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality, "")(e)
e.rotate()
}
}
f(genRotateOnce(0))
f(genRotateOnce(100))
f(genRotateOnce(10000))
f(genRotateOnce(100000))
// insert, rotate insert the same
genInsertRotateInsertSameOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality/2, "")(e)
e.rotate()
genCard(cardinality/2, "")(e)
}
}
f(genInsertRotateInsertSameOnce(0))
f(genInsertRotateInsertSameOnce(100))
f(genInsertRotateInsertSameOnce(10000))
f(genInsertRotateInsertSameOnce(100000))
// insert, rotate insert
genInsertRotateInsertOnce := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality/2, "one")(e)
e.rotate()
genCard(cardinality/2, "two")(e)
}
}
f(genInsertRotateInsertOnce(0))
f(genInsertRotateInsertOnce(100))
f(genInsertRotateInsertOnce(10000))
f(genInsertRotateInsertOnce(100000))
// insert, rotate insert
genRotateTwoTimes := func(cardinality int) func(e *estimator) {
return func(e *estimator) {
genCard(cardinality, "")(e)
e.rotate()
e.rotate()
}
}
f(genRotateTwoTimes(0))
f(genRotateTwoTimes(100))
f(genRotateTwoTimes(10000))
f(genRotateTwoTimes(100000))
}
func TestGroupSnapshot(t *testing.T) {
genCard := func(fooCard, barCard, bazCard int, seed string) func(e *estimator) {
return func(e *estimator) {
var tss []protoparser.TimeSerie
for fooI := 0; fooI < max(1, fooCard); fooI++ {
for barI := 0; barI < max(1, barCard); barI++ {
for bazI := 0; bazI < max(1, bazCard); bazI++ {
ts := protoparser.TimeSerie{}
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "__name__", Value: "the_metric_name"})
if fooCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "foo", Value: fmt.Sprintf("%s%d", seed, fooI)})
}
if barCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "bar", Value: fmt.Sprintf("%s%d", seed, barI)})
}
if bazCard > 0 {
ts.GroupLabels = append(ts.GroupLabels, protoparser.Label{Name: "baz", Value: fmt.Sprintf("%s%d", seed, bazI)})
}
var fpBuf []byte
for _, l := range ts.GroupLabels {
fpBuf = append(fpBuf, l.Name...)
fpBuf = append(fpBuf, '=')
fpBuf = append(fpBuf, l.Value...)
fpBuf = append(fpBuf, ',')
}
fpBuf = append(fpBuf, seed...)
ts.Fingerprint = hash(fpBuf)
tss = append(tss, ts)
}
}
}
e.insertMany(tss)
}
}
f := func(groupBy []string, gen func(e *estimator)) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
GroupBy: groupBy,
Buckets: 5,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
if len(e.buckets) != cfg.Buckets {
t.Fatalf("expected buckets length to be %d but got %d", cfg.Buckets, len(e.buckets))
}
for i, eb := range e.buckets {
if eb.sketch != nil {
t.Fatalf("expected bucket %d sketch to be nil", i)
}
if eb.prevSketch != nil {
t.Fatalf("expected bucket %d prevSketch to be nil", i)
}
}
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
expMetrics := buf.String()
buf.Reset()
if err := convertGroupToSnapshot(e, nil).writeMetrics(buf); err != nil {
t.Fatalf("failed to write metrics: %v", err)
}
assertMetricsSame(t, "convertGroupToSnapshot", expMetrics, buf.String())
// test encode/decode snapshot produce same result
buf.Reset()
if err := e.writeSnapshot(gob.NewEncoder(buf)); err != nil {
t.Fatalf("writeSnapshot: %v", err)
}
ss := newSnapshots()
if err := decodeSnapshots(buf, func(s *snapshot) {
ss.add(s)
}); err != nil {
t.Fatalf("decodeSnapshot: %v", err)
}
buf.Reset()
if err := ss.writeMetrics(buf); err != nil {
t.Fatalf("writeMetrics: %v", err)
}
assertMetricsSame(t, "encode/decode", expMetrics, buf.String())
}
f([]string{"__name__"}, func(e *estimator) {})
// group by metric name
f([]string{"__name__"}, genCard(10, 10, 10, ""))
// time series does not contribute to a group
f([]string{"foo"}, genCard(0, 10, 10, ""))
// group by one label
f([]string{"foo"}, genCard(1, 100, 0, ""))
f([]string{"foo"}, genCard(1, 10000, 0, ""))
f([]string{"foo"}, genCard(1, 1000, 1000, ""))
// group by one label, rotate
genCardRotate := func(fooCard, barCard, bazCard int, seed string) func(e *estimator) {
return func(e *estimator) {
genCard(fooCard, barCard, bazCard, seed)(e)
e.rotate()
}
}
f([]string{"foo"}, genCardRotate(1, 10, 10, ""))
f([]string{"foo"}, genCardRotate(1, 1000, 1000, ""))
// group by one label, rotate, insert same
genCardRotateInsertSame := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "")(e)
e.rotate()
genCard(1, barCard, bazCard, "")(e)
}
}
f([]string{"foo"}, genCardRotateInsertSame(10, 10))
f([]string{"foo"}, genCardRotateInsertSame(1000, 1000))
// group by one label, rotate, insert diff
genCardRotateInsertDiff := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "one")(e)
e.rotate()
genCard(1, barCard, bazCard, "two")(e)
}
}
f([]string{"foo"}, genCardRotateInsertDiff(10, 10))
f([]string{"foo"}, genCardRotateInsertDiff(1000, 1000))
// group by one label, rotate, insert diff
genCardRotateTwice := func(barCard, bazCard int) func(e *estimator) {
return func(e *estimator) {
genCard(1, barCard, bazCard, "one")(e)
e.rotate()
e.rotate()
}
}
f([]string{"foo"}, genCardRotateTwice(10, 10))
f([]string{"foo"}, genCardRotateTwice(1000, 1000))
// group by two labels
f([]string{"foo", "bar"}, genCard(1, 1, 1000, ""))
f([]string{"foo", "bar"}, genCard(2, 1, 1000, ""))
f([]string{"foo", "bar"}, genCard(2, 2, 1000, ""))
// group by two labels, rotate
genCardTwoLabelsRotate := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "")(e)
e.rotate()
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotate())
// group by two labels, rotate, insert same
genCardTwoLabelsRotateInsertSame := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "")(e)
e.rotate()
genCard(2, 2, 1000, "")(e)
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotateInsertSame())
// group by two labels, rotate, insert diff
genCardTwoLabelsRotateInsertDiff := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "one")(e)
e.rotate()
genCard(2, 2, 1000, "two")(e)
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotateInsertDiff())
// group by two labels, rotate, insert diff
genCardTwoLabelsRotateTwice := func() func(e *estimator) {
return func(e *estimator) {
genCard(2, 2, 1000, "one")(e)
e.rotate()
e.rotate()
}
}
f([]string{"foo", "bar"}, genCardTwoLabelsRotateTwice())
// quote values: label values with special characters must be properly escaped
genSpecialCard := func(fooVal string) func(e *estimator) {
return func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{
{
GroupLabels: []protoparser.Label{{Name: "foo", Value: fooVal}},
Fingerprint: hash([]byte("foo=" + fooVal + ",")),
},
})
}
}
// double quote in value
f([]string{"foo"}, genSpecialCard(`a"b`))
f([]string{"foo"}, genSpecialCard(`a\b`))
f([]string{"foo"}, genSpecialCard("a\nb"))
f([]string{"foo"}, genSpecialCard("a\tb"))
}
func TestGroupSnapshotGroupLimit(t *testing.T) {
makeTS := func(fooVal string) protoparser.TimeSerie {
return protoparser.TimeSerie{
GroupLabels: []protoparser.Label{{Name: "foo", Value: fooVal}},
Fingerprint: hash([]byte("foo=" + fooVal + ",")),
}
}
f := func(groupLimit int, gen func(e *estimator), expRejected int) {
t.Helper()
cfg := EstimatorConfig{
Interval: time.Minute * 10,
GroupBy: []string{"foo"},
GroupLimit: groupLimit,
Buckets: 3,
}
e, err := newEstimator(cfg)
if err != nil {
t.Fatalf("failed to create new estimator: %v", err)
}
defer e.stop()
gen(e)
buf := bytes.NewBuffer(nil)
e.writeMetrics(buf)
expMetrics := buf.String()
buf.Reset()
s := convertGroupToSnapshot(e, nil)
if err := s.writeMetrics(buf); err != nil {
t.Fatalf("failed to write metrics: %v", err)
}
assertMetricsSame(t, "convertGroupToSnapshot", expMetrics, buf.String())
var actRejected int
if s.GroupRejectedSketch != nil {
actRejected = int(s.GroupRejectedSketch.Estimate())
}
if expRejected != actRejected {
t.Fatalf("rejected expected: %d; got: %d", expRejected, actRejected)
}
// test encode/decode snapshot produce same result
buf.Reset()
if err := e.writeSnapshot(gob.NewEncoder(buf)); err != nil {
t.Fatalf("writeSnapshot: %v", err)
}
ss := newSnapshots()
if err := decodeSnapshots(buf, func(s *snapshot) {
ss.add(s)
}); err != nil {
t.Fatalf("decodeSnapshot: %v", err)
}
buf.Reset()
if err := ss.writeMetrics(buf); err != nil {
t.Fatalf("writeMetrics: %v", err)
}
assertMetricsSame(t, "encode/decode", expMetrics, buf.String())
}
// all groups accepted
f(3, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 0)
// 2 groups only accepted
f(2, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 1)
// one group only accepted
f(1, func(e *estimator) {
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b"), makeTS("c")})
}, 2)
// after rotate: groups in prevGroups bypass the limit; new groups are still checked
f(2, func(e *estimator) {
// fills limit
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b")})
e.rotate()
// "a" bypasses, "c" rejected
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("c")})
}, 1)
// after rotate: new group accepted when remaining capacity allows
f(3, func(e *estimator) {
// 2 groups, limit=3
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("b")})
e.rotate()
// "a" bypasses, "c" accepted (2+1=3 <= 3)
e.insertMany([]protoparser.TimeSerie{makeTS("a"), makeTS("c")})
}, 0)
// reject 100
f(3, func(e *estimator) {
var tss []protoparser.TimeSerie
for i := 0; i < 103; i++ {
tss = append(tss, makeTS(fmt.Sprintf("a%d", i)))
}
e.insertMany(tss)
}, 100)
}

View File

@@ -15,7 +15,7 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-m
currentItem := 0
%}
{% for _, row := range result %}
{%q= string(row.MetricFamilyName) %}: [
"{%s string(row.MetricFamilyName) %}": [
{
"type": {%q= row.Type.String() %},
{% if len(row.Unit) > 0 -%}

View File

@@ -35,10 +35,12 @@ func StreamMetadataResponse(qw422016 *qt422016.Writer, result []*metricsmetadata
//line app/vmselect/prometheus/metadata_response.qtpl:17
for _, row := range result {
//line app/vmselect/prometheus/metadata_response.qtpl:17
qw422016.N().S(`"`)
//line app/vmselect/prometheus/metadata_response.qtpl:18
qw422016.N().Q(string(row.MetricFamilyName))
qw422016.E().S(string(row.MetricFamilyName))
//line app/vmselect/prometheus/metadata_response.qtpl:18
qw422016.N().S(`: [{"type":`)
qw422016.N().S(`": [{"type":`)
//line app/vmselect/prometheus/metadata_response.qtpl:20
qw422016.N().Q(row.Type.String())
//line app/vmselect/prometheus/metadata_response.qtpl:20

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||e((t={exports:{}}).exports,t),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};

View File

@@ -1 +0,0 @@
var e=Object.create,t=Object.defineProperty,n=Object.getOwnPropertyDescriptor,r=Object.getOwnPropertyNames,i=Object.getPrototypeOf,a=Object.prototype.hasOwnProperty,o=(e,t)=>()=>(e&&(t=e(e=0)),t),s=(e,t)=>()=>(t||(e((t={exports:{}}).exports,t),e=null),t.exports),c=(e,n)=>{let r={};for(var i in e)t(r,i,{get:e[i],enumerable:!0});return n||t(r,Symbol.toStringTag,{value:`Module`}),r},l=(e,i,o,s)=>{if(i&&typeof i==`object`||typeof i==`function`)for(var c=r(i),l=0,u=c.length,d;l<u;l++)d=c[l],!a.call(e,d)&&d!==o&&t(e,d,{get:(e=>i[e]).bind(null,d),enumerable:!(s=n(i,d))||s.enumerable});return e},u=(n,r,a)=>(a=n==null?{}:e(i(n)),l(r||!n||!n.__esModule?t(a,`default`,{value:n,enumerable:!0}):a,n)),d=e=>a.call(e,`module.exports`)?e[`module.exports`]:l(t({},`__esModule`,{value:!0}),e);export{u as a,d as i,o as n,c as r,s as t};

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -37,9 +37,9 @@
<meta property="og:title" content="UI for VictoriaMetrics">
<meta property="og:url" content="https://victoriametrics.com/">
<meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data">
<script type="module" crossorigin src="./assets/index-CusQvJzs.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-Cyuzqnbw.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-B83wxFqK.js">
<script type="module" crossorigin src="./assets/index-CoGukb-x.js"></script>
<link rel="modulepreload" crossorigin href="./assets/rolldown-runtime-COnpUsM8.js">
<link rel="modulepreload" crossorigin href="./assets/vendor-C8Kwp93_.js">
<link rel="stylesheet" crossorigin href="./assets/vendor-CnsZ1jie.css">
<link rel="stylesheet" crossorigin href="./assets/index-BBUnmLOr.css">
</head>

View File

@@ -45,13 +45,11 @@ func TestSingleMetricsMetadata(t *testing.T) {
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
},
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
},
}
@@ -61,13 +59,12 @@ func TestSingleMetricsMetadata(t *testing.T) {
expected := &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
},
}
gotStats := sut.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{})
@@ -157,13 +154,11 @@ func TestClusterMetricsMetadata(t *testing.T) {
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_4"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_5"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: "metric_name_6"}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
{Labels: []prompb.Label{{Name: "__name__", Value: `metric_name_7_!@"_suffix`}}, Samples: []prompb.Sample{{Value: 40, Timestamp: ingestTimestamp}}},
},
Metadata: []prompb.MetricMetadata{
{MetricFamilyName: "metric_name_4", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_5", Help: "some help message", Type: prompb.MetricTypeSummary},
{MetricFamilyName: "metric_name_6", Help: "some help message", Type: prompb.MetricTypeStateset},
{MetricFamilyName: `metric_name_7_!@"_suffix`, Help: "some help message", Type: prompb.MetricTypeStateset},
},
}
@@ -176,13 +171,12 @@ func TestClusterMetricsMetadata(t *testing.T) {
expected := &apptest.PrometheusAPIV1Metadata{
Status: "success",
Data: map[string][]apptest.MetadataEntry{
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
`metric_name_7_!@"_suffix`: {{Help: "some help message", Type: "stateset"}},
"metric_name_1": {{Help: "some help message", Type: "gauge"}},
"metric_name_2": {{Help: "some help message", Type: "counter"}},
"metric_name_3": {{Help: "some help message", Type: "gauge"}},
"metric_name_4": {{Help: "some help message", Type: "summary"}},
"metric_name_5": {{Help: "some help message", Type: "summary"}},
"metric_name_6": {{Help: "some help message", Type: "stateset"}},
},
}
gotStats := vmselect.PrometheusAPIV1Metadata(t, "", 0, apptest.QueryOpts{Tenant: tenantID})

View File

@@ -8,7 +8,6 @@ import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -333,11 +332,13 @@ func TestSingleVMAgentDropOnOverload(t *testing.T) {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
@@ -640,116 +641,3 @@ func TestSingleVMAgentMultitenancy(t *testing.T) {
t.Fatalf("expected vmagent_tenant_inserted_rows_total to have value 1 for accountID=5, projectID=0")
}
}
func TestSingleVMAgentPriorizeRecentData(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
remoteWriteSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv.Close()
var mustRW2ReturnError atomic.Bool
mustRW2ReturnError.Store(true)
remoteWriteSrv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if mustRW2ReturnError.Load() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusNoContent)
}))
defer remoteWriteSrv2.Close()
vmagent := tc.MustStartDefaultRWVmagent("vmagent", []string{
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv.URL),
fmt.Sprintf(`-remoteWrite.url=%s/api/v1/write`, remoteWriteSrv2.URL),
"-remoteWrite.disableOnDiskQueue=true",
// use only 1 worker to get a full queue faster
"-remoteWrite.queues=1",
"-remoteWrite.flushInterval=1ms",
"-remoteWrite.inmemoryQueues=1",
// fastqueue size is roughly memory.Allowed() / len(urls) / *maxRowsPerBlock / 100
// Use very large maxRowsPerBlock to get fastqueue of minimal length(2).
// See initRemoteWriteCtxs function in remotewrite.go for details.
"-remoteWrite.maxRowsPerBlock=1000000000",
"-remoteWrite.tmpDataPath=" + tc.Dir() + "/vmagent",
// Delay retry logic to avoid race conditions with waitFor assertions.
// It improves the test stability on resource-constrained runners.
"-remoteWrite.retryMinInterval=3s",
"-remoteWrite.retryMaxTime=3s",
})
const (
retries = 20
period = 200 * time.Millisecond
)
waitFor := func(f func() bool) {
t.Helper()
for range retries {
if f() {
return
}
time.Sleep(period)
}
t.Fatalf("timed out waiting for retry #%d", retries)
}
// Real remote write URLs are hidden in metrics
url1 := "1:secret-url"
url2 := "2:secret-url"
// Wait until first request got flushed to remote write server
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 1 && vmagent.RemoteWriteRequests(t, url2) == 1
},
)
// Wait until second request got flushed to remote write server
// since there are 2 independent queues (general and in-memory) with minimal capacity of 1
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 2 && vmagent.RemoteWriteRequests(t, url2) == 2
},
)
// Send 2 more requests, the first RW endpoint should receive everything, the second should add them to the queue
// since worker is busy with the first request.
for i := range 2 {
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 3+i && vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 1+i
},
)
}
// Send one more request.
vmagent.APIV1ImportPrometheusNoWaitFlush(t, []string{
"foo_bar 1 1652169600000", // 2022-05-10T08:00:00Z
}, apptest.QueryOpts{})
waitFor(
func() bool {
return vmagent.RemoteWriteRequests(t, url1) == 5 && vmagent.RemoteWriteSamplesDropped(t, url2) > 0
},
)
mustRW2ReturnError.Store(false)
// ensure that inmemory data correctly flushed to the remote write
waitFor(
func() bool {
return vmagent.RemoteWritePendingInmemoryBlocks(t, url2) == 0
},
)
}

View File

@@ -6201,7 +6201,7 @@
"type": "victoriametrics-metrics-datasource",
"uid": "$ds"
},
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6282,14 +6282,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dropped samples ($instance)",
"title": "Ignored samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -6200,7 +6200,7 @@
"type": "prometheus",
"uid": "$ds"
},
"description": "The rate of dropped samples during aggregation. \nStream aggregation will drop samples with NaN values, too old timestamps or samples identified as duplicates during deduplication. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples ",
"fieldConfig": {
"defaults": {
"color": {
@@ -6281,14 +6281,14 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate({__name__=~\"vm_streamaggr_ignored_samples_total|vm_streamaggr_dedup_dropped_samples_total\", job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"expr": "sum(rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0) without (instance, pod)",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Dropped samples ($instance)",
"title": "Ignored samples ($instance)",
"type": "timeseries"
},
{

View File

@@ -3,7 +3,7 @@ services:
# It scrapes targets defined in --promscrape.config
# And forward them to --remoteWrite.url
vmagent:
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
depends_on:
- "vmauth"
ports:
@@ -42,14 +42,14 @@ services:
# vmstorage shards. Each shard receives 1/N of all metrics sent to vminserts,
# where N is number of vmstorages (2 in this case).
vmstorage-1:
image: victoriametrics/vmstorage:v1.146.0-cluster
image: victoriametrics/vmstorage:v1.145.0-cluster
volumes:
- strgdata-1:/storage
command:
- "--storageDataPath=/storage"
restart: always
vmstorage-2:
image: victoriametrics/vmstorage:v1.146.0-cluster
image: victoriametrics/vmstorage:v1.145.0-cluster
volumes:
- strgdata-2:/storage
command:
@@ -59,7 +59,7 @@ services:
# vminsert is ingestion frontend. It receives metrics pushed by vmagent,
# pre-process them and distributes across configured vmstorage shards.
vminsert-1:
image: victoriametrics/vminsert:v1.146.0-cluster
image: victoriametrics/vminsert:v1.145.0-cluster
depends_on:
- "vmstorage-1"
- "vmstorage-2"
@@ -68,7 +68,7 @@ services:
- "--storageNode=vmstorage-2:8400"
restart: always
vminsert-2:
image: victoriametrics/vminsert:v1.146.0-cluster
image: victoriametrics/vminsert:v1.145.0-cluster
depends_on:
- "vmstorage-1"
- "vmstorage-2"
@@ -80,7 +80,7 @@ services:
# vmselect is a query fronted. It serves read queries in MetricsQL or PromQL.
# vmselect collects results from configured `--storageNode` shards.
vmselect-1:
image: victoriametrics/vmselect:v1.146.0-cluster
image: victoriametrics/vmselect:v1.145.0-cluster
depends_on:
- "vmstorage-1"
- "vmstorage-2"
@@ -90,7 +90,7 @@ services:
- "--vmalert.proxyURL=http://vmalert:8880"
restart: always
vmselect-2:
image: victoriametrics/vmselect:v1.146.0-cluster
image: victoriametrics/vmselect:v1.145.0-cluster
depends_on:
- "vmstorage-1"
- "vmstorage-2"
@@ -105,7 +105,7 @@ services:
# read requests from Grafana, vmui, vmalert among vmselects.
# It can be used as an authentication proxy.
vmauth:
image: victoriametrics/vmauth:v1.146.0
image: victoriametrics/vmauth:v1.145.0
depends_on:
- "vmselect-1"
- "vmselect-2"
@@ -119,7 +119,7 @@ services:
# vmalert executes alerting and recording rules
vmalert:
image: victoriametrics/vmalert:v1.146.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- "vmauth"
ports:

View File

@@ -3,7 +3,7 @@ services:
# It scrapes targets defined in --promscrape.config
# And forward them to --remoteWrite.url
vmagent:
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -18,7 +18,7 @@ services:
# VictoriaMetrics instance, a single process responsible for
# storing metrics and serve read requests.
victoriametrics:
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
ports:
- 8428:8428
- 8089:8089
@@ -59,7 +59,7 @@ services:
# vmalert executes alerting and recording rules
vmalert:
image: victoriametrics/vmalert:v1.146.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- "victoriametrics"
- "alertmanager"

View File

@@ -1,6 +1,6 @@
services:
vmagent:
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -14,7 +14,7 @@ services:
restart: always
victoriametrics:
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
ports:
- 8428:8428
volumes:
@@ -40,7 +40,7 @@ services:
restart: always
vmalert:
image: victoriametrics/vmalert:v1.146.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -59,7 +59,7 @@ services:
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr": },{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]'
restart: always
vmanomaly:
image: victoriametrics/vmanomaly:v1.29.6
image: victoriametrics/vmanomaly:v1.29.5
depends_on:
- "victoriametrics"
ports:

View File

@@ -32,17 +32,6 @@ docs-image:
--platform $(DOCKER_PLATFORM) \
vmdocs
docs-check-links: docs-image
rm -rf vmdocs/public
docker run \
--rm \
--platform $(DOCKER_PLATFORM) \
-v ./vmdocs:/opt/docs \
$(shell for d in ./docs/*/; do printf ' -v %s:/opt/docs/content/%s' "$${d}" "$$(basename $${d})"; done) \
--entrypoint /bin/sh \
vmdocs-docker-package \
-c "yarn install && hugo --minify && yarn run check-links"
docs-debug: docs docs-image
docker run \
--rm \

View File

@@ -14,13 +14,6 @@ aliases:
---
Please find the changelog for VictoriaMetrics Anomaly Detection below.
## v1.29.6
Released: 2026-06-17
- BUGFIX: Fixed `VLogsReader` startup and query execution when `tenant_id` is omitted or provided in short account-only form such as `"0"`. Omitted or empty tenant IDs are treated as single-node/no-tenant mode, and account-only tenant IDs are expanded to `accountID:0` before adding VictoriaLogs `AccountID`/`ProjectID` params or VM tenant labels.
- BUGFIX: Hardened [`OnlineMADModel`](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-mad) anomaly scoring for perfectly constant time series (all values identical). The model now keeps a small deterministic prediction interval when the learned MAD is zero, so values deviating from an unknown constant baseline can produce `anomaly_score > 1` (previously, all anomaly scores were `0`).
## v1.29.5
Released: 2026-06-11

View File

@@ -423,7 +423,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.6
image: victoriametrics/vmanomaly:v1.29.5
# ...
restart: always
volumes:
@@ -641,7 +641,7 @@ options:
Heres an example of using the config splitter to divide configurations based on the `extra_filters` argument from the reader section:
```sh
docker pull victoriametrics/vmanomaly:v1.29.6 && docker image tag victoriametrics/vmanomaly:v1.29.6 vmanomaly
docker pull victoriametrics/vmanomaly:v1.29.5 && docker image tag victoriametrics/vmanomaly:v1.29.5 vmanomaly
```
```sh

View File

@@ -45,7 +45,7 @@ There are 2 types of compatibility to consider when migrating in stateful mode:
| Group start | Group end | Compatibility | Notes |
|---------|--------- |------------|-------|
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.6](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1296) | Fully Compatible | - |
| [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) | [v1.29.5](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1295) | Fully Compatible | - |
| [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) | Partially compatible* | Dumped models of class [prophet](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet) and [seasonal quantile](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-seasonal-quantile) have problems with loading to [v1.29.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1290) due to dropped `pytz` library. **Upgrading directly from v1.28.7 to [v1.29.1](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1291) with a fix is suggested** |
| [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262) | [v1.28.7](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1287) | Fully Compatible | [v1.28.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1280) introduced [rolling](https://docs.victoriametrics.com/anomaly-detection/components/models/#rolling-models) model class drop in favor of [online](https://docs.victoriametrics.com/anomaly-detection/components/models/#online-models) models (`rolling_quantile` and `std` models), however, it does not impact compatibility, as artifacts were not produced by default for rolling models. Also, offline `mad` and `zscore` models are redirecting to their respective online counterparts since [v1.28.4](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1284). |
| [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) | [v1.26.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270) | Partially Compatible* | [v1.25.3](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1253) introduced `forecast_at` argument for base [univariate](https://docs.victoriametrics.com/anomaly-detection/components/models/#univariate-models) and `Prophet` [models](https://docs.victoriametrics.com/anomaly-detection/components/models/#prophet), however, itself remains backward-reversible from newer states like [v1.26.2](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1262), [v1.27.0](https://docs.victoriametrics.com/anomaly-detection/changelog/#v1270). (All models except `isolation_forest_multivariate` class will be dropped) |

View File

@@ -132,7 +132,7 @@ Below are the steps to get `vmanomaly` up and running inside a Docker container:
1. Pull Docker image:
```sh
docker pull victoriametrics/vmanomaly:v1.29.6
docker pull victoriametrics/vmanomaly:v1.29.5
```
2. Create the license file with your license key.
@@ -152,7 +152,7 @@ docker run -it \
-v ./license:/license \
-v ./config.yaml:/config.yaml \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.6 \
victoriametrics/vmanomaly:v1.29.5 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -169,7 +169,7 @@ docker run -it \
-e VMANOMALY_DATA_DUMPS_DIR=/tmp/vmanomaly/data \
-e VMANOMALY_MODEL_DUMPS_DIR=/tmp/vmanomaly/models \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.6 \
victoriametrics/vmanomaly:v1.29.5 \
/config.yaml \
--licenseFile=/license \
--loggerLevel=INFO \
@@ -182,7 +182,7 @@ services:
# ...
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.6
image: victoriametrics/vmanomaly:v1.29.5
# ...
restart: always
volumes:

View File

@@ -315,7 +315,7 @@ docker run -it --rm \
-e VMANOMALY_MCP_SERVER_URL=http://mcp-vmanomaly:8081/mcp \
-p 8080:8080 \
-p 8490:8490 \
victoriametrics/vmanomaly:v1.29.6 \
victoriametrics/vmanomaly:v1.29.5 \
vmanomaly_config.yaml
```

View File

@@ -1265,7 +1265,7 @@ monitoring:
Let's pull the docker image for `vmanomaly`:
```sh
docker pull victoriametrics/vmanomaly:v1.29.6
docker pull victoriametrics/vmanomaly:v1.29.5
```
Now we can run the docker container putting as volumes both config and model file:
@@ -1279,7 +1279,7 @@ docker run -it \
-v $(PWD)/license:/license \
-v $(PWD)/custom_model.py:/vmanomaly/model/custom.py \
-v $(PWD)/custom.yaml:/config.yaml \
victoriametrics/vmanomaly:v1.29.6 /config.yaml \
victoriametrics/vmanomaly:v1.29.5 /config.yaml \
--licenseFile=/license
--watch
```

View File

@@ -10,9 +10,9 @@ sitemap:
- To use *vmanomaly*, part of the enterprise package, a license key is required. Obtain your key [here](https://victoriametrics.com/products/enterprise/trial/) for this tutorial or for enterprise use.
- In the tutorial, we'll be using the following VictoriaMetrics components:
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.146.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.146.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.146.0)
- [VictoriaMetrics Single-Node](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) (v1.145.0)
- [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/) (v1.145.0)
- [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) (v1.145.0)
- [Grafana](https://grafana.com/) (v12.2.0)
- [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/)
- [Node exporter](https://github.com/prometheus/node_exporter#node-exporter) (v1.9.1) and [Alertmanager](https://prometheus.io/docs/alerting/latest/alertmanager/) (v0.28.1)
@@ -323,7 +323,7 @@ Let's wrap it all up together into the `docker-compose.yml` file.
services:
vmagent:
container_name: vmagent
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -340,7 +340,7 @@ services:
victoriametrics:
container_name: victoriametrics
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
ports:
- 8428:8428
volumes:
@@ -373,7 +373,7 @@ services:
vmalert:
container_name: vmalert
image: victoriametrics/vmalert:v1.146.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- "victoriametrics"
ports:
@@ -395,7 +395,7 @@ services:
restart: always
vmanomaly:
container_name: vmanomaly
image: victoriametrics/vmanomaly:v1.29.6
image: victoriametrics/vmanomaly:v1.29.5
depends_on:
- "victoriametrics"
ports:

View File

@@ -240,23 +240,23 @@ vmagent will write data into VictoriaMetrics single-node and cluster (with tenan
# compose.yaml
services:
vmsingle:
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
vmstorage:
image: victoriametrics/vmstorage:v1.146.0-cluster
image: victoriametrics/vmstorage:v1.145.0-cluster
vminsert:
image: victoriametrics/vminsert:v1.146.0-cluster
image: victoriametrics/vminsert:v1.145.0-cluster
command:
- -storageNode=vmstorage:8400
vmselect:
image: victoriametrics/vmselect:v1.146.0-cluster
image: victoriametrics/vmselect:v1.145.0-cluster
command:
- -storageNode=vmstorage:8401
vmagent:
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
volumes:
- ./scrape.yaml:/etc/vmagent/config.yaml
command:
@@ -308,7 +308,7 @@ Now add the vmauth service to `compose.yaml`:
# compose.yaml
services:
vmauth:
image: docker.io/victoriametrics/vmauth:v1.146.0
image: docker.io/victoriametrics/vmauth:v1.145.0
ports:
- 8427:8427
volumes:

View File

@@ -155,15 +155,15 @@ These services will store and query the metrics scraped by vmagent.
# compose.yaml
services:
vmstorage:
image: victoriametrics/vmstorage:v1.146.0-cluster
image: victoriametrics/vmstorage:v1.145.0-cluster
vminsert:
image: victoriametrics/vminsert:v1.146.0-cluster
image: victoriametrics/vminsert:v1.145.0-cluster
command:
- -storageNode=vmstorage:8400
vmselect:
image: victoriametrics/vmselect:v1.146.0-cluster
image: victoriametrics/vmselect:v1.145.0-cluster
command:
- -storageNode=vmstorage:8401
ports:
@@ -196,7 +196,7 @@ Add the vmauth service to `compose.yaml`:
# compose.yaml
services:
vmauth:
image: victoriametrics/vmauth:v1.146.0-enterprise
image: victoriametrics/vmauth:v1.145.0-enterprise
ports:
- 8427:8427
volumes:
@@ -251,7 +251,7 @@ Add the vmagent service to `compose.yaml` with OAuth2 configuration:
# compose.yaml
services:
vmagent:
image: victoriametrics/vmagent:v1.146.0
image: victoriametrics/vmagent:v1.145.0
volumes:
- ./scrape.yaml:/etc/vmagent/config.yaml
command:

View File

@@ -107,7 +107,7 @@ The final piece is the Docker Compose file. This ties all the services together
# compose.yml
services:
victoriametrics:
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
command:
- "--storageDataPath=/victoria-metrics-data"
- "--selfScrapeInterval=10s"
@@ -128,7 +128,7 @@ services:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml:ro
vmalert:
image: victoriametrics/vmalert:v1.146.0
image: victoriametrics/vmalert:v1.145.0
depends_on:
- victoriametrics
- alertmanager

View File

@@ -19,7 +19,6 @@ See also [case studies](https://docs.victoriametrics.com/victoriametrics/casestu
* [Datanami: Why Roblox Picked VictoriaMetrics for Observability Data Overhaul](https://www.hpcwire.com/bigdatawire/2023/05/30/why-roblox-picked-victoriametrics-for-observability-data-overhaul/)
* [Cloudflare: Introducing notifications for HTTP Traffic Anomalies](https://blog.cloudflare.com/introducing-http-traffic-anomalies-notifications/)
* [Grammarly: Better, Faster, Cheaper: How Grammarly Improved Monitoring by Over 10x with VictoriaMetrics](https://www.grammarly.com/blog/engineering/monitoring-with-victoriametrics/)
* [Xata: How we rebuilt PostgreSQL branch metrics on VictoriaMetrics, per cell](https://xata.io/blog/how-we-rebuilt-postgresql-branch-metrics-on-victoriametrics-per-cell)
* [CERN: CMS monitoring R&D: Real-time monitoring and alerts](https://indico.cern.ch/event/877333/contributions/3696707/attachments/1972189/3281133/CMS_mon_RD_for_opInt.pdf)
* [CERN: The CMS monitoring infrastructure and applications](https://arxiv.org/pdf/2007.03630.pdf)
* [Forbes: The (Almost) Infinitely Scalable Open Source Monitoring Dream](https://www.forbes.com/sites/adrianbridgwater/2022/08/16/the-almost-infinitely-scalable-open-source-monitoring-dream/)

View File

@@ -61,9 +61,9 @@ Download the newest available [VictoriaMetrics release](https://docs.victoriamet
from [DockerHub](https://hub.docker.com/r/victoriametrics/victoria-metrics) or [Quay](https://quay.io/repository/victoriametrics/victoria-metrics?tab=tags):
```sh
docker pull victoriametrics/victoria-metrics:v1.146.0
docker pull victoriametrics/victoria-metrics:v1.145.0
docker run -it --rm -v `pwd`/victoria-metrics-data:/victoria-metrics-data -p 8428:8428 \
victoriametrics/victoria-metrics:v1.146.0 --selfScrapeInterval=5s -storageDataPath=victoria-metrics-data
victoriametrics/victoria-metrics:v1.145.0 --selfScrapeInterval=5s -storageDataPath=victoria-metrics-data
```
_For Enterprise images, see [this link](https://docs.victoriametrics.com/victoriametrics/enterprise/#docker-images)._

View File

@@ -26,32 +26,16 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
## [v1.146.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.146.0)
Released at 2026-06-22
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueues` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* BUGFIX: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly expose metric `vm_retention_filters_partitions_scheduled_rows`. See [#11138](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix potential corruption of remote-write metadata `Unit` values. See [#11120](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): push metrics to configured `-pushmetrics.url` on shutdown when migration fails. Previously, metrics were not pushed if vmctl exited with an error. See [#11081](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11081). Thanks to @zasdaym for contribution.
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): disallow restoring parts outside the configured `-storageDataPath` directory. See [710c920d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/710c920d6083327042a309e449fae4383617d817).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): correctly apply long tenant filters. Previously, such filters could be truncated, causing tenants to be matched incorrectly. See [#11096](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11096). Thanks for @fxrlv for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): don't cache empty responses for tenant IDs discovery during [multitenant queries](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenant-reads). This problem was visible during integration tests when multitenant queries were executed before the first ingestion happened. See [#10982](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10982)
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix potential corruption of remote-write metadata `Unit` values. See [#11120](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
@@ -292,25 +276,6 @@ It enables back `Discovered targets` debug UI by default.
* BUGFIX: `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly apply `extra_filters[]` filter when querying `vm_account_id` or `vm_project_id` labels via [multitenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy) request for `/api/v1/label/…/values` API. Before, `extra_filters` was ignored. See [#10503](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10503).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): revert the use of rollup result cache for [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query) that contain [`rate`](https://docs.victoriametrics.com/MetricsQL.html#rate) function with a lookbehind window larger than `-search.minWindowForInstantRollupOptimization`. The cache usage was removed since [v1.132.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.132.0). See [#10098](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10098#issuecomment-3895011084) for more details.
## [v1.136.12](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.136.12)
Released at 2026-06-19
**v1.136.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
All these fixes are also included in [the latest community release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest).
The v1.136.x line will be supported for at least 12 months since [v1.136.0](https://docs.victoriametrics.com/victoriametrics/changelog/#v11360) release**
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix potential corruption of remote-write metadata `Unit` values. See [#11120](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): push metrics to configured `-pushmetrics.url` on shutdown when migration fails. Previously, metrics were not pushed if vmctl exited with an error. See [#11081](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11081). Thanks to @zasdaym for contribution.
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): disallow restoring parts outside the configured `-storageDataPath` directory. See [710c920d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/710c920d6083327042a309e449fae4383617d817).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): correctly apply long tenant filters. Previously, such filters could be truncated, causing tenants to be matched incorrectly. See [#11096](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11096). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
## [v1.136.11](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.136.11)
Released at 2026-06-05
@@ -672,20 +637,6 @@ See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/ch
See changes [here](https://docs.victoriametrics.com/victoriametrics/changelog/changelog_2025/#v11230)
## [v1.122.25](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.122.25)
Released at 2026-06-19
**v1.122.x is a line of [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/). It contains important up-to-date bugfixes for [VictoriaMetrics enterprise](https://docs.victoriametrics.com/victoriametrics/enterprise/).
All these fixes are also included in [the latest community release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest).
The v1.122.x line will be supported for at least 12 months since [v1.122.0](https://docs.victoriametrics.com/victoriametrics/changelog/#v11220) release**
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [vmrestore](https://docs.victoriametrics.com/victoriametrics/vmrestore/): disallow restoring parts outside the configured `-storageDataPath` directory. See [710c920d](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/710c920d6083327042a309e449fae4383617d817).
## [v1.122.24](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.122.24)
Released at 2026-06-05

View File

@@ -121,7 +121,7 @@ It is allowed to run Enterprise components in [cases listed here](https://docs.v
Binary releases of Enterprise components are available at [the releases page for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest),
[the releases page for VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaLogs/releases/latest)
and [the releases page for VictoriaTraces](https://github.com/VictoriaMetrics/VictoriaTraces/releases/latest).
Enterprise binaries and packages have `enterprise` suffix in their names. For example, `victoria-metrics-linux-amd64-v1.146.0-enterprise.tar.gz`.
Enterprise binaries and packages have `enterprise` suffix in their names. For example, `victoria-metrics-linux-amd64-v1.145.0-enterprise.tar.gz`.
In order to run binary release of Enterprise component, please download the `*-enterprise.tar.gz` archive for your OS and architecture
from the corresponding releases page and unpack it. Then run the unpacked binary.
@@ -139,8 +139,8 @@ For example, the following command runs VictoriaMetrics Enterprise binary with t
obtained at [this page](https://victoriametrics.com/products/enterprise/trial/):
```sh
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.146.0/victoria-metrics-linux-amd64-v1.146.0-enterprise.tar.gz
tar -xzf victoria-metrics-linux-amd64-v1.146.0-enterprise.tar.gz
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.145.0/victoria-metrics-linux-amd64-v1.145.0-enterprise.tar.gz
tar -xzf victoria-metrics-linux-amd64-v1.145.0-enterprise.tar.gz
./victoria-metrics-prod -license=BASE64_ENCODED_LICENSE_KEY
```
@@ -155,7 +155,7 @@ Alternatively, VictoriaMetrics Enterprise license can be stored in the file and
It is allowed to run Enterprise components in [cases listed here](https://docs.victoriametrics.com/victoriametrics/enterprise/#valid-cases-for-victoriametrics-enterprise).
Docker images for Enterprise components are available at [VictoriaMetrics Docker Hub](https://hub.docker.com/u/victoriametrics) and [VictoriaMetrics Quay](https://quay.io/organization/victoriametrics).
Enterprise docker images have `enterprise` suffix in their names. For example, `victoriametrics/victoria-metrics:v1.146.0-enterprise`.
Enterprise docker images have `enterprise` suffix in their names. For example, `victoriametrics/victoria-metrics:v1.145.0-enterprise`.
In order to run Docker image of VictoriaMetrics Enterprise component, it is required to provide the license key via the command-line
flag as described in the [binary-releases](https://docs.victoriametrics.com/victoriametrics/enterprise/#binary-releases) section.
@@ -165,13 +165,13 @@ Enterprise license key can be obtained at [this page](https://victoriametrics.co
For example, the following command runs VictoriaMetrics Enterprise Docker image with the specified license key:
```sh
docker run --name=victoria-metrics victoriametrics/victoria-metrics:v1.146.0-enterprise -license=BASE64_ENCODED_LICENSE_KEY
docker run --name=victoria-metrics victoriametrics/victoria-metrics:v1.145.0-enterprise -license=BASE64_ENCODED_LICENSE_KEY
```
Alternatively, the license code can be stored in the file and then referred via `-licenseFile` command-line flag:
```sh
docker run --name=victoria-metrics -v /vm-license:/vm-license victoriametrics/victoria-metrics:v1.146.0-enterprise -licenseFile=/path/to/vm-license
docker run --name=victoria-metrics -v /vm-license:/vm-license victoriametrics/victoria-metrics:v1.145.0-enterprise -licenseFile=/path/to/vm-license
```
Example docker-compose configuration:
@@ -181,7 +181,7 @@ version: "3.5"
services:
victoriametrics:
container_name: victoriametrics
image: victoriametrics/victoria-metrics:v1.146.0
image: victoriametrics/victoria-metrics:v1.145.0
ports:
- 8428:8428
volumes:
@@ -213,7 +213,7 @@ is used to provide the license key in plain-text:
```yaml
server:
image:
tag: v1.146.0-enterprise
tag: v1.145.0-enterprise
license:
key: {BASE64_ENCODED_LICENSE_KEY}
@@ -224,7 +224,7 @@ In order to provide the license key via existing secret, the following values fi
```yaml
server:
image:
tag: v1.146.0-enterprise
tag: v1.145.0-enterprise
license:
secret:
@@ -274,7 +274,7 @@ spec:
license:
key: {BASE64_ENCODED_LICENSE_KEY}
image:
tag: v1.146.0-enterprise
tag: v1.145.0-enterprise
```
In order to provide the license key via an existing secret, the following custom resource is used:
@@ -291,7 +291,7 @@ spec:
name: vm-license
key: license
image:
tag: v1.146.0-enterprise
tag: v1.145.0-enterprise
```
Example secret with license key:
@@ -342,7 +342,7 @@ Builds are available for amd64 and arm64 architectures.
Example archive:
`victoria-metrics-linux-amd64-v1.146.0-enterprise.tar.gz`
`victoria-metrics-linux-amd64-v1.145.0-enterprise.tar.gz`
Includes:
@@ -351,7 +351,7 @@ Includes:
Example Docker image:
`victoriametrics/victoria-metrics:v1.146.0-enterprise-fips` uses the FIPS-compatible binary and based on `scratch` image.
`victoriametrics/victoria-metrics:v1.145.0-enterprise-fips` uses the FIPS-compatible binary and based on `scratch` image.
## What Happens to Licensed Components When a License Expires

View File

@@ -16,7 +16,7 @@ aliases:
1. The main goal - **to help users and [clients](https://docs.victoriametrics.com/victoriametrics/enterprise/) using VictoriaMetrics products in the most efficient way**.
1. Fixing bugs in the essential functionality of VictoriaMetrics components. Small usability bugs are usually the most annoying,
so they **must be fixed first**. Bugs, which affect a small number of users at some rare edge cases, can be fixed later.
1. Improving [public docs for VictoriaMetrics products](https://docs.victoriametrics.com),
1. Improving [public docs for VictoriaMetrics products](https://docs.victoriametrics.com).
so users could find answers to their questions via Google or any other AI-powered web search without the need
to ask these questions at our [support channels](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#community-and-contributions).
1. Simplifying usage of VictoriaMetrics products without breaking backwards compatibility, so users could regularly

View File

@@ -35,8 +35,8 @@ scrape_configs:
After you created the `scrape.yaml` file, download and unpack [single-node VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) to the same directory:
```sh
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.146.0/victoria-metrics-linux-amd64-v1.146.0.tar.gz
tar xzf victoria-metrics-linux-amd64-v1.146.0.tar.gz
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.145.0/victoria-metrics-linux-amd64-v1.145.0.tar.gz
tar xzf victoria-metrics-linux-amd64-v1.145.0.tar.gz
```
Then start VictoriaMetrics and instruct it to scrape targets defined in `scrape.yaml` and save scraped metrics
@@ -150,8 +150,8 @@ Then start [single-node VictoriaMetrics](https://docs.victoriametrics.com/victor
```yaml
# Download and unpack single-node VictoriaMetrics
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.146.0/victoria-metrics-linux-amd64-v1.146.0.tar.gz
tar xzf victoria-metrics-linux-amd64-v1.146.0.tar.gz
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.145.0/victoria-metrics-linux-amd64-v1.145.0.tar.gz
tar xzf victoria-metrics-linux-amd64-v1.145.0.tar.gz
# Run single-node VictoriaMetrics with the given scrape.yaml
./victoria-metrics-prod -promscrape.config=scrape.yaml

View File

@@ -76,7 +76,7 @@ It is better to substitute the slow recording rule with the following [stream ag
outputs: [rate_sum]
```
> It is recommended to set the `interval` field to a value at least 2 times the matched metrics collection interval.
> Field `interval` should be set to a value at least several times higher than the matched metrics collection interval.
This stream aggregation generates `http_request_duration_seconds_bucket:1m_without_instance_rate_sum` output series according to [output metric naming](#output-metric-names).
Then these series can be used in [alerting rules](https://docs.victoriametrics.com/victoriametrics/vmalert/#alerting-rules):
@@ -396,7 +396,7 @@ before sending them to the configured `-remoteWrite.url`. The deduplication can
Labels can be dropped before deduplication is applied. See [these docs](#dropping-unneeded-labels).
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation. The dropped old samples can be tracked with the `vm_streamaggr_dedup_dropped_samples_total` metric.
Stream aggregation deduplication is applied before aggregation rules, so duplicate samples are dropped before aggregation.
# Relabeling
@@ -444,9 +444,7 @@ outside the current [aggregation interval](https://docs.victoriametrics.com/vict
- To enable [aggregation windows](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows).
- To enable [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` and `vm_streamaggr_dedup_dropped_samples_total` metrics.
The dropped old samples can be tracked with the `vm_streamaggr_ignored_samples_total{reason="too_old"}` metric.
## Ignore aggregation intervals on start
@@ -644,9 +642,9 @@ See also [why you shouldn't put an aggregator behind a load balancer](https://do
# Troubleshooting
- [Unexpected spikes for `total` or `increase` outputs](#data-delay-and-staleness).
- [Unexpected spikes for `total` or `increase` outputs](#staleness).
- [Excessively large values for `total*`, `increase*`, and `rate*` outputs](#counter-resets).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#data-delay-and-staleness).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#staleness).
- [High memory usage and CPU usage](#high-resource-usage).
- [Unexpected results in vmagent cluster mode](#cluster-mode).
- [Inaccurate aggregation results for histograms](#aggregation-windows)
@@ -679,19 +677,11 @@ the following settings:
If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, produce values that are significantly higher than anticipated, then check the `vm_streamaggr_counter_resets_total` metric. This metric increments each time when [counter reset event](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) happens and could be caused by duplication or collision of raw samples. If you observe duplication or collision, try solving this problem by either fixing the source of these metrics or by [deduplicating](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication) these samples before aggregation.
## Data delay and staleness {#staleness}
## Staleness
Stream aggregation processes input samples in a streaming manner and flushes results once per specified `interval`. Because of this, aggregation results can be heavily affected by data delays (see `vm_streamaggr_samples_lag_seconds_bucket` metric).
In particular:
1. Stream aggregation won't produce results if input samples are delayed for multiple aggregation intervals, causing gaps in the output.
2. Delayed and out-of-order samples can inflate or skew correctness of aggregation results.
Dropping delayed samples can result in missed observations in the results, while keeping delayed samples may inflate the results. It is up to the user to decide what they prefer in the produced results:
1. If you prefer consistency in aggregation results and do not want delayed data to affect the next aggregation window, drop all potentially delayed samples via [ignore_old_samples](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#ignoring-old-samples).
2. If you prefer to have the accumulated changes from delayed data reflected in aggregation windows after the delay, increase `staleness_interval` in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config).
This is especially important for outputs that track the last seen per-series values in order to properly calculate output values:
The following outputs track the last seen per-series values in order to properly calculate output values:
- [histogram_bucket](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#histogram_bucket)
- [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase)
- [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus)
- [rate_avg](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#rate_avg)
@@ -699,19 +689,21 @@ This is especially important for outputs that track the last seen per-series val
- [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total)
- [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus)
For these outputs, the last seen per-series value is dropped if no new samples are received for the given time series during consecutive aggregation intervals specified in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config) via `interval` option.
The last seen per-series value is dropped if no new samples are received for the given time series during two consecutive aggregations
intervals specified in [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config) via `interval` option.
If a new sample for the existing time series is received after that, then it is treated as the first sample for a new time series.
This may lead to the following issues when data is delayed:
This may lead to the following issues:
- [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total) and [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase) may produce unexpected spikes, since they assume that a new time series starts from `0`.
- [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus) and [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus) may produce lower than expected results, if you expect to see the accumulated changes reflected after the delay, since they ignore the first sample in a new time series.
- Lower than expected results for [total_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total_prometheus) and [increase_prometheus](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase_prometheus) outputs,
since they ignore the first sample in a new time series.
- Unexpected spikes for [total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#total) and [increase](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#increase) outputs, since they assume that new time series start from 0.
These issues can be improved in the following ways:
These issues can be fixed in the following ways:
- By increasing the `interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. It is recommended to set `interval` to at least 2× the scrape or push interval of the input. Set it to a higher value if the input pipeline is prone to large delays.
- By increasing the `staleness_interval` option in the [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `interval`.
delays in data ingestion pipelines.
- By specifying the `staleness_interval` option at [stream aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#stream-aggregation-config), so it covers the expected
delays in data ingestion pipelines. By default, the `staleness_interval` is equal to `2 x interval`.
## High resource usage

View File

@@ -66,8 +66,6 @@ specified individually per each `-remoteWrite.url`:
# interval is the interval for the aggregation.
# The aggregated stats is sent to remote storage once per interval.
# It is recommended to set `interval` to at least 2× the scrape or push interval of the input.
# Set it to a higher value if the input pipeline is prone to large delays.
#
interval: 1m
@@ -96,7 +94,7 @@ specified individually per each `-remoteWrite.url`:
# - total_prometheus
# See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness for more details.
#
# staleness_interval: 1m
# staleness_interval: 2m
# ignore_first_sample_interval specifies the interval after which the agent begins sending samples.
# By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
@@ -293,6 +291,9 @@ The results of `histogram_bucket` is equal to the following [MetricsQL](https://
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
```
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) option.
See also:
- [quantiles](#quantiles)
- [avg](#avg)
@@ -506,19 +507,6 @@ See also:
- [count_samples](#count_samples)
- [count_series](#count_series)
### `sum_samples_total`
`sum_samples_total` sums input delta values into a cumulative [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter) and outputs the result at the given `interval`.
`sum_samples_total` makes sense only for aggregating delta values from clients such as [StatsD counter](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting).
The results of `sum_samples_total` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/) query:
```metricsql
sum(running_sum(some_delta_values))
```
>Note: The aggregator will forget the cumulative counter if it has not seen input samples for `staleness_interval`(set to `interval` by default) per output result, so the output counter will start from `0` the next time it sees the input again. Increase the `staleness_interval` option if you want to extend the window to tolerate bigger gaps.
### total
`total` generates output [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#counter) by summing the input counters over the given `interval`.

View File

@@ -275,10 +275,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
Optional name of the cluster. If multiple vmagent clusters scrape the same targets, then each cluster must have unique name in order to properly de-duplicate samples received from these clusters. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info
-promscrape.cluster.replicationFactor int
The number of members in the cluster, which scrape the same targets. If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info (default 1)
-promscrape.cluster.shardByLabels array
Optional list of target labels, which will be used for sharding targets among cluster members if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info
Supports an array of values separated by comma or specified via multiple flags.
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-promscrape.config string
Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. The path can point to local file and to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details
-promscrape.config.dryRun
@@ -490,13 +486,13 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/
-search.maxTSDBStatusTopNSeries int
The maximum value of 'topN' argument that can be passed to /api/v1/status/tsdb API. This option allows limiting memory usage. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#tsdb-stats (default 1000)
-search.maxTagKeys int
The maximum number of tag keys returned per search. See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration (default 100000)
The maximum number of tag keys returned from /api/v1/labels . See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration (default 100000)
-search.maxTagValueSuffixesPerSearch int
The maximum number of tag value suffixes returned from /metrics/find (default 100000)
-search.maxTagValues int
The maximum number of tag values returned per search. See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration (default 100000)
The maximum number of tag values returned from /api/v1/label/<label_name>/values . See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration (default 100000)
-search.maxUniqueTimeseries int
The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect
The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional).
-search.maxWorkersPerQuery int
The maximum number of CPU cores a single query can use. The default value should work good for most cases. The flag can be set to lower values for improving performance of big number of concurrently executed queries. The flag can be set to bigger values for improving performance of heavy queries, which scan big number of time series (>10K) and/or big number of samples (>100M). There is no sense in setting this flag to values bigger than the number of CPU cores available on the system (default netstorage.defaultMaxWorkersPerQuery())
-search.minStalenessInterval duration

View File

@@ -797,12 +797,6 @@ For example, the following commands spread scrape targets among a cluster of two
The `-promscrape.cluster.memberNum` can be set to a StatefulSet pod name when `vmagent` runs in Kubernetes.
The pod name must end with a number in the range `0 ... promscrape.cluster.membersCount-1`. For example, `-promscrape.cluster.memberNum=vmagent-0`.
By default, targets are sharded among `vmagent` instances by all target labels after relabeling.
Use `-promscrape.cluster.shardByLabels` {{% available_from "v1.146.0" %}} to shard targets by specified labels instead.
For example, with `-promscrape.cluster.shardByLabels=service`, the targets with the same `service` label value will be scraped by the same `vmagent` instance,
which is useful when perform stream aggregation that requires all metrics with the same `service` label value to be processed on the same `vmagent` instance.
If none of the specified labels are present in the target labels, then all target labels will be used for sharding.
By default, each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances,
then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands
start a cluster of three `vmagent` instances, where two `vmagent` instances scrape each target:
@@ -934,29 +928,6 @@ vmagent will generate the following persistent queue folders:
2_0AAFDF53E314A72A
```
### On-disk persistence and data processing order
By default, vmagent processes data in FIFO order. If data has been written to the on-disk queue,
it must be flushed to the remote storage before newly ingested data can be forwarded there.
During long outages, vmagent may accumulate large amounts of data in the file-based queue,
which can introduce a significant lag between the moment data is collected by vmagent and the
moment it becomes visible at the remote storage.
This behavior can be changed with the `-remoteWrite.inmemoryQueues` {{% available_from "v1.146.0" %}} command-line flag.
When set to a non-zero value, vmagent starts the given number of additional workers,
which send only recently ingested data from the in-memory queue, while the workers configured via `-remoteWrite.queues` drain the file-based backlog concurrently.
This reduces the delivery lag for fresh samples after remote storage outages or slowdowns. The flag can be set individually per each `-remoteWrite.url`.
Note that these workers are started in addition to the workers configured via `-remoteWrite.queues`, so the total number of concurrent connections to
the remote storage becomes the sum of both flags. Take this into account if the remote storage limits the number of concurrent requests.
This flag has the following possible limitations:
* Samples may arrive at the remote storage out of order, since recent data can be delivered before the older backlogged data.
Do not use this option if the remote storage doesn't accept out-of-order samples.
* Recent data isn't guaranteed to take the fast path: if the in-memory queue is full,
newly ingested data is still written to the file-based queue and is delivered in FIFO order by the generic workers.
### Disabling On-disk persistence
There are cases when it is better to disable on-disk persistence for pending data on the `vmagent` side:

View File

@@ -240,10 +240,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Optional name of the cluster. If multiple vmagent clusters scrape the same targets, then each cluster must have unique name in order to properly de-duplicate samples received from these clusters. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info
-promscrape.cluster.replicationFactor int
The number of members in the cluster, which scrape the same targets. If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info (default 1)
-promscrape.cluster.shardByLabels array
Optional list of target labels, which will be used for sharding targets among cluster members if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info
Supports an array of values separated by comma or specified via multiple flags.
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-promscrape.config string
Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. The path can point to local file and to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details
-promscrape.config.dryRun
@@ -439,10 +435,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'
Supports an array of values separated by comma or specified via multiple flags.
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.inmemoryQueues array
The number of additional workers per each -remoteWrite.url, which send only recently ingested data from the in-memory queue, while the file-based queue at -remoteWrite.tmpDataPath is drained by workers configured via -remoteWrite.queues. This reduces delivery lag for fresh samples when the file-based queue contains a backlog accumulated during remote storage outages. (default 0)
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to default value.
-remoteWrite.keepDanglingQueues
Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
-remoteWrite.label array

View File

@@ -34,9 +34,9 @@ vmctl command-line tool is available as:
Download and unpack vmctl:
```sh
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.146.0/vmutils-darwin-arm64-v1.146.0.tar.gz
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.145.0/vmutils-darwin-arm64-v1.145.0.tar.gz
tar xzf vmutils-darwin-arm64-v1.146.0.tar.gz
tar xzf vmutils-darwin-arm64-v1.145.0.tar.gz
```
Once binary is unpacked, see the full list of supported modes by running the following command:

View File

@@ -46,13 +46,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -42,13 +42,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -41,13 +41,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -34,13 +34,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -49,13 +49,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -34,13 +34,9 @@ OPTIONS:
Should be the same as --httpListenAddr value for single-node version or vminsert component.
When importing into the clustered version do not forget to set additionally --vm-account-id flag.
Please note, that vmctl performs initial readiness check for the given address by checking /health endpoint. (default: "http://localhost:8428")
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-headers value Optional HTTP headers to send with each request to the corresponding destination address.
For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address.
Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'
--vm-bearer-token value Optional bearer auth token to use for the corresponding --vm-addr
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
--vm-user value VictoriaMetrics username for basic auth [$VM_USERNAME]
--vm-password value VictoriaMetrics password for basic auth [$VM_PASSWORD]
--vm-account-id value AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant).
AccountID is required when importing into the clustered version of VictoriaMetrics.
It is possible to set it as accountID:projectID, where projectID is also arbitrary 32-bit integer.
If projectID isn't set, then it equals to 0

View File

@@ -0,0 +1,125 @@
---
weight: 3
menu:
docs:
parent: victoriametrics
weight: 13
title: vmestimator
tags:
- metrics
- cardinality
aliases:
- /vmestimator.html
- /vmestimator/index.html
- /vmestimator/
---
`vmestimator` is a cardinality estimator that receives Prometheus remote write streams
and exposes approximate time series cardinality as metrics (TODO: support remote write).
It is useful for tracking how many unique time series are flowing through across all metrics, metric name, or broken down by specific labels.
## How it works
Running:
```
go run ./app/vmestimator/... -config=streams.yaml -httpListenAddr=:8490
```
Configuration:
```yaml
streams:
# Track total cardinality with no grouping.
- interval: '1h'
# Track cardinality grouped by metric name.
- interval: '1h'
group_by: ["__name__"]
# Track cardinality grouped by job label.
- interval: '1m'
group_by: ["job"]
# Track cardinality grouped by tenant info
- group_by: ["vm_account_id", "vm_project_id"]
# Track cardinality of jobs, with extra labels on the output metrics.
- group_by: ["job"]
labels:
region: 'eu-central-1'
env: 'production'
```
Fields:
- `group_by` (optional): list of label names to split cardinality by; each distinct combination gets its own estimate
- `group_limit` (optional): maximum number of distinct groups to track; excess groups are counted in a rejected sketch but not individually; defaults to `10000`
- `buckets` (optional): number of internal shards for parallel ingestion; defaults to `min(64, 2*availableCPUs)`
- `labels` (optional): extra labels attached to all output metrics for this estimator
- `interval` (optional): how often to rotate (reset) counters; defaults to `5m`
- `hll_precision` (optional): HyperLogLog precision, must be in range `[4, 18]`; higher values yield more accurate estimates at the cost of more memory; defaults to `14`
- `hll_sparse` (optional): whether to use sparse HyperLogLog representation, which reduces memory for low-cardinality groups; defaults to `true`
## Metrics
By default, cardinality estimates are merged with regular metrics and exposed at `/metrics`.
This behavior is controlled by the following flags:
- `-cardinalityMetrics.cacheTTL` (default `30s`): how long to cache the cardinality metrics response before recomputing it
The HTTP endpoint is controlled by the `-cardinalityMetrics.exposeAt` flag:
- `-cardinalityMetrics.exposeAt=/metrics` (default): cardinality metrics merged with regular metrics at `/metrics`
- `-cardinalityMetrics.exposeAt=/cardinality/metrics`: only cardinality metrics exposed at that path
- `-cardinalityMetrics.exposeAt=`: cardinality metrics not exposed via HTTP
All metrics include `interval`, `group_by_keys`, and `group_by_values` labels. Extra labels from the `labels` config field are inserted between `interval` and `group_by_keys` (sorted alphabetically).
**Without grouping** (`group_by_keys` is `__global__` and `group_by_values` is not set):
```
cardinality_estimate{interval="1h0m0s",group_by_keys="__global__"} 142300
```
**With grouping** — one summary line (total distinct group count) plus one line per distinct label value combination. Each per-group line also includes individual `by_{key}="{val}"` labels for each group key:
```
cardinality_estimate{interval="5m0s",group_by_keys="__group__",group_by_values="instance,job"} 2
cardinality_estimate{interval="5m0s",group_by_keys="instance,job",group_by_values="host1:9090,prometheus",by_instance="host1:9090",by_job="prometheus"} 312
cardinality_estimate{interval="5m0s",group_by_keys="instance,job",group_by_values="host2:9100,node",by_instance="host2:9100",by_job="node"} 87
```
**With extra labels:**
```
cardinality_estimate{interval="5m0s",env="production",region="eu-central-1",group_by_keys="job",group_by_values="prometheus",by_job="prometheus"} 312
```
## Cluster
`vmestimator` can be run as a cluster for high availability or when CPU per instance becomes a limiting factor.
In this mode instances are split into two roles: **storages** that receive writes, and **selectors** that read from storages and expose the merged result.
**Storage nodes** — receive Prometheus remote write and serve snapshots:
```
vmestimator -config=streams.yaml -httpListenAddr=:8491 -cardinalityMetrics.exposeAt=/cardinality/metrics
vmestimator -config=streams.yaml -httpListenAddr=:8492 -cardinalityMetrics.exposeAt=/cardinality/metrics
vmestimator -config=streams.yaml -httpListenAddr=:8493 -cardinalityMetrics.exposeAt=/cardinality/metrics
```
Setting `-cardinalityMetrics.exposeAt=/cardinality/metrics` keeps cardinality estimates off the default `/metrics` path. This way `/metrics` on a storage node returns only its own operational metrics, while `/cardinality/metrics` gives you the storage's local cardinality estimates if you need to inspect or debug a specific node.
**Selector nodes** — query all storage nodes, merge HyperLogLog sketches, and expose consolidated cardinality estimates:
```
vmestimator -storageNode=http://vmestimator-storage-1:8491 \
-storageNode=http://vmestimator-storage-2:8492 \
-storageNode=http://vmestimator-storage-3:8493 \
-httpListenAddr=:8490
```
When `-storageNode` flags are provided and no `-config` is specified, the selector runs without local estimators and only merges remote data.
## Operational metrics
When grouping is enabled, vmestimator exposes per-bucket operational metrics at `/metrics`:
- `vmestimator_estimator_group_size{group_by_keys, bucket}` — number of active groups in this bucket after the last rotation
- `vmestimator_estimator_group_rejected_size{group_by_keys}` — estimated number of distinct group values rejected since the last rotation because `group_limit` was reached
- `vmestimator_estimator_group_limit{group_by_keys, bucket}` — configured `group_limit` for this bucket

9
go.mod
View File

@@ -2,6 +2,8 @@ module github.com/VictoriaMetrics/VictoriaMetrics
go 1.26.4
replace github.com/axiomhq/hyperloglog => github.com/makasim/hyperloglog v0.0.10-reuse-memory
require (
cloud.google.com/go/storage v1.62.3
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.22.0
@@ -10,15 +12,17 @@ require (
github.com/VictoriaMetrics/VictoriaLogs v1.121.1-0.20260616132739-c901a1e31cb3
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.44.0
github.com/VictoriaMetrics/metricsql v0.87.2
github.com/VictoriaMetrics/metrics v1.43.2
github.com/VictoriaMetrics/metricsql v0.87.1
github.com/aws/aws-sdk-go-v2 v1.42.0
github.com/aws/aws-sdk-go-v2/config v1.32.25
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.103.3
github.com/axiomhq/hyperloglog v0.0.0-00010101000000-000000000000
github.com/bmatcuk/doublestar/v4 v4.10.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/cheggaaa/pb/v3 v3.1.7
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v1.0.0
github.com/google/go-cmp v0.7.0
@@ -97,6 +101,7 @@ require (
github.com/hashicorp/go-version v1.9.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kamstrup/intmap v0.5.2 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
github.com/knadh/koanf/v2 v2.3.5 // indirect

14
go.sum
View File

@@ -58,10 +58,10 @@ github.com/VictoriaMetrics/easyproto v1.2.0 h1:FJT9uNXA2isppFuJErbLqD306KoFlehl7
github.com/VictoriaMetrics/easyproto v1.2.0/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8FRbRDUvC7OVc=
github.com/VictoriaMetrics/fastcache v1.13.3/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/VictoriaMetrics/metrics v1.44.0 h1:Fr8yqQSV+ZfYaDD/anqk1E8e9YPgfleSleJmAI0M0Tw=
github.com/VictoriaMetrics/metrics v1.44.0/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metricsql v0.87.2 h1:7OsrcDBWREWKqqpnFyIUEOM4FNv2qHvCoww2GYz3Tc0=
github.com/VictoriaMetrics/metricsql v0.87.2/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VictoriaMetrics/metrics v1.43.2 h1:+8pIQEGwchKS5CYFyvv3LKvNXGi7baZ9hmIV4RHqibY=
github.com/VictoriaMetrics/metrics v1.43.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metricsql v0.87.1 h1:GdIblCDgXsrBJcBSDtFT8SLK7P+QHijdQmcr4L/f0Go=
github.com/VictoriaMetrics/metricsql v0.87.1/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
@@ -150,6 +150,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33 h1:ucRHb6/lvW/+mTEIGbvhcYU3S8+uSNkuMjx/qZFfhtM=
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.193.0 h1:CSbbUl5LufT75KPNvex3vDnBYjY2RfJWs7T3Ac7dHpA=
github.com/digitalocean/godo v1.193.0/go.mod h1:xQsWpVCCbkDrWisHA72hPzPlnC+4W5w/McZY5ij9uvU=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
@@ -292,6 +294,8 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kamstrup/intmap v0.5.2 h1:qnwBm1mh4XAnW9W9Ue9tZtTff8pS6+s6iKF6JRIV2Dk=
github.com/kamstrup/intmap v0.5.2/go.mod h1:gWUVWHKzWj8xpJVFf5GC0O26bWmv3GqdnIX/LMT6Aq4=
github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU=
github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -314,6 +318,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/linode/linodego v1.69.1 h1:f45N2MHR/oece2/ktTTCYmrlfse4//k3NgwcF5zbGZ0=
github.com/linode/linodego v1.69.1/go.mod h1:Fha0NYsQSx5VZK1HQNJY/z/dIxxkFp+vb5veawbmAUw=
github.com/makasim/hyperloglog v0.0.10-reuse-memory h1:tqMXSDlkVujI/aGYUm6uwt4lRUQcne22MOLcJBgLAGc=
github.com/makasim/hyperloglog v0.0.10-reuse-memory/go.mod h1:YjX/dQqCR/7QYX0g8mu8UZAjpIenz1FKM71UEsjFoTo=
github.com/mattn/go-colorable v0.1.15 h1:+u9SLTRGnXv73cEsnsmoZBom+dMU88B2M0aDcWy0/jY=
github.com/mattn/go-colorable v0.1.15/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4=

View File

@@ -91,11 +91,6 @@ func (r *Restore) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
for _, srcPart := range srcParts {
if !srcPart.IsLocalPathInsideDir(r.Dst.Dir) {
return fmt.Errorf("part file %s would be written outside storage directory %s", srcPart.Path, r.Dst.Dir)
}
}
logger.Infof("obtaining list of parts at %s", dst)
dstParts, err := dst.ListParts()
if err != nil {

View File

@@ -120,17 +120,6 @@ func (p *Part) ParseFromRemotePath(remotePath string) bool {
return true
}
// IsLocalPathInsideDir returns true if the part's local path resolves inside dir.
// It resolves ../../ sequences and prevents path traversal outside dir.
func (p *Part) IsLocalPathInsideDir(dir string) bool {
dir = filepath.Clean(dir)
if dir == `/` {
return true
}
return strings.HasPrefix(p.LocalPath(dir), dir+string(filepath.Separator))
}
// MaxPartSize is the maximum size for each part.
//
// The MaxPartSize reduces bandwidth usage during retires on network errors

View File

@@ -1,54 +0,0 @@
package common
import (
"testing"
)
func TestIsLocalPathInsideDir(t *testing.T) {
f := func(dir, path string, expected bool) {
t.Helper()
p := Part{Path: path}
if got := p.IsLocalPathInsideDir(dir); got != expected {
t.Fatalf("IsLocalPathInsideDir(%q, %q): got %v, want %v", dir, path, got, expected)
}
}
// normal path inside dir
f("/data/storage", "parts/segment1/data.bin", true)
// dir with trailing slash is normalized
f("/data/storage/", "parts/segment1/data.bin", true)
// deeply nested path
f("/data/storage", "a/b/c/d/e/file.dat", true)
// traversal that stays inside dir
f("/data/storage", "foo/../bar/file.dat", true)
// root dir allows any path
f("/", "any/path/here", true)
// root dir allows traversal attempts since nothing is outside /
f("/", "../outside/marker.txt", true)
// path with leading slash is treated as relative by filepath.Join and stays inside dir
f("/data/storage", "/outside/marker.txt", true)
// dir with .. components is normalized; path inside resolved dir
f("/data/storage/../foo", "parts/file.dat", true)
// dir with .. components is normalized; traversal outside resolved dir
f("/data/storage/../foo", "../storage/evil.txt", false)
// simple traversal
f("/data/storage", "../outside/marker.txt", false)
// traversal with trailing slash in dir
f("/data/storage/", "../outside/marker.txt", false)
// deep traversal
f("/data/storage", "a/../../outside/marker.txt", false)
// sibling directory whose name shares a prefix with dir
f("/data/storage", "../storagefoo/evil.txt", false)
}

View File

@@ -129,10 +129,6 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) {
// On platforms with preallocation, writes go to a .tmp file that must be
// finalized with FinalizeFile.
func (fs *FS) NewDirectWriteCloser(p common.Part) (io.WriteCloser, error) {
if !p.IsLocalPathInsideDir(fs.Dir) {
logger.Fatalf("BUG: part file %s would be written outside storage directory %s", p.Path, fs.Dir)
}
path := fs.writePath(p)
if err := fs.mkdirAll(path); err != nil {
return nil, err

View File

@@ -26,8 +26,6 @@ type FastQueue struct {
// isPQDisabled is set to true when pq is disabled.
isPQDisabled bool
prioritizeInMemoryData bool
// pq is file-based queue
pq *queue
@@ -41,31 +39,6 @@ type FastQueue struct {
stopDeadline uint64
}
// OpenFastQueueOpts defines options for FastQueue
type OpenFastQueueOpts struct {
// MaxInmemoryBlocks defines amount of blocks to hold in memory before falling back to file-based persistence.
MaxInmemoryBlocks int
// MaxPendingBytes limits file-based size of the queue.
// If MaxPendingBytes is 0, then the queue size is unlimited.
// The oldest data is dropped when the queue
// reaches MaxPendingSize.
MaxPendingBytes int64
// IsPQDisabled defines whether file-based queue could be used.
// If it is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
IsPQDisabled bool
// PrioritizeInMemoryData instructs FastQueue to write data into the in-memory queue
// even if the file-based queue is not empty.
// This is useful when data order doesn't matter and getting the most recent data
// as fast as possible is more important.
PrioritizeInmemoryData bool
}
// MustOpenFastQueueWithOpts opens persistent queue at the given path with given opts
func MustOpenFastQueueWithOpts(path, name string, opts OpenFastQueueOpts) *FastQueue {
return mustOpenFastQueue(path, name, opts)
}
// MustOpenFastQueue opens persistent queue at the given path.
//
// It holds up to maxInmemoryBlocks in memory before falling back to file-based persistence.
@@ -76,22 +49,11 @@ func MustOpenFastQueueWithOpts(path, name string, opts OpenFastQueueOpts) *FastQ
// if isPQDisabled is set to true, then write requests that exceed in-memory buffer capacity are rejected.
// in-memory queue part can be stored on disk during graceful shutdown.
func MustOpenFastQueue(path, name string, maxInmemoryBlocks int, maxPendingBytes int64, isPQDisabled bool) *FastQueue {
opts := OpenFastQueueOpts{
MaxInmemoryBlocks: maxInmemoryBlocks,
MaxPendingBytes: maxPendingBytes,
IsPQDisabled: isPQDisabled,
}
return mustOpenFastQueue(path, name, opts)
}
func mustOpenFastQueue(path, name string, opts OpenFastQueueOpts) *FastQueue {
maxPendingBytes := opts.MaxPendingBytes
isPQDisabled := opts.IsPQDisabled
pq := mustOpen(path, name, maxPendingBytes)
fq := &FastQueue{
pq: pq,
isPQDisabled: isPQDisabled,
prioritizeInMemoryData: opts.PrioritizeInmemoryData,
ch: make(chan *bytesutil.ByteBuffer, opts.MaxInmemoryBlocks),
pq: pq,
isPQDisabled: isPQDisabled,
ch: make(chan *bytesutil.ByteBuffer, maxInmemoryBlocks),
}
fq.cond.L = &fq.mu
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()
@@ -119,7 +81,7 @@ func mustOpenFastQueue(path, name string, opts OpenFastQueueOpts) *FastQueue {
if isPQDisabled {
persistenceStatus = "disabled"
}
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, opts.MaxInmemoryBlocks, pendingBytes, persistenceStatus)
logger.Infof("opened fast queue at %q with maxInmemoryBlocks=%d, it contains %d pending bytes, persistence is %s", path, maxInmemoryBlocks, pendingBytes, persistenceStatus)
return fq
}
@@ -135,7 +97,7 @@ func (fq *FastQueue) IsWriteBlocked() bool {
}
fq.mu.Lock()
defer fq.mu.Unlock()
return len(fq.ch) == cap(fq.ch) || (fq.pq.GetPendingBytes() > 0 && !fq.prioritizeInMemoryData)
return len(fq.ch) == cap(fq.ch) || fq.pq.GetPendingBytes() > 0
}
// UnblockAllReaders unblocks all the readers.
@@ -231,24 +193,19 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
defer fq.mu.Unlock()
isPQWriteAllowed := !fq.isPQDisabled || ignoreDisabledPQ
if !isPQWriteAllowed && fq.pq.GetPendingBytes() > 0 {
// fast path: there is pending data at file-based queue,
// it must be drained before in-memory queue could be used.
// File-based queue could be non-empty after vmagent restart
// and vmagent couldn't flush in-memory queue during shutdown.
return false
}
if !fq.prioritizeInMemoryData {
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
fq.pq.MustWriteBlock(block)
return true
fq.flushInmemoryBlocksToFileIfNeededLocked()
if n := fq.pq.GetPendingBytes(); n > 0 {
// The file-based queue isn't drained yet. This means that in-memory queue cannot be used yet.
// So put the block to file-based queue.
if len(fq.ch) > 0 {
logger.Panicf("BUG: the in-memory queue must be empty when the file-based queue is non-empty; it contains %d pending bytes", n)
}
if !isPQWriteAllowed {
return false
}
fq.pq.MustWriteBlock(block)
return true
}
if len(fq.ch) == cap(fq.ch) {
// There is no space left in the in-memory queue. Put the data to file-based queue.
@@ -259,7 +216,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
fq.pq.MustWriteBlock(block)
return true
}
// Fast path - put the block to in-memory queue.
bb := blockBufPool.Get()
bb.B = append(bb.B[:0], block...)
fq.ch <- bb
@@ -272,7 +229,7 @@ func (fq *FastQueue) tryWriteBlock(block []byte, ignoreDisabledPQ bool) bool {
}
// MustReadBlock reads the next block from fq into dst and returns it.
// It first reads from the file-based queue, then checks in-memory queue.
// It first reads from the in-memory queue, then checks file-based queue.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
fq.mu.Lock()
@@ -282,15 +239,16 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if n := fq.pq.GetPendingBytes(); n > 0 {
data, ok := fq.pq.MustReadBlockNonblocking(dst)
if ok {
return data, true
}
dst = data
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
continue
}
if fq.stopDeadline > 0 {
return dst, false
@@ -301,27 +259,6 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
}
}
// MustReadInMemoryBlockBlocking reads the next block from the in-memory queue into dst and returns it.
// It blocks until a block is available or the stop deadline is exceeded, in which case it returns (dst, false).
func (fq *FastQueue) MustReadInMemoryBlockBlocking(dst []byte) ([]byte, bool) {
fq.mu.Lock()
defer fq.mu.Unlock()
for {
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
return dst, false
}
if len(fq.ch) > 0 {
return fq.mustReadInMemoryBlockLocked(dst), true
}
if fq.stopDeadline > 0 {
return dst, false
}
// There are no blocks. Wait for new block.
fq.cond.Wait()
}
}
// MustReadInMemoryBlock reads the next block from the in-memory queue into dst and returns it.
// It returns (dst, true) if a block was available, or (nil, false) if the in-memory queue is empty.
// It does not block waiting for new blocks.
@@ -340,6 +277,9 @@ func (fq *FastQueue) mustReadInMemoryBlockLocked(dst []byte) []byte {
if len(fq.ch) == 0 {
logger.Panicf("BUG: the function must not be called when in-memory queue is empty. Caller should verify the queue len upfront")
}
if n := fq.pq.GetPendingBytes(); n > 0 {
logger.Panicf("BUG: the file-based queue must be empty when the in-memory queue is non-empty; it contains %d pending bytes", n)
}
bb := <-fq.ch
fq.pendingInmemoryBytes -= uint64(len(bb.B))
fq.lastInmemoryBlockReadTime = fasttime.UnixTimestamp()

View File

@@ -364,64 +364,3 @@ func TestFastQueueWriteReadWithIgnoreDisabledPQ(t *testing.T) {
fq.MustClose()
fs.MustRemoveDir(path)
}
func TestFastQueueWriteReadWithPrioritizeInmemory(t *testing.T) {
path := "fast-queue-write-read-inmemory-disabled-pq-force-write"
fs.MustRemoveDir(path)
capacity := 20
opts := OpenFastQueueOpts{
MaxInmemoryBlocks: capacity,
PrioritizeInmemoryData: true,
}
fq := MustOpenFastQueueWithOpts(path, "foobar", opts)
if n := fq.GetInmemoryQueueLen(); n != 0 {
t.Fatalf("unexpected non-zero inmemory queue size: %d", n)
}
var blocks []string
for i := range capacity {
block := fmt.Sprintf("block %d", i)
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
if n := fq.GetInmemoryQueueLen(); n != capacity {
t.Fatalf("unexpected non-zero inmemory queue size: %d: %d", n, capacity)
}
for i := range capacity {
block := fmt.Sprintf("block %d-%d", i, i)
if !fq.TryWriteBlock([]byte(block)) {
t.Fatalf("TryWriteBlock must return true in this context")
}
blocks = append(blocks, block)
}
// in case of capacity exceed last element is written into file-based queue
if n := fq.GetInmemoryQueueLen(); n != capacity-1 {
t.Fatalf("unexpected non-zero inmemory queue size: %d: %d", n, capacity)
}
// make sure that recently ingested elemements returned first
for idx := capacity + 1; idx < capacity*2; idx++ {
buf, ok := fq.MustReadInMemoryBlockBlocking(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != blocks[idx] {
t.Fatalf("unexpected block read; got %q; want %q: %d", buf, blocks[idx], idx)
}
}
blocks = blocks[:capacity+1]
for _, block := range blocks {
buf, ok := fq.MustReadBlock(nil)
if !ok {
t.Fatalf("unexpected ok=false")
}
if string(buf) != block {
t.Fatalf("unexpected block read; got %q; want %q", buf, block)
}
}
fq.MustClose()
fs.MustRemoveDir(path)
}

View File

@@ -50,17 +50,6 @@ func (ie *IfExpression) Parse(s string) error {
return nil
}
// ParseFromMetricExpr parses if from given MetricExpr
func (ie *IfExpression) ParseFromMetricExpr(me *metricsql.MetricExpr) error {
var ieLocal ifExpression
if err := ieLocal.parseFromMetricExpr(me); err != nil {
return err
}
ie.ies = []*ifExpression{&ieLocal}
return nil
}
// UnmarshalJSON unmarshals ie from JSON data.
func (ie *IfExpression) UnmarshalJSON(data []byte) error {
var v any
@@ -193,16 +182,6 @@ func (ie *ifExpression) Parse(s string) error {
return nil
}
func (ie *ifExpression) parseFromMetricExpr(me *metricsql.MetricExpr) error {
lfss, err := metricExprToLabelFilterss(me)
if err != nil {
return fmt.Errorf("cannot parse series selector: %w", err)
}
ie.s = string(me.AppendString(nil))
ie.lfss = lfss
return nil
}
// UnmarshalJSON unmarshals ie from JSON data.
func (ie *ifExpression) UnmarshalJSON(data []byte) error {
var s string

View File

@@ -76,9 +76,6 @@ var (
"Every %d occurrence in the template is substituted with -promscrape.cluster.memberNum at urls to vmagent instances responsible for scraping the given target "+
"at /service-discovery page. For example -promscrape.cluster.memberURLTemplate='http://vmagent-%d:8429/targets'. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more details")
clusterShardByLabels = flagutil.NewArrayString("promscrape.cluster.shardByLabels", "Optional list of target labels, which will be used for sharding targets among cluster members "+
"if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
@@ -89,10 +86,7 @@ var (
"Bigger uncompressed responses are rejected. See also max_scrape_size option at https://docs.victoriametrics.com/victoriametrics/sd_configs/#scrape_configs")
)
var (
clusterMemberID int
clusterShardByLabelsSorted []string
)
var clusterMemberID int
func mustInitClusterMemberID() {
s := *clusterMemberNum
@@ -116,15 +110,6 @@ func mustInitClusterMemberID() {
clusterMemberID = n
}
func initClusterShardByLabels() {
if len(*clusterShardByLabels) == 0 {
clusterShardByLabelsSorted = nil
return
}
clusterShardByLabelsSorted = slices.Clone(*clusterShardByLabels)
slices.Sort(clusterShardByLabelsSorted)
}
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
Global GlobalConfig `yaml:"global,omitempty"`
@@ -1153,28 +1138,12 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
}
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels) []byte {
originalDstLen := len(dst)
for _, targetLabelName := range clusterShardByLabelsSorted {
for _, label := range labels.GetLabels() {
if label.Name == targetLabelName {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
break
}
}
}
// Use all labels to compute the key if `promscrape.cluster.shardByLabels` is not configured
if len(dst) == originalDstLen {
for _, label := range labels.GetLabels() {
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
}
return dst
for _, label := range labels.GetLabels() {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
}
return dst
}

View File

@@ -148,77 +148,6 @@ func TestGetClusterMemberNumsForScrapeWork(t *testing.T) {
f("foo", 3, 2, []int{2, 0})
}
func TestAppendScrapeWorkKeyShardByLabels(t *testing.T) {
f := func(labels map[string]string, shardByLabels []string, expectedKey string) {
t.Helper()
originValue := *clusterShardByLabels
*clusterShardByLabels = shardByLabels
defer func() {
*clusterShardByLabels = originValue
}()
initClusterShardByLabels()
outputKey := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labels)))
if expectedKey != outputKey {
t.Fatalf("unexpected sharding key:%q for target labels:%v with shardByLabels=%q, expect: %q",
outputKey, labels, shardByLabels, expectedKey)
}
}
// didn't specify -promscrape.cluster.shardByLabels, so all labels will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{},
"a=aa,b=bb,c=cc,d=dd,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"a", "c"},
"a=aa,c=cc,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding even if they're not in order in -promscrape.cluster.shardByLabels.
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"c", "a"},
"a=aa,c=cc,",
)
// match part of labels in -promscrape.cluster.shardByLabels, label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"c": "cc",
"d": "dd"},
[]string{"a", "b", "c"},
"a=aa,c=cc,",
)
// none of labels in -promscrape.cluster.shardByLabels is matched, so all labels will be used for sharding
f(
map[string]string{
"d": "dd",
"e": "ee"},
[]string{"a", "b", "c"},
"d=dd,e=ee,",
)
}
func TestLoadStaticConfigs(t *testing.T) {
scs, err := loadStaticConfigs("testdata/file_sd.json")
if err != nil {

View File

@@ -66,7 +66,6 @@ func CheckConfig() error {
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
mustInitClusterMemberID()
initClusterShardByLabels()
globalStopChan = make(chan struct{})
scraperWG.Go(func() {
runScraper(*promscrapeConfigFile, pushData, globalStopChan)

View File

@@ -1,7 +1,6 @@
package streamaggr
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
@@ -18,10 +17,9 @@ import (
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
droppedSamples *metrics.Counter
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
}
type dedupAggrShard struct {
@@ -49,20 +47,10 @@ type dedupAggrSample struct {
timestamp int64
}
func newDedupAggr(ms *metrics.Set, metricLabels string) *dedupAggr {
var d dedupAggr
d.shards = make([]dedupAggrShard, dedupAggrShardsCount)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.itemsCount())
})
d.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.droppedSamples = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_dropped_samples_total{%s}`, metricLabels))
return &d
func newDedupAggr() *dedupAggr {
return &dedupAggr{
shards: make([]dedupAggrShard, dedupAggrShardsCount),
}
}
func (da *dedupAggr) sizeBytes() uint64 {
@@ -99,8 +87,7 @@ func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
if len(shardSamples) == 0 {
continue
}
deduplicatedSamples := da.shards[i].pushSamples(shardSamples, isGreen)
da.droppedSamples.Add(deduplicatedSamples)
da.shards[i].pushSamples(shardSamples, isGreen)
}
putPerShardSamples(pss)
}
@@ -180,9 +167,8 @@ func putPerShardSamples(pss *perShardSamples) {
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
var state *dedupAggrState
var deduplicatedSamples int
if isGreen {
state = &das.green
@@ -212,10 +198,8 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) int {
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
deduplicatedSamples++
}
state.samplesBuf = samplesBuf
return deduplicatedSamples
}
// deduplicateSamples returns deduplicated timestamp and value results.

View File

@@ -7,13 +7,11 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
)
func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample)
@@ -61,7 +59,7 @@ func TestDedupAggrSerial(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
var wg sync.WaitGroup
for range concurrency {

View File

@@ -5,8 +5,6 @@ import (
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
@@ -25,7 +23,7 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr(metrics.NewSet(), "")
da := newDedupAggr()
b.ResetTimer()
b.ReportAllocs()

View File

@@ -44,6 +44,7 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
interval: interval,
enableWindows: enableWindows,
@@ -63,7 +64,16 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
d.da = newDedupAggr(ms, metricLabels)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)
@@ -110,7 +120,6 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
d.da.droppedSamples.Inc()
continue
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{

View File

@@ -31,16 +31,12 @@ type increaseAggrValue struct {
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
if av.total == nil {
av.total = new(float64)
}
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared[key]
// The last value is stale, reset it.
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
ok = false
if av.total == nil {
av.total = new(float64)
}
if ok {
if sample.timestamp < lv.timestamp {

View File

@@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
var rateAggrSharedValuePool sync.Pool
@@ -100,12 +99,6 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
ac := c.(*rateAggrConfig)
var state *rateAggrStateValue
sv, ok := av.shared[key]
// The last value is stale, reset it.
if ok && sv.deleteDeadline < int64(fasttime.UnixTimestamp())*1000 {
delete(av.shared, key)
putRateAggrSharedValue(sv)
ok = false
}
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {

View File

@@ -43,7 +43,6 @@ var supportedOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",
@@ -173,12 +172,12 @@ type Config struct {
DedupInterval string `yaml:"dedup_interval,omitempty"`
// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus, rate_avg and rate_sum.
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
StalenessInterval string `yaml:"staleness_interval,omitempty"`
// IgnoreFirstSampleInterval specifies the interval after which the agent begins sending samples.
// By default, it is set to the staleness interval, and it helps reduce the initial sample load after an agent restart.
// This parameter is relevant only for the following outputs: total, total_prometheus, increase and increase_prometheus.
// This parameter is relevant only for the following outputs: total, total_prometheus, increase, increase_prometheus, and histogram_bucket.
IgnoreFirstSampleInterval string `yaml:"ignore_first_sample_interval,omitempty"`
// Outputs is a list of output aggregate functions to produce.
@@ -502,9 +501,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return nil, fmt.Errorf("interval=%s must be a multiple of dedup_interval=%s", interval, dedupInterval)
}
// set the default staleness interval as the aggregation interval, to be consistent with query lookbehind window in metricsQL,
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102
stalenessInterval := interval
// check cfg.StalenessInterval
stalenessInterval := interval * 2
if cfg.StalenessInterval != "" {
stalenessInterval, err = time.ParseDuration(cfg.StalenessInterval)
if err != nil {
@@ -670,7 +668,18 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
if dedupInterval > 0 {
a.da = newDedupAggr(ms, metricLabels)
a.da = newDedupAggr()
a.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
a.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount()
return float64(n)
})
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
@@ -771,9 +780,7 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "stdvar":
return newStdvarAggrConfig(), nil
case "sum_samples":
return newSumSamplesAggrConfig(true), nil
case "sum_samples_total":
return newSumSamplesAggrConfig(false), nil
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":

View File

@@ -1,5 +1,3 @@
//go:build synctest
package streamaggr
import (
@@ -477,125 +475,26 @@ foo:1m_increase_prometheus{baz="qwe"} 15
outputs: [increase_prometheus]
`, "11111111")
// increase, increase_prometheus, total, total_prometheus outputs with different staleness intervals
f([]string{`
foo 5
bar 200
`, `
foo 10
bar 201
`, ``, `
foo 7
bar 205
`}, time.Minute, `bar:1m_increase 200
bar:1m_increase 1
bar:1m_increase 205
bar:1m_increase_prometheus 0
bar:1m_increase_prometheus 1
bar:1m_increase_prometheus 0
bar:1m_total 200
bar:1m_total 201
bar:1m_total 205
bar:1m_total_prometheus 0
bar:1m_total_prometheus 1
bar:1m_total_prometheus 0
bar:1m_without_non_existing_label_increase 0
bar:1m_without_non_existing_label_increase 1
bar:1m_without_non_existing_label_increase 4
bar:1m_without_non_existing_label_increase_prometheus 0
bar:1m_without_non_existing_label_increase_prometheus 1
bar:1m_without_non_existing_label_increase_prometheus 4
bar:1m_without_non_existing_label_total 0
bar:1m_without_non_existing_label_total 1
bar:1m_without_non_existing_label_total 1
bar:1m_without_non_existing_label_total 5
bar:1m_without_non_existing_label_total_prometheus 0
bar:1m_without_non_existing_label_total_prometheus 1
bar:1m_without_non_existing_label_total_prometheus 1
bar:1m_without_non_existing_label_total_prometheus 5
foo:1m_increase 5
foo:1m_increase 5
foo:1m_increase 7
foo:1m_increase_prometheus 0
foo:1m_increase_prometheus 5
foo:1m_increase_prometheus 0
foo:1m_total 5
foo:1m_total 10
foo:1m_total 7
foo:1m_total_prometheus 0
foo:1m_total_prometheus 5
foo:1m_total_prometheus 0
foo:1m_without_non_existing_label_increase 0
foo:1m_without_non_existing_label_increase 5
foo:1m_without_non_existing_label_increase 7
foo:1m_without_non_existing_label_increase_prometheus 0
foo:1m_without_non_existing_label_increase_prometheus 5
foo:1m_without_non_existing_label_increase_prometheus 7
foo:1m_without_non_existing_label_total 0
foo:1m_without_non_existing_label_total 5
foo:1m_without_non_existing_label_total 5
foo:1m_without_non_existing_label_total 12
foo:1m_without_non_existing_label_total_prometheus 0
foo:1m_without_non_existing_label_total_prometheus 5
foo:1m_without_non_existing_label_total_prometheus 5
foo:1m_without_non_existing_label_total_prometheus 12
`, `
- interval: 1m
ignore_first_sample_interval: 0s
outputs: [increase, increase_prometheus, total, total_prometheus]
- interval: 1m
staleness_interval: 2m
without: [non_existing_label]
outputs: [increase, increase_prometheus, total, total_prometheus]
`, "111111")
// sum_sample and sum_samples_total outputs with different staleness intervals
// multiple aggregate configs
f([]string{`
foo 1
foo 2 1
foo{bar="baz"} 2
`, `
foo 4
`, ``, ``, `
foo 6
`, ``, ``}, time.Minute, `foo:1m_sum_samples 3
foo:1m_sum_samples 4
foo:1m_sum_samples 6
foo:1m_sum_samples_total 3
foo:1m_sum_samples_total 7
foo:1m_sum_samples_total 6
foo:1m_sum_samples_total{bar="baz"} 2
foo 3.3
`, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 0
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 0
foo:1m_sum_samples{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples 3
foo:1m_without_non-existing-label_sum_samples 4
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples 6
foo:1m_without_non-existing-label_sum_samples 0
foo:1m_without_non-existing-label_sum_samples_total 3
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 7
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total 6
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples_total{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 2
foo:1m_without_non-existing-label_sum_samples{bar="baz"} 0
foo:5m_by_bar_sum_samples 13
foo:5m_by_bar_sum_samples_total 13
foo:5m_by_bar_sum_samples_total{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
`, `
- interval: 1m
staleness_interval: 1m
outputs: [ sum_samples, sum_samples_total]
- interval: 1m
staleness_interval: 2m
without: [non-existing-label]
outputs: [ sum_samples, sum_samples_total]
outputs: [count_series, sum_samples]
- interval: 5m
by: [bar]
outputs: [sum_samples, sum_samples_total]
`, "11111")
outputs: [sum_samples]
`, "111")
// min and max outputs
f([]string{`
@@ -789,39 +688,30 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg with different staleness intervals
// test rate_sum and rate_avg, when two aggregation intervals are empty
f([]string{`
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
foo{abc="456", cde="1"} 3
foo{abc="456", cde="1"} 4 1
foo{abc="777", cde="1"} 5
foo{abc="777", cde="1"} 6 1
`, ``, `
foo{abc="123", cde="1"} 121
foo{abc="123", cde="1"} 122 1
foo{abc="456", cde="1"} 123
foo{abc="456", cde="1"} 124 1
foo{abc="777", cde="1"} 125
foo{abc="777", cde="1"} 126 1
foo{abc="456", cde="1"} 7
foo{abc="456", cde="1"} 8 1
foo{abc="777", cde="1"} 8
foo{abc="777", cde="1"} 9 1
`, ``, ``, `
foo{abc="123", cde="1"} 19
foo{abc="123", cde="1"} 20 1
foo{abc="456", cde="1"} 26
foo{abc="456", cde="1"} 27 1
foo{abc="777", cde="1"} 27
foo{abc="777", cde="1"} 28 1
`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_avg{cde="1"} 1
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_by_cde_rate_sum{cde="1"} 3
foo:1m_without_abc_rate_avg{cde="1"} 1
foo:1m_without_abc_rate_avg{cde="1"} 1
foo:1m_without_abc_rate_sum{cde="1"} 3
foo:1m_without_abc_rate_sum{cde="1"} 3
`, `
- interval: 1m
by: [cde]
outputs: [rate_sum, rate_avg]
enable_windows: true
- interval: 1m
staleness_interval: 2m
without: [abc]
outputs: [rate_sum, rate_avg]
enable_windows: true
`, "111111111111")
// rate_sum and rate_avg with duplicated events

View File

@@ -252,15 +252,11 @@ func TestAggregatorsEqual(t *testing.T) {
}
func timeSeriessToString(tss []prompb.TimeSeries) string {
sorted := make([]prompb.TimeSeries, len(tss))
copy(sorted, tss)
sort.SliceStable(sorted, func(i, j int) bool {
return promrelabel.LabelsToString(sorted[i].Labels) < promrelabel.LabelsToString(sorted[j].Labels)
})
a := make([]string, len(sorted))
for i, ts := range sorted {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}

View File

@@ -27,7 +27,6 @@ var benchOutputs = []string{
"stddev",
"stdvar",
"sum_samples",
"sum_samples_total",
"total",
"total_prometheus",
"unique_samples",

View File

@@ -1,44 +1,27 @@
package streamaggr
import (
"math"
)
type sumSamplesAggrValue struct {
sum float64
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if math.Abs(av.sum) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.sum = 0
}
av.sum += sample.value
}
func (av *sumSamplesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
ac := c.(*sumSamplesAggrConfig)
if ac.resetTotalOnFlush {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
return
}
ctx.appendSeries(key, "sum_samples_total", av.sum)
func (av *sumSamplesAggrValue) flush(_ aggrConfig, ctx *flushCtx, key string, _ bool) {
ctx.appendSeries(key, "sum_samples", av.sum)
av.sum = 0
}
func (*sumSamplesAggrValue) state() any {
return nil
}
func newSumSamplesAggrConfig(resetTotalOnFlush bool) aggrConfig {
return &sumSamplesAggrConfig{
resetTotalOnFlush: resetTotalOnFlush,
}
func newSumSamplesAggrConfig() aggrConfig {
return &sumSamplesAggrConfig{}
}
type sumSamplesAggrConfig struct {
resetTotalOnFlush bool
}
type sumSamplesAggrConfig struct{}
func (*sumSamplesAggrConfig) getValue(_ any) aggrValue {
return &sumSamplesAggrValue{}

View File

@@ -31,11 +31,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
// The last value is stale, reset it.
if ok && lv.deleteDeadline < int64(currentTime)*1000 {
ok = false
}
if ok {
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
@@ -47,8 +43,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
av.total += sample.value
ac.counterResetsTotal.Inc()
}
} else if keepFirstSample {
av.total += sample.value
}
lv.value = sample.value
lv.timestamp = sample.timestamp

View File

@@ -7,7 +7,6 @@ import (
"io/ioutil"
"log"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
@@ -405,113 +404,16 @@ func readPSITotals(cgroupPath, statsName string) (uint64, uint64, error) {
}
func getCgroupV2Path() string {
cgroupData, err := os.ReadFile("/proc/self/cgroup")
data, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
return ""
}
// Read /proc/self/mountinfo with a timeout. Generating the mountinfo contents
// can block in the kernel when a backing filesystem (e.g. a hung NFS or FUSE
// mount) is unresponsive. Since this runs at program init via psiMetricsStart,
// a blocking read would hang startup, so fall back to disabling PSI metrics instead.
mountinfoData, _ := readFileWithTimeout("/proc/self/mountinfo", time.Second)
return getCgroupV2PathInternal(string(cgroupData), mountinfoData)
}
// readFileWithTimeout reads the file at path, returning ("", false) if the read
// doesn't complete within timeout.
//
// A timed-out read leaks the reading goroutine until the read eventually unblocks
// (if ever). This is an acceptable safeguard against a read of a pseudo-file such
// as /proc/self/mountinfo hanging on an unresponsive mount.
func readFileWithTimeout(path string, timeout time.Duration) (string, bool) {
type result struct {
data []byte
err error
}
// The channel is buffered so the goroutine can always send and exit,
// even after this function has returned on timeout.
ch := make(chan result, 1)
go func() {
data, err := os.ReadFile(path)
ch <- result{data: data, err: err}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case r := <-ch:
if r.err != nil {
return "", false
}
return string(r.data), true
case <-timer.C:
return "", false
}
}
func getCgroupV2PathInternal(cgroupData, mountinfoData string) string {
rel := getCgroupV2RelativePath(cgroupData)
if rel == "" {
// The process doesn't run under cgroup v2.
tmp := strings.SplitN(string(data), "::", 2)
if len(tmp) != 2 {
return ""
}
path := "/sys/fs/cgroup" + strings.TrimSpace(tmp[1])
// Determine the actual cgroup v2 mountpoint instead of assuming /sys/fs/cgroup.
// On systems with a hybrid cgroup hierarchy the unified cgroup v2 is mounted
// at a different location such as /sys/fs/cgroup/unified.
// See https://github.com/VictoriaMetrics/metrics/issues/127
mountpoint := getCgroupV2Mountpoint(mountinfoData)
if mountpoint == "" {
// fallback to assumed path
mountpoint = "/sys/fs/cgroup"
}
cgroupPath := path.Join(mountpoint, rel)
// Drop trailing slash if it exists. This prevents from '//' in the constructed paths by the caller.
return strings.TrimSuffix(cgroupPath, "/")
}
// getCgroupV2RelativePath returns the cgroup v2 path of the process relative to
// the cgroup v2 mountpoint, or an empty string if the process doesn't run under cgroup v2.
//
// The cgroup v2 entry in /proc/self/cgroup has an empty controllers field, e.g. "0::/the/path".
// See https://man7.org/linux/man-pages/man7/cgroups.7.html
func getCgroupV2RelativePath(cgroupData string) string {
for _, line := range strings.Split(cgroupData, "\n") {
// Each line has the form "hierarchy-ID:controller-list:cgroup-path".
// The cgroup v2 line has an empty hierarchy-ID and controller-list, i.e. it starts with "0::".
tmp := strings.SplitN(line, "::", 2)
if len(tmp) == 2 && strings.HasPrefix(line, "0::") {
return strings.TrimSpace(tmp[1])
}
}
return ""
}
// getCgroupV2Mountpoint returns the mountpoint of the cgroup v2 (unified) hierarchy
// parsed from the contents of /proc/self/mountinfo, or an empty string if cgroup v2 isn't mounted.
func getCgroupV2Mountpoint(mountinfoData string) string {
for _, line := range strings.Split(mountinfoData, "\n") {
if !strings.Contains(line, "cgroup2") {
// fast path
continue
}
// mountinfo lines have the form:
// 36 35 98:0 / /sys/fs/cgroup/unified rw,... - cgroup2 cgroup2 rw,...
// The optional fields preceding the filesystem type are terminated by " - ".
// See https://man7.org/linux/man-pages/man5/proc_pid_mountinfo.5.html
tmp := strings.SplitN(line, " - ", 2)
if len(tmp) != 2 {
continue
}
after := strings.Fields(tmp[1])
if len(after) < 1 || after[0] != "cgroup2" {
continue
}
before := strings.Fields(tmp[0])
if len(before) < 5 {
continue
}
// before[4] is the mount point.
return before[4]
}
return ""
// Drop trailing slash if it exsits. This prevents from '//' in the constructed paths by the caller.
return strings.TrimSuffix(path, "/")
}

View File

@@ -74,6 +74,7 @@ var transformFuncs = map[string]bool{
"rand": true,
"rand_exponential": true,
"rand_normal": true,
"range": true,
"range_avg": true,
"range_first": true,
"range_last": true,

16
vendor/github.com/axiomhq/hyperloglog/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,16 @@
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
/vendor

41
vendor/github.com/axiomhq/hyperloglog/Contributing.md generated vendored Normal file
View File

@@ -0,0 +1,41 @@
## How to Contribute
👍🎉 First of all, thank you for your interest in Axiom-node! We'd love to accept your patches and contributions! 🎉👍
This project accepts contributions. In order to contribute, you should pay attention to a few guidelines:
## Reporting Issues
Bugs, feature requests, and development-related questions should be directed to our [GitHub issue tracker](https://github.com/axiomhq/hyperloglog/issues).
When reporting a bug, please try and provide as much context as possible such as your operating system, Go version and anything else that might be relevant to the bug. For feature requests, please explain what you're trying to do and how the requested feature would help you do that.
## Setup
[Fork](https://github.com/axiomhq/hyperloglog.git), then clone this repository:
```
git clone https://github.com/axiomhq/hyperloglog.git
cd hyperloglog
cd demo
go run hyperloglog_demo.go
```
## Submitting Modifications
1. It's generally best to start by opening a new issue describing the bug or feature you're intending to fix. Even if you think it's relatively minor, it's helpful to know what people are working on. Mention in the initial issue that you are planning to work on that bug or feature so that it can be assigned to you.
2. Follow the normal process of [forking](https://docs.github.com/en/free-pro-team@latest/github/getting-started-with-github/fork-a-repo) the project, and setup a new branch to work in. It's important that each group of changes be done in separate branches in order to ensure that a pull request only includes the commits related to that bug or feature.
3. Go makes it very simple to ensure properly formatted code, so always run `go fmt` on your code before committing it.
4. Do your best to have [well-formated commit messages](https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html)
for each change. This provides consistency throughout the project and ensures that commit messages are able to be formatted properly by various git tools.
5. Finally, push the commits to your fork and submit a [pull request](https://docs.github.com/en/free-pro-team@latest/github/collaborating-with-issues-and-pull-requests/creating-a-pull-request)
### Once you've filed the PR:
- One or more maintainers will use GitHub's review feature to review your PR.
- If the maintainer asks for any changes, edit your changes, push, and ask for another review.
- If the maintainer decides to suggest some improvements or alternatives, modify and make improvements. Once your changes are approved, one of the project maintainers will merge them.

19
vendor/github.com/axiomhq/hyperloglog/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,19 @@
Copyright (c) 2021, Axiom, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

51
vendor/github.com/axiomhq/hyperloglog/README.md generated vendored Normal file
View File

@@ -0,0 +1,51 @@
# HyperLogLog - an algorithm for approximating the number of distinct elements
[![GoDoc](https://godoc.org/github.com/axiomhq/hyperloglog?status.svg)](https://godoc.org/github.com/axiomhq/hyperloglog) [![Go Report Card](https://goreportcard.com/badge/github.com/axiomhq/hyperloglog)](https://goreportcard.com/report/github.com/axiomhq/hyperloglog) [![CircleCI](https://circleci.com/gh/axiomhq/hyperloglog/tree/master.svg?style=svg)](https://circleci.com/gh/axiomhq/hyperloglog/tree/master)
An improved version of [HyperLogLog](https://en.wikipedia.org/wiki/HyperLogLog) for the count-distinct problem, approximating the number of distinct elements in a multiset. This implementation offers enhanced performance, flexibility, and simplicity while maintaining accuracy.
## Note on Implementation History
The initial version of this work (tagged as v0.1.0) was based on ["Better with fewer bits: Improving the performance of cardinality estimation of large data streams - Qingjun Xiao, You Zhou, Shigang Chen"](https://www.cise.ufl.edu/~sgchen/Publications/XZC17.pdf). However, the current implementation has evolved significantly from this original basis, notably moving away from the tailcut method.
## Current Implementation
The current implementation is based on the LogLog-Beta algorithm, as described in:
["LogLog-Beta and More: A New Algorithm for Cardinality Estimation Based on LogLog Counting"](https://arxiv.org/pdf/1612.02284) by Jason Qin, Denys Kim, and Yumei Tung (2016).
Key features of the current implementation:
* **Metro hash** used instead of xxhash
* **Sparse representation** for lower cardinalities (like HyperLogLog++)
* **LogLog-Beta** for dynamic bias correction across all cardinalities
* **8-bit registers** for convenience and simplified implementation
* **Order-independent insertions and merging** for consistent results regardless of data input order
* **Removal of tailcut method** for a more straightforward approach
* **Flexible precision** allowing for 2^4 to 2^18 registers
This implementation is now more straightforward, efficient, and flexible, while remaining backwards compatible with previous versions. It provides a balance between precision, memory usage, speed, and ease of use.
## Precision and Memory Usage
This implementation allows for creating HyperLogLog sketches with arbitrary precision between 2^4 and 2^18 registers. The memory usage scales with the number of registers:
* Minimum (2^4 registers): 16 bytes
* Default (2^14 registers): 16 KB
* Maximum (2^18 registers): 256 KB
Users can choose the precision that best fits their use case, balancing memory usage against estimation accuracy.
## Note
A big thank you to Prof. Shigang Chen and his team at the University of Florida who are actively conducting research around "Big Network Data".
## Contributing
Kindly check our [contributing guide](https://github.com/axiomhq/hyperloglog/blob/main/Contributing.md) on how to propose bugfixes and improvements, and submitting pull requests to the project
## License
&copy; Axiom, Inc., 2024
Distributed under MIT License (`The MIT License`).
See [LICENSE](LICENSE) for more information.

273
vendor/github.com/axiomhq/hyperloglog/beta.go generated vendored Normal file
View File

@@ -0,0 +1,273 @@
package hyperloglog
import (
"fmt"
"math"
)
var betaMap = map[uint8]func(float64) float64{
4: beta4,
5: beta5,
6: beta6,
7: beta7,
8: beta8,
9: beta9,
10: beta10,
11: beta11,
12: beta12,
13: beta13,
14: beta14,
15: beta15,
16: beta16,
17: beta17,
18: beta18,
}
func beta(p uint8, ez float64) float64 {
f, ok := betaMap[p]
if !ok {
panic(fmt.Sprintf("invalid precision %d", p))
}
return f(ez)
}
/*
p=4
[-0.582581413904517,-1.935300357560050,11.07932375 8035073,-22.131357446444323,22.505391846630037,-12 .000723834917984,3.220579408194167,-0.342225302271 235]
*/
func beta4(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.582581413904517*ez +
-1.935300357560050*zl +
11.079323758035073*math.Pow(zl, 2) +
-22.131357446444323*math.Pow(zl, 3) +
22.505391846630037*math.Pow(zl, 4) +
-12.000723834917984*math.Pow(zl, 5) +
3.220579408194167*math.Pow(zl, 6) +
-0.342225302271235*math.Pow(zl, 7)
}
/*
p=5
[-0.7518999460733967,-0.9590030077748760,5.5997371 322141607,-8.2097636999765520,6.5091254894472037,- 2.6830293734323729,0.5612891113138221,-0.046333162 2196545]
*/
func beta5(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.7518999460733967*ez +
-0.9590030077748760*zl +
5.5997371322141607*math.Pow(zl, 2) +
-8.2097636999765520*math.Pow(zl, 3) +
6.5091254894472037*math.Pow(zl, 4) +
-2.6830293734323729*math.Pow(zl, 5) +
0.5612891113138221*math.Pow(zl, 6) +
-0.0463331622196545*math.Pow(zl, 7)
}
/*
p=6
[29.8257900969619634,-31.3287083337725925,-10.5942 523036582283,-11.5720125689099618,3.81887543739074 92,-2.4160130328530811,0.4542208940970826,-0.05751 55452020420]
*/
func beta6(ez float64) float64 {
zl := math.Log(ez + 1)
return 29.8257900969619634*ez +
-31.3287083337725925*zl +
-10.5942523036582283*math.Pow(zl, 2) +
-11.5720125689099618*math.Pow(zl, 3) +
3.8188754373907492*math.Pow(zl, 4) +
-2.4160130328530811*math.Pow(zl, 5) +
0.4542208940970826*math.Pow(zl, 6) +
-0.0575155452020420*math.Pow(zl, 7)
}
/*
p=7
[2.8102921290820060,-3.9780498518175995,1.31626800 41351582,-3.9252486335805901,2.0080835753946471,-0 .7527151937556955,0.1265569894242751,-0.0109946438726240]
*/
func beta7(ez float64) float64 {
zl := math.Log(ez + 1)
return 2.8102921290820060*ez +
-3.9780498518175995*zl +
1.3162680041351582*math.Pow(zl, 2) +
-3.9252486335805901*math.Pow(zl, 3) +
2.0080835753946471*math.Pow(zl, 4) +
-0.7527151937556955*math.Pow(zl, 5) +
0.1265569894242751*math.Pow(zl, 6) +
-0.0109946438726240*math.Pow(zl, 7)
}
/*
p=8
[1.00633544887550519,-2.00580666405112407,1.643697 49366514117,-2.70560809940566172,1.392099802442225 98,-0.46470374272183190,0.07384282377269775,-0.00578554885254223]
*/
func beta8(ez float64) float64 {
zl := math.Log(ez + 1)
return 1.00633544887550519*ez +
-2.00580666405112407*zl +
1.64369749366514117*math.Pow(zl, 2) +
-2.70560809940566172*math.Pow(zl, 3) +
1.39209980244222598*math.Pow(zl, 4) +
-0.46470374272183190*math.Pow(zl, 5) +
0.07384282377269775*math.Pow(zl, 6) +
-0.00578554885254223*math.Pow(zl, 7)
}
/*
p=9
[-0.09415657458167959,-0.78130975924550528,1.71514 946750712460,-1.73711250406516338,0.86441508489048 924,-0.23819027465047218,0.03343448400269076,-0.00 207858528178157]
*/
func beta9(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.09415657458167959*ez +
-0.78130975924550528*zl +
1.71514946750712460*math.Pow(zl, 2) +
-1.73711250406516338*math.Pow(zl, 3) +
0.86441508489048924*math.Pow(zl, 4) +
-0.23819027465047218*math.Pow(zl, 5) +
0.03343448400269076*math.Pow(zl, 6) +
-0.00207858528178157*math.Pow(zl, 7)
}
/*
p=10
[-0.25935400670790054,-0.52598301999805808,1.48933 034925876839,-1.29642714084993571,0.62284756217221615,-0.15672326770251041,0.02054415903878563,-0.00 112488483925502]
*/
func beta10(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.25935400670790054*ez +
-0.52598301999805808*zl +
1.48933034925876839*math.Pow(zl, 2) +
-1.29642714084993571*math.Pow(zl, 3) +
0.62284756217221615*math.Pow(zl, 4) +
-0.15672326770251041*math.Pow(zl, 5) +
0.02054415903878563*math.Pow(zl, 6) +
-0.00112488483925502*math.Pow(zl, 7)
}
/*
p=11
[-4.32325553856025e-01,-1.08450736399632e-01,6.091 56550741120e-01,-1.65687801845180e-02,-7.958293410 87617e-02,4.71830602102918e-02,-7.81372902346934e- 03,5.84268708489995e-04]
*/
func beta11(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.432325553856025*ez +
-0.108450736399632*zl +
0.609156550741120*math.Pow(zl, 2) +
-0.0165687801845180*math.Pow(zl, 3) +
-0.0795829341087617*math.Pow(zl, 4) +
0.0471830602102918*math.Pow(zl, 5) +
-0.00781372902346934*math.Pow(zl, 6) +
0.000584268708489995*math.Pow(zl, 7)
}
/*
p=12
[-3.84979202588598e-01,1.83162233114364e-01,1.3039 6688841854e-01,7.04838927629266e-02,-8.95893971464 453e-03,1.13010036741605e-02,-1.94285569591290e-03 ,2.25435774024964e-04]
*/
func beta12(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.384979202588598*ez +
0.183162233114364*zl +
0.130396688841854*math.Pow(zl, 2) +
0.0704838927629266*math.Pow(zl, 3) +
-0.0089589397146453*math.Pow(zl, 4) +
0.0113010036741605*math.Pow(zl, 5) +
-0.00194285569591290*math.Pow(zl, 6) +
0.000225435774024964*math.Pow(zl, 7)
}
/*
p=13
[-0.41655270946462997,-0.22146677040685156,0.38862 131236999947,0.45340979746062371,-0.36264738324476 375,0.12304650053558529,-0.01701540384555510,0.001 02750367080838]
*/
func beta13(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.41655270946462997*ez +
-0.22146677040685156*zl +
0.38862131236999947*math.Pow(zl, 2) +
0.45340979746062371*math.Pow(zl, 3) +
-0.36264738324476375*math.Pow(zl, 4) +
0.12304650053558529*math.Pow(zl, 5) +
-0.01701540384555510*math.Pow(zl, 6) +
0.00102750367080838*math.Pow(zl, 7)
}
/*
p=14
[-3.71009760230692e-01,9.78811941207509e-03,1.8579 6293324165e-01,2.03015527328432e-01,-1.16710521803 686e-01,4.31106699492820e-02,-5.99583540511831e-03 ,4.49704299509437e-04]
*/
func beta14(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.371009760230692*ez +
0.00978811941207509*zl +
0.185796293324165*math.Pow(zl, 2) +
0.203015527328432*math.Pow(zl, 3) +
-0.116710521803686*math.Pow(zl, 4) +
0.0431106699492820*math.Pow(zl, 5) +
-0.00599583540511831*math.Pow(zl, 6) +
0.000449704299509437*math.Pow(zl, 7)
}
/*
p=15
[-0.38215145543875273,-0.89069400536090837,0.37602 335774678869,0.99335977440682377,-0.65577441638318 956,0.18332342129703610,-0.02241529633062872,0.001 21399789330194]
*/
func beta15(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.38215145543875273*ez +
-0.89069400536090837*zl +
0.37602335774678869*math.Pow(zl, 2) +
0.99335977440682377*math.Pow(zl, 3) +
-0.65577441638318956*math.Pow(zl, 4) +
0.18332342129703610*math.Pow(zl, 5) +
-0.02241529633062872*math.Pow(zl, 6) +
0.00121399789330194*math.Pow(zl, 7)
}
/*
p=16
[-0.37331876643753059,-1.41704077448122989,0.407291 84796612533,1.56152033906584164,-0.99242233534286128,0.26064681399483092,-0.03053811369682807,0.00155770210179105]
*/
func beta16(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.37331876643753059*ez +
-1.41704077448122989*zl +
0.40729184796612533*math.Pow(zl, 2) +
1.56152033906584164*math.Pow(zl, 3) +
-0.99242233534286128*math.Pow(zl, 4) +
0.26064681399483092*math.Pow(zl, 5) +
-0.03053811369682807*math.Pow(zl, 6) +
0.00155770210179105*math.Pow(zl, 7)
}
/*
p=17
[-0.36775502299404605,0.53831422351377967,0.769702 89278767923,0.55002583586450560,-0.745755882611469 41,0.25711835785821952,-0.03437902606864149,0.0018 5949146371616]
*/
func beta17(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.36775502299404605*ez +
0.53831422351377967*zl +
0.76970289278767923*math.Pow(zl, 2) +
0.55002583586450560*math.Pow(zl, 3) +
-0.74575588261146941*math.Pow(zl, 4) +
0.25711835785821952*math.Pow(zl, 5) +
-0.03437902606864149*math.Pow(zl, 6) +
0.00185949146371616*math.Pow(zl, 7)
}
/*
p=18
[-0.36479623325960542,0.99730412328635032,1.553543 86230081221,1.25932677198028919,-1.533259482091101 63,0.47801042200056593,-0.05951025172951174,0.0029 1076804642205]
*/
func beta18(ez float64) float64 {
zl := math.Log(ez + 1)
return -0.36479623325960542*ez +
0.99730412328635032*zl +
1.55354386230081221*math.Pow(zl, 2) +
1.25932677198028919*math.Pow(zl, 3) +
-1.53325948209110163*math.Pow(zl, 4) +
0.47801042200056593*math.Pow(zl, 5) +
-0.05951025172951174*math.Pow(zl, 6) +
0.00291076804642205*math.Pow(zl, 7)
}

176
vendor/github.com/axiomhq/hyperloglog/compressed.go generated vendored Normal file
View File

@@ -0,0 +1,176 @@
package hyperloglog
import (
"encoding/binary"
"slices"
)
// Original author of this file is github.com/clarkduvall/hyperloglog
type iterator struct {
i int
last uint32
v *compressedList
}
func (iter *iterator) Next() uint32 {
n, i := iter.v.decode(iter.i, iter.last)
iter.last = n
iter.i = i
return n
}
func (iter *iterator) Peek() (uint32, int) {
return iter.v.decode(iter.i, iter.last)
}
func (iter *iterator) Advance(last uint32, i int) {
iter.last = last
iter.i = i
}
func (iter iterator) HasNext() bool {
return iter.i < iter.v.Len()
}
type compressedList struct {
count uint32
last uint32
b variableLengthList
}
func (v *compressedList) Clone() *compressedList {
if v == nil {
return nil
}
newV := &compressedList{
count: v.count,
last: v.last,
}
newV.b = make(variableLengthList, len(v.b))
copy(newV.b, v.b)
return newV
}
func (v *compressedList) reset() {
if v == nil {
return
}
v.count = 0
v.last = 0
v.b = v.b[:0]
}
func (v *compressedList) AppendBinary(data []byte) ([]byte, error) {
// At least 4 bytes for the two fixed sized values
data = slices.Grow(data, 4+4)
// Marshal the count and last values.
data = append(data,
// Number of items in the list.
byte(v.count>>24),
byte(v.count>>16),
byte(v.count>>8),
byte(v.count),
// The last item in the list.
byte(v.last>>24),
byte(v.last>>16),
byte(v.last>>8),
byte(v.last),
)
// Append the variableLengthList
return v.b.AppendBinary(data)
}
func (v *compressedList) UnmarshalBinary(data []byte) error {
if len(data) < 12 {
return ErrorTooShort
}
// Set the count.
v.count, data = binary.BigEndian.Uint32(data[:4]), data[4:]
// Set the last value.
v.last, data = binary.BigEndian.Uint32(data[:4]), data[4:]
// Set the list.
sz, data := binary.BigEndian.Uint32(data[:4]), data[4:]
v.b = make([]uint8, sz)
if uint32(len(data)) < sz {
return ErrorTooShort
}
for i := uint32(0); i < sz; i++ {
v.b[i] = data[i]
}
return nil
}
func newCompressedList(capacity int) *compressedList {
v := &compressedList{}
v.b = make(variableLengthList, 0, capacity)
return v
}
func (v *compressedList) Len() int {
return len(v.b)
}
func (v *compressedList) decode(i int, last uint32) (uint32, int) {
n, i := v.b.decode(i)
return n + last, i
}
func (v *compressedList) Append(x uint32) {
v.count++
v.b = v.b.Append(x - v.last)
v.last = x
}
func (v *compressedList) Iter() iterator {
return iterator{0, 0, v}
}
type variableLengthList []uint8
func (v variableLengthList) AppendBinary(data []byte) ([]byte, error) {
// 4 bytes for the size of the list, and a byte for each element in the
// list.
data = slices.Grow(data, 4+len(v))
// Length of the list. We only need 32 bits because the size of the set
// couldn't exceed that on 32 bit architectures.
sz := len(v)
data = append(data,
byte(sz>>24),
byte(sz>>16),
byte(sz>>8),
byte(sz),
)
// Marshal each element in the list.
data = append(data, v...)
return data, nil
}
func (v variableLengthList) decode(i int) (uint32, int) {
var x uint32
j := i
for ; v[j]&0x80 != 0; j++ {
x |= uint32(v[j]&0x7f) << (uint(j-i) * 7)
}
x |= uint32(v[j]) << (uint(j-i) * 7)
return x, j + 1
}
func (v variableLengthList) Append(x uint32) variableLengthList {
for x&0xffffff80 != 0 {
v = append(v, uint8((x&0x7f)|0x80))
x >>= 7
}
return append(v, uint8(x&0x7f))
}

439
vendor/github.com/axiomhq/hyperloglog/hyperloglog.go generated vendored Normal file
View File

@@ -0,0 +1,439 @@
package hyperloglog
import (
"encoding/binary"
"errors"
"fmt"
"math"
"slices"
"sync"
)
const (
pp = uint8(25)
mp = uint32(1) << pp
version = 2
)
type Sketch struct {
p uint8
m uint32
alpha float64
tmpSet set
sparseList *compressedList
regs []uint8
}
// New returns a HyperLogLog Sketch with 2^14 registers (precision 14)
func New() *Sketch { return New14() }
// New14 returns a HyperLogLog Sketch with 2^14 registers (precision 14)
func New14() *Sketch { return newSketchNoError(14, true) }
// New16 returns a HyperLogLog Sketch with 2^16 registers (precision 16)
func New16() *Sketch { return newSketchNoError(16, true) }
// NewNoSparse returns a HyperLogLog Sketch with 2^14 registers (precision 14) that will not use a sparse representation
func NewNoSparse() *Sketch { return newSketchNoError(14, false) }
// New16NoSparse returns a HyperLogLog Sketch with 2^16 registers (precision 16) that will not use a sparse representation
func New16NoSparse() *Sketch { return newSketchNoError(16, false) }
func newSketchNoError(precision uint8, sparse bool) *Sketch {
sk, _ := NewSketch(precision, sparse)
return sk
}
func NewSketch(precision uint8, sparse bool) (*Sketch, error) {
if precision < 4 || precision > 18 {
return nil, fmt.Errorf("p has to be >= 4 and <= 18")
}
m := uint32(1) << precision
s := &Sketch{
m: m,
p: precision,
alpha: alpha(float64(m)),
}
if sparse {
s.tmpSet = makeSet(0)
s.sparseList = getCompressedList(0)
} else {
s.regs = make([]uint8, m)
}
return s, nil
}
func (sk *Sketch) sparse() bool { return sk.sparseList != nil }
// Clone returns a deep copy of sk.
func (sk *Sketch) Clone() *Sketch {
clone := *sk
clone.regs = append([]uint8(nil), sk.regs...)
clone.tmpSet = sk.tmpSet.Clone()
clone.sparseList = sk.sparseList.Clone()
return &clone
}
func (sk *Sketch) Reset() {
if sk.sparse() {
sk.tmpSet.reset()
sk.sparseList.reset()
return
}
clear(sk.regs)
}
func (sk *Sketch) maybeToNormal() {
if uint32(sk.tmpSet.Len())*100 > sk.m {
sk.mergeSparse()
m := sk.m
if m > 8096 {
m -= m / 10
}
if uint32(sk.sparseList.Len()) > m {
sk.toNormal()
}
}
}
func (sk *Sketch) Merge(other *Sketch) error {
if other == nil {
return nil
}
if sk.p != other.p {
return errors.New("precisions must be equal")
}
if sk.sparse() && other.sparse() {
sk.mergeSparseSketch(other)
} else {
sk.mergeDenseSketch(other)
}
return nil
}
func (sk *Sketch) mergeSparseSketch(other *Sketch) {
sk.tmpSet.Merge(other.tmpSet)
for iter := other.sparseList.Iter(); iter.HasNext(); {
sk.tmpSet.add(iter.Next())
}
sk.maybeToNormal()
}
func (sk *Sketch) mergeDenseSketch(other *Sketch) {
if sk.sparse() {
sk.toNormal()
}
if other.sparse() {
other.tmpSet.ForEach(func(k uint32) {
i, r := decodeHash(k, other.p, pp)
sk.insert(i, r)
})
for iter := other.sparseList.Iter(); iter.HasNext(); {
i, r := decodeHash(iter.Next(), other.p, pp)
sk.insert(i, r)
}
} else {
for i, v := range other.regs {
if v > sk.regs[i] {
sk.regs[i] = v
}
}
}
}
func (sk *Sketch) toNormal() {
if sk.tmpSet.Len() > 0 {
sk.mergeSparse()
}
sk.regs = make([]uint8, sk.m)
for iter := sk.sparseList.Iter(); iter.HasNext(); {
i, r := decodeHash(iter.Next(), sk.p, pp)
sk.insert(i, r)
}
sk.tmpSet = nilSet
putCompressedList(sk.sparseList)
sk.sparseList = nil
}
func (sk *Sketch) insert(i uint32, r uint8) { sk.regs[i] = max(r, sk.regs[i]) }
func (sk *Sketch) Insert(e []byte) { sk.InsertHash(hash(e)) }
func (sk *Sketch) InsertHash(x uint64) {
if sk.sparse() {
if sk.tmpSet.add(encodeHash(x, sk.p, pp)) {
sk.maybeToNormal()
}
return
}
i, r := getPosVal(x, sk.p)
sk.insert(uint32(i), r)
}
func (sk *Sketch) Estimate() uint64 {
if sk.sparse() {
sk.mergeSparse()
return uint64(linearCount(mp, mp-sk.sparseList.count))
}
sum, ez := sumAndZeros(sk.regs)
m := float64(sk.m)
est := sk.alpha * m * (m - ez) / (sum + beta(sk.p, ez))
return uint64(est + 0.5)
}
var compressedListPools = newCompressedListPools()
func newCompressedListPools() [8]*sync.Pool {
pools := [8]*sync.Pool{}
for i := 0; i < len(pools); i++ {
pools[i] = &sync.Pool{}
}
return pools
}
func getCompressedList(requestedCapacity int) *compressedList {
var pool *sync.Pool
var capacity int
if capacity = 256; requestedCapacity < capacity {
pool = compressedListPools[0]
} else if capacity = 512; requestedCapacity < capacity {
pool = compressedListPools[1]
} else if capacity = 1024; requestedCapacity < capacity {
pool = compressedListPools[2]
} else if capacity = 2048; requestedCapacity < capacity {
pool = compressedListPools[3]
} else if capacity = 4096; requestedCapacity < capacity {
pool = compressedListPools[4]
} else if capacity = 8196; requestedCapacity < capacity {
pool = compressedListPools[5]
} else if capacity = 16384; requestedCapacity < capacity {
pool = compressedListPools[6]
} else {
capacity = requestedCapacity
pool = compressedListPools[7]
}
c := pool.Get()
if c == nil {
return newCompressedList(capacity - 1)
}
c1 := c.(*compressedList)
c1.b = slices.Grow(c1.b, capacity-1)
return c1
}
func putCompressedList(c *compressedList) {
c.reset()
capacity := cap(c.b)
if capacity < 256 {
compressedListPools[0].Put(c)
} else if capacity < 512 {
compressedListPools[1].Put(c)
} else if capacity < 1024 {
compressedListPools[2].Put(c)
} else if capacity < 2048 {
compressedListPools[3].Put(c)
} else if capacity < 4096 {
compressedListPools[4].Put(c)
} else if capacity < 8196 {
compressedListPools[5].Put(c)
} else if capacity < 16384 {
compressedListPools[6].Put(c)
} else {
compressedListPools[7].Put(c)
}
}
func (sk *Sketch) mergeSparse() {
if sk.tmpSet.Len() == 0 {
return
}
keys := make([]uint32, 0, sk.tmpSet.Len())
sk.tmpSet.ForEach(func(k uint32) {
keys = append(keys, k)
})
slices.Sort(keys)
newList := getCompressedList(4*sk.tmpSet.Len() + sk.sparseList.Len())
for iter, i := sk.sparseList.Iter(), 0; iter.HasNext() || i < len(keys); {
if !iter.HasNext() {
newList.Append(keys[i])
i++
continue
}
if i >= len(keys) {
newList.Append(iter.Next())
continue
}
x1, adv := iter.Peek()
x2 := keys[i]
if x1 == x2 {
newList.Append(x1)
iter.Advance(x1, adv)
i++
} else if x1 > x2 {
newList.Append(x2)
i++
} else {
newList.Append(x1)
iter.Advance(x1, adv)
}
}
putCompressedList(sk.sparseList)
sk.sparseList = newList
sk.tmpSet.m.Clear()
}
// MarshalBinary implements the encoding.BinaryMarshaler interface.
//
// When the result will be appended to another buffer, consider using
// AppendBinary to avoid additional allocations and copying.
func (sk *Sketch) MarshalBinary() (data []byte, err error) {
return sk.AppendBinary(nil)
}
// AppendBinary implements the encoding.BinaryAppender interface.
func (sk *Sketch) AppendBinary(data []byte) ([]byte, error) {
data = slices.Grow(data, 8+len(sk.regs))
// Marshal a version marker.
data = append(data, version)
// Marshal p.
data = append(data, sk.p)
// Marshal b
data = append(data, 0)
if sk.sparse() {
// It's using the sparse Sketch.
data = append(data, byte(1))
// Add the tmp_set
data, err := sk.tmpSet.AppendBinary(data)
if err != nil {
return nil, err
}
// Add the sparse Sketch
return sk.sparseList.AppendBinary(data)
}
// It's using the dense Sketch.
data = append(data, byte(0))
// Add the dense sketch Sketch.
sz := len(sk.regs)
data = append(data,
byte(sz>>24),
byte(sz>>16),
byte(sz>>8),
byte(sz),
)
// Marshal each element in the list.
for _, v := range sk.regs {
data = append(data, byte(v))
}
return data, nil
}
// ErrorTooShort is an error that UnmarshalBinary try to parse too short
// binary.
var ErrorTooShort = errors.New("too short binary")
// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
func (sk *Sketch) UnmarshalBinary(data []byte) error {
if len(data) < 8 {
return ErrorTooShort
}
// Unmarshal version. We may need this in the future if we make
// non-compatible changes.
v := data[0]
// Unmarshal p.
p := data[1]
// Unmarshal b.
b := data[2]
// Determine if we need a sparse Sketch
sparse := data[3] == byte(1)
// Make a newSketch Sketch if the precision doesn't match or if the Sketch was used
if sk.p != p || sk.regs != nil || sk.tmpSet.Len() > 0 || (sk.sparseList != nil && sk.sparseList.Len() > 0) {
newh, err := NewSketch(p, sparse)
if err != nil {
return err
}
*sk = *newh
}
// h is now initialised with the correct p. We just need to fill the
// rest of the details out.
if sparse {
// Using the sparse Sketch.
// Unmarshal the tmp_set.
tssz := binary.BigEndian.Uint32(data[4:8])
sk.tmpSet = makeSet(int(tssz))
// We need to unmarshal tssz values in total, and each value requires us
// to read 4 bytes.
tsLastByte := int((tssz * 4) + 8)
for i := 8; i < tsLastByte; i += 4 {
k := binary.BigEndian.Uint32(data[i : i+4])
sk.tmpSet.add(k)
}
// Unmarshal the sparse Sketch.
return sk.sparseList.UnmarshalBinary(data[tsLastByte:])
}
// Using the dense Sketch.
sk.sparseList = nil
sk.tmpSet = nilSet
if v == 1 {
return sk.unmarshalBinaryV1(data[8:], b)
}
return sk.unmarshalBinaryV2(data)
}
func sumAndZeros(regs []uint8) (res, ez float64) {
for _, v := range regs {
if v == 0 {
ez++
}
res += 1.0 / math.Pow(2.0, float64(v))
}
return res, ez
}
func (sk *Sketch) unmarshalBinaryV1(data []byte, b uint8) error {
sk.regs = make([]uint8, len(data)*2)
for i, v := range data {
sk.regs[i*2] = uint8((v >> 4)) + b
sk.regs[i*2+1] = uint8((v<<4)>>4) + b
}
return nil
}
func (sk *Sketch) unmarshalBinaryV2(data []byte) error {
sk.regs = data[8:]
return nil
}

118
vendor/github.com/axiomhq/hyperloglog/sparse.go generated vendored Normal file
View File

@@ -0,0 +1,118 @@
package hyperloglog
import (
"math/bits"
"slices"
"github.com/kamstrup/intmap"
)
func getIndex(k uint32, p, pp uint8) uint32 {
if k&1 == 1 {
return bextr32(k, 32-p, p)
}
return bextr32(k, pp-p+1, p)
}
// Encode a hash to be used in the sparse representation.
func encodeHash(x uint64, p, pp uint8) uint32 {
idx := uint32(bextr(x, 64-pp, pp))
if bextr(x, 64-pp, pp-p) == 0 {
zeros := bits.LeadingZeros64((bextr(x, 0, 64-pp)<<pp)|(1<<pp-1)) + 1
return idx<<7 | uint32(zeros<<1) | 1
}
return idx << 1
}
// Decode a hash from the sparse representation.
func decodeHash(k uint32, p, pp uint8) (uint32, uint8) {
var r uint8
if k&1 == 1 {
r = uint8(bextr32(k, 1, 6)) + pp - p
} else {
// We can use the 64bit clz implementation and reduce the result
// by 32 to get a clz for a 32bit word.
r = uint8(bits.LeadingZeros64(uint64(k<<(32-pp+p-1))) - 31) // -32 + 1
}
return getIndex(k, p, pp), r
}
type set struct {
m *intmap.Set[uint32]
}
func (s set) reset() {
if s.m != nil {
s.m.Clear()
}
}
var nilSet set
func makeSet(size int) set {
return set{m: intmap.NewSet[uint32](size)}
}
func (s set) ForEach(fn func(v uint32)) {
s.m.ForEach(func(v uint32) bool {
fn(v)
return true
})
}
func (s set) Merge(other set) {
other.m.ForEach(func(v uint32) bool {
s.m.Add(v)
return true
})
}
func (s set) Len() int {
return s.m.Len()
}
func (s set) add(v uint32) bool {
return s.m.Add(v)
}
func (s set) Clone() set {
if s == nilSet {
return nilSet
}
newS := intmap.NewSet[uint32](s.m.Len())
s.m.ForEach(func(v uint32) bool {
newS.Add(v)
return true
})
return set{m: newS}
}
func (s *set) AppendBinary(data []byte) ([]byte, error) {
// 4 bytes for the size of the set, and 4 bytes for each key.
// list.
data = slices.Grow(data, 4+(4*s.m.Len()))
// Length of the set. We only need 32 bits because the size of the set
// couldn't exceed that on 32 bit architectures.
sl := s.m.Len()
data = append(data,
byte(sl>>24),
byte(sl>>16),
byte(sl>>8),
byte(sl),
)
// Marshal each element in the set.
s.m.ForEach(func(k uint32) bool {
data = append(data,
byte(k>>24),
byte(k>>16),
byte(k>>8),
byte(k),
)
return true
})
return data, nil
}

Some files were not shown because too many files have changed in this diff Show More