mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
27 Commits
debug-grou
...
issue-7717
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35d388dff9 | ||
|
|
1038153097 | ||
|
|
18c32d7385 | ||
|
|
754a9bd563 | ||
|
|
94830ea064 | ||
|
|
e84d88d390 | ||
|
|
51cdf23e1d | ||
|
|
55486b66ba | ||
|
|
095910342f | ||
|
|
16a4269de1 | ||
|
|
0cd2f800ba | ||
|
|
655406584a | ||
|
|
7d3175b438 | ||
|
|
e67e01bec1 | ||
|
|
2e0082f5c4 | ||
|
|
a7a06b3be6 | ||
|
|
31c92c4009 | ||
|
|
d332cac491 | ||
|
|
554c72aa60 | ||
|
|
0852b56fe2 | ||
|
|
b5c41a1f48 | ||
|
|
d1ffd83bf0 | ||
|
|
2c9c5d0366 | ||
|
|
8a8ff2fa14 | ||
|
|
ea569e7f51 | ||
|
|
06039e6f93 | ||
|
|
426cbff5f7 |
@@ -424,6 +424,84 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
mimirPath = "mimir-path"
|
||||
mimirTenantID = "mimir-tenant-id"
|
||||
mimirConcurrency = "mimir-concurrency"
|
||||
mimirFilterTimeStart = "mimir-filter-time-start"
|
||||
mimirFilterTimeEnd = "mimir-filter-time-end"
|
||||
mimirFilterLabel = "mimir-filter-label"
|
||||
mimirFilterLabelValue = "mimir-filter-label-value"
|
||||
|
||||
mimirCredsFilePath = "mimir-creds-file-path"
|
||||
mimirConfigFilePath = "mimir-config-file-path"
|
||||
mimirConfigProfile = "mimir-config-profile"
|
||||
mimirCustomS3Endpoint = "mimir-custom-s3-endpoint"
|
||||
mimirS3ForcePathStyle = "mimir-s3-force-path-style"
|
||||
mimirS3TLSInsecureSkipVerify = "mimir-s3-tls-insecure-skip-verify"
|
||||
)
|
||||
|
||||
var (
|
||||
mimirFlags = []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: mimirPath,
|
||||
Usage: "Path to Mimir storage bucket or local folder.",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirTenantID,
|
||||
Usage: "Tenant ID for Mimir storage",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: mimirConcurrency,
|
||||
Usage: "Number of concurrently running block readers",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirFilterTimeStart,
|
||||
Usage: "The time filter in RFC3339 format to select timeseries with timestamp equal or higher than provided value. E.g. '2020-01-01T20:07:00Z'",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirFilterTimeEnd,
|
||||
Usage: "The time filter in RFC3339 format to select timeseries with timestamp equal or lower than provided value. E.g. '2020-01-01T20:07:00Z'",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirFilterLabel,
|
||||
Usage: "Prometheus label name to filter timeseries by. E.g. '__name__' will filter timeseries by name.",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirFilterLabelValue,
|
||||
Usage: fmt.Sprintf("Prometheus regular expression to filter label from %q flag.", promFilterLabel),
|
||||
Value: ".*",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirCredsFilePath,
|
||||
Usage: "Path to file with GCS or S3 credentials. Credentials are loaded from default locations if not set. See https://cloud.google.com/iam/docs/creating-managing-service-account-keys and https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirConfigFilePath,
|
||||
Usage: "Path to file with S3 configs. Configs are loaded from default location if not set. See https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirConfigProfile,
|
||||
Usage: "Profile name for S3 configs. If no set, the value of the environment variable will be loaded (AWS_PROFILE or AWS_DEFAULT_PROFILE), or if both not set, DefaultSharedConfigProfile is used",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: mimirCustomS3Endpoint,
|
||||
Usage: "Custom S3 endpoint for use with S3-compatible storages (e.g. MinIO). S3 is used if not set",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: mimirS3ForcePathStyle,
|
||||
Usage: "Prefixing endpoint with bucket name when set false, true by default.",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: mimirS3TLSInsecureSkipVerify,
|
||||
Usage: "Whether to skip TLS verification when connecting to the S3 endpoint.",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
vmNativeFilterMatch = "vm-native-filter-match"
|
||||
vmNativeFilterTimeStart = "vm-native-filter-time-start"
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/mimir"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
@@ -271,7 +272,54 @@ func main() {
|
||||
cc: c.Int(promConcurrency),
|
||||
isVerbose: c.Bool(globalVerbose),
|
||||
}
|
||||
return pp.run()
|
||||
return pp.run(ctx)
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "mimir",
|
||||
Usage: "Migrate time series from Mimir object storage or local filesystem",
|
||||
Flags: mergeFlags(globalFlags, mimirFlags, vmFlags),
|
||||
Before: beforeFn,
|
||||
Action: func(c *cli.Context) error {
|
||||
fmt.Println("Mimir import mode")
|
||||
|
||||
vmCfg, err := initConfigVM(c)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to init VM configuration: %s", err)
|
||||
}
|
||||
|
||||
importer, err = vm.NewImporter(ctx, vmCfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create VM importer: %s", err)
|
||||
}
|
||||
|
||||
mCfg := mimir.Config{
|
||||
Filter: mimir.Filter{
|
||||
TimeMin: c.String(mimirFilterTimeStart),
|
||||
TimeMax: c.String(mimirFilterTimeEnd),
|
||||
Label: c.String(mimirFilterLabel),
|
||||
LabelValue: c.String(mimirFilterLabelValue),
|
||||
},
|
||||
Path: c.String(mimirPath),
|
||||
TenantID: c.String(mimirTenantID),
|
||||
CredsFilePath: c.String(mimirCredsFilePath),
|
||||
ConfigFilePath: c.String(mimirConfigFilePath),
|
||||
ConfigProfile: c.String(mimirConfigProfile),
|
||||
CustomS3Endpoint: c.String(mimirCustomS3Endpoint),
|
||||
S3ForcePathStyle: c.Bool(mimirS3ForcePathStyle),
|
||||
S3TLSInsecureSkipVerify: c.Bool(mimirS3TLSInsecureSkipVerify),
|
||||
}
|
||||
cl, err := mimir.NewClient(ctx, mCfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create mimir client: %s", err)
|
||||
}
|
||||
pp := prometheusProcessor{
|
||||
cl: cl,
|
||||
im: importer,
|
||||
cc: c.Int(mimirConcurrency),
|
||||
isVerbose: c.Bool(globalVerbose),
|
||||
}
|
||||
return pp.run(ctx)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
||||
184
app/vmctl/mimir/lazyreader.go
Normal file
184
app/vmctl/mimir/lazyreader.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package mimir
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||
)
|
||||
|
||||
var _ tsdb.BlockReader = (*LazyBlockReader)(nil)
|
||||
|
||||
// LazyBlockReader is stores block id and segment num information.
|
||||
// It is used to lazily fetch and parse block data.
|
||||
// It implements tsdb.BlockReader interface.
|
||||
type LazyBlockReader struct {
|
||||
// Block ID.
|
||||
ID ulid.ULID
|
||||
// SegmentsNum stores the number of chunks segments in the block.
|
||||
SegmentsNum int
|
||||
|
||||
mu sync.Mutex
|
||||
reader tsdb.BlockReader
|
||||
fs common.RemoteFS
|
||||
err error
|
||||
}
|
||||
|
||||
// NewLazyBlockReader returns a new LazyBlockReader for the given block.
|
||||
func NewLazyBlockReader(block *Block, fs common.RemoteFS) (*LazyBlockReader, error) {
|
||||
if block.SegmentsFormat != "1b6d" {
|
||||
return nil, fmt.Errorf("unsupported segments format: %s", block.SegmentsFormat)
|
||||
}
|
||||
|
||||
return &LazyBlockReader{
|
||||
ID: block.ID,
|
||||
SegmentsNum: block.SegmentsNum,
|
||||
fs: fs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (lbr *LazyBlockReader) initialize() error {
|
||||
lbr.mu.Lock()
|
||||
defer lbr.mu.Unlock()
|
||||
if lbr.reader != nil {
|
||||
return nil
|
||||
}
|
||||
// fetching block and parse it and store it in lbr.reader
|
||||
temp, err := lbr.mkTempDir()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp dir: %s", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := os.RemoveAll(temp); err != nil {
|
||||
log.Printf("failed to remove temp dir: %s", err)
|
||||
}
|
||||
log.Printf("removed temp dir: %s", temp)
|
||||
}()
|
||||
|
||||
meta, err := lbr.fetchFile(metaFilename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := lbr.writeFile(temp, metaFilename, meta); err != nil {
|
||||
log.Printf("failed to write meta file: %s", err)
|
||||
return err
|
||||
}
|
||||
idx, err := lbr.fetchFile(indexFilename)
|
||||
if err != nil {
|
||||
log.Printf("failed to fetch index file %q: %s", indexFilename, err)
|
||||
return err
|
||||
}
|
||||
if err := lbr.writeFile(temp, indexFilename, idx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 1; i <= lbr.SegmentsNum; i++ {
|
||||
// segments formats has format 1b06d
|
||||
// https://github.com/grafana/mimir/blob/main/pkg/storage/tsdb/bucketindex/index.go#L32
|
||||
chunkName := fmt.Sprintf("%06d", i)
|
||||
blockChunkPath := filepath.Join("chunks", chunkName)
|
||||
chunk, err := lbr.fetchFile(blockChunkPath)
|
||||
if err != nil {
|
||||
log.Printf("failed to fetch chunk file: %q: %s", chunkName, err)
|
||||
return err
|
||||
}
|
||||
if err := lbr.writeFile(temp, blockChunkPath, chunk); err != nil {
|
||||
log.Printf("failed to write chunk file: %q: %s", chunkName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Set postingDecoder to nil because
|
||||
// If it is nil then a default decoder is used, compatible with Prometheus v2.
|
||||
pb, err := tsdb.OpenBlock(nil, temp, nil, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open block %q: %s", lbr.ID, err)
|
||||
}
|
||||
lbr.reader = pb
|
||||
return nil
|
||||
}
|
||||
|
||||
// Index returns an IndexReader over the block's data.
|
||||
func (lbr *LazyBlockReader) Index() (tsdb.IndexReader, error) {
|
||||
if err := lbr.initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lbr.reader.Index()
|
||||
}
|
||||
|
||||
// Chunks returns a ChunkReader over the block's data.
|
||||
func (lbr *LazyBlockReader) Chunks() (tsdb.ChunkReader, error) {
|
||||
if err := lbr.initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lbr.reader.Chunks()
|
||||
}
|
||||
|
||||
// Tombstones returns a tombstones.Reader over the block's deleted data.
|
||||
func (lbr *LazyBlockReader) Tombstones() (tombstones.Reader, error) {
|
||||
if err := lbr.initialize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lbr.reader.Tombstones()
|
||||
}
|
||||
|
||||
// Meta provides meta information about the block reader.
|
||||
func (lbr *LazyBlockReader) Meta() tsdb.BlockMeta {
|
||||
if err := lbr.initialize(); err != nil {
|
||||
lbr.err = fmt.Errorf("error get Block Meta: %s; return empty block", err)
|
||||
return tsdb.BlockMeta{}
|
||||
}
|
||||
return lbr.reader.Meta()
|
||||
}
|
||||
|
||||
// Size returns the number of bytes that the block takes up on disk.
|
||||
func (lbr *LazyBlockReader) Size() int64 {
|
||||
if err := lbr.initialize(); err != nil {
|
||||
lbr.err = fmt.Errorf("error get Size of the block: %s, return zero size", err)
|
||||
return 0
|
||||
}
|
||||
return lbr.reader.Size()
|
||||
}
|
||||
|
||||
// Err returns the last error that occurred on the block reader.
|
||||
func (lbr *LazyBlockReader) Err() error {
|
||||
return lbr.err
|
||||
}
|
||||
|
||||
func (lbr *LazyBlockReader) mkTempDir() (string, error) {
|
||||
temp, err := os.MkdirTemp("", lbr.ID.String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp dir: %s", err)
|
||||
}
|
||||
err = os.Mkdir(filepath.Join(temp, "chunks"), 0755)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp dir: %s", err)
|
||||
}
|
||||
return temp, nil
|
||||
}
|
||||
|
||||
func (lbr *LazyBlockReader) fetchFile(filePath string) ([]byte, error) {
|
||||
blockID := lbr.ID.String()
|
||||
blockPath := filepath.Join(blockID, filePath)
|
||||
has, err := lbr.fs.HasFile(blockPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !has {
|
||||
return nil, fmt.Errorf("block meta %s not found", blockID)
|
||||
}
|
||||
return lbr.fs.ReadFile(blockPath)
|
||||
}
|
||||
|
||||
func (lbr *LazyBlockReader) writeFile(folder string, filename string, file []byte) error {
|
||||
fileName := filepath.Join(folder, filename)
|
||||
return os.WriteFile(fileName, file, 0644)
|
||||
}
|
||||
241
app/vmctl/mimir/mimir.go
Normal file
241
app/vmctl/mimir/mimir.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package mimir
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
|
||||
utils "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vmctlutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||
)
|
||||
|
||||
const (
|
||||
bucketIndex = "bucket-index.json"
|
||||
bucketIndexCompressedFilename = bucketIndex + ".gz"
|
||||
metaFilename = "meta.json"
|
||||
indexFilename = "index"
|
||||
)
|
||||
|
||||
// BlockDeletionMark holds the information about a block's deletion mark in the index.
|
||||
// This type was copied from the mimir repository https://github.com/grafana/mimir/blob/main/pkg/storage/tsdb/bucketindex/index.go#L234.
|
||||
type BlockDeletionMark struct {
|
||||
// Block ID.
|
||||
ID ulid.ULID `json:"block_id"`
|
||||
|
||||
// DeletionTime is a unix timestamp (seconds precision) of when the block was marked to be deleted.
|
||||
DeletionTime int64 `json:"deletion_time"`
|
||||
}
|
||||
|
||||
// Block holds the information about a block in the index.
|
||||
// This is a partial implementation of the https://github.com/grafana/mimir/blob/main/pkg/storage/tsdb/bucketindex/index.go#L73
|
||||
type Block struct {
|
||||
// Block ID.
|
||||
ID ulid.ULID `json:"block_id"`
|
||||
|
||||
// MinTime and MaxTime specify the time range all samples in the block are in (millis precision).
|
||||
MinTime int64 `json:"min_time"`
|
||||
MaxTime int64 `json:"max_time"`
|
||||
|
||||
// SegmentsFormat and SegmentsNum stores the format and number of chunks segments
|
||||
// in the block.
|
||||
SegmentsFormat string `json:"segments_format,omitempty"`
|
||||
SegmentsNum int `json:"segments_num,omitempty"`
|
||||
}
|
||||
|
||||
// Index contains all known blocks and markers of a tenant.
|
||||
// This is a partial implementation pof the https://github.com/grafana/mimir/blob/main/pkg/storage/tsdb/bucketindex/index.go#L36
|
||||
type Index struct {
|
||||
// Version of the index format.
|
||||
Version int `json:"version"`
|
||||
|
||||
// List of complete blocks (partial blocks are excluded from the index).
|
||||
Blocks []*Block `json:"blocks"`
|
||||
}
|
||||
|
||||
// Config contains a list of params needed
|
||||
// for reading Prometheus snapshots
|
||||
type Config struct {
|
||||
// Path to remote storage bucket
|
||||
Path string
|
||||
// TenantID is the tenant id for the storage
|
||||
TenantID string
|
||||
|
||||
Filter Filter
|
||||
|
||||
CredsFilePath string
|
||||
ConfigFilePath string
|
||||
ConfigProfile string
|
||||
CustomS3Endpoint string
|
||||
S3ForcePathStyle bool
|
||||
S3TLSInsecureSkipVerify bool
|
||||
}
|
||||
|
||||
// Filter contains configuration for filtering
|
||||
// the timeseries
|
||||
type Filter struct {
|
||||
TimeMin string
|
||||
TimeMax string
|
||||
Label string
|
||||
LabelValue string
|
||||
}
|
||||
|
||||
// Client is a wrapper over Prometheus tsdb.DBReader
|
||||
type Client struct {
|
||||
common.RemoteFS
|
||||
filter filter
|
||||
}
|
||||
|
||||
type filter struct {
|
||||
min, max int64
|
||||
label string
|
||||
labelValue string
|
||||
}
|
||||
|
||||
func (f filter) inRange(minTime, maxTime int64) bool {
|
||||
fmin, fmax := f.min, f.max
|
||||
if minTime == 0 {
|
||||
fmin = minTime
|
||||
}
|
||||
if fmax == 0 {
|
||||
fmax = maxTime
|
||||
}
|
||||
return minTime <= fmax && fmin <= maxTime
|
||||
}
|
||||
|
||||
// NewClient creates and validates new Client
|
||||
// with given Config
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Path == "" {
|
||||
return nil, fmt.Errorf("path cannot be empty")
|
||||
}
|
||||
|
||||
if cfg.TenantID != "" {
|
||||
cfg.Path = fmt.Sprintf("%s/%s", cfg.Path, cfg.TenantID)
|
||||
}
|
||||
|
||||
var c Client
|
||||
rfs, err := NewRemoteFS(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse `-src`=%q: %w", cfg.Path, err)
|
||||
}
|
||||
|
||||
c.RemoteFS = rfs
|
||||
timeMin, err := utils.ParseTime(cfg.Filter.TimeMin)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse min time in filter: %s", err)
|
||||
}
|
||||
timeMax, err := utils.ParseTime(cfg.Filter.TimeMax)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse max time in filter: %s", err)
|
||||
}
|
||||
c.filter = filter{
|
||||
min: timeMin.UnixMilli(),
|
||||
max: timeMax.UnixMilli(),
|
||||
label: cfg.Filter.Label,
|
||||
labelValue: cfg.Filter.LabelValue,
|
||||
}
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
// Explore a fetches bucket-index.json file from a remote storage or local filesystem
|
||||
// and filter blocks via the defined time range, but does not take into account label filters.
|
||||
func (c *Client) Explore() ([]tsdb.BlockReader, error) {
|
||||
|
||||
s := &utils.Stats{
|
||||
Filtered: c.filter.min != 0 || c.filter.max != 0 || c.filter.label != "",
|
||||
}
|
||||
|
||||
log.Printf("Fetching blocks from remote storage")
|
||||
|
||||
indexFile, err := c.fetchIndexFile()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch index file: %s", err)
|
||||
}
|
||||
|
||||
var blocksToImport []tsdb.BlockReader
|
||||
for _, block := range indexFile.Blocks {
|
||||
if !c.filter.inRange(block.MinTime, block.MaxTime) {
|
||||
// Skipping block outside of time range
|
||||
continue
|
||||
}
|
||||
|
||||
if block.ID.String() == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
lazyBlockReader, err := NewLazyBlockReader(block, c.RemoteFS)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create lazy block reader: %s", err)
|
||||
}
|
||||
blocksToImport = append(blocksToImport, lazyBlockReader)
|
||||
}
|
||||
|
||||
s.Blocks = len(blocksToImport)
|
||||
return blocksToImport, nil
|
||||
}
|
||||
|
||||
// Read reads the given BlockReader according to configured
|
||||
// time and label filters.
|
||||
func (c *Client) Read(ctx context.Context, block tsdb.BlockReader) (storage.SeriesSet, error) {
|
||||
meta := block.Meta()
|
||||
if b, ok := block.(*LazyBlockReader); ok && b.Err() != nil {
|
||||
return nil, fmt.Errorf("failed to read block: %s", b.Err())
|
||||
}
|
||||
|
||||
if meta.ULID.String() == "" {
|
||||
log.Printf("got block without the id. it is empty")
|
||||
return nil, fmt.Errorf("block without id")
|
||||
}
|
||||
|
||||
minTime, maxTime := meta.MinTime, meta.MaxTime
|
||||
if c.filter.min != 0 {
|
||||
minTime = c.filter.min
|
||||
}
|
||||
if c.filter.max != 0 {
|
||||
maxTime = c.filter.max
|
||||
}
|
||||
q, err := tsdb.NewBlockQuerier(block, minTime, maxTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, c.filter.label, c.filter.labelValue))
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
func (c *Client) fetchIndexFile() (*Index, error) {
|
||||
has, err := c.HasFile(bucketIndexCompressedFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !has {
|
||||
return nil, fmt.Errorf("bucket-index.json.gz not found")
|
||||
}
|
||||
|
||||
file, err := c.ReadFile(bucketIndexCompressedFilename)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read bucket index: %s", err)
|
||||
}
|
||||
|
||||
r := bytes.NewReader(file)
|
||||
// Read all the content.
|
||||
gzipReader, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create gzip reader: %s", err)
|
||||
}
|
||||
|
||||
var indexFile Index
|
||||
err = json.NewDecoder(gzipReader).Decode(&indexFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode bucket index: %s", err)
|
||||
}
|
||||
|
||||
return &indexFile, nil
|
||||
}
|
||||
91
app/vmctl/mimir/remotefs.go
Normal file
91
app/vmctl/mimir/remotefs.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package mimir
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/azremote"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsremote"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/gcsremote"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/s3remote"
|
||||
)
|
||||
|
||||
// NewRemoteFS returns new remote fs from the given Config.
|
||||
func NewRemoteFS(ctx context.Context, cfg Config) (common.RemoteFS, error) {
|
||||
if len(cfg.Path) == 0 {
|
||||
return nil, fmt.Errorf("path cannot be empty")
|
||||
}
|
||||
n := strings.Index(cfg.Path, "://")
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing scheme in path %q. Supported schemes: `gs://`, `s3://`, `azblob://`, `fs://`", cfg.Path)
|
||||
}
|
||||
scheme := cfg.Path[:n]
|
||||
dir := cfg.Path[n+len("://"):]
|
||||
switch scheme {
|
||||
case "fs":
|
||||
if !filepath.IsAbs(dir) {
|
||||
return nil, fmt.Errorf("dir must be absolute; got %q", dir)
|
||||
}
|
||||
fsr := &fsremote.FS{
|
||||
Dir: filepath.Clean(dir),
|
||||
}
|
||||
return fsr, nil
|
||||
case "gcs", "gs":
|
||||
n := strings.Index(dir, "/")
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing directory on the gcs bucket %q", dir)
|
||||
}
|
||||
bucket := dir[:n]
|
||||
dir = dir[n:]
|
||||
fsr := &gcsremote.FS{
|
||||
CredsFilePath: cfg.CredsFilePath,
|
||||
Bucket: bucket,
|
||||
Dir: dir,
|
||||
}
|
||||
if err := fsr.Init(ctx); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize connection to gcs: %w", err)
|
||||
}
|
||||
return fsr, nil
|
||||
case "azblob":
|
||||
n := strings.Index(dir, "/")
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing directory on the AZBlob container %q", dir)
|
||||
}
|
||||
bucket := dir[:n]
|
||||
dir = dir[n:]
|
||||
fsr := &azremote.FS{
|
||||
Container: bucket,
|
||||
Dir: dir,
|
||||
}
|
||||
if err := fsr.Init(ctx); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize connection to AZBlob: %w", err)
|
||||
}
|
||||
return fsr, nil
|
||||
case "s3":
|
||||
n := strings.Index(dir, "/")
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing directory on the s3 bucket %q", dir)
|
||||
}
|
||||
bucket := dir[:n]
|
||||
dir = dir[n:]
|
||||
fsr := &s3remote.FS{
|
||||
CredsFilePath: cfg.CredsFilePath,
|
||||
ConfigFilePath: cfg.ConfigFilePath,
|
||||
CustomEndpoint: cfg.CustomS3Endpoint,
|
||||
TLSInsecureSkipVerify: cfg.S3TLSInsecureSkipVerify,
|
||||
S3ForcePathStyle: cfg.S3ForcePathStyle,
|
||||
ProfileName: cfg.ConfigProfile,
|
||||
Bucket: bucket,
|
||||
Dir: dir,
|
||||
}
|
||||
if err := fsr.Init(ctx); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize connection to s3: %w", err)
|
||||
}
|
||||
return fsr, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported scheme %q", scheme)
|
||||
}
|
||||
}
|
||||
@@ -1,22 +1,30 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
)
|
||||
|
||||
// Runner is an interface for fetching and reading
|
||||
// snapshot blocks
|
||||
type Runner interface {
|
||||
Explore() ([]tsdb.BlockReader, error)
|
||||
Read(context.Context, tsdb.BlockReader) (storage.SeriesSet, error)
|
||||
}
|
||||
|
||||
type prometheusProcessor struct {
|
||||
// prometheus client fetches and reads
|
||||
// Runner fetches and reads
|
||||
// snapshot blocks
|
||||
cl *prometheus.Client
|
||||
cl Runner
|
||||
// importer performs import requests
|
||||
// for timeseries data returned from
|
||||
// snapshot blocks
|
||||
@@ -30,7 +38,7 @@ type prometheusProcessor struct {
|
||||
isVerbose bool
|
||||
}
|
||||
|
||||
func (pp *prometheusProcessor) run() error {
|
||||
func (pp *prometheusProcessor) run(ctx context.Context) error {
|
||||
blocks, err := pp.cl.Explore()
|
||||
if err != nil {
|
||||
return fmt.Errorf("explore failed: %s", err)
|
||||
@@ -43,7 +51,7 @@ func (pp *prometheusProcessor) run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := pp.processBlocks(blocks); err != nil {
|
||||
if err := pp.processBlocks(ctx, blocks); err != nil {
|
||||
return fmt.Errorf("migration failed: %s", err)
|
||||
}
|
||||
|
||||
@@ -52,8 +60,8 @@ func (pp *prometheusProcessor) run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
|
||||
ss, err := pp.cl.Read(b)
|
||||
func (pp *prometheusProcessor) do(ctx context.Context, b tsdb.BlockReader) error {
|
||||
ss, err := pp.cl.Read(ctx, b)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read block: %s", err)
|
||||
}
|
||||
@@ -109,7 +117,7 @@ func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
|
||||
return ss.Err()
|
||||
}
|
||||
|
||||
func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
|
||||
func (pp *prometheusProcessor) processBlocks(ctx context.Context, blocks []tsdb.BlockReader) error {
|
||||
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
|
||||
if err := barpool.Start(); err != nil {
|
||||
return err
|
||||
@@ -126,7 +134,7 @@ func (pp *prometheusProcessor) processBlocks(blocks []tsdb.BlockReader) error {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for br := range blockReadersCh {
|
||||
if err := pp.do(br); err != nil {
|
||||
if err := pp.do(ctx, br); err != nil {
|
||||
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vmctlutil"
|
||||
)
|
||||
|
||||
// Config contains a list of params needed
|
||||
@@ -60,13 +62,13 @@ func NewClient(cfg Config) (*Client, error) {
|
||||
return nil, fmt.Errorf("failed to open snapshot %q: %s", cfg.Snapshot, err)
|
||||
}
|
||||
c := &Client{DBReadOnly: db}
|
||||
minTime, maxTime, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
|
||||
timeMin, timeMax, err := parseTime(cfg.Filter.TimeMin, cfg.Filter.TimeMax)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse time in filter: %s", err)
|
||||
}
|
||||
c.filter = filter{
|
||||
min: minTime,
|
||||
max: maxTime,
|
||||
min: timeMin,
|
||||
max: timeMax,
|
||||
label: cfg.Filter.Label,
|
||||
labelValue: cfg.Filter.LabelValue,
|
||||
}
|
||||
@@ -83,7 +85,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch blocks: %s", err)
|
||||
}
|
||||
s := &Stats{
|
||||
s := &vmctlutil.Stats{
|
||||
Filtered: c.filter.min != 0 || c.filter.max != 0 || c.filter.label != "",
|
||||
Blocks: len(blocks),
|
||||
}
|
||||
@@ -110,7 +112,7 @@ func (c *Client) Explore() ([]tsdb.BlockReader, error) {
|
||||
|
||||
// Read reads the given BlockReader according to configured
|
||||
// time and label filters.
|
||||
func (c *Client) Read(block tsdb.BlockReader) (storage.SeriesSet, error) {
|
||||
func (c *Client) Read(ctx context.Context, block tsdb.BlockReader) (storage.SeriesSet, error) {
|
||||
minTime, maxTime := block.Meta().MinTime, block.Meta().MaxTime
|
||||
if c.filter.min != 0 {
|
||||
minTime = c.filter.min
|
||||
@@ -122,7 +124,7 @@ func (c *Client) Read(block tsdb.BlockReader) (storage.SeriesSet, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, c.filter.label, c.filter.labelValue))
|
||||
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, c.filter.label, c.filter.labelValue))
|
||||
return ss, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -207,7 +207,6 @@ func (im *Importer) Input(ts *TimeSeries) error {
|
||||
// and waits until they are finished
|
||||
func (im *Importer) Close() {
|
||||
im.once.Do(func() {
|
||||
close(im.close)
|
||||
close(im.input)
|
||||
im.wg.Wait()
|
||||
close(im.errors)
|
||||
@@ -237,7 +236,17 @@ func (im *Importer) startWorker(ctx context.Context, bar barpool.Bar, batchSize,
|
||||
return
|
||||
case ts, ok := <-im.input:
|
||||
if !ok {
|
||||
continue
|
||||
// drain all batches before exit
|
||||
exitErr := &ImportError{
|
||||
Batch: batch,
|
||||
}
|
||||
retryableFunc := func() error { return im.Import(batch) }
|
||||
_, err := im.backoff.Retry(ctx, retryableFunc)
|
||||
if err != nil {
|
||||
exitErr.Err = err
|
||||
}
|
||||
im.errors <- exitErr
|
||||
return
|
||||
}
|
||||
// init waitForBatch when first
|
||||
// value was received
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package prometheus
|
||||
package vmctlutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/chunks/000001
vendored
Normal file
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/chunks/000001
vendored
Normal file
Binary file not shown.
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/index
vendored
Normal file
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/index
vendored
Normal file
Binary file not shown.
51
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/meta.json
vendored
Normal file
51
apptest/tests/testdata/mimir-tsdb/anonymous/01JFJBS3YP1SHZ3PJQ6HK76EC3/meta.json
vendored
Normal file
@@ -0,0 +1,51 @@
|
||||
{
|
||||
"ulid": "01JFJBS3YP1SHZ3PJQ6HK76EC3",
|
||||
"minTime": 1734709200000,
|
||||
"maxTime": 1734709320000,
|
||||
"stats": {
|
||||
"numSamples": 400,
|
||||
"numSeries": 100,
|
||||
"numChunks": 100
|
||||
},
|
||||
"compaction": {
|
||||
"level": 1,
|
||||
"sources": [
|
||||
"01JFJBS3YP1SHZ3PJQ6HK76EC3"
|
||||
],
|
||||
"parents": [
|
||||
{
|
||||
"ulid": "00000000000000000000000000",
|
||||
"minTime": 0,
|
||||
"maxTime": 0
|
||||
}
|
||||
],
|
||||
"hints": [
|
||||
"from-out-of-order"
|
||||
]
|
||||
},
|
||||
"version": 1,
|
||||
"out_of_order": false,
|
||||
"thanos": {
|
||||
"labels": {},
|
||||
"downsample": {
|
||||
"resolution": 0
|
||||
},
|
||||
"source": "receive",
|
||||
"segment_files": [
|
||||
"000001"
|
||||
],
|
||||
"files": [
|
||||
{
|
||||
"rel_path": "chunks/000001",
|
||||
"size_bytes": 4808
|
||||
},
|
||||
{
|
||||
"rel_path": "index",
|
||||
"size_bytes": 55021
|
||||
},
|
||||
{
|
||||
"rel_path": "meta.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/bucket-index.json.gz
vendored
Normal file
BIN
apptest/tests/testdata/mimir-tsdb/anonymous/bucket-index.json.gz
vendored
Normal file
Binary file not shown.
1
apptest/tests/testdata/mimir-tsdb/expected_response.json
vendored
Normal file
1
apptest/tests/testdata/mimir-tsdb/expected_response.json
vendored
Normal file
File diff suppressed because one or more lines are too long
139
apptest/tests/vmctl_mimir_migration_test.go
Normal file
139
apptest/tests/vmctl_mimir_migration_test.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
const (
|
||||
testMimirPath = "testdata/mimir-tsdb"
|
||||
expectedMimirResponseFile = "./testdata/mimir-tsdb/expected_response.json"
|
||||
)
|
||||
|
||||
func TestSingleVmctlMimirProtocol(t *testing.T) {
|
||||
fs.MustRemoveDir(t.Name())
|
||||
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
vmsingleDst := tc.MustStartDefaultVmsingle()
|
||||
vmAddr := fmt.Sprintf("http://%s/", vmsingleDst.HTTPAddr())
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("cannot get current working directory: %s", err)
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("fs://%s/%s", dir, testMimirPath)
|
||||
vmctlFlags := []string{
|
||||
`mimir`,
|
||||
`--mimir-tenant-id=anonymous`,
|
||||
`--mimir-filter-time-start=2024-12-01T00:00:00Z`,
|
||||
`--mimir-filter-time-end=2024-12-31T23:59:59Z`,
|
||||
`--mimir-custom-s3-endpoint=http://localhost:9000`,
|
||||
`--mimir-path=` + path,
|
||||
`--vm-addr=` + vmAddr,
|
||||
`--disable-progress-bar=true`,
|
||||
`--vm-concurrency=6`,
|
||||
`--mimir-concurrency=6`,
|
||||
}
|
||||
|
||||
testMimirProtocol(tc, vmsingleDst, vmctlFlags)
|
||||
}
|
||||
|
||||
func TestClusterVmctlMimirProtocol(t *testing.T) {
|
||||
fs.MustRemoveDir(t.Name())
|
||||
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
cluster := tc.MustStartDefaultCluster()
|
||||
vmAddr := fmt.Sprintf("http://%s/", cluster.Vminsert.HTTPAddr())
|
||||
dir, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("cannot get current working directory: %s", err)
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("fs://%s/%s", dir, testMimirPath)
|
||||
|
||||
vmctlFlags := []string{
|
||||
`mimir`,
|
||||
`--mimir-tenant-id=anonymous`,
|
||||
`--mimir-filter-time-start=2024-12-01T00:00:00Z`,
|
||||
`--mimir-filter-time-end=2024-12-31T23:59:59Z`,
|
||||
`--mimir-custom-s3-endpoint=http://localhost:9000`,
|
||||
`--mimir-path=` + path,
|
||||
`--vm-addr=` + vmAddr,
|
||||
`--disable-progress-bar=true`,
|
||||
`--vm-concurrency=6`,
|
||||
`--mimir-concurrency=6`,
|
||||
}
|
||||
|
||||
testMimirProtocol(tc, cluster, vmctlFlags)
|
||||
}
|
||||
|
||||
func testMimirProtocol(tc *apptest.TestCase, sut apptest.PrometheusWriteQuerier, vmctlFlags []string) {
|
||||
t := tc.T()
|
||||
t.Helper()
|
||||
|
||||
cmpOpt := cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType")
|
||||
|
||||
// test for empty data request
|
||||
got := sut.PrometheusAPIV1Query(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||
Step: "5m",
|
||||
Time: "2025-06-02T17:14:00Z",
|
||||
})
|
||||
|
||||
want := apptest.NewPrometheusAPIV1QueryResponse(t, `{"data":{"result":[]}}`)
|
||||
if diff := cmp.Diff(want, got, cmpOpt); diff != "" {
|
||||
t.Errorf("unexpected response (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
tc.MustStartVmctl("vmctl", vmctlFlags)
|
||||
|
||||
sut.ForceFlush(t)
|
||||
|
||||
// open the expected series response file
|
||||
file, err := os.Open(expectedMimirResponseFile)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open expected series response file: %s", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
bytes, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot read expected series response file: %s", err)
|
||||
}
|
||||
|
||||
var wantResponse apptest.PrometheusAPIV1QueryResponse
|
||||
if err := json.Unmarshal(bytes, &wantResponse); err != nil {
|
||||
t.Fatalf("cannot unmarshal expected series response file: %s", err)
|
||||
}
|
||||
wantResponse.Sort()
|
||||
|
||||
tc.Assert(&apptest.AssertOptions{
|
||||
// For cluster version, we need to wait longer for the metrics to be stored
|
||||
Retries: 300,
|
||||
Msg: `unexpected metrics stored on vmsingle via the prometheus protocol`,
|
||||
Got: func() any {
|
||||
expected := sut.PrometheusAPIV1Export(t, `{__name__=~".*"}`, apptest.QueryOpts{
|
||||
Start: "2024-12-01T15:31:10Z",
|
||||
End: "2024-12-31T15:32:20Z",
|
||||
})
|
||||
expected.Sort()
|
||||
return expected.Data.Result
|
||||
},
|
||||
Want: wantResponse.Data.Result,
|
||||
CmpOpts: []cmp.Option{
|
||||
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
|
||||
},
|
||||
})
|
||||
}
|
||||
0
docs/changelog/CHANGELOG.md
Normal file
0
docs/changelog/CHANGELOG.md
Normal file
0
docs/victoriametrics/vmctl.md
Normal file
0
docs/victoriametrics/vmctl.md
Normal file
@@ -3,6 +3,7 @@ package fsremote
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -228,6 +229,7 @@ func (fs *FS) HasFile(filePath string) (bool, error) {
|
||||
path := filepath.Join(fs.Dir, filePath)
|
||||
fi, err := os.Stat(path)
|
||||
if err != nil {
|
||||
log.Printf("debug: os.Stat(%q) error: %s", path, err)
|
||||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user