mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
85 Commits
logsql-ski
...
pmm-6401-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ceea4d312 | ||
|
|
63c88d8ea2 | ||
|
|
dc6636e2b2 | ||
|
|
c13f1d99e0 | ||
|
|
079888f719 | ||
|
|
b68264b4f5 | ||
|
|
aed049f660 | ||
|
|
7fcc0a1ef0 | ||
|
|
48951073c4 | ||
|
|
d0dfcb72b4 | ||
|
|
4cf7a55808 | ||
|
|
d72fc60108 | ||
|
|
0b92e18047 | ||
|
|
aa8ea16160 | ||
|
|
f5e70f0ab9 | ||
|
|
9e10d5083e | ||
|
|
30c2d75815 | ||
|
|
0e80f3f45a | ||
|
|
6e3cbae0b3 | ||
|
|
a5583ddaff | ||
|
|
5db9e82e54 | ||
|
|
80676cf1fd | ||
|
|
ba4c49dde6 | ||
|
|
35e5e8ff1e | ||
|
|
4cdbc4642d | ||
|
|
23c0fb1efc | ||
|
|
441d3e4b3f | ||
|
|
a0ea5777f0 | ||
|
|
fb006fc6c0 | ||
|
|
8593358965 | ||
|
|
d0311b7fe5 | ||
|
|
4edd38a906 | ||
|
|
56054f4eb7 | ||
|
|
0ff0787797 | ||
|
|
f9c706e186 | ||
|
|
d74d22460c | ||
|
|
d1193c87a8 | ||
|
|
4f311e5827 | ||
|
|
142e6b6ecf | ||
|
|
1b4ef473b9 | ||
|
|
8beb1f9519 | ||
|
|
501fd8efd9 | ||
|
|
45f2ba2572 | ||
|
|
cb2342029e | ||
|
|
ff0088ceec | ||
|
|
afe6d2e736 | ||
|
|
e1a6262302 | ||
|
|
f000a10cd0 | ||
|
|
4aee6ef4c0 | ||
|
|
f4dfacd493 | ||
|
|
fb2d4e56ce | ||
|
|
36b748dfc7 | ||
|
|
c625dc5b96 | ||
|
|
e32620afa1 | ||
|
|
3f298272a8 | ||
|
|
7a473798b7 | ||
|
|
00ce906d97 | ||
|
|
41c9565aa1 | ||
|
|
56303aee5b | ||
|
|
8d8e2ccf5f | ||
|
|
8772cb617c | ||
|
|
65fbfc5cbc | ||
|
|
1b389674c0 | ||
|
|
98529e16ee | ||
|
|
1b112405a8 | ||
|
|
8bbc83e85e | ||
|
|
8349140744 | ||
|
|
4dc13754d8 | ||
|
|
83b7eb8ca6 | ||
|
|
e5ef3288dd | ||
|
|
e7f2907138 | ||
|
|
757c5cfbe0 | ||
|
|
317ddb84b9 | ||
|
|
2b1d0510fa | ||
|
|
40d2f6fee4 | ||
|
|
9fbb84d5c2 | ||
|
|
bdaa9a91f3 | ||
|
|
1a91da35be | ||
|
|
f85be226bb | ||
|
|
8df5a3c5f6 | ||
|
|
9d3eb3f4b8 | ||
|
|
2cd48959d4 | ||
|
|
8fc8874db4 | ||
|
|
ff1cbb524e | ||
|
|
a70df4bd83 |
@@ -33,6 +33,12 @@ var (
|
||||
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
|
||||
)
|
||||
|
||||
// custom api help links [["/api","doc"]] without http.pathPrefix.
|
||||
var customAPIPathList = [][]string{
|
||||
{"/graph/explore", "explore metrics grafana page"},
|
||||
{"/graph/d/prometheus-advanced/advanced-data-exploration", "PMM grafana dashboard"},
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Write flags and help message to stdout, since it is easier to grep or pipe.
|
||||
flag.CommandLine.SetOutput(os.Stdout)
|
||||
@@ -116,6 +122,10 @@ func writeAPIHelp(w io.Writer, pathList [][]string) {
|
||||
p = path.Join(pathPrefix, p)
|
||||
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
|
||||
}
|
||||
for _, p := range customAPIPathList {
|
||||
p, doc := p[0], p[1]
|
||||
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
|
||||
}
|
||||
}
|
||||
|
||||
func usage() {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
@@ -184,6 +185,12 @@ var gomaxprocs = cgroup.AvailableCPUs()
|
||||
type packedTimeseries struct {
|
||||
metricName string
|
||||
brs []blockRef
|
||||
pd *promData
|
||||
}
|
||||
|
||||
type promData struct {
|
||||
values []float64
|
||||
timestamps []int64
|
||||
}
|
||||
|
||||
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
|
||||
@@ -319,10 +326,45 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
||||
if firstErr != nil {
|
||||
return firstErr
|
||||
}
|
||||
if pts.pd != nil {
|
||||
// Add data from Prometheus to dst.
|
||||
// It usually has smaller timestamps than the data from sbs, so put it first.
|
||||
//
|
||||
// There is no need in check for fetchData, since this is already checked when initializing pts.pd.
|
||||
dst.Values = append(dst.Values, pts.pd.values...)
|
||||
dst.Timestamps = append(dst.Timestamps, pts.pd.timestamps...)
|
||||
}
|
||||
mergeSortBlocks(dst, sbs)
|
||||
if pts.pd != nil {
|
||||
if !sort.IsSorted(dst) {
|
||||
sort.Sort(dst)
|
||||
}
|
||||
pts.pd = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sort.Interface implementation for Result
|
||||
|
||||
// Len implements sort.Interface
|
||||
func (r *Result) Len() int {
|
||||
return len(r.Timestamps)
|
||||
}
|
||||
|
||||
// Less implements sort.Interface
|
||||
func (r *Result) Less(i, j int) bool {
|
||||
timestamps := r.Timestamps
|
||||
return timestamps[i] < timestamps[j]
|
||||
}
|
||||
|
||||
// Swap implements sort.Interface
|
||||
func (r *Result) Swap(i, j int) {
|
||||
timestamps := r.Timestamps
|
||||
values := r.Values
|
||||
timestamps[i], timestamps[j] = timestamps[j], timestamps[i]
|
||||
values[i], values[j] = values[j], values[i]
|
||||
}
|
||||
|
||||
func getSortBlock() *sortBlock {
|
||||
v := sbPool.Get()
|
||||
if v == nil {
|
||||
@@ -473,6 +515,14 @@ func GetLabelsOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) (
|
||||
labels[i] = "__name__"
|
||||
}
|
||||
}
|
||||
|
||||
// Merge labels obtained from Prometheus storage.
|
||||
promLabels, err := promdb.GetLabelNamesOnTimeRange(tr, deadline)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain labels from Prometheus storage: %w", err)
|
||||
}
|
||||
labels = mergeStrings(labels, promLabels)
|
||||
|
||||
// Sort labels like Prometheus does
|
||||
sort.Strings(labels)
|
||||
return labels, nil
|
||||
@@ -538,11 +588,40 @@ func GetLabels(deadline searchutils.Deadline) ([]string, error) {
|
||||
labels[i] = "__name__"
|
||||
}
|
||||
}
|
||||
|
||||
// Merge labels obtained from Prometheus storage.
|
||||
promLabels, err := promdb.GetLabelNames(deadline)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain labels from Prometheus storage: %w", err)
|
||||
}
|
||||
labels = mergeStrings(labels, promLabels)
|
||||
|
||||
// Sort labels like Prometheus does
|
||||
sort.Strings(labels)
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
func mergeStrings(a, b []string) []string {
|
||||
if len(a) == 0 {
|
||||
return b
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return a
|
||||
}
|
||||
m := make(map[string]struct{}, len(a)+len(b))
|
||||
for _, s := range a {
|
||||
m[s] = struct{}{}
|
||||
}
|
||||
for _, s := range b {
|
||||
m[s] = struct{}{}
|
||||
}
|
||||
result := make([]string, 0, len(m))
|
||||
for s := range m {
|
||||
result = append(result, s)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
|
||||
// until the given deadline.
|
||||
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
||||
@@ -557,6 +636,14 @@ func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
|
||||
}
|
||||
|
||||
// Merge label values obtained from Prometheus storage.
|
||||
promLabelValues, err := promdb.GetLabelValuesOnTimeRange(labelName, tr, deadline)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain label values on time range for %q from Prometheus storage: %w", labelName, err)
|
||||
}
|
||||
labelValues = mergeStrings(labelValues, promLabelValues)
|
||||
|
||||
// Sort labelValues like Prometheus does
|
||||
sort.Strings(labelValues)
|
||||
return labelValues, nil
|
||||
@@ -600,6 +687,14 @@ func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err)
|
||||
}
|
||||
|
||||
// Merge label values obtained from Prometheus storage.
|
||||
promLabelValues, err := promdb.GetLabelValues(labelName, deadline)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot obtain label values for %q from Prometheus storage: %w", labelName, err)
|
||||
}
|
||||
labelValues = mergeStrings(labelValues, promLabelValues)
|
||||
|
||||
// Sort labelValues like Prometheus does
|
||||
sort.Strings(labelValues)
|
||||
return labelValues, nil
|
||||
@@ -904,6 +999,26 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
|
||||
return nil, fmt.Errorf("cannot finalize temporary file: %w", err)
|
||||
}
|
||||
|
||||
// Fetch data from promdb.
|
||||
pm := make(map[string]*promData)
|
||||
err = promdb.VisitSeries(sq, fetchData, deadline, func(metricName []byte, values []float64, timestamps []int64) {
|
||||
pd := pm[string(metricName)]
|
||||
if pd == nil {
|
||||
if _, ok := m[string(metricName)]; !ok {
|
||||
orderedMetricNames = append(orderedMetricNames, string(metricName))
|
||||
}
|
||||
pd = &promData{}
|
||||
pm[string(metricName)] = pd
|
||||
}
|
||||
pd.values = append(pd.values, values...)
|
||||
pd.timestamps = append(pd.timestamps, timestamps...)
|
||||
})
|
||||
if err != nil {
|
||||
putTmpBlocksFile(tbf)
|
||||
putStorageSearch(sr)
|
||||
return nil, fmt.Errorf("error when searching in Prometheus data: %w", err)
|
||||
}
|
||||
|
||||
var rss Results
|
||||
rss.tr = tr
|
||||
rss.fetchData = fetchData
|
||||
@@ -913,6 +1028,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
|
||||
pts[i] = packedTimeseries{
|
||||
metricName: metricName,
|
||||
brs: m[metricName],
|
||||
pd: pm[metricName],
|
||||
}
|
||||
}
|
||||
rss.packedTimeseries = pts
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/promdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -93,6 +94,8 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
|
||||
logger.Infof("successfully opened storage %q in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
|
||||
*DataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
|
||||
|
||||
promdb.Init(retentionPeriod.Msecs)
|
||||
}
|
||||
|
||||
// Storage is a storage.
|
||||
@@ -223,6 +226,7 @@ func Stop() {
|
||||
logger.Infof("gracefully closing the storage at %s", *DataPath)
|
||||
startTime := time.Now()
|
||||
WG.WaitAndBlock()
|
||||
promdb.MustClose()
|
||||
Storage.MustClose()
|
||||
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
|
||||
|
||||
|
||||
276
app/vmstorage/promdb/promdb.go
Normal file
276
app/vmstorage/promdb/promdb.go
Normal file
@@ -0,0 +1,276 @@
|
||||
package promdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
promstorage "github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
)
|
||||
|
||||
var prometheusDataPath = flag.String("prometheusDataPath", "", "Optional path to readonly historical Prometheus data")
|
||||
|
||||
var prometheusRetentionMsecs int64
|
||||
|
||||
// Init must be called after flag.Parse and before using the package.
|
||||
//
|
||||
// See also MustClose.
|
||||
func Init(retentionMsecs int64) {
|
||||
if promDB != nil {
|
||||
logger.Fatalf("BUG: it looks like MustOpenPromDB is called multiple times without MustClosePromDB call")
|
||||
}
|
||||
prometheusRetentionMsecs = retentionMsecs
|
||||
if *prometheusDataPath == "" {
|
||||
return
|
||||
}
|
||||
l := log.LoggerFunc(func(a ...interface{}) error {
|
||||
logger.Infof("%v", a)
|
||||
return nil
|
||||
})
|
||||
opts := tsdb.DefaultOptions()
|
||||
opts.RetentionDuration = retentionMsecs
|
||||
|
||||
// Set max block duration to 10% of retention period or 31 days
|
||||
// according to https://prometheus.io/docs/prometheus/latest/storage/#compaction
|
||||
maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond)
|
||||
if maxBlockDuration > retentionMsecs/10 {
|
||||
maxBlockDuration = retentionMsecs / 10
|
||||
}
|
||||
if maxBlockDuration < opts.MinBlockDuration {
|
||||
maxBlockDuration = opts.MinBlockDuration
|
||||
}
|
||||
opts.MaxBlockDuration = maxBlockDuration
|
||||
|
||||
// Custom delete function is needed, because Prometheus by default doesn't delete
|
||||
// blocks outside the retention if no new blocks are created with samples with the current timestamps.
|
||||
// See https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116
|
||||
opts.BlocksToDelete = func(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
|
||||
m := make(map[ulid.ULID]struct{})
|
||||
minRetentionTime := time.Now().Unix()*1000 - retentionMsecs
|
||||
for _, block := range blocks {
|
||||
meta := block.Meta()
|
||||
// delete block marked for deletion by compaction code.
|
||||
if meta.Compaction.Deletable {
|
||||
m[meta.ULID] = struct{}{}
|
||||
continue
|
||||
}
|
||||
if block.MaxTime() < minRetentionTime {
|
||||
m[meta.ULID] = struct{}{}
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
pdb, err := tsdb.Open(*prometheusDataPath, l, nil, opts)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open Prometheus data at -prometheusDataPath=%q: %s", *prometheusDataPath, err)
|
||||
}
|
||||
promDB = pdb
|
||||
logger.Infof("successfully opened historical Prometheus data at -prometheusDataPath=%q with retentionMsecs=%d", *prometheusDataPath, retentionMsecs)
|
||||
}
|
||||
|
||||
// MustClose must be called on graceful shutdown.
|
||||
//
|
||||
// Package functionality cannot be used after this call.
|
||||
func MustClose() {
|
||||
if *prometheusDataPath == "" {
|
||||
return
|
||||
}
|
||||
if promDB == nil {
|
||||
logger.Panicf("BUG: it looks like MustClosePromDB is called without MustOpenPromDB call")
|
||||
}
|
||||
if err := promDB.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close promDB: %s", err)
|
||||
}
|
||||
promDB = nil
|
||||
logger.Infof("successfully closed historical Prometheus data at -prometheusDataPath=%q", *prometheusDataPath)
|
||||
}
|
||||
|
||||
var promDB *tsdb.DB
|
||||
|
||||
// GetLabelNamesOnTimeRange returns label names.
|
||||
func GetLabelNamesOnTimeRange(tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
||||
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer mustCloseQuerier(q)
|
||||
|
||||
names, _, err := q.LabelNames()
|
||||
// Make full copy of names, since they cannot be used after q is closed.
|
||||
names = copyStringsWithMemory(names)
|
||||
return names, err
|
||||
}
|
||||
|
||||
// GetLabelNames returns label names.
|
||||
func GetLabelNames(deadline searchutils.Deadline) ([]string, error) {
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: 0,
|
||||
MaxTimestamp: time.Now().UnixNano() / 1e6,
|
||||
}
|
||||
return GetLabelNamesOnTimeRange(tr, deadline)
|
||||
}
|
||||
|
||||
// GetLabelValuesOnTimeRange returns values for the given labelName on the given tr.
|
||||
func GetLabelValuesOnTimeRange(labelName string, tr storage.TimeRange, deadline searchutils.Deadline) ([]string, error) {
|
||||
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
q, err := promDB.Querier(ctx, tr.MinTimestamp, tr.MaxTimestamp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer mustCloseQuerier(q)
|
||||
|
||||
values, _, err := q.LabelValues(labelName)
|
||||
// Make full copy of values, since they cannot be used after q is closed.
|
||||
values = copyStringsWithMemory(values)
|
||||
return values, err
|
||||
}
|
||||
|
||||
// GetLabelValues returns values for the given labelName.
|
||||
func GetLabelValues(labelName string, deadline searchutils.Deadline) ([]string, error) {
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: 0,
|
||||
MaxTimestamp: time.Now().UnixNano() / 1e6,
|
||||
}
|
||||
return GetLabelValuesOnTimeRange(labelName, tr, deadline)
|
||||
}
|
||||
|
||||
func copyStringsWithMemory(a []string) []string {
|
||||
result := make([]string, len(a))
|
||||
for i, s := range a {
|
||||
result[i] = string(append([]byte{}, s...))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// SeriesVisitor is called by VisitSeries for each matching time series.
|
||||
//
|
||||
// The caller shouldn't hold references to metricName, values and timestamps after returning.
|
||||
type SeriesVisitor func(metricName []byte, values []float64, timestamps []int64)
|
||||
|
||||
// VisitSeries calls f for each series found in the pdb.
|
||||
//
|
||||
// If fetchData is false, then empty values and timestamps are passed to f.
|
||||
func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline, f SeriesVisitor) error {
|
||||
if *prometheusDataPath == "" {
|
||||
return nil
|
||||
}
|
||||
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
minTime, maxTime := getSearchTimeRange(sq)
|
||||
q, err := promDB.Querier(ctx, minTime, maxTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mustCloseQuerier(q)
|
||||
var seriesSet []promstorage.SeriesSet
|
||||
for _, tf := range sq.TagFilterss {
|
||||
ms, err := convertTagFiltersToMatchers(tf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot convert tag filters to matchers: %w", err)
|
||||
}
|
||||
s := q.Select(false, nil, ms...)
|
||||
seriesSet = append(seriesSet, s)
|
||||
}
|
||||
ss := promstorage.NewMergeSeriesSet(seriesSet, promstorage.ChainedSeriesMerge)
|
||||
var (
|
||||
mn storage.MetricName
|
||||
metricName []byte
|
||||
values []float64
|
||||
timestamps []int64
|
||||
)
|
||||
for ss.Next() {
|
||||
s := ss.At()
|
||||
convertPromLabelsToMetricName(&mn, s.Labels())
|
||||
metricName = mn.SortAndMarshal(metricName[:0])
|
||||
values = values[:0]
|
||||
timestamps = timestamps[:0]
|
||||
if fetchData {
|
||||
it := s.Iterator()
|
||||
for it.Next() {
|
||||
ts, v := it.At()
|
||||
values = append(values, v)
|
||||
timestamps = append(timestamps, ts)
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return fmt.Errorf("error when iterating Prometheus series: %w", err)
|
||||
}
|
||||
}
|
||||
f(metricName, values, timestamps)
|
||||
}
|
||||
return ss.Err()
|
||||
}
|
||||
|
||||
func getSearchTimeRange(sq *storage.SearchQuery) (int64, int64) {
|
||||
maxTime := sq.MaxTimestamp
|
||||
minTime := sq.MinTimestamp
|
||||
minRetentionTime := time.Now().Unix()*1000 - prometheusRetentionMsecs
|
||||
if maxTime < minRetentionTime {
|
||||
maxTime = minRetentionTime
|
||||
}
|
||||
if minTime < minRetentionTime {
|
||||
minTime = minRetentionTime
|
||||
}
|
||||
return minTime, maxTime
|
||||
}
|
||||
|
||||
func convertPromLabelsToMetricName(dst *storage.MetricName, labels []labels.Label) {
|
||||
dst.Reset()
|
||||
for _, label := range labels {
|
||||
if label.Name == "__name__" {
|
||||
dst.MetricGroup = append(dst.MetricGroup[:0], label.Value...)
|
||||
} else {
|
||||
dst.AddTag(label.Name, label.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convertTagFiltersToMatchers(tfs []storage.TagFilter) ([]*labels.Matcher, error) {
|
||||
ms := make([]*labels.Matcher, 0, len(tfs))
|
||||
for _, tf := range tfs {
|
||||
var mt labels.MatchType
|
||||
if tf.IsNegative {
|
||||
if tf.IsRegexp {
|
||||
mt = labels.MatchNotRegexp
|
||||
} else {
|
||||
mt = labels.MatchNotEqual
|
||||
}
|
||||
} else {
|
||||
if tf.IsRegexp {
|
||||
mt = labels.MatchRegexp
|
||||
} else {
|
||||
mt = labels.MatchEqual
|
||||
}
|
||||
}
|
||||
key := string(tf.Key)
|
||||
if key == "" {
|
||||
key = "__name__"
|
||||
}
|
||||
value := string(tf.Value)
|
||||
m, err := labels.NewMatcher(mt, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func mustCloseQuerier(q promstorage.Querier) {
|
||||
if err := q.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close querier: %s", err)
|
||||
}
|
||||
}
|
||||
4
go.mod
4
go.mod
@@ -7,7 +7,7 @@ require (
|
||||
|
||||
// Do not use the original github.com/valyala/fasthttp because of issues
|
||||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.12
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.13
|
||||
github.com/VictoriaMetrics/metrics v1.12.3
|
||||
github.com/VictoriaMetrics/metricsql v0.10.0
|
||||
github.com/aws/aws-sdk-go v1.37.1
|
||||
@@ -15,10 +15,12 @@ require (
|
||||
github.com/cheggaaa/pb/v3 v3.0.5
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
|
||||
github.com/fatih/color v1.10.0 // indirect
|
||||
github.com/go-kit/kit v0.10.0
|
||||
github.com/golang/snappy v0.0.2
|
||||
github.com/influxdata/influxdb v1.8.4
|
||||
github.com/klauspost/compress v1.11.7
|
||||
github.com/mattn/go-runewidth v0.0.10 // indirect
|
||||
github.com/oklog/ulid v1.3.1
|
||||
github.com/prometheus/client_golang v1.9.0 // indirect
|
||||
github.com/prometheus/procfs v0.3.0 // indirect
|
||||
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
|
||||
|
||||
4
go.sum
4
go.sum
@@ -82,8 +82,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw=
|
||||
github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8=
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.12 h1:Ag0E119yrH4BTxVyjKD9TeiSImtG9bUcg/stItLJhSE=
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.12/go.mod h1:3SeUL4zwB/p/a9aEeRc6gdlbrtNHXBJR6N376EgiSHU=
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.13 h1:5JNS4vSPdN4QyfcpAg3Y1Wznf0uXEuSOFpeIlFw3MgM=
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.13/go.mod h1:3SeUL4zwB/p/a9aEeRc6gdlbrtNHXBJR6N376EgiSHU=
|
||||
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||
github.com/VictoriaMetrics/metrics v1.12.3 h1:Fe6JHC6MSEKa+BtLhPN8WIvS+HKPzMc2evEpNeCGy7I=
|
||||
github.com/VictoriaMetrics/metrics v1.12.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||
|
||||
@@ -356,6 +356,12 @@ func (mn *MetricName) String() string {
|
||||
return fmt.Sprintf("%s{%s}", mnCopy.MetricGroup, tagsStr)
|
||||
}
|
||||
|
||||
// SortAndMarshal sorts mn tags and then marshals them to dst.
|
||||
func (mn *MetricName) SortAndMarshal(dst []byte) []byte {
|
||||
mn.sortTags()
|
||||
return mn.Marshal(dst)
|
||||
}
|
||||
|
||||
// Marshal appends marshaled mn to dst and returns the result.
|
||||
//
|
||||
// mn.sortTags must be called before calling this function
|
||||
|
||||
14
vendor/github.com/VictoriaMetrics/fasthttp/client.go
generated
vendored
14
vendor/github.com/VictoriaMetrics/fasthttp/client.go
generated
vendored
@@ -1023,7 +1023,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error)
|
||||
panic("BUG: resp cannot be nil")
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&c.lastUseTime, uint32(CoarseTimeNow().Unix()-startTimeUnix))
|
||||
atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
|
||||
|
||||
// Free up resources occupied by response before sending the request,
|
||||
// so the GC may reclaim these resources (e.g. response body).
|
||||
@@ -1039,7 +1039,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error)
|
||||
// Optimization: update write deadline only if more than 25%
|
||||
// of the last write deadline exceeded.
|
||||
// See https://github.com/golang/go/issues/15133 for details.
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
if currentTime.Sub(cc.lastWriteDeadlineTime) > (c.WriteTimeout >> 2) {
|
||||
if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
|
||||
c.closeConn(cc)
|
||||
@@ -1083,7 +1083,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error)
|
||||
// Optimization: update read deadline only if more than 25%
|
||||
// of the last read deadline exceeded.
|
||||
// See https://github.com/golang/go/issues/15133 for details.
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
if currentTime.Sub(cc.lastReadDeadlineTime) > (c.ReadTimeout >> 2) {
|
||||
if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
|
||||
c.closeConn(cc)
|
||||
@@ -1255,7 +1255,7 @@ func acquireClientConn(conn net.Conn) *clientConn {
|
||||
}
|
||||
cc := v.(*clientConn)
|
||||
cc.c = conn
|
||||
cc.createdTime = CoarseTimeNow()
|
||||
cc.createdTime = time.Now()
|
||||
return cc
|
||||
}
|
||||
|
||||
@@ -1267,7 +1267,7 @@ func releaseClientConn(cc *clientConn) {
|
||||
var clientConnPool sync.Pool
|
||||
|
||||
func (c *HostClient) releaseConn(cc *clientConn) {
|
||||
cc.lastUseTime = CoarseTimeNow()
|
||||
cc.lastUseTime = time.Now()
|
||||
c.connsLock.Lock()
|
||||
c.conns = append(c.conns, cc)
|
||||
c.connsLock.Unlock()
|
||||
@@ -1970,7 +1970,7 @@ func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error
|
||||
// Optimization: update write deadline only if more than 25%
|
||||
// of the last write deadline exceeded.
|
||||
// See https://github.com/golang/go/issues/15133 for details.
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
if currentTime.Sub(lastWriteDeadlineTime) > (writeTimeout >> 2) {
|
||||
if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
|
||||
w.err = err
|
||||
@@ -2051,7 +2051,7 @@ func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error
|
||||
// Optimization: update read deadline only if more than 25%
|
||||
// of the last read deadline exceeded.
|
||||
// See https://github.com/golang/go/issues/15133 for details.
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
if currentTime.Sub(lastReadDeadlineTime) > (readTimeout >> 2) {
|
||||
if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
|
||||
w.err = err
|
||||
|
||||
28
vendor/github.com/VictoriaMetrics/fasthttp/coarseTime.go
generated
vendored
28
vendor/github.com/VictoriaMetrics/fasthttp/coarseTime.go
generated
vendored
@@ -1,28 +0,0 @@
|
||||
package fasthttp
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CoarseTimeNow returns the current time truncated to the nearest second.
|
||||
//
|
||||
// This is a faster alternative to time.Now().
|
||||
func CoarseTimeNow() time.Time {
|
||||
tp := coarseTime.Load().(*time.Time)
|
||||
return *tp
|
||||
}
|
||||
|
||||
func init() {
|
||||
t := time.Now().Truncate(time.Second)
|
||||
coarseTime.Store(&t)
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
t := time.Now().Truncate(time.Second)
|
||||
coarseTime.Store(&t)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var coarseTime atomic.Value
|
||||
14
vendor/github.com/VictoriaMetrics/fasthttp/server.go
generated
vendored
14
vendor/github.com/VictoriaMetrics/fasthttp/server.go
generated
vendored
@@ -1281,7 +1281,7 @@ func (s *Server) Serve(ln net.Listener) error {
|
||||
if time.Since(lastOverflowErrorTime) > time.Minute {
|
||||
s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
|
||||
"Try increasing Server.Concurrency", maxWorkersCount)
|
||||
lastOverflowErrorTime = CoarseTimeNow()
|
||||
lastOverflowErrorTime = time.Now()
|
||||
}
|
||||
|
||||
// The current server reached concurrency limit,
|
||||
@@ -1323,7 +1323,7 @@ func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.
|
||||
if time.Since(*lastPerIPErrorTime) > time.Minute {
|
||||
s.logger().Printf("The number of connections from %s exceeds MaxConnsPerIP=%d",
|
||||
getConnIP4(c), s.MaxConnsPerIP)
|
||||
*lastPerIPErrorTime = CoarseTimeNow()
|
||||
*lastPerIPErrorTime = time.Now()
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -1438,7 +1438,7 @@ func (s *Server) serveConn(c net.Conn) error {
|
||||
serverName := s.getServerName()
|
||||
connRequestNum := uint64(0)
|
||||
connID := nextConnID()
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
connTime := currentTime
|
||||
maxRequestBodySize := s.MaxRequestBodySize
|
||||
if maxRequestBodySize <= 0 {
|
||||
@@ -1491,7 +1491,7 @@ func (s *Server) serveConn(c net.Conn) error {
|
||||
}
|
||||
}
|
||||
|
||||
currentTime = CoarseTimeNow()
|
||||
currentTime = time.Now()
|
||||
ctx.lastReadDuration = currentTime.Sub(ctx.time)
|
||||
|
||||
if err != nil {
|
||||
@@ -1632,7 +1632,7 @@ func (s *Server) serveConn(c net.Conn) error {
|
||||
break
|
||||
}
|
||||
|
||||
currentTime = CoarseTimeNow()
|
||||
currentTime = time.Now()
|
||||
}
|
||||
|
||||
if br != nil {
|
||||
@@ -1688,7 +1688,7 @@ func (s *Server) updateWriteDeadline(c net.Conn, ctx *RequestCtx, lastDeadlineTi
|
||||
// Optimization: update write deadline only if more than 25%
|
||||
// of the last write deadline exceeded.
|
||||
// See https://github.com/golang/go/issues/15133 for details.
|
||||
currentTime := CoarseTimeNow()
|
||||
currentTime := time.Now()
|
||||
if currentTime.Sub(lastDeadlineTime) > (writeTimeout >> 2) {
|
||||
if err := c.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
|
||||
panic(fmt.Sprintf("BUG: error in SetWriteDeadline(%s): %s", writeTimeout, err))
|
||||
@@ -1871,7 +1871,7 @@ func (ctx *RequestCtx) Init2(conn net.Conn, logger Logger, reduceMemoryUsage boo
|
||||
ctx.connID = nextConnID()
|
||||
ctx.s = fakeServer
|
||||
ctx.connRequestNum = 0
|
||||
ctx.connTime = CoarseTimeNow()
|
||||
ctx.connTime = time.Now()
|
||||
ctx.time = ctx.connTime
|
||||
|
||||
keepBodyBuffer := !reduceMemoryUsage
|
||||
|
||||
2
vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go
generated
vendored
2
vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go
generated
vendored
@@ -187,7 +187,7 @@ func (wp *workerPool) getCh() *workerChan {
|
||||
}
|
||||
|
||||
func (wp *workerPool) release(ch *workerChan) bool {
|
||||
ch.lastUseTime = CoarseTimeNow()
|
||||
ch.lastUseTime = time.Now()
|
||||
wp.lock.Lock()
|
||||
if wp.mustStop {
|
||||
wp.lock.Unlock()
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -10,7 +10,7 @@ cloud.google.com/go/internal/version
|
||||
cloud.google.com/go/storage
|
||||
# github.com/VictoriaMetrics/fastcache v1.5.7
|
||||
github.com/VictoriaMetrics/fastcache
|
||||
# github.com/VictoriaMetrics/fasthttp v1.0.12
|
||||
# github.com/VictoriaMetrics/fasthttp v1.0.13
|
||||
github.com/VictoriaMetrics/fasthttp
|
||||
github.com/VictoriaMetrics/fasthttp/fasthttputil
|
||||
github.com/VictoriaMetrics/fasthttp/stackless
|
||||
|
||||
Reference in New Issue
Block a user