Files
andriibeee e1a9901654 vmselect: add CSV header support for export/import (#10706)
Export (/api/v1/export/csv) now always writes a header row matching the requested format fields. Examples:

```
  # format=__timestamp__:unix_ms,__value__,job,instance
  __timestamp__:unix_ms,__value__,job,instance
  1704067200000,42.5,node,localhost:9090
```

Import (/api/v1/import/csv) gains auto-detection logic: the first row is skipped if any timestamp column fails timestamp parsing or any metric value column fails float parsing. If the first row is not detected as headers, it is parsed as data. This makes the import backward compatible. 

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10666
PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10706

### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

---------

Co-authored-by: Max Kotliar <mkotlyar@victoriametrics.com>
2026-04-09 14:00:39 +03:00

199 lines
4.5 KiB
Go

package csvimport
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
// Rows represents csv rows.
type Rows struct {
// Rows contains parsed csv rows after the call to Unmarshal.
Rows []Row
sc scanner
tagsPool []Tag
metricsPool []metric
}
// Reset resets rs.
func (rs *Rows) Reset() {
rows := rs.Rows
for i := range rows {
r := &rows[i]
r.Metric = ""
r.Tags = nil
r.Value = 0
r.Timestamp = 0
}
rs.Rows = rs.Rows[:0]
rs.sc.Init("")
tags := rs.tagsPool
for i := range tags {
t := &tags[i]
t.Key = ""
t.Value = ""
}
rs.tagsPool = rs.tagsPool[:0]
metrics := rs.metricsPool
for i := range metrics {
m := &metrics[i]
m.Name = ""
m.Value = 0
}
rs.metricsPool = rs.metricsPool[:0]
}
// Row represents a single metric row
type Row struct {
Metric string
Tags []Tag
Value float64
Timestamp int64
}
// Tag represents metric tag
type Tag struct {
Key string
Value string
}
type metric struct {
Name string
Value float64
}
// Unmarshal unmarshals csv lines from s according to the given cds.
func (rs *Rows) Unmarshal(s string, cds []ColumnDescriptor) {
rs.sc.Init(s)
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds, false)
}
// UnmarshalDetectHeader is similar to Unmarshal, but skips the first row if it looks like CSV header
// Must only be called for the first data block in a stream.
func (rs *Rows) UnmarshalDetectHeader(s string, cds []ColumnDescriptor) {
rs.sc.Init(s)
rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds, true)
}
// isHeaderRow returns true if the current scanner line looks like a CSV header.
func isHeaderRow(sc *scanner, cds []ColumnDescriptor) bool {
isHeader := false
col := uint(0)
for sc.NextColumn() {
if col >= uint(len(cds)) {
continue
}
cd := &cds[col]
col++
if cd.isEmpty() || sc.Column == "" {
continue
}
if cd.ParseTimestamp != nil {
if _, err := cd.ParseTimestamp(sc.Column); err != nil {
isHeader = true
}
}
if cd.MetricName != "" {
if _, err := fastfloat.Parse(sc.Column); err != nil {
isHeader = true
}
}
}
return isHeader
}
func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []ColumnDescriptor, skipHeader bool) ([]Row, []Tag, []metric) {
for sc.NextLine() {
if skipHeader {
skipHeader = false
savedLine := sc.Line
if isHeaderRow(sc, cds) {
sc.Line = savedLine
sc.isLastColumn = false
sc.Error = nil
continue
}
sc.Line = savedLine
sc.isLastColumn = false
sc.Error = nil
}
line := sc.Line
var r Row
col := uint(0)
metrics = metrics[:0]
tagsLen := len(tags)
for sc.NextColumn() {
if col >= uint(len(cds)) {
// Skip superfluous column.
continue
}
cd := &cds[col]
col++
if cd.isEmpty() || sc.Column == "" {
// Ignore empty column.
continue
}
if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil {
timestamp, err := parseTimestamp(sc.Column)
if err != nil {
sc.Error = fmt.Errorf("cannot parse timestamp from %q: %w", sc.Column, err)
break
}
r.Timestamp = timestamp
continue
}
if tagName := cd.TagName; tagName != "" {
tags = append(tags, Tag{
Key: tagName,
Value: sc.Column,
})
continue
}
metricName := cd.MetricName
if metricName == "" {
logger.Panicf("BUG: unexpected empty MetricName")
}
value, err := fastfloat.Parse(sc.Column)
if err != nil {
sc.Error = fmt.Errorf("cannot parse metric value for %q from %q: %w", metricName, sc.Column, err)
}
metrics = append(metrics, metric{
Name: metricName,
Value: value,
})
}
if col < uint(len(cds)) && sc.Error == nil {
sc.Error = fmt.Errorf("missing columns in the csv line %q; got %d columns; want at least %d columns", line, col, len(cds))
}
if sc.Error != nil {
logger.Errorf("error when parsing csv line %q: %s; skipping this line", line, sc.Error)
invalidLines.Inc()
continue
}
if len(metrics) == 0 {
continue
}
r.Metric = metrics[0].Name
r.Tags = tags[tagsLen:]
r.Value = metrics[0].Value
dst = append(dst, r)
for _, m := range metrics[1:] {
dst = append(dst, Row{
Metric: m.Name,
Tags: r.Tags,
Value: m.Value,
Timestamp: r.Timestamp,
})
}
}
return dst, tags, metrics
}
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="csvimport"}`)