mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Export (/api/v1/export/csv) now always writes a header row matching the requested format fields. Examples: ``` # format=__timestamp__:unix_ms,__value__,job,instance __timestamp__:unix_ms,__value__,job,instance 1704067200000,42.5,node,localhost:9090 ``` Import (/api/v1/import/csv) gains auto-detection logic: the first row is skipped if any timestamp column fails timestamp parsing or any metric value column fails float parsing. If the first row is not detected as headers, it is parsed as data. This makes the import backward compatible. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10666 PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10706 ### Checklist The following checks are **mandatory**: - [x] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist). - [x] My change adheres to [VictoriaMetrics development goals](https://docs.victoriametrics.com/victoriametrics/goals/). --------- Co-authored-by: Max Kotliar <mkotlyar@victoriametrics.com>
199 lines
4.5 KiB
Go
199 lines
4.5 KiB
Go
package csvimport
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/fastjson/fastfloat"
|
|
)
|
|
|
|
// Rows represents csv rows.
|
|
type Rows struct {
|
|
// Rows contains parsed csv rows after the call to Unmarshal.
|
|
Rows []Row
|
|
|
|
sc scanner
|
|
tagsPool []Tag
|
|
metricsPool []metric
|
|
}
|
|
|
|
// Reset resets rs.
|
|
func (rs *Rows) Reset() {
|
|
rows := rs.Rows
|
|
for i := range rows {
|
|
r := &rows[i]
|
|
r.Metric = ""
|
|
r.Tags = nil
|
|
r.Value = 0
|
|
r.Timestamp = 0
|
|
}
|
|
rs.Rows = rs.Rows[:0]
|
|
|
|
rs.sc.Init("")
|
|
|
|
tags := rs.tagsPool
|
|
for i := range tags {
|
|
t := &tags[i]
|
|
t.Key = ""
|
|
t.Value = ""
|
|
}
|
|
rs.tagsPool = rs.tagsPool[:0]
|
|
|
|
metrics := rs.metricsPool
|
|
for i := range metrics {
|
|
m := &metrics[i]
|
|
m.Name = ""
|
|
m.Value = 0
|
|
}
|
|
rs.metricsPool = rs.metricsPool[:0]
|
|
}
|
|
|
|
// Row represents a single metric row
|
|
type Row struct {
|
|
Metric string
|
|
Tags []Tag
|
|
Value float64
|
|
Timestamp int64
|
|
}
|
|
|
|
// Tag represents metric tag
|
|
type Tag struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
type metric struct {
|
|
Name string
|
|
Value float64
|
|
}
|
|
|
|
// Unmarshal unmarshals csv lines from s according to the given cds.
|
|
func (rs *Rows) Unmarshal(s string, cds []ColumnDescriptor) {
|
|
rs.sc.Init(s)
|
|
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds, false)
|
|
}
|
|
|
|
// UnmarshalDetectHeader is similar to Unmarshal, but skips the first row if it looks like CSV header
|
|
// Must only be called for the first data block in a stream.
|
|
func (rs *Rows) UnmarshalDetectHeader(s string, cds []ColumnDescriptor) {
|
|
rs.sc.Init(s)
|
|
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds, true)
|
|
}
|
|
|
|
// isHeaderRow returns true if the current scanner line looks like a CSV header.
|
|
func isHeaderRow(sc *scanner, cds []ColumnDescriptor) bool {
|
|
isHeader := false
|
|
col := uint(0)
|
|
for sc.NextColumn() {
|
|
if col >= uint(len(cds)) {
|
|
continue
|
|
}
|
|
cd := &cds[col]
|
|
col++
|
|
if cd.isEmpty() || sc.Column == "" {
|
|
continue
|
|
}
|
|
if cd.ParseTimestamp != nil {
|
|
if _, err := cd.ParseTimestamp(sc.Column); err != nil {
|
|
isHeader = true
|
|
}
|
|
}
|
|
if cd.MetricName != "" {
|
|
if _, err := fastfloat.Parse(sc.Column); err != nil {
|
|
isHeader = true
|
|
}
|
|
}
|
|
}
|
|
return isHeader
|
|
}
|
|
|
|
func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []ColumnDescriptor, skipHeader bool) ([]Row, []Tag, []metric) {
|
|
for sc.NextLine() {
|
|
if skipHeader {
|
|
skipHeader = false
|
|
savedLine := sc.Line
|
|
if isHeaderRow(sc, cds) {
|
|
sc.Line = savedLine
|
|
sc.isLastColumn = false
|
|
sc.Error = nil
|
|
continue
|
|
}
|
|
sc.Line = savedLine
|
|
sc.isLastColumn = false
|
|
sc.Error = nil
|
|
}
|
|
line := sc.Line
|
|
var r Row
|
|
col := uint(0)
|
|
metrics = metrics[:0]
|
|
tagsLen := len(tags)
|
|
for sc.NextColumn() {
|
|
if col >= uint(len(cds)) {
|
|
// Skip superfluous column.
|
|
continue
|
|
}
|
|
cd := &cds[col]
|
|
col++
|
|
if cd.isEmpty() || sc.Column == "" {
|
|
// Ignore empty column.
|
|
continue
|
|
}
|
|
if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil {
|
|
timestamp, err := parseTimestamp(sc.Column)
|
|
if err != nil {
|
|
sc.Error = fmt.Errorf("cannot parse timestamp from %q: %w", sc.Column, err)
|
|
break
|
|
}
|
|
r.Timestamp = timestamp
|
|
continue
|
|
}
|
|
if tagName := cd.TagName; tagName != "" {
|
|
tags = append(tags, Tag{
|
|
Key: tagName,
|
|
Value: sc.Column,
|
|
})
|
|
continue
|
|
}
|
|
metricName := cd.MetricName
|
|
if metricName == "" {
|
|
logger.Panicf("BUG: unexpected empty MetricName")
|
|
}
|
|
value, err := fastfloat.Parse(sc.Column)
|
|
if err != nil {
|
|
sc.Error = fmt.Errorf("cannot parse metric value for %q from %q: %w", metricName, sc.Column, err)
|
|
}
|
|
metrics = append(metrics, metric{
|
|
Name: metricName,
|
|
Value: value,
|
|
})
|
|
}
|
|
if col < uint(len(cds)) && sc.Error == nil {
|
|
sc.Error = fmt.Errorf("missing columns in the csv line %q; got %d columns; want at least %d columns", line, col, len(cds))
|
|
}
|
|
if sc.Error != nil {
|
|
logger.Errorf("error when parsing csv line %q: %s; skipping this line", line, sc.Error)
|
|
invalidLines.Inc()
|
|
continue
|
|
}
|
|
if len(metrics) == 0 {
|
|
continue
|
|
}
|
|
r.Metric = metrics[0].Name
|
|
r.Tags = tags[tagsLen:]
|
|
r.Value = metrics[0].Value
|
|
dst = append(dst, r)
|
|
for _, m := range metrics[1:] {
|
|
dst = append(dst, Row{
|
|
Metric: m.Name,
|
|
Tags: r.Tags,
|
|
Value: m.Value,
|
|
Timestamp: r.Timestamp,
|
|
})
|
|
}
|
|
}
|
|
return dst, tags, metrics
|
|
}
|
|
|
|
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="csvimport"}`)
|