diff --git a/app/vlogsgenerator/Makefile b/app/vlogsgenerator/Makefile deleted file mode 100644 index 731244250b..0000000000 --- a/app/vlogsgenerator/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -# All these commands must run from repository root. - -vlogsgenerator: - APP_NAME=vlogsgenerator $(MAKE) app-local - -vlogsgenerator-race: - APP_NAME=vlogsgenerator RACE=-race $(MAKE) app-local diff --git a/app/vlogsgenerator/README.md b/app/vlogsgenerator/README.md index 028f0ef7f8..5456a353a1 100644 --- a/app/vlogsgenerator/README.md +++ b/app/vlogsgenerator/README.md @@ -1,158 +1 @@ -# vlogsgenerator - -Logs generator for [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/). - -## How to build vlogsgenerator? - -Run `make vlogsgenerator` from the repository root. This builds `bin/vlogsgenerator` binary. - -## How run vlogsgenerator? - -`vlogsgenerator` generates logs in [JSON line format](https://jsonlines.org/) suitable for the ingestion -via [`/insert/jsonline` endpoint at VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). - -By default it writes the generated logs into `stdout`. For example, the following command writes generated logs to `stdout`: - -``` -bin/vlogsgenerator -``` - -It is possible to redirect the generated logs to file. For example, the following command writes the generated logs to `logs.json` file: - -``` -bin/vlogsgenerator > logs.json -``` - -The generated logs at `logs.json` file can be inspected with the following command: - -``` -head logs.json | jq . -``` - -Below is an example output: - -```json -{ - "_time": "2024-05-08T14:34:00.854Z", - "_msg": "message for the stream 8 and worker 0; ip=185.69.136.129; uuid=b4fe8f1a-c93c-dea3-ba11-5b9f0509291e; u64=8996587920687045253", - "host": "host_8", - "worker_id": "0", - "run_id": "f9b3deee-e6b6-7f56-5deb-1586e4e81725", - "const_0": "some value 0 8", - "const_1": "some value 1 8", - "const_2": "some value 2 8", - "var_0": "some value 0 12752539384823438260", - "dict_0": "warn", - "dict_1": "info", - "u8_0": "6", - "u16_0": "35202", - "u32_0": "1964973739", - "u64_0": "4810489083243239145", - "float_0": "1.868", - "ip_0": "250.34.75.125", - "timestamp_0": "1799-03-16T01:34:18.311Z", - "json_0": "{\"foo\":\"bar_3\",\"baz\":{\"a\":[\"x\",\"y\"]},\"f3\":NaN,\"f4\":32}" -} -{ - "_time": "2024-05-08T14:34:00.854Z", - "_msg": "message for the stream 9 and worker 0; ip=164.244.254.194; uuid=7e8373b1-ce0d-1ce7-8e96-4bcab8955598; u64=13949903463741076522", - "host": "host_9", - "worker_id": "0", - "run_id": "f9b3deee-e6b6-7f56-5deb-1586e4e81725", - "const_0": "some value 0 9", - "const_1": "some value 1 9", - "const_2": "some value 2 9", - "var_0": "some value 0 5371555382075206134", - "dict_0": "INFO", - "dict_1": "FATAL", - "u8_0": "219", - "u16_0": "31459", - "u32_0": "3918836777", - "u64_0": "6593354256620219850", - "float_0": "1.085", - "ip_0": "253.151.88.158", - "timestamp_0": "2042-10-05T16:42:57.082Z", - "json_0": "{\"foo\":\"bar_5\",\"baz\":{\"a\":[\"x\",\"y\"]},\"f3\":NaN,\"f4\":27}" -} -``` - -The `run_id` field uniquely identifies every `vlogsgenerator` invocation. - -### How to write logs to VictoriaLogs? - -The generated logs can be written directly to VictoriaLogs by passing the address of [`/insert/jsonline` endpoint](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api) -to `-addr` command-line flag. For example, the following command writes the generated logs to VictoriaLogs running at `localhost`: - -``` -bin/vlogsgenerator -addr=http://localhost:9428/insert/jsonline -``` - -### Configuration - -`vlogsgenerator` accepts various command-line flags, which can be used for configuring the number and the shape of the generated logs. -These flags can be inspected by running `vlogsgenerator -help`. Below are the most interesting flags: - -* `-start` - starting timestamp for generating logs. Logs are evenly generated on the [`-start` ... `-end`] interval. -* `-end` - ending timestamp for generating logs. Logs are evenly generated on the [`-start` ... `-end`] interval. -* `-activeStreams` - the number of active [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to generate. -* `-logsPerStream` - the number of log entries to generate per each log stream. Log entries are evenly distributed on the [`-start` ... `-end`] interval. - -The total number of generated logs can be calculated as `-activeStreams` * `-logsPerStream`. - -For example, the following command generates `1_000_000` log entries on the time range `[2024-01-01 - 2024-02-01]` across `100` -[log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), where every logs stream contains `10_000` log entries, -and writes them to `http://localhost:9428/insert/jsonline`: - -``` -bin/vlogsgenerator \ - -start=2024-01-01 -end=2024-02-01 \ - -activeStreams=100 \ - -logsPerStream=10_000 \ - -addr=http://localhost:9428/insert/jsonline -``` - -### Churn rate - -It is possible to generate churn rate for active [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) -by specifying `-totalStreams` command-line flag bigger than `-activeStreams`. For example, the following command generates -logs for `1000` total streams, while the number of active streams equals to `100`. This means that at every time there are logs for `100` streams, -but these streams change over the given [`-start` ... `-end`] time range, so the total number of streams on the given time range becomes `1000`: - -``` -bin/vlogsgenerator \ - -start=2024-01-01 -end=2024-02-01 \ - -activeStreams=100 \ - -totalStreams=1_000 \ - -logsPerStream=10_000 \ - -addr=http://localhost:9428/insert/jsonline -``` - -In this case the total number of generated logs equals to `-totalStreams` * `-logsPerStream` = `10_000_000`. - -### Benchmark tuning - -By default `vlogsgenerator` generates and writes logs by a single worker. This may limit the maximum data ingestion rate during benchmarks. -The number of workers can be changed via `-workers` command-line flag. For example, the following command generates and writes logs with `16` workers: - -``` -bin/vlogsgenerator \ - -start=2024-01-01 -end=2024-02-01 \ - -activeStreams=100 \ - -logsPerStream=10_000 \ - -addr=http://localhost:9428/insert/jsonline \ - -workers=16 -``` - -### Output statistics - -Every 10 seconds `vlogsgenerator` writes statistics about the generated logs into `stderr`. The frequency of the generated statistics can be adjusted via `-statInterval` command-line flag. -For example, the following command writes statistics every 2 seconds: - -``` -bin/vlogsgenerator \ - -start=2024-01-01 -end=2024-02-01 \ - -activeStreams=100 \ - -logsPerStream=10_000 \ - -addr=http://localhost:9428/insert/jsonline \ - -statInterval=2s -``` +VictoriaLogs source code has been moved to [github.com/VictoriaMetrics/VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaLogs/). diff --git a/app/vlogsgenerator/main.go b/app/vlogsgenerator/main.go deleted file mode 100644 index f1e1759900..0000000000 --- a/app/vlogsgenerator/main.go +++ /dev/null @@ -1,354 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "io" - "math" - "math/rand" - "net/http" - "net/url" - "os" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" -) - -var ( - addr = flag.String("addr", "stdout", "HTTP address to push the generated logs to; if it is set to stdout, then logs are generated to stdout") - workers = flag.Int("workers", 1, "The number of workers to use to push logs to -addr") - - start = newTimeFlag("start", "-1d", "Generated logs start from this time; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats") - end = newTimeFlag("end", "0s", "Generated logs end at this time; see https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#timestamp-formats") - activeStreams = flag.Int("activeStreams", 100, "The number of active log streams to generate; see https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields") - totalStreams = flag.Int("totalStreams", 0, "The number of total log streams; if -totalStreams > -activeStreams, then some active streams are substituted with new streams "+ - "during data generation") - logsPerStream = flag.Int64("logsPerStream", 1_000, "The number of log entries to generate per each log stream. Log entries are evenly distributed between -start and -end") - constFieldsPerLog = flag.Int("constFieldsPerLog", 3, "The number of fields with constant values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - varFieldsPerLog = flag.Int("varFieldsPerLog", 1, "The number of fields with variable values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - dictFieldsPerLog = flag.Int("dictFieldsPerLog", 2, "The number of fields with up to 8 different values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - u8FieldsPerLog = flag.Int("u8FieldsPerLog", 1, "The number of fields with uint8 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - u16FieldsPerLog = flag.Int("u16FieldsPerLog", 1, "The number of fields with uint16 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - u32FieldsPerLog = flag.Int("u32FieldsPerLog", 1, "The number of fields with uint32 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - u64FieldsPerLog = flag.Int("u64FieldsPerLog", 1, "The number of fields with uint64 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - i64FieldsPerLog = flag.Int("i64FieldsPerLog", 1, "The number of fields with int64 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - floatFieldsPerLog = flag.Int("floatFieldsPerLog", 1, "The number of fields with float64 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - ipFieldsPerLog = flag.Int("ipFieldsPerLog", 1, "The number of fields with IPv4 values to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - timestampFieldsPerLog = flag.Int("timestampFieldsPerLog", 1, "The number of fields with ISO8601 timestamps per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - jsonFieldsPerLog = flag.Int("jsonFieldsPerLog", 1, "The number of JSON fields to generate per each log entry; "+ - "see https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model") - - statInterval = flag.Duration("statInterval", 10*time.Second, "The interval between publishing the stats") -) - -func main() { - // Write flags and help message to stdout, since it is easier to grep or pipe. - flag.CommandLine.SetOutput(os.Stdout) - envflag.Parse() - buildinfo.Init() - logger.Init() - - var remoteWriteURL *url.URL - if *addr != "stdout" { - urlParsed, err := url.Parse(*addr) - if err != nil { - logger.Fatalf("cannot parse -addr=%q: %s", *addr, err) - } - qs, err := url.ParseQuery(urlParsed.RawQuery) - if err != nil { - logger.Fatalf("cannot parse query string in -addr=%q: %w", *addr, err) - } - qs.Set("_stream_fields", "host,worker_id") - urlParsed.RawQuery = qs.Encode() - remoteWriteURL = urlParsed - } - - if start.nsec >= end.nsec { - logger.Fatalf("-start=%s must be smaller than -end=%s", start, end) - } - if *activeStreams <= 0 { - logger.Fatalf("-activeStreams must be bigger than 0; got %d", *activeStreams) - } - if *logsPerStream <= 0 { - logger.Fatalf("-logsPerStream must be bigger than 0; got %d", *logsPerStream) - } - if *totalStreams < *activeStreams { - *totalStreams = *activeStreams - } - - cfg := &workerConfig{ - url: remoteWriteURL, - activeStreams: *activeStreams, - totalStreams: *totalStreams, - } - - // divide total and active streams among workers - if *workers <= 0 { - logger.Fatalf("-workers must be bigger than 0; got %d", *workers) - } - if *workers > *activeStreams { - logger.Fatalf("-workers=%d cannot exceed -activeStreams=%d", *workers, *activeStreams) - } - cfg.activeStreams /= *workers - cfg.totalStreams /= *workers - - logger.Infof("start -workers=%d workers for ingesting -logsPerStream=%d log entries per each -totalStreams=%d (-activeStreams=%d) on a time range -start=%s, -end=%s to -addr=%s", - *workers, *logsPerStream, *totalStreams, *activeStreams, toRFC3339(start.nsec), toRFC3339(end.nsec), *addr) - - startTime := time.Now() - var wg sync.WaitGroup - for i := 0; i < *workers; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - generateAndPushLogs(cfg, workerID) - }(i) - } - - go func() { - prevEntries := uint64(0) - prevBytes := uint64(0) - ticker := time.NewTicker(*statInterval) - for range ticker.C { - currEntries := logEntriesCount.Load() - deltaEntries := currEntries - prevEntries - rateEntries := float64(deltaEntries) / statInterval.Seconds() - - currBytes := bytesGenerated.Load() - deltaBytes := currBytes - prevBytes - rateBytes := float64(deltaBytes) / statInterval.Seconds() - logger.Infof("generated %dK log entries (%dK total) at %.0fK entries/sec, %dMB (%dMB total) at %.0fMB/sec", - deltaEntries/1e3, currEntries/1e3, rateEntries/1e3, deltaBytes/1e6, currBytes/1e6, rateBytes/1e6) - - prevEntries = currEntries - prevBytes = currBytes - } - }() - - wg.Wait() - - dSecs := time.Since(startTime).Seconds() - currEntries := logEntriesCount.Load() - currBytes := bytesGenerated.Load() - rateEntries := float64(currEntries) / dSecs - rateBytes := float64(currBytes) / dSecs - logger.Infof("ingested %dK log entries (%dMB) in %.3f seconds; avg ingestion rate: %.0fK entries/sec, %.0fMB/sec", currEntries/1e3, currBytes/1e6, dSecs, rateEntries/1e3, rateBytes/1e6) -} - -var logEntriesCount atomic.Uint64 - -var bytesGenerated atomic.Uint64 - -type workerConfig struct { - url *url.URL - activeStreams int - totalStreams int -} - -type statWriter struct { - w io.Writer -} - -func (sw *statWriter) Write(p []byte) (int, error) { - bytesGenerated.Add(uint64(len(p))) - return sw.w.Write(p) -} - -func generateAndPushLogs(cfg *workerConfig, workerID int) { - pr, pw := io.Pipe() - sw := &statWriter{ - w: pw, - } - - // The 1MB write buffer increases data ingestion performance by reducing the number of send() syscalls - bw := bufio.NewWriterSize(sw, 1024*1024) - - doneCh := make(chan struct{}) - go func() { - generateLogs(bw, workerID, cfg.activeStreams, cfg.totalStreams) - _ = bw.Flush() - _ = pw.Close() - close(doneCh) - }() - - if cfg.url == nil { - _, err := io.Copy(os.Stdout, pr) - if err != nil { - logger.Fatalf("unexpected error when writing logs to stdout: %s", err) - } - return - } - - req, err := http.NewRequest("POST", cfg.url.String(), pr) - if err != nil { - logger.Fatalf("cannot create request to %q: %s", cfg.url, err) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - logger.Fatalf("cannot perform request to %q: %s", cfg.url, err) - } - defer resp.Body.Close() - - if resp.StatusCode/100 != 2 { - logger.Fatalf("unexpected status code got from %q: %d; want 2xx", cfg.url, err) - } - - // Wait until all the generateLogs goroutine is finished. - <-doneCh -} - -func generateLogs(bw *bufio.Writer, workerID, activeStreams, totalStreams int) { - streamLifetime := int64(float64(end.nsec-start.nsec) * (float64(activeStreams) / float64(totalStreams))) - streamStep := int64(float64(end.nsec-start.nsec) / float64(totalStreams-activeStreams+1)) - step := streamLifetime / (*logsPerStream - 1) - - currNsec := start.nsec - for currNsec < end.nsec { - firstStreamID := int((currNsec - start.nsec) / streamStep) - generateLogsAtTimestamp(bw, workerID, currNsec, firstStreamID, activeStreams) - currNsec += step - } -} - -var runID = toUUID(rand.Uint64(), rand.Uint64()) - -func generateLogsAtTimestamp(bw *bufio.Writer, workerID int, ts int64, firstStreamID, activeStreams int) { - streamID := firstStreamID - timeStr := toRFC3339(ts) - for i := 0; i < activeStreams; i++ { - ip := toIPv4(rand.Uint32()) - uuid := toUUID(rand.Uint64(), rand.Uint64()) - fmt.Fprintf(bw, `{"_time":"%s","_msg":"message for the stream %d and worker %d; ip=%s; uuid=%s; u64=%d","host":"host_%d","worker_id":"%d"`, - timeStr, streamID, workerID, ip, uuid, rand.Uint64(), streamID, workerID) - fmt.Fprintf(bw, `,"run_id":"%s"`, runID) - for j := 0; j < *constFieldsPerLog; j++ { - fmt.Fprintf(bw, `,"const_%d":"some value %d %d"`, j, j, streamID) - } - for j := 0; j < *varFieldsPerLog; j++ { - fmt.Fprintf(bw, `,"var_%d":"some value %d %d"`, j, j, rand.Uint64()) - } - for j := 0; j < *dictFieldsPerLog; j++ { - fmt.Fprintf(bw, `,"dict_%d":"%s"`, j, dictValues[rand.Intn(len(dictValues))]) - } - for j := 0; j < *u8FieldsPerLog; j++ { - fmt.Fprintf(bw, `,"u8_%d":"%d"`, j, uint8(rand.Uint32())) - } - for j := 0; j < *u16FieldsPerLog; j++ { - fmt.Fprintf(bw, `,"u16_%d":"%d"`, j, uint16(rand.Uint32())) - } - for j := 0; j < *u32FieldsPerLog; j++ { - fmt.Fprintf(bw, `,"u32_%d":"%d"`, j, rand.Uint32()) - } - for j := 0; j < *u64FieldsPerLog; j++ { - fmt.Fprintf(bw, `,"u64_%d":"%d"`, j, rand.Uint64()) - } - for j := 0; j < *i64FieldsPerLog; j++ { - fmt.Fprintf(bw, `,"i64_%d":"%d"`, j, int64(rand.Uint64())) - } - for j := 0; j < *floatFieldsPerLog; j++ { - fmt.Fprintf(bw, `,"float_%d":"%v"`, j, math.Round(10_000*rand.Float64())/1000) - } - for j := 0; j < *ipFieldsPerLog; j++ { - ip := toIPv4(rand.Uint32()) - fmt.Fprintf(bw, `,"ip_%d":"%s"`, j, ip) - } - for j := 0; j < *timestampFieldsPerLog; j++ { - timestamp := toISO8601(int64(rand.Uint64())) - fmt.Fprintf(bw, `,"timestamp_%d":"%s"`, j, timestamp) - } - for j := 0; j < *jsonFieldsPerLog; j++ { - fmt.Fprintf(bw, `,"json_%d":"{\"foo\":\"bar_%d\",\"baz\":{\"a\":[\"x\",\"y\"]},\"f3\":NaN,\"f4\":%d}"`, j, rand.Intn(10), rand.Intn(100)) - } - fmt.Fprintf(bw, "}\n") - - logEntriesCount.Add(1) - streamID++ - } -} - -var dictValues = []string{ - "debug", - "info", - "warn", - "error", - "fatal", - "ERROR", - "FATAL", - "INFO", -} - -func newTimeFlag(name, defaultValue, description string) *timeFlag { - var tf timeFlag - if err := tf.Set(defaultValue); err != nil { - logger.Panicf("invalid defaultValue=%q for flag %q: %w", defaultValue, name, err) - } - flag.Var(&tf, name, description) - return &tf -} - -type timeFlag struct { - s string - nsec int64 -} - -func (tf *timeFlag) Set(s string) error { - msec, err := timeutil.ParseTimeMsec(s) - if err != nil { - return fmt.Errorf("cannot parse time from %q: %w", s, err) - } - tf.s = s - tf.nsec = msec * 1e6 - return nil -} - -func (tf *timeFlag) String() string { - return tf.s -} - -func toRFC3339(nsec int64) string { - return time.Unix(0, nsec).UTC().Format(time.RFC3339Nano) -} - -func toISO8601(nsec int64) string { - return time.Unix(0, nsec).UTC().Format("2006-01-02T15:04:05.000Z") -} - -func toIPv4(n uint32) string { - dst := make([]byte, 0, len("255.255.255.255")) - dst = marshalUint64(dst, uint64(n>>24)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64((n>>16)&0xff)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64((n>>8)&0xff)) - dst = append(dst, '.') - dst = marshalUint64(dst, uint64(n&0xff)) - return string(dst) -} - -func toUUID(a, b uint64) string { - return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x", a&(1<<32-1), (a>>32)&(1<<16-1), (a >> 48), b&(1<<16-1), b>>16) -} - -// marshalUint64 appends string representation of n to dst and returns the result. -func marshalUint64(dst []byte, n uint64) []byte { - return strconv.AppendUint(dst, n, 10) -}