Compare commits

...

11 Commits

Author SHA1 Message Date
Aliaksandr Valialkin
5bbfdff9fe Makefile: add make publish and make package shortcuts for building and publishing docker images 2019-05-24 13:19:24 +03:00
Aliaksandr Valialkin
6b0ae332f8 lib/encoding: add vm_zstd_block_{compress|decompress}_calls_total for determining the number CompressZSTD / DecompressZSTD calls 2019-05-24 13:01:02 +03:00
Aliaksandr Valialkin
2eb3602d61 app/victoria-metrics: remove -p XXXX:XXXX from docker run options, since it is unnesessary if --net=host is set 2019-05-24 12:54:53 +03:00
Aliaksandr Valialkin
6fb9dd09f5 lib/encoding: add vm_zstd_block_{original|compressed}_bytes_total metrics for rough estimation of block compression ratio 2019-05-24 12:34:32 +03:00
Aliaksandr Valialkin
19b6643e5c lib/encoding: substitute CompressZSTD with CompressZSTDLevel 2019-05-24 12:32:55 +03:00
Aliaksandr Valialkin
08b889ef09 lib/httpserver: add -http.disableResponseCompression flag, which may help saving CPU resources at the cost of higher network bandwidth usage 2019-05-24 12:18:40 +03:00
Aliaksandr Valialkin
d15d0127fe app/vmselect/promql: add alias(q, name) function that sets the given name to all the time series in q 2019-05-24 02:41:45 +03:00
Aliaksandr Valialkin
674888fdc9 lib/decimal: add a comment explaining weird code in maxUpExponent. Fixes #29 2019-05-23 17:18:35 +03:00
Aliaksandr Valialkin
fb140eda33 app/vmselect/promql: add label_transform(q, label, regexp, replacement) function for replacing all the occurences of regexp with replacement in the given label for q 2019-05-23 16:26:19 +03:00
Aliaksandr Valialkin
398ec4383e README.md: typo fix 2019-05-23 02:09:51 +03:00
Aliaksandr Valialkin
eff0debe14 README.md: mention that VictoriaMetrics is high-perf cost-effective TSDB 2019-05-23 00:36:45 +03:00
11 changed files with 121 additions and 21 deletions

View File

@@ -19,6 +19,10 @@ include deployment/*/Makefile
clean:
rm -rf bin/*
publish: publish-victoria-metrics
package: package-victoria-metrics
release: victoria-metrics-prod
cd bin && tar czf victoria-metrics-$(PKG_TAG).tar.gz victoria-metrics-prod

View File

@@ -4,7 +4,7 @@
[![Latest Release](https://img.shields.io/github/release/VictoriaMetrics/VictoriaMetrics.svg?style=flat-square)](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/latest)
VictoriaMetrics is a long-term remote storage for Prometheus.
VictoriaMetrics is high-performance cost-effective time series database. It can be used as a long-term remote storage for Prometheus.
It is available in [binary releases](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
[docker images](https://hub.docker.com/r/valyala/victoria-metrics/) and
in [source code](https://github.com/VictoriaMetrics/VictoriaMetrics).
@@ -94,7 +94,7 @@ to your needs.
Run `make package-victoria-metrics`. It will build `valyala/victoria-metrics:<PKG_TAG>` docker image locally.
`<PKG_TAG>` is auto-generated image tag, which depends on source code in the repository.
The `<PKG_TAG>` may be manually set via `PKG_TAG=foobar make package`.
The `<PKG_TAG>` may be manually set via `PKG_TAG=foobar make package-victoria-metrics`.

View File

@@ -12,7 +12,7 @@ publish-victoria-metrics:
run-victoria-metrics:
mkdir -p victoria-metrics-data
DOCKER_OPTS='-v $(shell pwd)/victoria-metrics-data:/victoria-metrics-data -p 8428:8428 -p 2003:2003 -p 2003:2003/udp' \
DOCKER_OPTS='-v $(shell pwd)/victoria-metrics-data:/victoria-metrics-data' \
APP_NAME=victoria-metrics \
ARGS='-graphiteListenAddr=:2003 -opentsdbListenAddr=:4242 -retentionPeriod=12 -search.maxUniqueTimeseries=1000000 -search.maxQueryDuration=10m' \
$(MAKE) run-via-docker

View File

@@ -789,6 +789,18 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`alias()`, func(t *testing.T) {
t.Parallel()
q := `alias(time(), "foobar")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`label_set(tag)`, func(t *testing.T) {
t.Parallel()
q := `label_set(time(), "tagname", "tagvalue")`
@@ -1266,6 +1278,34 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`label_transform(mismatch)`, func(t *testing.T) {
t.Parallel()
q := `label_transform(time(), "__name__", "foobar", "xx")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`label_transform(match)`, func(t *testing.T) {
t.Parallel()
q := `label_transform(
label_set(time(), "foo", "a.bar.baz"),
"foo", "\\.", "-")`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("a-bar-baz"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`label_replace(mismatch)`, func(t *testing.T) {
t.Parallel()
q := `label_replace(time(), "__name__", "x${1}y", "foo", ".+")`
@@ -3461,6 +3501,7 @@ func TestExecError(t *testing.T) {
f(`hour(1,2)`)
f(`label_join()`)
f(`label_replace(1)`)
f(`label_transform(1)`)
f(`label_set()`)
f(`label_set(1, "foo")`)
f(`label_del()`)
@@ -3506,6 +3547,9 @@ func TestExecError(t *testing.T) {
f(`keep_last_value()`)
f(`distinct_over_time()`)
f(`distinct()`)
f(`alias()`)
f(`alias(1)`)
f(`alias(1, "foo", "bar")`)
// Invalid argument type
f(`median_over_time({}, 2)`)
@@ -3535,6 +3579,11 @@ func TestExecError(t *testing.T) {
f(`label_replace(1, "foo", "bar", 4, 5)`)
f(`label_replace(1, "foo", "bar", "baz", 5)`)
f(`label_replace(1, "foo", "bar", "baz", "invalid(regexp")`)
f(`label_transform(1, 2, 3, 4)`)
f(`label_transform(1, "foo", 3, 4)`)
f(`label_transform(1, "foo", "bar", 4)`)
f(`label_transform(1, "foo", "invalid(regexp", "baz`)
f(`alias(1, 2)`)
// Duplicate timeseries
f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz"))

View File

@@ -25,6 +25,7 @@ func getDefaultWithArgExprs() []*withArgExpr {
`median_over_time(m) = quantile_over_time(0.5, m)`,
`range_median(q) = range_quantile(0.5, q)`,
`alias(q, name) = label_set(q, "__name__", name)`,
})
})
return defaultWithArgExprs

View File

@@ -1,7 +1,6 @@
package promql
import (
"fmt"
"regexp"
"sync"
"sync/atomic"
@@ -10,12 +9,16 @@ import (
)
func compileRegexpAnchored(re string) (*regexp.Regexp, error) {
reAnchored := "^(?:" + re + ")$"
return compileRegexp(reAnchored)
}
func compileRegexp(re string) (*regexp.Regexp, error) {
rcv := regexpCacheV.Get(re)
if rcv != nil {
return rcv.r, rcv.err
}
regexAnchored := fmt.Sprintf("^(?:%s)$", re)
r, err := regexp.Compile(regexAnchored)
r, err := regexp.Compile(re)
rcv = &regexpCacheValue{
r: r,
err: err,

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"math"
"math/rand"
"regexp"
"sort"
"strconv"
"strings"
@@ -61,6 +62,7 @@ var transformFuncs = map[string]transformFunc{
"label_keep": transformLabelKeep,
"label_copy": transformLabelCopy,
"label_move": transformLabelMove,
"label_transform": transformLabelTransform,
"union": transformUnion,
"": transformUnion, // empty func is a synonim to union
"keep_last_value": transformKeepLastValue,
@@ -816,6 +818,31 @@ func transformLabelJoin(tfa *transformFuncArg) ([]*timeseries, error) {
return rvs, nil
}
func transformLabelTransform(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if err := expectTransformArgsNum(args, 4); err != nil {
return nil, err
}
label, err := getString(args[1], 1)
if err != nil {
return nil, err
}
regex, err := getString(args[2], 2)
if err != nil {
return nil, err
}
replacement, err := getString(args[3], 3)
if err != nil {
return nil, err
}
r, err := compileRegexp(regex)
if err != nil {
return nil, fmt.Errorf(`cannot compile regex %q: %s`, regex, err)
}
return labelReplace(args[0], label, r, label, replacement)
}
func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if err := expectTransformArgsNum(args, 5); err != nil {
@@ -842,11 +869,12 @@ func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
if err != nil {
return nil, fmt.Errorf(`cannot compile regex %q: %s`, regex, err)
}
return labelReplace(args[0], srcLabel, r, dstLabel, replacement)
}
func labelReplace(tss []*timeseries, srcLabel string, r *regexp.Regexp, dstLabel, replacement string) ([]*timeseries, error) {
replacementBytes := []byte(replacement)
rvs := args[0]
for _, ts := range rvs {
for _, ts := range tss {
mn := &ts.MetricName
dstValue := getDstValue(mn, dstLabel)
srcValue := mn.GetTagValue(srcLabel)
@@ -856,7 +884,7 @@ func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
mn.RemoveTag(dstLabel)
}
}
return rvs, nil
return tss, nil
}
func transformLn(v float64) float64 {

View File

@@ -186,6 +186,7 @@ func maxUpExponent(v int64) int16 {
v = -v
}
if v < 0 {
// Handle corner case for v=-1<<63
return 0
}

View File

@@ -1,27 +1,34 @@
package encoding
import (
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/gozstd"
)
// CompressZSTD compresses src, appends the result to dst and returns
// the appended dst.
//
// src must be non-empty.
func CompressZSTD(dst, src []byte) []byte {
return gozstd.CompressLevel(dst, src, 5)
}
// CompressZSTDLevel appends compressed src to dst and returns
// the appended dst.
//
// The given compressLevel is used for the compression.
func CompressZSTDLevel(dst, src []byte, compressLevel int) []byte {
return gozstd.CompressLevel(dst, src, compressLevel)
compressCalls.Inc()
originalBytes.Add(len(src))
dstLen := len(dst)
dst = gozstd.CompressLevel(dst, src, compressLevel)
compressedBytes.Add(len(dst) - dstLen)
return dst
}
// DecompressZSTD decompresses src, appends the result to dst and returns
// the appended dst.
func DecompressZSTD(dst, src []byte) ([]byte, error) {
decompressCalls.Inc()
return gozstd.Decompress(dst, src)
}
var (
compressCalls = metrics.NewCounter(`vm_zstd_block_compress_calls_total`)
decompressCalls = metrics.NewCounter(`vm_zstd_block_decompress_calls_total`)
originalBytes = metrics.NewCounter(`vm_zstd_block_original_bytes_total`)
compressedBytes = metrics.NewCounter(`vm_zstd_block_compressed_bytes_total`)
)

View File

@@ -17,7 +17,7 @@ func TestCompressDecompressZSTD(t *testing.T) {
}
func testCompressDecompressZSTD(t *testing.T, b []byte) {
bc := CompressZSTD(nil, b)
bc := CompressZSTDLevel(nil, b, 5)
bNew, err := DecompressZSTD(nil, bc)
if err != nil {
t.Fatalf("unexpected error when decompressing b=%x from bc=%x: %s", b, bc, err)
@@ -27,7 +27,7 @@ func testCompressDecompressZSTD(t *testing.T, b []byte) {
}
prefix := []byte{1, 2, 33}
bcNew := CompressZSTD(prefix, b)
bcNew := CompressZSTDLevel(prefix, b, 5)
if string(bcNew[:len(prefix)]) != string(prefix) {
t.Fatalf("invalid prefix for b=%x; got\n%x; expecting\n%x", b, bcNew[:len(prefix)], prefix)
}

View File

@@ -31,6 +31,8 @@ var (
httpAuthPassword = flag.String("httpAuth.password", "", "Password for HTTP Basic Auth. The authentication is disabled -httpAuth.username is empty")
metricsAuthKey = flag.String("metricsAuthKey", "", "Auth key for /metrics. It overrides httpAuth settings")
pprofAuthKey = flag.String("pprofAuthKey", "", "Auth key for /debug/pprof. It overrides httpAuth settings")
disableResponseCompression = flag.Bool("http.disableResponseCompression", false, "Disable compression of HTTP responses for saving CPU resources. By default compression is enabled to save network bandwidth")
)
var (
@@ -51,6 +53,8 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// By default all the responses are transparently compressed, since Google
// charges a lot for the egress traffic. The compression may be disabled
// by calling DisableResponseCompression before writing the first byte to w.
//
// The compression is also disabled if -http.disableResponseCompression flag is set.
func Serve(addr string, rh RequestHandler) {
scheme := "http"
if *tlsEnable {
@@ -224,6 +228,9 @@ func checkBasicAuth(w http.ResponseWriter, r *http.Request) bool {
}
func maybeGzipResponseWriter(w http.ResponseWriter, r *http.Request) http.ResponseWriter {
if *disableResponseCompression {
return w
}
ae := r.Header.Get("Accept-Encoding")
if ae == "" {
return w