|
|
|
|
@@ -371,6 +371,7 @@ type aggregator struct {
|
|
|
|
|
inputRelabeling *promrelabel.ParsedConfigs
|
|
|
|
|
outputRelabeling *promrelabel.ParsedConfigs
|
|
|
|
|
|
|
|
|
|
needInputKey bool
|
|
|
|
|
keepMetricNames bool
|
|
|
|
|
ignoreOldSamples bool
|
|
|
|
|
|
|
|
|
|
@@ -554,13 +555,17 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
|
|
|
|
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
|
|
|
|
|
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
|
|
|
|
|
}
|
|
|
|
|
needInputKey := false
|
|
|
|
|
aggrOutputs := make([]aggrOutput, len(cfg.Outputs))
|
|
|
|
|
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
|
|
|
|
|
for i, output := range cfg.Outputs {
|
|
|
|
|
as, err := newAggrState(output, outputsSeen, stalenessInterval)
|
|
|
|
|
as, requireInputKey, err := newAggrState(output, outputsSeen, stalenessInterval)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if !needInputKey && requireInputKey {
|
|
|
|
|
needInputKey = requireInputKey
|
|
|
|
|
}
|
|
|
|
|
aggrOutputs[i] = aggrOutput{
|
|
|
|
|
as: as,
|
|
|
|
|
|
|
|
|
|
@@ -586,6 +591,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
|
|
|
|
inputRelabeling: inputRelabeling,
|
|
|
|
|
outputRelabeling: outputRelabeling,
|
|
|
|
|
|
|
|
|
|
needInputKey: needInputKey,
|
|
|
|
|
keepMetricNames: keepMetricNames,
|
|
|
|
|
ignoreOldSamples: ignoreOldSamples,
|
|
|
|
|
|
|
|
|
|
@@ -614,6 +620,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if dedupInterval > 0 {
|
|
|
|
|
a.needInputKey = true
|
|
|
|
|
a.da = newDedupAggr()
|
|
|
|
|
|
|
|
|
|
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
|
|
|
|
@@ -645,20 +652,20 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
|
|
|
|
return a, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) {
|
|
|
|
|
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, bool, error) {
|
|
|
|
|
// check for duplicated output
|
|
|
|
|
if _, ok := outputsSeen[output]; ok {
|
|
|
|
|
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
|
|
|
|
return nil, false, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
|
|
|
|
|
}
|
|
|
|
|
outputsSeen[output] = struct{}{}
|
|
|
|
|
|
|
|
|
|
if strings.HasPrefix(output, "quantiles(") {
|
|
|
|
|
if !strings.HasSuffix(output, ")") {
|
|
|
|
|
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
|
|
|
|
|
return nil, false, fmt.Errorf("missing closing brace for `quantiles()` output")
|
|
|
|
|
}
|
|
|
|
|
argsStr := output[len("quantiles(") : len(output)-1]
|
|
|
|
|
if len(argsStr) == 0 {
|
|
|
|
|
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
|
|
|
|
|
return nil, false, fmt.Errorf("`quantiles()` must contain at least one phi")
|
|
|
|
|
}
|
|
|
|
|
args := strings.Split(argsStr, ",")
|
|
|
|
|
phis := make([]float64, len(args))
|
|
|
|
|
@@ -666,57 +673,57 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
|
|
|
|
|
arg = strings.TrimSpace(arg)
|
|
|
|
|
phi, err := strconv.ParseFloat(arg, 64)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
|
|
|
|
|
return nil, false, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
|
|
|
|
|
}
|
|
|
|
|
if phi < 0 || phi > 1 {
|
|
|
|
|
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
|
|
|
|
|
return nil, false, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
|
|
|
|
|
}
|
|
|
|
|
phis[i] = phi
|
|
|
|
|
}
|
|
|
|
|
if _, ok := outputsSeen["quantiles"]; ok {
|
|
|
|
|
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
|
|
|
|
|
return nil, false, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
|
|
|
|
|
}
|
|
|
|
|
outputsSeen["quantiles"] = struct{}{}
|
|
|
|
|
return newQuantilesAggrState(phis), nil
|
|
|
|
|
return newQuantilesAggrState(phis), false, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch output {
|
|
|
|
|
case "avg":
|
|
|
|
|
return newAvgAggrState(), nil
|
|
|
|
|
return newAvgAggrState(), false, nil
|
|
|
|
|
case "count_samples":
|
|
|
|
|
return newCountSamplesAggrState(), nil
|
|
|
|
|
return newCountSamplesAggrState(), false, nil
|
|
|
|
|
case "count_series":
|
|
|
|
|
return newCountSeriesAggrState(), nil
|
|
|
|
|
return newCountSeriesAggrState(), true, nil
|
|
|
|
|
case "histogram_bucket":
|
|
|
|
|
return newHistogramBucketAggrState(stalenessInterval), nil
|
|
|
|
|
return newHistogramBucketAggrState(stalenessInterval), false, nil
|
|
|
|
|
case "increase":
|
|
|
|
|
return newTotalAggrState(stalenessInterval, true, true), nil
|
|
|
|
|
return newTotalAggrState(stalenessInterval, true, true), true, nil
|
|
|
|
|
case "increase_prometheus":
|
|
|
|
|
return newTotalAggrState(stalenessInterval, true, false), nil
|
|
|
|
|
return newTotalAggrState(stalenessInterval, true, false), true, nil
|
|
|
|
|
case "last":
|
|
|
|
|
return newLastAggrState(), nil
|
|
|
|
|
return newLastAggrState(), false, nil
|
|
|
|
|
case "max":
|
|
|
|
|
return newMaxAggrState(), nil
|
|
|
|
|
return newMaxAggrState(), false, nil
|
|
|
|
|
case "min":
|
|
|
|
|
return newMinAggrState(), nil
|
|
|
|
|
return newMinAggrState(), false, nil
|
|
|
|
|
case "rate_avg":
|
|
|
|
|
return newRateAggrState(stalenessInterval, true), nil
|
|
|
|
|
return newRateAggrState(stalenessInterval, true), true, nil
|
|
|
|
|
case "rate_sum":
|
|
|
|
|
return newRateAggrState(stalenessInterval, false), nil
|
|
|
|
|
return newRateAggrState(stalenessInterval, false), true, nil
|
|
|
|
|
case "stddev":
|
|
|
|
|
return newStddevAggrState(), nil
|
|
|
|
|
return newStddevAggrState(), false, nil
|
|
|
|
|
case "stdvar":
|
|
|
|
|
return newStdvarAggrState(), nil
|
|
|
|
|
return newStdvarAggrState(), false, nil
|
|
|
|
|
case "sum_samples":
|
|
|
|
|
return newSumSamplesAggrState(), nil
|
|
|
|
|
return newSumSamplesAggrState(), false, nil
|
|
|
|
|
case "total":
|
|
|
|
|
return newTotalAggrState(stalenessInterval, false, true), nil
|
|
|
|
|
return newTotalAggrState(stalenessInterval, false, true), true, nil
|
|
|
|
|
case "total_prometheus":
|
|
|
|
|
return newTotalAggrState(stalenessInterval, false, false), nil
|
|
|
|
|
return newTotalAggrState(stalenessInterval, false, false), true, nil
|
|
|
|
|
case "unique_samples":
|
|
|
|
|
return newUniqueSamplesAggrState(), nil
|
|
|
|
|
return newUniqueSamplesAggrState(), false, nil
|
|
|
|
|
default:
|
|
|
|
|
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
|
|
|
|
|
return nil, false, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -930,7 +937,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bufLen := len(buf)
|
|
|
|
|
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
|
|
|
|
|
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels, a.needInputKey)
|
|
|
|
|
// key remains valid only by the end of this function and can't be reused after
|
|
|
|
|
// do not intern key because number of unique keys could be too high
|
|
|
|
|
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
|
|
|
|
@@ -970,13 +977,15 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte {
|
|
|
|
|
func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label, needInputKey bool) []byte {
|
|
|
|
|
bb := bbPool.Get()
|
|
|
|
|
bb.B = lc.Compress(bb.B, inputLabels)
|
|
|
|
|
bb.B = lc.Compress(bb.B, outputLabels)
|
|
|
|
|
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
|
|
|
|
dst = append(dst, bb.B...)
|
|
|
|
|
bbPool.Put(bb)
|
|
|
|
|
dst = lc.Compress(dst, outputLabels)
|
|
|
|
|
if needInputKey {
|
|
|
|
|
dst = lc.Compress(dst, inputLabels)
|
|
|
|
|
}
|
|
|
|
|
return dst
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -986,24 +995,24 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab
|
|
|
|
|
|
|
|
|
|
func getOutputKey(key string) string {
|
|
|
|
|
src := bytesutil.ToUnsafeBytes(key)
|
|
|
|
|
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
|
|
|
|
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
|
|
|
|
if nSize <= 0 {
|
|
|
|
|
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
|
|
|
|
|
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
|
|
|
|
|
}
|
|
|
|
|
src = src[nSize:]
|
|
|
|
|
outputKey := src[inputKeyLen:]
|
|
|
|
|
outputKey := src[:outputKeyLen]
|
|
|
|
|
return bytesutil.ToUnsafeString(outputKey)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getInputOutputKey(key string) (string, string) {
|
|
|
|
|
src := bytesutil.ToUnsafeBytes(key)
|
|
|
|
|
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
|
|
|
|
outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
|
|
|
|
if nSize <= 0 {
|
|
|
|
|
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
|
|
|
|
|
logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
|
|
|
|
|
}
|
|
|
|
|
src = src[nSize:]
|
|
|
|
|
inputKey := src[:inputKeyLen]
|
|
|
|
|
outputKey := src[inputKeyLen:]
|
|
|
|
|
outputKey := src[:outputKeyLen]
|
|
|
|
|
inputKey := src[outputKeyLen:]
|
|
|
|
|
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|