Compare commits

..

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
90029442cb lib/streamaggr: use stale samples for increase, total and rate outputs state update
Before, with ignore_old_samples or enable_windows set, old samples were just dropped.
With this change, these samples are used to update per-series state in stateful aggregation outputs: rate, total, and increase, but do not contribute to the aggregated output.
This ensures the next in-interval sample computes the correct per-interval increase relative to the most recent pre-interval value, rather than a stale one from a previous flush cycle.
2026-06-10 15:00:20 +03:00
62 changed files with 483 additions and 285 deletions

View File

@@ -10,7 +10,7 @@ import (
func Compress(wr WriteRequest) []byte {
data, err := wr.Marshal()
if err != nil {
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %w", err))
panic(fmt.Errorf("BUG: cannot compress WriteRequest: %s", err))
}
return snappy.Encode(nil, data)
}

View File

@@ -61,15 +61,15 @@ func parseInputSeries(input []series, interval *promutil.Duration, startStamp ti
for _, data := range input {
expr, err := metricsql.Parse(data.Series)
if err != nil {
return res, fmt.Errorf("failed to parse series %s: %w", data.Series, err)
return res, fmt.Errorf("failed to parse series %s: %v", data.Series, err)
}
promvals, err := parseInputValue(data.Values, true)
if err != nil {
return res, fmt.Errorf("failed to parse input series value %s: %w", data.Values, err)
return res, fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
}
metricExpr, ok := expr.(*metricsql.MetricExpr)
if !ok || len(metricExpr.LabelFilterss) != 1 {
return res, fmt.Errorf("got invalid input series %s: %w", data.Series, err)
return res, fmt.Errorf("got invalid input series %s: %v", data.Series, err)
}
samples := make([]testutil.Sample, 0, len(promvals))
ts := startStamp

View File

@@ -53,13 +53,13 @@ Outer:
if s.Labels != "" {
metricsqlExpr, err := metricsql.Parse(s.Labels)
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
continue Outer
}
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
if !ok || len(metricsqlMetricExpr.LabelFilterss) > 1 {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %w", mt.Expr,
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("got invalid exp_samples: %q", s.Labels)))
continue Outer
}

View File

@@ -329,11 +329,11 @@ func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]i
q, err := datasource.Init(nil)
if err != nil {
return []error{fmt.Errorf("failed to init datasource: %w", err)}
return []error{fmt.Errorf("failed to init datasource: %v", err)}
}
rw, err := remotewrite.NewDebugClient()
if err != nil {
return []error{fmt.Errorf("failed to init wr: %w", err)}
return []error{fmt.Errorf("failed to init wr: %v", err)}
}
alertEvalTimesMap := map[time.Duration]struct{}{}

View File

@@ -89,7 +89,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
labels.Visit(func(key []byte, v *fastjson.Value) {
lv, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("error when parsing label value %q: %w", v, errLocal)
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
return
}
r.Labels = append(r.Labels, prompb.Label{
@@ -112,7 +112,7 @@ func (pi *promInstant) Unmarshal(b []byte) error {
r.Timestamps = []int64{sample[0].GetInt64()}
val, err := sample[1].StringBytes()
if err != nil {
return fmt.Errorf("error when parsing `value` object %q: %w", sample[1], err)
return fmt.Errorf("error when parsing `value` object %q: %s", sample[1], err)
}
f, err := strconv.ParseFloat(bytesutil.ToUnsafeString(val), 64)
if err != nil {

View File

@@ -601,7 +601,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr
func (ar *AlertingRule) expandLabelTemplates(m datasource.Metric, qFn templates.QueryFn) (*labelSet, error) {
ls, err := ar.toLabels(m, qFn)
if err != nil {
return ls, fmt.Errorf("failed to expand label templates: %w", err)
return ls, fmt.Errorf("failed to expand label templates: %s", err)
}
return ls, nil
}
@@ -620,7 +620,7 @@ func (ar *AlertingRule) expandAnnotationTemplates(m datasource.Metric, qFn templ
}
as, err := notifier.ExecTemplate(qFn, ar.Annotations, tplData)
if err != nil {
return as, fmt.Errorf("failed to expand annotation templates: %w", err)
return as, fmt.Errorf("failed to expand annotation templates: %s", err)
}
return as, nil
}

View File

@@ -77,7 +77,7 @@ var (
func marshalJson(v any, kind string) ([]byte, *httpserver.ErrorWithStatusCode) {
data, err := json.Marshal(v)
if err != nil {
return nil, errResponse(fmt.Errorf("failed to marshal %s: %w", kind, err), http.StatusInternalServerError)
return nil, errResponse(fmt.Errorf("failed to marshal %s: %s", kind, err), http.StatusInternalServerError)
}
return data, nil
}

View File

@@ -1639,7 +1639,7 @@ func (w *fakeResponseWriter) WriteHeader(statusCode int) {
"X-Content-Type-Options": true,
})
if err != nil {
panic(fmt.Errorf("cannot marshal headers: %w", err))
panic(fmt.Errorf("cannot marshal headers: %s", err))
}
}

View File

@@ -161,7 +161,7 @@ func fetchAndParseJWKs(ctx context.Context, jwksURI string) (*jwt.VerifierPool,
vp, err := jwt.ParseJWKs(b)
if err != nil {
return nil, fmt.Errorf("failed to parse jwks keys from %q: %w", jwksURI, err)
return nil, fmt.Errorf("failed to parse jwks keys from %q: %v", jwksURI, err)
}
return vp, nil
@@ -188,7 +188,7 @@ func getOpenIDConfiguration(ctx context.Context, issuer string) (openidConfig, e
var cfg openidConfig
if err := json.NewDecoder(resp.Body).Decode(&cfg); err != nil {
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %w", configURL, err)
return openidConfig{}, fmt.Errorf("failed to decode openid config from %q: %s", configURL, err)
}
return cfg, nil

View File

@@ -131,13 +131,16 @@ func (ac *authContext) initFromBasicAuthConfig(ba *BasicAuthConfig) error {
if ba.Username == "" {
return fmt.Errorf("missing `username` in `basic_auth` section")
}
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
if ba.Password != "" {
ac.getAuthHeader = func() string {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
token := ba.Username + ":" + ba.Password
token64 := base64.StdEncoding.EncodeToString([]byte(token))
return "Basic " + token64
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}
ac.authDigest = fmt.Sprintf("basic(username=%q, password=%q)", ba.Username, ba.Password)
return nil
}

View File

@@ -69,8 +69,6 @@ const (
vmAddr = "vm-addr"
vmUser = "vm-user"
vmPassword = "vm-password"
vmHeaders = "vm-headers"
vmBearerToken = "vm-bearer-token"
vmAccountID = "vm-account-id"
vmConcurrency = "vm-concurrency"
vmCompress = "vm-compress"
@@ -114,16 +112,6 @@ var (
Usage: "VictoriaMetrics password for basic auth",
EnvVars: []string{"VM_PASSWORD"},
},
&cli.StringFlag{
Name: vmHeaders,
Usage: "Optional HTTP headers to send with each request to the corresponding destination address. \n" +
"For example, --vm-headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding destination address. \n" +
"Multiple headers must be delimited by '^^': --vm-headers='header1:value1^^header2:value2'",
},
&cli.StringFlag{
Name: vmBearerToken,
Usage: "Optional bearer auth token to use for the corresponding --vm-addr",
},
&cli.StringFlag{
Name: vmAccountID,
Usage: "AccountID is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). \n" +

View File

@@ -43,7 +43,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
func (ip *influxProcessor) run(ctx context.Context) error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %w", err)
return fmt.Errorf("explore query failed: %s", err)
}
if len(series) < 1 {
return fmt.Errorf("found no timeseries to import")
@@ -71,7 +71,7 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := ip.do(s); err != nil {
influxErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for %q.%q: %w", s.Measurement, s.Field, err)
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
return
}
influxSeriesProcessed.Inc()
@@ -84,10 +84,10 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for _, s := range series {
select {
case infErr := <-errCh:
return fmt.Errorf("influx error: %w", infErr)
return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors():
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
case seriesCh <- s:
}
}
@@ -100,11 +100,11 @@ func (ip *influxProcessor) run(ctx context.Context) error {
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
influxErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, ip.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, ip.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
log.Println("Import finished!")
@@ -119,7 +119,7 @@ const valueField = "value"
func (ip *influxProcessor) do(s *influx.Series) error {
cr, err := ip.ic.FetchDataPoints(s)
if err != nil {
return fmt.Errorf("failed to fetch datapoints: %w", err)
return fmt.Errorf("failed to fetch datapoints: %s", err)
}
defer func() {
_ = cr.Close()

View File

@@ -96,10 +96,10 @@ func NewClient(cfg Config) (*Client, error) {
}
hc, err := influx.NewHTTPClient(c)
if err != nil {
return nil, fmt.Errorf("failed to establish conn: %w", err)
return nil, fmt.Errorf("failed to establish conn: %s", err)
}
if _, _, err := hc.Ping(time.Second); err != nil {
return nil, fmt.Errorf("ping failed: %w", err)
return nil, fmt.Errorf("ping failed: %s", err)
}
chunkSize := cfg.ChunkSize
@@ -155,7 +155,7 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": ["value1", "value2"]}
mFields, err := c.fieldsByMeasurement()
if err != nil {
return nil, fmt.Errorf("failed to get field keys: %w", err)
return nil, fmt.Errorf("failed to get field keys: %s", err)
}
if len(mFields) < 1 {
@@ -165,12 +165,12 @@ func (c *Client) Explore() ([]*Series, error) {
// {"measurement1": {"tag1", "tag2"}}
measurementTags, err := c.getMeasurementTags()
if err != nil {
return nil, fmt.Errorf("failed to get tags of measurements: %w", err)
return nil, fmt.Errorf("failed to get tags of measurements: %s", err)
}
series, err := c.getSeries()
if err != nil {
return nil, fmt.Errorf("failed to get series: %w", err)
return nil, fmt.Errorf("failed to get series: %s", err)
}
var iSeries []*Series
@@ -237,7 +237,7 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
return nil, nil, err
}
if resp.Error() != nil {
return nil, nil, fmt.Errorf("response error for %s: %w", cr.iq.Command, resp.Error())
return nil, nil, fmt.Errorf("response error for %s: %s", cr.iq.Command, resp.Error())
}
if len(resp.Results) != 1 {
return nil, nil, fmt.Errorf("unexpected number of results in response: %d", len(resp.Results))
@@ -265,7 +265,8 @@ func (cr *ChunkedResponse) Next() ([]int64, []float64, error) {
for i, fv := range fieldValues {
v, err := toFloat64(fv)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %w", cr.field, v, err)
return nil, nil, fmt.Errorf("failed to convert value %q.%v to float64: %s",
cr.field, v, err)
}
values[i] = v
}
@@ -293,7 +294,7 @@ func (c *Client) FetchDataPoints(s *Series) (*ChunkedResponse, error) {
}
cr, err := c.QueryAsChunk(iq)
if err != nil {
return nil, fmt.Errorf("query %q err: %w", iq.Command, err)
return nil, fmt.Errorf("query %q err: %s", iq.Command, err)
}
return &ChunkedResponse{cr, iq, s.Field}, nil
}
@@ -307,7 +308,7 @@ func (c *Client) fieldsByMeasurement() (map[string][]string, error) {
log.Printf("fetching fields: %s", stringify(q))
qValues, err := c.do(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
var total int
@@ -351,7 +352,7 @@ func (c *Client) getSeries() ([]*Series, error) {
log.Printf("fetching series: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
const key = "key"
@@ -365,7 +366,7 @@ func (c *Client) getSeries() ([]*Series, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -416,7 +417,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
log.Printf("fetching tag keys: %s", stringify(q))
cr, err := c.QueryAsChunk(q)
if err != nil {
return nil, fmt.Errorf("error while executing query %q: %w", q.Command, err)
return nil, fmt.Errorf("error while executing query %q: %s", q.Command, err)
}
const tagKey = "tagKey"
@@ -431,7 +432,7 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
return nil, err
}
if resp.Error() != nil {
return nil, fmt.Errorf("response error for query %q: %w", q.Command, resp.Error())
return nil, fmt.Errorf("response error for query %q: %s", q.Command, resp.Error())
}
qValues, err := parseResult(resp.Results[0])
if err != nil {
@@ -454,10 +455,10 @@ func (c *Client) getMeasurementTags() (map[string]map[string]struct{}, error) {
func (c *Client) do(q influx.Query) ([]queryValues, error) {
res, err := c.Query(q)
if err != nil {
return nil, fmt.Errorf("query error: %w", err)
return nil, fmt.Errorf("query error: %s", err)
}
if res.Error() != nil {
return nil, fmt.Errorf("response error: %w", res.Error())
return nil, fmt.Errorf("response error: %s", res.Error())
}
if len(res.Results) < 1 {
return nil, fmt.Errorf("query returned 0 results")

View File

@@ -71,7 +71,7 @@ func toFloat64(v any) (float64, error) {
func parseDate(dateStr string) (int64, error) {
startTime, err := time.Parse(time.RFC3339, dateStr)
if err != nil {
return 0, fmt.Errorf("cannot parse %q: %w", dateStr, err)
return 0, fmt.Errorf("cannot parse %q: %s", dateStr, err)
}
return startTime.UnixNano() / 1e6, nil
}
@@ -92,7 +92,7 @@ func (s *Series) unmarshal(v string) error {
var err error
s.LabelPairs, err = unmarshalTags(v[n+1:], noEscapeChars)
if err != nil {
return fmt.Errorf("failed to unmarhsal tags: %w", err)
return fmt.Errorf("failed to unmarhsal tags: %s", err)
}
return nil
}

View File

@@ -88,7 +88,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_opentsdb")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %w", otsdbAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %s", otsdbAddr, addr, err)
}
oCfg := opentsdb.Config{
Addr: addr,
@@ -103,17 +103,17 @@ func main() {
}
otsdbClient, err := opentsdb.NewClient(oCfg)
if err != nil {
return fmt.Errorf("failed to create opentsdb client: %w", err)
return fmt.Errorf("failed to create opentsdb client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency), c.Bool(globalVerbose))
@@ -137,7 +137,7 @@ func main() {
tc, err := promauth.NewTLSConfig(certFile, keyFile, caFile, serverName, insecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
iCfg := influx.Config{
@@ -157,17 +157,17 @@ func main() {
influxClient, err := influx.NewClient(iCfg)
if err != nil {
return fmt.Errorf("failed to create influx client: %w", err)
return fmt.Errorf("failed to create influx client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
processor := newInfluxProcessor(
@@ -203,7 +203,7 @@ func main() {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_remoteread")
if err != nil {
return fmt.Errorf("failed to create transport for -%s=%q: %w", remoteReadSrcAddr, addr, err)
return fmt.Errorf("failed to create transport for -%s=%q: %s", remoteReadSrcAddr, addr, err)
}
// Backwards compatible default values if none provided by user
@@ -227,17 +227,17 @@ func main() {
DisablePathAppend: c.Bool(remoteReadDisablePathAppend),
})
if err != nil {
return fmt.Errorf("error create remote read client: %w", err)
return fmt.Errorf("error create remote read client: %s", err)
}
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err := vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
rmp := remoteReadProcessor{
@@ -265,12 +265,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
promCfg := prometheus.Config{
@@ -285,7 +285,7 @@ func main() {
}
cl, err := prometheus.NewClient(promCfg)
if err != nil {
return fmt.Errorf("failed to create prometheus client: %w", err)
return fmt.Errorf("failed to create prometheus client: %s", err)
}
pp := prometheusProcessor{
@@ -307,12 +307,12 @@ func main() {
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
mCfg := mimir.Config{
@@ -335,7 +335,7 @@ func main() {
}
cl, err := mimir.NewClient(ctx, mCfg)
if err != nil {
return fmt.Errorf("failed to create mimir client: %w", err)
return fmt.Errorf("failed to create mimir client: %s", err)
}
pp := prometheusProcessor{
@@ -356,12 +356,12 @@ func main() {
fmt.Println("Thanos import mode")
vmCfg, err := initConfigVM(c)
if err != nil {
return fmt.Errorf("failed to init VM configuration: %w", err)
return fmt.Errorf("failed to init VM configuration: %s", err)
}
importer, err = vm.NewImporter(ctx, vmCfg)
if err != nil {
return fmt.Errorf("failed to create VM importer: %w", err)
return fmt.Errorf("failed to create VM importer: %s", err)
}
thanosCfg := thanos.Config{
Snapshot: c.String(thanosSnapshot),
@@ -374,7 +374,7 @@ func main() {
}
cl, err := thanos.NewClient(thanosCfg)
if err != nil {
return fmt.Errorf("failed to create thanos client: %w", err)
return fmt.Errorf("failed to create thanos client: %s", err)
}
var aggrTypes []thanos.AggrType
@@ -382,7 +382,7 @@ func main() {
for _, typeStr := range aggrTypesStr {
aggrType, err := thanos.ParseAggrType(typeStr)
if err != nil {
return fmt.Errorf("failed to parse aggregate type %q: %w", typeStr, err)
return fmt.Errorf("failed to parse aggregate type %q: %s", typeStr, err)
}
aggrTypes = append(aggrTypes, aggrType)
}
@@ -415,7 +415,7 @@ func main() {
bfMinDuration := c.Duration(vmNativeBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return fmt.Errorf("failed to create backoff object: %w", err)
return fmt.Errorf("failed to create backoff object: %s", err)
}
disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive)
@@ -439,7 +439,7 @@ func main() {
srcTC, err := promauth.NewTLSConfig(srcCertFile, srcKeyFile, srcCAFile, srcServerName, srcInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trSrc := httputil.NewTransport(false, "vmctl_src")
@@ -457,7 +457,7 @@ func main() {
auth.WithBearer(c.String(vmNativeDstBearerToken)),
auth.WithHeaders(c.String(vmNativeDstHeaders)))
if err != nil {
return fmt.Errorf("error initialize auth config for destination: %s: %s", dstAddr, err)
return fmt.Errorf("error initialize auth config for destination: %s", dstAddr)
}
// create TLS config
@@ -469,7 +469,7 @@ func main() {
dstTC, err := promauth.NewTLSConfig(dstCertFile, dstKeyFile, dstCAFile, dstServerName, dstInsecureSkipVerify)
if err != nil {
return fmt.Errorf("failed to create TLS Config: %w", err)
return fmt.Errorf("failed to create TLS Config: %s", err)
}
trDst := httputil.NewTransport(false, "vmctl_dst")
@@ -534,7 +534,7 @@ func main() {
log.Printf("verifying block at path=%q", blockPath)
f, err := os.OpenFile(blockPath, os.O_RDONLY, 0600)
if err != nil {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q: %w", blockPath, err), 1)
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
}
defer f.Close()
var blocksCount atomic.Uint64
@@ -542,7 +542,7 @@ func main() {
blocksCount.Add(1)
return nil
}); err != nil {
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d: %w", blockPath, blocksCount.Load(), err), 1)
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1)
}
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load())
return nil
@@ -585,7 +585,7 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
tr, err := promauth.NewTLSTransport(certFile, keyFile, caFile, serverName, insecureSkipVerify, "vmctl_client")
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %w", vmAddr, addr, err)
return vm.Config{}, fmt.Errorf("failed to create transport for -%s=%q: %s", vmAddr, addr, err)
}
bfRetries := c.Int(vmBackoffRetries)
@@ -593,21 +593,14 @@ func initConfigVM(c *cli.Context) (vm.Config, error) {
bfMinDuration := c.Duration(vmBackoffMinDuration)
bf, err := backoff.New(bfRetries, bfFactor, bfMinDuration)
if err != nil {
return vm.Config{}, fmt.Errorf("failed to create backoff object: %w", err)
}
authCfg, err := auth.Generate(
auth.WithBasicAuth(c.String(vmUser), c.String(vmPassword)),
auth.WithBearer(c.String(vmBearerToken)),
auth.WithHeaders(c.String(vmHeaders)))
if err != nil {
return vm.Config{}, fmt.Errorf("error initialize auth config for destination: %s: %s", addr, err)
return vm.Config{}, fmt.Errorf("failed to create backoff object: %s", err)
}
return vm.Config{
Addr: addr,
Transport: tr,
AuthCfg: authCfg,
User: c.String(vmUser),
Password: c.String(vmPassword),
Concurrency: uint8(c.Int(vmConcurrency)),
Compress: c.Bool(vmCompress),
AccountID: c.String(vmAccountID),

View File

@@ -54,7 +54,7 @@ func (lbr *lazyBlockReader) initialize() error {
// fetching block and parse it and store it in lbr.reader
temp, err := lbr.mkTempDir()
if err != nil {
return fmt.Errorf("failed to create temp dir: %w", err)
return fmt.Errorf("failed to create temp dir: %s", err)
}
lbr.tempDirPath = temp
@@ -85,7 +85,7 @@ func (lbr *lazyBlockReader) initialize() error {
return fmt.Errorf("failed to fetch chunk file: %q: %w", chunkName, err)
}
if err := lbr.writeFile(temp, blockChunkPath, chunk); err != nil {
return fmt.Errorf("failed to write chunk file: %q: %w", chunkName, err)
return fmt.Errorf("failed to write chunk file: %q: %s", chunkName, err)
}
}
@@ -135,7 +135,7 @@ func (lbr *lazyBlockReader) Meta() tsdb.BlockMeta {
// Size returns the number of bytes that the block takes up on disk.
func (lbr *lazyBlockReader) Size() int64 {
if err := lbr.initialize(); err != nil {
lbr.err = fmt.Errorf("error get Size of the block: %w, return zero size", err)
lbr.err = fmt.Errorf("error get Size of the block: %s, return zero size", err)
return 0
}
return lbr.reader.Size()
@@ -167,11 +167,11 @@ func (lbr *lazyBlockReader) Close() error {
func (lbr *lazyBlockReader) mkTempDir() (string, error) {
temp, err := os.MkdirTemp("", lbr.ID.String())
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
return "", fmt.Errorf("failed to create temp dir: %s", err)
}
err = os.Mkdir(filepath.Join(temp, "chunks"), os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create temp dir: %w", err)
return "", fmt.Errorf("failed to create temp dir: %s", err)
}
return temp, nil
}

View File

@@ -133,11 +133,11 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
c.RemoteFS = rfs
timeMin, err := utils.ParseTime(cfg.Filter.TimeMin)
if err != nil {
return nil, fmt.Errorf("failed to parse min time in filter: %w", err)
return nil, fmt.Errorf("failed to parse min time in filter: %s", err)
}
timeMax, err := utils.ParseTime(cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse max time in filter: %w", err)
return nil, fmt.Errorf("failed to parse max time in filter: %s", err)
}
c.filter = filter{
min: timeMin.UnixMilli(),
@@ -156,7 +156,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
indexFile, err := c.fetchIndexFile()
if err != nil {
return nil, fmt.Errorf("failed to fetch index file: %w", err)
return nil, fmt.Errorf("failed to fetch index file: %s", err)
}
var blocksToImport []tsdb.BlockReader
@@ -172,7 +172,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
lazyBlockReader, err := newLazyBlockReader(block, c.RemoteFS)
if err != nil {
return nil, fmt.Errorf("failed to create lazy block reader: %w", err)
return nil, fmt.Errorf("failed to create lazy block reader: %s", err)
}
blocksToImport = append(blocksToImport, lazyBlockReader)
}
@@ -185,7 +185,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
func (c *Client) Read(ctx context.Context, block tsdb.BlockReader) (*prometheus.CloseableSeriesSet, error) {
meta := block.Meta()
if b, ok := block.(*lazyBlockReader); ok && b.Err() != nil {
return nil, fmt.Errorf("failed to read block: %w", b.Err())
return nil, fmt.Errorf("failed to read block: %s", b.Err())
}
if meta.ULID.String() == "" {
@@ -218,20 +218,20 @@ func (c *Client) fetchIndexFile() (*Index, error) {
file, err := c.ReadFile(bucketIndexCompressedFilename)
if err != nil {
return nil, fmt.Errorf("failed to read bucket index: %w", err)
return nil, fmt.Errorf("failed to read bucket index: %s", err)
}
r := bytes.NewReader(file)
// Read all the content.
gzipReader, err := gzip.NewReader(r)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
return nil, fmt.Errorf("failed to create gzip reader: %s", err)
}
var indexFile Index
err = json.NewDecoder(gzipReader).Decode(&indexFile)
if err != nil {
return nil, fmt.Errorf("failed to decode bucket index: %w", err)
return nil, fmt.Errorf("failed to decode bucket index: %s", err)
}
return &indexFile, nil

View File

@@ -47,7 +47,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exploreRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %w", url, err)
return nil, fmt.Errorf("cannot create request to %q: %s", url, err)
}
params := req.URL.Query()
@@ -60,14 +60,14 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start,
if err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("series request failed: %w", err)
return nil, fmt.Errorf("series request failed: %s", err)
}
var response Response
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
exploreRequestsErrorsTotal.Inc()
exploreDuration.UpdateDuration(startTime)
return nil, fmt.Errorf("cannot decode series response: %w", err)
return nil, fmt.Errorf("cannot decode series response: %s", err)
}
exploreDuration.UpdateDuration(startTime)
return response.MetricNames, resp.Body.Close()
@@ -80,19 +80,19 @@ func (c *Client) ImportPipe(ctx context.Context, dstURL string, pr *io.PipeReade
req, err := http.NewRequestWithContext(ctx, http.MethodPost, dstURL, pr)
if err != nil {
importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create import request to %q: %w", c.Addr, err)
return fmt.Errorf("cannot create import request to %q: %s", c.Addr, err)
}
importResp, err := c.do(req, http.StatusNoContent)
if err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("import request failed: %w", err)
return fmt.Errorf("import request failed: %s", err)
}
if err := importResp.Body.Close(); err != nil {
importRequestsErrorsTotal.Inc()
importDuration.UpdateDuration(startTime)
return fmt.Errorf("cannot close import response body: %w", err)
return fmt.Errorf("cannot close import response body: %s", err)
}
importDuration.UpdateDuration(startTime)
return nil
@@ -105,7 +105,7 @@ func (c *Client) ExportPipe(ctx context.Context, url string, f Filter) (io.ReadC
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
exportRequestsErrorsTotal.Inc()
return nil, fmt.Errorf("cannot create request to %q: %w", c.Addr, err)
return nil, fmt.Errorf("cannot create request to %q: %s", c.Addr, err)
}
params := req.URL.Query()
@@ -136,7 +136,7 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
u := fmt.Sprintf("%s/%s", c.Addr, nativeTenantsAddr)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request to %q: %w", u, err)
return nil, fmt.Errorf("cannot create request to %q: %s", u, err)
}
params := req.URL.Query()
@@ -150,18 +150,18 @@ func (c *Client) GetSourceTenants(ctx context.Context, f Filter) ([]string, erro
resp, err := c.do(req, http.StatusOK)
if err != nil {
return nil, fmt.Errorf("tenants request failed: %w", err)
return nil, fmt.Errorf("tenants request failed: %s", err)
}
var r struct {
Tenants []string `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, fmt.Errorf("cannot decode tenants response: %w", err)
return nil, fmt.Errorf("cannot decode tenants response: %s", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close tenants response body: %w", err)
return nil, fmt.Errorf("cannot close tenants response body: %s", err)
}
return r.Tenants, nil
@@ -180,7 +180,7 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
if resp.StatusCode != expSC {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
return nil, fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
}
return nil, fmt.Errorf("unexpected response code %d: %s", resp.StatusCode, string(body))
}

View File

@@ -47,7 +47,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit)
m, err := op.oc.FindMetrics(q)
if err != nil {
return fmt.Errorf("metric discovery failed for %q: %w", q, err)
return fmt.Errorf("metric discovery failed for %q: %s", q, err)
}
metrics = append(metrics, m...)
}
@@ -76,7 +76,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
log.Printf("Starting work on %s", metric)
serieslist, err := op.oc.FindSeries(metric)
if err != nil {
return fmt.Errorf("couldn't retrieve series list for %s: %w", metric, err)
return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err)
}
/*
Create channels for collecting/processing series and errors
@@ -95,7 +95,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for s := range seriesCh {
if err := op.do(s); err != nil {
otsdbErrorsTotal.Inc()
errCh <- fmt.Errorf("couldn't retrieve series for %s: %w", metric, err)
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
return
}
otsdbSeriesProcessed.Inc()
@@ -112,7 +112,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
// check for any lingering errors on the query side
for otsdbErr := range errCh {
if runErr == nil {
runErr = fmt.Errorf("import process failed:\n%w", otsdbErr)
runErr = fmt.Errorf("import process failed: \n%s", otsdbErr)
}
}
bar.Finish()
@@ -125,7 +125,7 @@ func (op *otsdbProcessor) run(ctx context.Context) error {
for vmErr := range op.im.Errors() {
if vmErr.Err != nil {
otsdbErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
}
}
log.Println("Import finished!")
@@ -141,12 +141,12 @@ func (op *otsdbProcessor) sendQueries(ctx context.Context, serieslist []opentsdb
for _, tr := range rt.QueryRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled: %w", ctx.Err())
return fmt.Errorf("context canceled: %s", ctx.Err())
case otsdbErr := <-errCh:
otsdbErrorsTotal.Inc()
return fmt.Errorf("opentsdb error: %w", otsdbErr)
return fmt.Errorf("opentsdb error: %s", otsdbErr)
case vmErr := <-op.im.Errors():
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, op.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, op.isVerbose))
case seriesCh <- queryObj{
Tr: tr, StartTime: startTime,
Series: series, Rt: opentsdb.RetentionMeta{
@@ -166,7 +166,7 @@ func (op *otsdbProcessor) do(s queryObj) error {
end := s.StartTime - s.Tr.End
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
if err != nil {
return fmt.Errorf("failed to collect data for %v in %v:%v :: %w", s.Series, s.Rt, s.Tr, err)
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
}
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
log.Printf("no data found for %v in %v:%v...skipping", s.Series, s.Rt, s.Tr)

View File

@@ -106,7 +106,7 @@ func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -114,12 +114,12 @@ func (c Client) FindMetrics(q string) ([]string, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %w", q, err)
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return metriclist, nil
}
@@ -130,7 +130,7 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := c.c.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != 200 {
@@ -138,12 +138,12 @@ func (c Client) FindSeries(metric string) ([]Meta, error) {
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %w", q, err)
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %w", q, err)
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return results.Results, nil
}
@@ -183,7 +183,7 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64, m
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := c.c.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %w", q, err)
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
defer func() { _ = resp.Body.Close() }()
/*
@@ -303,7 +303,7 @@ func NewClient(cfg Config) (*Client, error) {
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %w", r, err)
return &Client{}, fmt.Errorf("couldn't parse retention %q :: %v", r, err)
}
retentions = append(retentions, ret)
}

View File

@@ -88,7 +88,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
}
queryLengthDuration, err := convertDuration(chunks[2])
if err != nil {
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %w", chunks[2], err)
return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err)
}
// set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
queryLength := queryLengthDuration.Milliseconds()
@@ -110,7 +110,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
aggTimeDuration, err := convertDuration(aggregates[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %w", aggregates[1], err)
return Retention{}, fmt.Errorf("invalid aggregation time duration string: %q: %s", aggregates[1], err)
}
aggTime := aggTimeDuration.Milliseconds()
if !msecTime {
@@ -119,7 +119,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention,
rowLengthDuration, err := convertDuration(chunks[1])
if err != nil {
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %w", chunks[1], err)
return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err)
}
// set length of each row in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds
rowLength := rowLengthDuration.Milliseconds()

View File

@@ -46,7 +46,7 @@ type prometheusProcessor struct {
func (pp *prometheusProcessor) run(ctx context.Context) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %w", err)
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -57,7 +57,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
}
if err := pp.processBlocks(ctx, blocks); err != nil {
return fmt.Errorf("migration failed: %w", err)
return fmt.Errorf("migration failed: %s", err)
}
log.Println("Import finished!")
@@ -68,7 +68,7 @@ func (pp *prometheusProcessor) run(ctx context.Context) error {
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
css, err := pp.cl.Read(ctx, b)
if err != nil {
return fmt.Errorf("failed to read block: %w", err)
return fmt.Errorf("failed to read block: %s", err)
}
defer func() {
if err := css.Close(); err != nil {
@@ -146,7 +146,7 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for br := range blockReadersCh {
if err := pp.do(ctx, br); err != nil {
promErrorsTotal.Inc()
errCh <- fmt.Errorf("cannot read block %q: %w", br.Meta().ULID, err)
errCh <- fmt.Errorf("cannot read block %q: %s", br.Meta().ULID, err)
return
}
if cb, ok := br.(io.Closer); ok {
@@ -164,11 +164,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %w", promErr)
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
case blockReadersCh <- br:
}
}
@@ -182,11 +182,11 @@ func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
promErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, pp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, pp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil

View File

@@ -59,12 +59,12 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
db, err := tsdb.OpenDBReadOnly(cfg.Snapshot, cfg.TemporaryDir, nil)
if err != nil {
return nil, fmt.Errorf("failed to open snapshot %q: %w", cfg.Snapshot, err)
return nil, fmt.Errorf("failed to open snapshot %q: %s", cfg.Snapshot, err)
}
c := &Client{DBReadOnly: db}
timeMin, timeMax, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
}
c.filter = filter{
min: timeMin,
@@ -83,7 +83,7 @@ func NewClient(cfg Config) (*Client, error) {
func (c *Client) Explore() ([]tsdb.BlockReader, error) {
blocks, err := c.Blocks()
if err != nil {
return nil, fmt.Errorf("failed to fetch blocks: %w", err)
return nil, fmt.Errorf("failed to fetch blocks: %s", err)
}
s := &vmctlutil.Stats{
Filtered: c.filter.min != 0 || c.filter.max != 0 || c.filter.label != "",
@@ -142,14 +142,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -44,7 +44,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
ranges, err := stepper.SplitDateRange(*rrp.filter.timeStart, *rrp.filter.timeEnd, rrp.filter.chunk, rrp.filter.timeReverse)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %w", err)
return fmt.Errorf("failed to create date ranges for the given time filters: %v", err)
}
question := fmt.Sprintf("Selected time range %q - %q will be split into %d ranges according to %q step. Continue?",
@@ -74,7 +74,7 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for r := range rangeC {
if err := rrp.do(ctx, r); err != nil {
remoteReadErrorsTotal.Inc()
errCh <- fmt.Errorf("request failed for: %w", err)
errCh <- fmt.Errorf("request failed for: %s", err)
return
}
remoteReadRangesProcessed.Inc()
@@ -86,10 +86,10 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for _, r := range ranges {
select {
case infErr := <-errCh:
return fmt.Errorf("remote read error: %w", infErr)
return fmt.Errorf("remote read error: %s", infErr)
case vmErr := <-rrp.dst.Errors():
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
case rangeC <- &remoteread.Filter{
StartTimestampMs: r[0].UnixMilli(),
EndTimestampMs: r[1].UnixMilli(),
@@ -105,11 +105,11 @@ func (rrp *remoteReadProcessor) run(ctx context.Context) error {
for vmErr := range rrp.dst.Errors() {
if vmErr.Err != nil {
remoteReadErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, rrp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, rrp.isVerbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil
@@ -119,7 +119,7 @@ func (rrp *remoteReadProcessor) do(ctx context.Context, filter *remoteread.Filte
return rrp.src.Read(ctx, filter, func(series *vm.TimeSeries) error {
if err := rrp.dst.Input(series); err != nil {
return fmt.Errorf(
"failed to read data for time range start: %d, end: %d: %w",
"failed to read data for time range start: %d, end: %d, %s",
filter.StartTimestampMs, filter.EndTimestampMs, err)
}
return nil

View File

@@ -157,7 +157,7 @@ func (c *Client) Read(ctx context.Context, filter *Filter, streamCb StreamCallba
if errors.Is(err, context.Canceled) {
return fmt.Errorf("fetch request has ben cancelled")
}
return fmt.Errorf("error while fetching data from remote storage: %w", err)
return fmt.Errorf("error while fetching data from remote storage: %s", err)
}
return nil
}

View File

@@ -52,7 +52,7 @@ func (f filter) inRange(minV, maxV int64) bool {
func NewClient(cfg Config) (*Client, error) {
minTime, maxTime, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
if err != nil {
return nil, fmt.Errorf("failed to parse time in filter: %w", err)
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
}
return &Client{
snapshotPath: cfg.Snapshot,
@@ -183,14 +183,14 @@ func parseTime(start, end string) (int64, int64, error) {
if start != "" {
v, err := time.Parse(time.RFC3339, start)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", start, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", start, err)
}
s = v.UnixNano() / int64(time.Millisecond)
}
if end != "" {
v, err := time.Parse(time.RFC3339, end)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse %q: %w", end, err)
return 0, 0, fmt.Errorf("failed to parse %q: %s", end, err)
}
e = v.UnixNano() / int64(time.Millisecond)
}

View File

@@ -36,7 +36,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
// Use the first aggregate type to explore blocks (block list is the same for all types)
blocks, err := tp.cl.Explore(tp.aggrTypes[0])
if err != nil {
return fmt.Errorf("explore failed: %w", err)
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
@@ -84,7 +84,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
log.Println("Processing raw blocks (resolution=0)...")
stats, err := tp.processBlocks(rawBlocks, thanos.AggrTypeNone, bar)
if err != nil {
return fmt.Errorf("migration failed for raw blocks: %w", err)
return fmt.Errorf("migration failed for raw blocks: %s", err)
}
phases = append(phases, phaseStats{
name: "raw",
@@ -108,7 +108,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
aggrBlocks, err := tp.cl.Explore(aggrType)
if err != nil {
return fmt.Errorf("explore failed for aggr type %s: %w", aggrType, err)
return fmt.Errorf("explore failed for aggr type %s: %s", aggrType, err)
}
var downsampledOnly []thanos.BlockInfo
@@ -128,7 +128,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
stats, err := tp.processBlocks(downsampledOnly, aggrType, bar)
thanos.CloseBlocks(aggrBlocks)
if err != nil {
return fmt.Errorf("migration failed for aggr type %s: %w", aggrType, err)
return fmt.Errorf("migration failed for aggr type %s: %s", aggrType, err)
}
phases = append(phases, phaseStats{
name: aggrType.String(),
@@ -153,7 +153,7 @@ func (tp *thanosProcessor) run(ctx context.Context) error {
for vmErr := range tp.im.Errors() {
if vmErr.Err != nil {
thanosErrorsTotal.Inc()
return fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
}
}
@@ -184,7 +184,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
seriesCount, samplesCount, err := tp.do(bi, aggrType)
if err != nil {
thanosErrorsTotal.Inc()
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %w", bi.Block.Meta().ULID, aggrType, err)
errCh <- fmt.Errorf("read failed for block %q with aggr %s: %s", bi.Block.Meta().ULID, aggrType, err)
return
}
@@ -209,12 +209,12 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
case thanosErr := <-errCh:
close(blockReadersCh)
wg.Wait()
return processBlocksStats{}, fmt.Errorf("thanos error: %w", thanosErr)
return processBlocksStats{}, fmt.Errorf("thanos error: %s", thanosErr)
case vmErr := <-tp.im.Errors():
close(blockReadersCh)
wg.Wait()
thanosErrorsTotal.Inc()
return processBlocksStats{}, fmt.Errorf("import process failed: %w", wrapErr(vmErr, tp.isVerbose))
return processBlocksStats{}, fmt.Errorf("import process failed: %s", wrapErr(vmErr, tp.isVerbose))
case blockReadersCh <- bi:
}
}
@@ -223,7 +223,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
wg.Wait()
close(errCh)
for err := range errCh {
return processBlocksStats{}, fmt.Errorf("import process failed: %w", err)
return processBlocksStats{}, fmt.Errorf("import process failed: %s", err)
}
return processBlocksStats{
@@ -236,7 +236,7 @@ func (tp *thanosProcessor) processBlocks(blocks []thanos.BlockInfo, aggrType tha
func (tp *thanosProcessor) do(bi thanos.BlockInfo, aggrType thanos.AggrType) (uint64, uint64, error) {
ss, err := tp.cl.Read(bi)
if err != nil {
return 0, 0, fmt.Errorf("failed to read block: %w", err)
return 0, 0, fmt.Errorf("failed to read block: %s", err)
}
defer ss.Close() // Ensure querier is closed even on early returns

View File

@@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
@@ -28,8 +27,6 @@ type Config struct {
// --httpListenAddr value for single node version
// --httpListenAddr value of vmselect component for cluster version
Addr string
AuthCfg *auth.Config
// Transport allows specifying custom http.Transport
Transport *http.Transport
// Concurrency defines number of worker
@@ -43,6 +40,10 @@ type Config struct {
// BatchSize defines how many samples
// importer collects before sending the import request
BatchSize int
// User name for basic auth
User string
// Password for basic auth
Password string
// SignificantFigures defines the number of significant figures to leave
// in metric values before importing.
// Zero value saves all the significant decimal places
@@ -64,10 +65,11 @@ type Config struct {
// see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data
type Importer struct {
addr string
authCfg *auth.Config
client *http.Client
importPath string
compress bool
user string
password string
close chan struct{}
input chan *TimeSeries
@@ -146,7 +148,8 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
client: client,
importPath: importPath,
compress: cfg.Compress,
authCfg: cfg.AuthCfg,
user: cfg.User,
password: cfg.Password,
rl: limiter.NewLimiter(cfg.RateLimit),
close: make(chan struct{}),
input: make(chan *TimeSeries, cfg.Concurrency*4),
@@ -160,7 +163,7 @@ func NewImporter(ctx context.Context, cfg Config) (*Importer, error) {
importDuration: metrics.GetOrCreateHistogram(`vmctl_importer_request_duration_seconds`),
}
if err := im.Ping(); err != nil {
return nil, fmt.Errorf("ping to %q failed: %w", addr, err)
return nil, fmt.Errorf("ping to %q failed: %s", addr, err)
}
if cfg.BatchSize < 1 {
@@ -286,7 +289,7 @@ func (im *Importer) flush(ctx context.Context, b []*TimeSeries) error {
retryableFunc := func() error { return im.Import(b) }
attempts, err := im.backoff.Retry(ctx, retryableFunc)
if err != nil {
return fmt.Errorf("import failed with %d retries: %w", attempts, err)
return fmt.Errorf("import failed with %d retries: %s", attempts, err)
}
im.s.Lock()
im.s.retries = attempts
@@ -299,10 +302,10 @@ func (im *Importer) Ping() error {
url := fmt.Sprintf("%s/health", im.addr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
resp, err := im.client.Do(req)
if err != nil {
@@ -329,10 +332,10 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
req, err := http.NewRequest(http.MethodPost, im.importPath, pr)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("cannot create request to %q: %w", im.addr, err)
return fmt.Errorf("cannot create request to %q: %s", im.addr, err)
}
if im.authCfg != nil {
im.authCfg.SetHeaders(req, true)
if im.user != "" {
req.SetBasicAuth(im.user, im.password)
}
if im.compress {
req.Header.Set("Content-Encoding", "gzip")
@@ -349,7 +352,7 @@ func (im *Importer) Import(tsBatch []*TimeSeries) error {
zw, err := gzip.NewWriterLevel(w, 1)
if err != nil {
im.importRequestsErrorsTotal.Inc()
return fmt.Errorf("unexpected error when creating gzip writer: %w", err)
return fmt.Errorf("unexpected error when creating gzip writer: %s", err)
}
w = zw
}
@@ -408,7 +411,7 @@ var ErrBadRequest = errors.New("bad request")
func (im *Importer) do(req *http.Request) error {
resp, err := im.client.Do(req)
if err != nil {
return fmt.Errorf("unexpected error when performing request: %w", err)
return fmt.Errorf("unexpected error when performing request: %s", err)
}
defer func() {
_ = resp.Body.Close()
@@ -416,7 +419,7 @@ func (im *Importer) do(req *http.Request) error {
if resp.StatusCode != http.StatusNoContent {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body for status code %d: %w", resp.StatusCode, err)
return fmt.Errorf("failed to read response body for status code %d: %s", resp.StatusCode, err)
}
if resp.StatusCode == http.StatusBadRequest {
return fmt.Errorf("%w: unexpected response code %d: %s", ErrBadRequest, resp.StatusCode, string(body))

View File

@@ -55,14 +55,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
start, err := vmctlutil.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = vmctlutil.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
}
@@ -91,7 +91,7 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
err := p.runBackfilling(ctx, tenantID, ranges)
if err != nil {
migrationErrorsTotal.Inc()
return fmt.Errorf("migration failed: %w", err)
return fmt.Errorf("migration failed: %s", err)
}
if p.interCluster {
@@ -157,7 +157,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
}
default:
}
return fmt.Errorf("failed to write into %q: %w", p.dst.Addr, err)
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
}
p.s.Lock()
@@ -184,7 +184,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
importAddr, err := vm.AddExtraLabelsToImportPath(importAddr, p.dst.ExtraLabels)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %w", err)
return fmt.Errorf("failed to add labels to import path: %s", err)
}
dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr)
@@ -222,7 +222,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
format = fmt.Sprintf(nativeWithBackoffTpl, barPrefix)
metricsMap, err = p.explore(ctx, p.src, tenantID, ranges)
if err != nil {
return fmt.Errorf("failed to explore metric names: %w", err)
return fmt.Errorf("failed to explore metric names: %s", err)
}
if len(metricsMap) == 0 {
errMsg := "no metrics found"
@@ -295,7 +295,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("export/import error: %w", infErr)
return fmt.Errorf("export/import error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
@@ -313,7 +313,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
close(errCh)
for err := range errCh {
return fmt.Errorf("import process failed: %w", err)
return fmt.Errorf("import process failed: %s", err)
}
return nil

View File

@@ -257,7 +257,7 @@ func (vms *VMStorage) requestHandler(w http.ResponseWriter, r *http.Request) boo
dealine, err = strconv.Atoi(deadlineStr)
if err != nil {
logger.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err)
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %w", deadlineStr, err))
jsonResponseError(w, fmt.Errorf("cannot parse `seconds` arg %q: %s", deadlineStr, err))
return true
}
}

View File

@@ -26,9 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See PR [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808) for details.
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix `increase` and `increase_prometheus` outputs producing inflated values when old samples update the baseline across interval boundaries with `ignore_old_samples: true` or `enable_windows: true`.
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -121,7 +121,7 @@ func (p *Password) initRandomValue() {
_, err := io.ReadFull(rand.Reader, buf[:])
if err != nil {
// cannot use lib/logger here, since it can be uninitialized yet
panic(fmt.Errorf("FATAL: cannot read random data: %w", err))
panic(fmt.Errorf("FATAL: cannot read random data: %s", err))
}
s := string(buf[:])
p.value.Store(&s)

View File

@@ -16,7 +16,7 @@ func ParseKey(key []byte) (any, error) {
k, err := x509.ParsePKIXPublicKey(b.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse key %q: %w", key, err)
return nil, fmt.Errorf("failed to parse key %q: %v", key, err)
}
return k, nil

View File

@@ -14,7 +14,7 @@ func BenchmarkWriteRequestUnmarshalProtobuf(b *testing.B) {
wru := &WriteRequestUnmarshaler{}
for pb.Next() {
if _, err := wru.UnmarshalProtobuf(data); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})

View File

@@ -97,12 +97,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var dedicatedServerDetails dedicatedServer
if err = json.Unmarshal(resp, &dedicatedServerDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// get IPs for this dedicated server.
@@ -113,12 +113,12 @@ func getDedicatedServerDetails(cfg *apiConfig, dedicatedServerName string) (*ded
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// handle different IP formats
@@ -141,11 +141,11 @@ func getDedicatedServerList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
if err = json.Unmarshal(resp, &dedicatedServerList); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
return dedicatedServerList, nil

View File

@@ -117,12 +117,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var vpsDetails virtualPrivateServer
if err = json.Unmarshal(resp, &vpsDetails); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// get IPs for this vps.
@@ -133,12 +133,12 @@ func getVPSDetails(cfg *apiConfig, vpsName string) (*virtualPrivateServer, error
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var ips []string
if err = json.Unmarshal(resp, &ips); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
// handle different IP formats
@@ -162,12 +162,12 @@ func getVPSList(cfg *apiConfig) ([]string, error) {
request.Header, _ = getAuthHeaders(cfg, request.Header, cfg.client.APIServer(), reqPath)
})
if err != nil {
return nil, fmt.Errorf("cannot process %s: %w", reqPath, err)
return nil, fmt.Errorf("request %s error: %v", reqPath, err)
}
var vpsList []string
if err = json.Unmarshal(resp, &vpsList); err != nil {
return nil, fmt.Errorf("cannot unmarshal response from %s: %w", reqPath, err)
return nil, fmt.Errorf("cannot unmarshal %s response: %v", reqPath, err)
}
return vpsList, nil

View File

@@ -33,7 +33,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
}
parsedURL, err := url.Parse(sdc.URL)
if err != nil {
return nil, fmt.Errorf("cannot parse %s: %w", sdc.URL, err)
return nil, fmt.Errorf("parse URL %s error: %v", sdc.URL, err)
}
if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" {
return nil, fmt.Errorf("URL %s scheme must be 'http' or 'https'", sdc.URL)

View File

@@ -221,7 +221,7 @@ func getIAMToken(cfg *apiConfig) (*iamToken, error) {
body := bytes.NewBuffer(passport)
resp, err := cfg.client.Post(iamURL, "application/json", body)
if err != nil {
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %w", iamURL, err)
return nil, fmt.Errorf("cannot send request to yandex cloud iam api %q: %s", iamURL, err)
}
data, err := readResponseBody(resp, iamURL)
if err != nil {

View File

@@ -186,7 +186,7 @@ func (s *Series) unmarshalProtobuf(src []byte) (err error) {
}
pt := &points[len(points)-1]
if err := pt.unmarshalProtobuf(data); err != nil {
return fmt.Errorf("cannot unmarshal point: %w", err)
return fmt.Errorf("cannot unmarshal point: %s", err)
}
case 1:
data, ok := fc.MessageData()

View File

@@ -30,7 +30,7 @@ func ProcessRequestBody(b []byte) ([]byte, error) {
}
}
if err := json.Unmarshal(b, &req); err != nil {
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %w", err)
return nil, fmt.Errorf("cannot unmarshal Firehose JSON in request body: %s", err)
}
var dst []byte

View File

@@ -99,17 +99,17 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
n, err := getFloat64(o, "value")
if err != nil {
return fmt.Errorf("missing `value` element: %w", err)
return fmt.Errorf("missing `value` element, %s", err)
}
r.Value = n
cl, err := getInt64(o, "clock")
if err != nil {
return fmt.Errorf("missing `clock` element: %w", err)
return fmt.Errorf("missing `clock` element, %s", err)
}
ns, err := getInt64(o, "ns")
if err != nil {
return fmt.Errorf("missing `ns` element: %w", err)
return fmt.Errorf("missing `ns` element, %s", err)
}
// clock - Number of seconds since Epoch to the moment when value was collected (integer part).
// ns - Number of nanoseconds to be added to clock to get a precise value collection time.
@@ -121,7 +121,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
if len(groupValue) != 0 {
groups, err := getArray(o, "groups")
if err != nil {
return fmt.Errorf("missing `groups` element: %w", err)
return fmt.Errorf("missing `groups` element, %s", err)
}
for _, g := range groups {
k := g.GetStringBytes()
@@ -141,7 +141,7 @@ func (r *Row) unmarshal(o *fastjson.Value) error {
itemTags, err := getArray(o, "item_tags")
if err != nil {
return fmt.Errorf("missing `item_tags` element: %w", err)
return fmt.Errorf("missing `item_tags` element, %s", err)
}
if len(duplicateTagsSeparator) == 0 { // Do not merge tags

View File

@@ -71,9 +71,9 @@ func Create(ctx context.Context, createSnapshotURL string) (string, error) {
return snap.Snapshot, nil
}
if snap.Status == "error" {
return "", fmt.Errorf("snapshot status: %q; msg: %q", snap.Status, snap.Msg)
return "", errors.New(snap.Msg)
}
return "", fmt.Errorf("snapshot status unknown: %q", snap.Status)
return "", fmt.Errorf("unknown status: %v", snap.Status)
}
// Delete deletes a snapshot via the provided api endpoint
@@ -121,14 +121,14 @@ func Delete(ctx context.Context, deleteSnapshotURL string, snapshotName string)
if snap.Status == "error" {
return errors.New(snap.Msg)
}
return fmt.Errorf("snapshot status unknown: %q", snap.Status)
return fmt.Errorf("unknown status: %v", snap.Status)
}
// GetHTTPClient returns a new HTTP client configured for snapshot operations.
func GetHTTPClient() (*http.Client, error) {
tr, err := promauth.NewTLSTransport(*tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify, "vm_snapshot_client")
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
return nil, fmt.Errorf("failed to create transport: %s", err)
}
hc := &http.Client{
Transport: tr,

View File

@@ -649,7 +649,7 @@ func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer,
} else {
_, key, err := unmarshalCompositeTagKey(labelName)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal composite tag key: %w", err)
return nil, fmt.Errorf("cannot unmarshal composite tag key: %s", err)
}
lns[string(key)] = struct{}{}
}

View File

@@ -6,6 +6,9 @@ type avgAggrValue struct {
}
func (av *avgAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
av.count++
}

View File

@@ -4,7 +4,10 @@ type countSamplesAggrValue struct {
count uint64
}
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, _ *pushSample, _ string, _ int64) {
func (av *countSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
}

View File

@@ -9,7 +9,10 @@ type countSeriesAggrValue struct {
samples map[uint64]struct{}
}
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, _ *pushSample, key string, _ int64) {
func (av *countSeriesAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, _ int64) {
if sample.stateOnly {
return
}
// Count unique hashes over the keys instead of unique key values.
// This reduces memory usage at the cost of possible hash collisions for distinct key values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))

View File

@@ -45,6 +45,7 @@ type dedupAggrShardNopad struct {
type dedupAggrSample struct {
value float64
timestamp int64
stateOnly bool
}
func newDedupAggr() *dedupAggr {
@@ -189,6 +190,7 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
s.stateOnly = sample.stateOnly
key := bytesutil.InternString(sample.key)
state.m[key] = s
@@ -197,28 +199,33 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
var newWins bool
s.timestamp, s.value, newWins = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
if newWins {
s.stateOnly = sample.stateOnly
}
}
state.samplesBuf = samplesBuf
}
// deduplicateSamples returns deduplicated timestamp and value results.
// deduplicateSamples returns deduplicated timestamp and value results,
// along with a boolean indicating whether the new sample won.
// See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#deduplication
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64, bool) {
if newT > oldT {
return newT, newV
return newT, newV, true
}
// if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
// always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
if newT == oldT {
if decimal.IsStaleNaN(oldV) {
return newT, newV
return newT, newV, true
}
if newV > oldV {
return newT, newV
return newT, newV, true
}
}
return oldT, oldV
return oldT, oldV, false
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
@@ -250,6 +257,7 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
key: key,
value: s.value,
timestamp: s.timestamp,
stateOnly: s.stateOnly,
})
// Limit the number of samples per each flush in order to limit memory usage.

View File

@@ -24,8 +24,8 @@ func TestDedupAggrSerial(t *testing.T) {
}
da.pushSamples(samples, 0, false)
if n := da.sizeBytes(); n > 5_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 5_000_000 bytes", n)
if n := da.sizeBytes(); n > 6_000_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 6_000_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
@@ -81,7 +81,7 @@ func TestDedupAggrConcurrent(_ *testing.T) {
func TestDeduplicateSamples(t *testing.T) {
f := func(oldT, newT int64, oldV, newV float64, expectedT int64, expectedV float64) {
t.Helper()
dedupT, dedupV := deduplicateSamples(oldT, newT, oldV, newV)
dedupT, dedupV, _ := deduplicateSamples(oldT, newT, oldV, newV)
if dedupT != expectedT || dedupV != expectedV {
t.Fatalf("unexpected deduplicated result for oldT=%d, newT=%d, oldV=%f, newV=%f; got dedupT=%d, dedupV=%f; want dedupT=%d, dedupV=%f",
oldT, newT, oldV, newV, dedupT, dedupV, expectedT, expectedV)

View File

@@ -11,6 +11,9 @@ type histogramBucketAggrValue struct {
}
func (av *histogramBucketAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.h.Update(sample.value)
}

109
lib/streamaggr/increase.go Normal file
View File

@@ -0,0 +1,109 @@
package streamaggr
import (
"fmt"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
type increaseLastValue struct {
value float64
timestamp int64
deleteDeadline int64
}
type increaseAggrValueShared struct {
lastValues map[string]increaseLastValue
}
type increaseAggrValue struct {
total float64
shared *increaseAggrValueShared
}
func (av *increaseAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*increaseAggrConfig)
currentTime := fasttime.UnixTimestamp()
keepFirstSample := ac.keepFirstSample && currentTime >= ac.ignoreFirstSampleDeadline
lv, ok := av.shared.lastValues[key]
if ok || keepFirstSample {
if sample.timestamp < lv.timestamp {
// Skip out of order sample
return
}
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
lv.timestamp = sample.timestamp
lv.deleteDeadline = deleteDeadline
key = bytesutil.InternString(key)
av.shared.lastValues[key] = lv
}
func (av *increaseAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*increaseAggrConfig)
suffix := ac.getSuffix()
total := av.total
av.total = 0
lvs := av.shared.lastValues
for lk, lv := range lvs {
if ctx.flushTimestamp > lv.deleteDeadline || isLast {
delete(lvs, lk)
}
}
ctx.appendSeries(key, suffix, total)
}
func (av *increaseAggrValue) state() any {
return av.shared
}
func newIncreaseAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &increaseAggrConfig{
keepFirstSample: keepFirstSample,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type increaseAggrConfig struct {
keepFirstSample bool
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*increaseAggrConfig) getValue(s any) aggrValue {
var shared *increaseAggrValueShared
if s == nil {
shared = &increaseAggrValueShared{
lastValues: make(map[string]increaseLastValue),
}
} else {
shared = s.(*increaseAggrValueShared)
}
return &increaseAggrValue{
shared: shared,
}
}
func (ac *increaseAggrConfig) getSuffix() string {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}

View File

@@ -6,6 +6,9 @@ type lastAggrValue struct {
}
func (av *lastAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.timestamp >= av.timestamp {
av.last = sample.value
av.timestamp = sample.timestamp

View File

@@ -6,6 +6,9 @@ type maxAggrValue struct {
}
func (av *maxAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value > av.max || !av.defined {
av.max = sample.value
}

View File

@@ -6,6 +6,9 @@ type minAggrValue struct {
}
func (av *minAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if sample.value < av.min || !av.defined {
av.min = sample.value
}

View File

@@ -13,6 +13,9 @@ type quantilesAggrValue struct {
}
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if av.h == nil {
av.h = histogram.GetFast()
}

View File

@@ -34,6 +34,7 @@ var rateAggrStateValuePool sync.Pool
func putRateAggrStateValue(v *rateAggrStateValue) {
v.timestamp = 0
v.lastTimestamp = 0
v.increase = 0
rateAggrStateValuePool.Put(v)
}
@@ -88,6 +89,10 @@ type rateAggrStateValue struct {
// increase stores cumulative increase for the current time series on the current aggregation interval
increase float64
timestamp int64
// lastTimestamp is the latest timestamp seen for this series including state-only samples.
// It is used for out-of-order detection, while timestamp (above) is only updated by
// non-state-only samples and is used for rate calculation.
lastTimestamp int64
}
type rateAggrValue struct {
@@ -101,16 +106,20 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
sv, ok := av.shared[key]
if ok {
state = sv.getState(av.isGreen)
if sample.timestamp < state.timestamp {
if sample.timestamp < state.lastTimestamp {
// Skip out of order sample
return
}
if sample.value >= sv.value {
state.increase += sample.value - sv.value
if !sample.stateOnly {
if sample.value >= sv.value {
state.increase += sample.value - sv.value
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
sv.prevTimestamp = sample.timestamp
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -121,7 +130,10 @@ func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string
}
sv.value = sample.value
sv.deleteDeadline = deleteDeadline
state.timestamp = sample.timestamp
state.lastTimestamp = sample.timestamp
if !sample.stateOnly {
state.timestamp = sample.timestamp
}
}
func (av *rateAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {

View File

@@ -12,6 +12,9 @@ type stdAggrValue struct {
}
func (av *stdAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.count++
avg := av.avg + (sample.value-av.avg)/av.count
av.q += (sample.value - av.avg) * (sample.value - avg)

View File

@@ -762,9 +762,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
return newIncreaseAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -782,9 +782,9 @@ func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen m
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true), nil
case "total_prometheus":
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:
@@ -1006,26 +1006,28 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
a.ignoredNaNSamples.Inc()
continue
}
if (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline {
// Skip old samples outside the current aggregation interval
stateOnly := (ignoreOldSamples || enableWindows) && s.Timestamp < minDeadline
if stateOnly {
a.ignoredOldSamples.Inc()
continue
}
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
} else {
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
}
if enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
} else {
ctx.blue = append(ctx.blue, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
stateOnly: stateOnly,
})
}
}
@@ -1099,6 +1101,10 @@ type pushSample struct {
key string
value float64
timestamp int64
// stateOnly marks samples older than minDeadline: update tracking state in stateful outputs
// (total, rate, increase) but do not contribute to the aggregation output.
stateOnly bool
}
func getPushCtx() *pushCtx {

View File

@@ -688,7 +688,9 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125
outputs: [rate_sum, rate_avg]
`, "11111")
// test rate_sum and rate_avg, when two aggregation intervals are empty
// test rate_sum and rate_avg, when two aggregation intervals are empty.
// abc=777 arrives slightly before the start of each interval (-10ms) but still
// updates prevTimestamp, so it contributes to rate_sum alongside abc=123 and abc=456.
f([]string{`
foo{abc="123", cde="1"} 1
foo{abc="123", cde="1"} 2 1
@@ -807,4 +809,55 @@ foo:1m_sum_samples{baz="qwe"} 10
dedup_interval: 30s
outputs: [sum_samples]
`, "11111111")
// total with ignore_old_samples: an old sample (30s before the interval boundary) must
// update the state reference without contributing to the interval total, so the subsequent
// current-interval sample (250) computes increase 250-150=100 instead of 250-100=150.
// Cumulative total: 100 (interval1) + 100 (interval2) = 200.
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_total 100
foo:1m_total 200
`, `
- interval: 1m
outputs: [total]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// increase with ignore_old_samples: same correctness check for increase output.
// Per-interval: 100 (first sample from 0) and 100 (250-150=100 thanks to stateOnly update).
f([]string{`
foo 100
`, `
foo 150 -30
foo 250
`}, time.Minute, `foo:1m_increase 100
foo:1m_increase 100
`, `
- interval: 1m
outputs: [increase]
ignore_old_samples: true
ignore_first_sample_interval: 0s
`, "111")
// rate with ignore_old_samples: out-of-order stateOnly samples must not overwrite sv.value,
// and the winning stateOnly sample's timestamp is used as the denominator start.
// foo 120 -40 (ts=T0+20s) is rejected as OOO after foo 150 -30 (ts=T0+30s),
// so the baseline is 150 at T0+30s, giving rate=(200-150)/30 ≈ 1.667.
f([]string{`
foo 100
`, `
foo 150 -30
foo 120 -40
foo 200
`}, time.Minute, `foo:1m_rate_sum 1.6666666666666667
`, `
- interval: 1m
outputs: [rate_sum]
ignore_old_samples: true
`, "1111")
}

View File

@@ -74,7 +74,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}
@@ -133,7 +133,7 @@ func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregat
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %w", err))
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}

View File

@@ -5,6 +5,9 @@ type sumSamplesAggrValue struct {
}
func (av *sumSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
av.sum += sample.value
}

View File

@@ -36,12 +36,14 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
// Skip out of order sample
return
}
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
if !sample.stateOnly {
if sample.value >= lv.value {
av.total += sample.value - lv.value
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
}
lv.value = sample.value
@@ -54,7 +56,6 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast bool) {
ac := c.(*totalAggrConfig)
suffix := ac.getSuffix()
// check for stale entries
total := av.shared.total + av.total
av.total = 0
lvs := av.shared.lastValues
@@ -63,9 +64,7 @@ func (av *totalAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, isLast
delete(lvs, lk)
}
}
if ac.resetTotalOnFlush {
av.shared.total = 0
} else if math.Abs(total) >= (1 << 53) {
if math.Abs(total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
av.shared.total = 0
} else {
@@ -78,11 +77,10 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
@@ -90,8 +88,6 @@ func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleI
}
type totalAggrConfig struct {
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
@@ -117,12 +113,6 @@ func (*totalAggrConfig) getValue(s any) aggrValue {
}
func (ac *totalAggrConfig) getSuffix() string {
if ac.resetTotalOnFlush {
if ac.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if ac.keepFirstSample {
return "total"
}

View File

@@ -5,6 +5,9 @@ type uniqueSamplesAggrValue struct {
}
func (av *uniqueSamplesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
if sample.stateOnly {
return
}
if _, ok := av.samples[sample.value]; !ok {
av.samples[sample.value] = struct{}{}
}