Compare commits

...

1 Commits

Author SHA1 Message Date
Hui Wang
c0889547f2 metricsql: add fill modifiers support 2026-07-03 15:31:22 +08:00
4 changed files with 338 additions and 19 deletions

View File

@@ -172,7 +172,13 @@ func newBinaryOpFunc(bf func(left, right float64, isBool bool) float64) binaryOp
left = removeEmptySeries(left)
right = removeEmptySeries(right)
}
if len(left) == 0 || len(right) == 0 {
if len(left) == 0 && len(right) == 0 {
return nil, nil
}
if len(left) == 0 && bfa.be.FillLeft == nil {
return nil, nil
}
if len(right) == 0 && bfa.be.FillRight == nil {
return nil, nil
}
left, right, dst, err := adjustBinaryOpTags(bfa.be, left, right)
@@ -226,7 +232,7 @@ func adjustBinaryOpTags(be *metricsql.BinaryOpExpr, left, right []*timeseries) (
}
}
// Slow path: `vector op vector` or `a op {on|ignoring} {group_left|group_right} b`
// Slow path: `vector op vector` or `a op {on|ignoring} {group_left|group_right} {fill|fill_left|fill_right} b`
var rvsLeft, rvsRight []*timeseries
mLeft, mRight := createTimeseriesMapByTagSet(be, left, right)
joinOp := strings.ToLower(be.JoinModifier.Op)
@@ -239,10 +245,27 @@ func adjustBinaryOpTags(be *metricsql.BinaryOpExpr, left, right []*timeseries) (
// Add __name__ to groupTags if metric name must be preserved.
groupTags = append(groupTags[:len(groupTags):len(groupTags)], "__name__")
}
// Add missing keys from mRight to mLeft when fill_left()/fill() modifier is used
if be.FillLeft != nil {
for k := range mRight {
if _, ok := mLeft[k]; !ok {
mLeft[k] = nil
}
}
}
for k, tssLeft := range mLeft {
tssRight := mRight[k]
if len(tssLeft) == 0 {
if be.FillLeft == nil {
logger.Panicf("BUG: unexpected empty tssLeft for key %q when FillLeft is nil", k)
}
tssLeft = []*timeseries{newFillTimeseries(be, tssRight[0], be.FillLeft.N)}
}
if len(tssRight) == 0 {
continue
if be.FillRight == nil {
continue
}
tssRight = []*timeseries{newFillTimeseries(be, tssLeft[0], be.FillRight.N)}
}
switch joinOp {
case "group_left":
@@ -287,6 +310,27 @@ func adjustBinaryOpTags(be *metricsql.BinaryOpExpr, left, right []*timeseries) (
return rvsLeft, rvsRight, dst, nil
}
// newFillTimeseries returns a time series filled with fillValue for the fill_left()/fill_right()/fill() modifiers.
func newFillTimeseries(be *metricsql.BinaryOpExpr, src *timeseries, fillValue float64) *timeseries {
var ts timeseries
ts.CopyFromShallowTimestamps(src)
if !be.KeepMetricNames {
ts.MetricName.ResetMetricGroup()
}
groupTags := be.GroupModifier.Args
switch strings.ToLower(be.GroupModifier.Op) {
case "on":
ts.MetricName.RemoveTagsOn(groupTags)
default:
ts.MetricName.RemoveTagsIgnoring(groupTags)
}
values := ts.Values
for i := range values {
values[i] = fillValue
}
return &ts
}
func ensureSingleTimeseries(side string, be *metricsql.BinaryOpExpr, tss []*timeseries) error {
if len(tss) == 0 {
logger.Panicf("BUG: tss must contain at least one value")

View File

@@ -424,18 +424,7 @@ func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOp
if bf == nil {
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
}
var err error
var tssLeft, tssRight []*timeseries
switch strings.ToLower(be.Op) {
case "and", "if":
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
default:
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
}
tssLeft, tssRight, err := execBinaryOpArgs(qt, ec, be)
if err != nil {
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
}
@@ -451,6 +440,29 @@ func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOp
return rv, nil
}
// binaryOpEvalOrder might change the order of evaluation of the left and right sides of a binary operation,
// when there is chance to push down common label filters from exprFirst to exprSecond in the following executions.
func binaryOpEvalOrder(be *metricsql.BinaryOpExpr) (exprFirst, exprSecond metricsql.Expr) {
exprFirst, exprSecond = be.Left, be.Right
switch strings.ToLower(be.Op) {
case "and", "if":
// For `and` and `if`, fetch the right-side series first, since it usually contains
// fewer time series and yields more specific filters for the left side.
exprFirst, exprSecond = be.Right, be.Left
}
if be.FillLeft != nil && be.FillRight == nil {
// For `fill_left(<value>)`, the unmatched series can only come from the right side, so evaluate it first.
exprFirst, exprSecond = be.Right, be.Left
}
return exprFirst, exprSecond
}
// canPushdownCommonFilters decides if common label filters can be pushed down from one side of a binary operation to the other.
//
// Common filters cannot be pushed down when:
// - the operator is `or` or `default`;
// - either side is an aggregation function without explicit grouping;
// - fill(<value>) modifier is used.
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
switch strings.ToLower(be.Op) {
case "or", "default":
@@ -459,6 +471,10 @@ func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
return false
}
// Filters cannot be propagated when fill(<value>) modifier is used.
if be.FillLeft != nil && be.FillRight != nil {
return false
}
return true
}
@@ -470,8 +486,16 @@ func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
return len(afe.Modifier.Args) == 0
}
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
exprFirst, exprSecond := binaryOpEvalOrder(be)
canPushdown := canPushdownCommonFilters(be)
firstIsLeft := exprFirst == be.Left
sortResult := func(tssFirst, tssSecond []*timeseries) ([]*timeseries, []*timeseries, error) {
if firstIsLeft {
return tssFirst, tssSecond, nil
}
return tssSecond, tssFirst, nil
}
if !canPushdown && !shouldOptimizeRepeatedBinaryOpSubexprs(ec, exprFirst, exprSecond) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
@@ -503,7 +527,7 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
if errSecond != nil {
return nil, nil, errSecond
}
return tssFirst, tssSecond, nil
return sortResult(tssFirst, tssSecond)
}
if !canPushdown {
qt = qt.NewChild("execute left and right sides of %q sequentially because repeated cacheable subexpression was found", be.Op)
@@ -522,7 +546,7 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
return sortResult(tssFirst, tssSecond)
}
// Execute binary operation in the following way:
@@ -565,7 +589,7 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
return sortResult(tssFirst, tssSecond)
}
func shouldOptimizeRepeatedBinaryOpSubexprs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr) bool {

View File

@@ -4006,6 +4006,256 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector + vector fill()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(1, "foo", "common")
or label_set(2, "foo", "left_only")
) + fill(0) (
label_set(3, "foo", "common")
or label_set(4, "foo", "right_only")
), "foo")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("common"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2, 2, 2, 2, 2, 2},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("left_only"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("right_only"),
}}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`vector + vector fill_left() fill_right()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(1, "foo", "common")
or label_set(2, "foo", "left_only")
) + fill_left(10) fill_right(20) (
label_set(3, "foo", "common")
or label_set(4, "foo", "right_only")
), "foo")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("common"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{22, 22, 22, 22, 22, 22},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("left_only"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{14, 14, 14, 14, 14, 14},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("right_only"),
}}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`vector + vector fill_right() only`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(1, "foo", "common")
or label_set(2, "foo", "left_only")
) + fill_right(20) (
label_set(3, "foo", "common")
or label_set(4, "foo", "right_only")
), "foo")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("common"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{22, 22, 22, 22, 22, 22},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("left_only"),
}}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector + vector on() fill()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(1, "foo", "common", "extra", "l")
or label_set(2, "foo", "left_only", "extra", "l")
) + on(foo) fill(0) (
label_set(3, "foo", "common", "extra", "r")
or label_set(4, "foo", "right_only", "extra", "r")
), "foo")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("common"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2, 2, 2, 2, 2, 2},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("left_only"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{4, 4, 4, 4, 4, 4},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("right_only"),
}}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`vector + vector on() group_left() fill_right()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(1, "method", "get", "code", "500")
or label_set(2, "method", "get", "code", "404")
or label_set(3, "method", "put", "code", "501")
) + on(method) group_left() fill_right(0) (
label_set(10, "method", "get")
), "method", "code")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{12, 12, 12, 12, 12, 12},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("code"),
Value: []byte("404"),
},
{
Key: []byte("method"),
Value: []byte("get"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{11, 11, 11, 11, 11, 11},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("code"),
Value: []byte("500"),
},
{
Key: []byte("method"),
Value: []byte("get"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 3, 3},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("code"),
Value: []byte("501"),
},
{
Key: []byte("method"),
Value: []byte("put"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`vector / vector ignoring() fill()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label((
label_set(6, "method", "get", "code", "500")
or label_set(1, "method", "put", "code", "500")
) / ignoring(code) fill(0) (
label_set(12, "method", "get")
or label_set(5, "method", "post")
or label_set(10, "method", "put")
), "method")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.5, 0.5, 0.5, 0.5, 0.5, 0.5},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("method"),
Value: []byte("get"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("method"),
Value: []byte("post"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.1, 0.1, 0.1, 0.1, 0.1, 0.1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("method"),
Value: []byte("put"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram_quantile(scalar)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6, time())`

View File

@@ -34,6 +34,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): introduce `64KiB` size limit for `metric metadata` fields - `Unit`, `Help` and `MetricFamilyName`. See [#11128](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11128).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `optimize_repeated_binary_op_subexprs=1` query arg to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query) for executing binary operator sides sequentially when they share the same optimized aggregate rollup result expression. This allows the second side to reuse rollup result cache populated by the first side. See [#10575](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10575). Thanks to @xhebox for the contribution.
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): prevent possible password brute-force attacks with an artificial 2-3 second delay as recommended by [OWASP](https://owasp.org/Top10/2025/A07_2025-Authentication_Failures). See [#11180](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11180).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): support `fill` modifiers to allow missing series on either side of a binary operation to be filled with a provided default value. See [#10598](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10598).
* BUGFIX: all VictoriaMetrics components: cancel in-flight HTTP requests shortly before `-http.maxGracefulShutdownDuration` elapses during graceful shutdown, so they can drain and the shutdown completes cleanly within that window instead of timing out and exiting via `logger.Fatalf` -> `os.Exit`. This prevents skipping the storage flush and losing in-memory data when long-lived requests are in flight (such as VictoriaLogs live tailing). See [#1502](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fixes unexpected rare rerouting. See [#11162](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11162).