Compare commits

..

1 Commits

Author SHA1 Message Date
Aliaksandr Valialkin
19986da161 docs/CHANGELOG.md: cut v1.53.1 2021-02-03 22:50:52 +02:00
4856 changed files with 191479 additions and 930674 deletions

View File

@@ -4,4 +4,3 @@ gocache-for-docker
victoria-metrics-data
vmstorage-data
vmselect-cache
.vscode

44
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,44 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
It would be great [upgrading](https://victoriametrics.github.io/#how-to-upgrade) to [the latest avaialble release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases)
and verifying whether the bug is reproducible there.
It is also recommended reading [troubleshooting docs](https://victoriametrics.github.io/#troubleshooting).
**To Reproduce**
Steps to reproduce the behavior.
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Version**
The line returned when passing `--version` command line flag to binary. For example:
```
$ ./victoria-metrics-prod --version
victoria-metrics-20190730-121249-heads-single-node-0-g671d9e55
```
**Used command-line flags**
Command-line flags are listed as `flag{name="httpListenAddr", value=":443"} 1` lines at `/metrics` page.
See the following docs for details:
* [monitoring for single-node VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#monitoring)
* [montioring for VictoriaMetrics cluster](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/cluster/README.md#monitoring)
**Additional context**
Add any other context about the problem here such as error logs from VictoriaMetrics and Prometheus,
`/metrics` output, screenshots from the official Grafana dashboards for VictoriaMetrics:
* [Grafana dashboard for single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
* [Grafana dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176)

View File

@@ -1,86 +0,0 @@
name: Bug report
description: Create a report to help us improve
labels: [bug]
body:
- type: markdown
attributes:
value: |
Before filling a bug report it would be great to [upgrade](https://docs.victoriametrics.com/#how-to-upgrade)
to [the latest available release](https://github.com/VictoriaMetrics/VictoriaMetrics/releases)
and verify whether the bug is reproducible there.
It's also recommended to read the [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html) first.
- type: textarea
id: describe-the-bug
attributes:
label: Describe the bug
description: |
A clear and concise description of what the bug is.
placeholder: |
When I do `A` VictoriaMetrics does `B`. I expect it to do `C`.
validations:
required: true
- type: textarea
id: to-reproduce
attributes:
label: To Reproduce
description: |
Steps to reproduce the behavior.
If reproducing an issue requires some specific configuration file, please paste it here.
placeholder: |
Steps to reproduce the behavior.
validations:
required: true
- type: textarea
id: version
attributes:
label: Version
description: |
The line returned when passing `--version` command line flag to the binary. For example:
```
$ ./victoria-metrics-prod --version
victoria-metrics-20190730-121249-heads-single-node-0-g671d9e55
```
validations:
required: true
- type: textarea
id: logs
attributes:
label: Logs
description: |
Check if any warnings or errors were logged by VictoriaMetrics components
or components in communication with VictoriaMetrics (e.g. Prometheus, Grafana).
validations:
required: false
- type: textarea
id: screenshots
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem.
For VictoriaMetrics health-state issues please provide full-length screenshots
of Grafana dashboards if possible:
* [Grafana dashboard for single-node VictoriaMetrics](https://grafana.com/grafana/dashboards/10229-victoriametrics/)
* [Grafana dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176-victoriametrics-cluster/)
See how to setup monitoring here:
* [monitoring for single-node VictoriaMetrics](https://docs.victoriametrics.com/#monitoring)
* [monitoring for VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring)
validations:
required: false
- type: textarea
id: flags
attributes:
label: Used command-line flags
description: |
Please provide the command-line flags used for running VictoriaMetrics and its components.
validations:
required: false
- type: textarea
id: additional-info
attributes:
label: Additional information
placeholder: |
Additional information that doesn't fit elsewhere
validations:
required: false

View File

@@ -1,5 +0,0 @@
blank_issues_enabled: true
contact_links:
- name: Ask on Slack
url: https://slack.victoriametrics.com/
about: You can ask for help here!

View File

@@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@@ -1,43 +0,0 @@
name: Feature request
description: Suggest an idea for this project
labels: [enhancement]
body:
- type: textarea
id: describe-the-problem
attributes:
label: Is your feature request related to a problem? Please describe
description: |
A clear and concise description of what the problem is.
placeholder: |
Ex. I'm always frustrated when [...]
validations:
required: false
- type: textarea
id: describe-the-solution
attributes:
label: Describe the solution you'd like
description: |
A clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
id: alternative-solutions
attributes:
label: Describe alternatives you've considered
description: |
A clear and concise description of any alternative solutions or features you've considered.
placeholder: |
I have tried to do `A`, but that doesn't solve a problem completely.
I have tried to do `A` and `B`, but implementing this would be better.
validations:
required: false
- type: textarea
id: feature-additional-info
attributes:
label: Additional information
description: |
Additional information which you consider helpful for implementing this feature.
placeholder: |
Add any other context or screenshots about the feature request here.
validations:
required: false

View File

@@ -1,32 +0,0 @@
name: Question
description: Ask a question regarding VictoriaMetrics or its components
labels: [question]
body:
- type: textarea
id: describe-the-component
attributes:
label: Is your question request related to a specific component?
placeholder: |
VictoriaMetrics, vmagent, vmalert, vmui, etc...
validations:
required: false
- type: textarea
id: describe-the-question
attributes:
label: Describe the question in detail
description: |
A clear and concise description of the issue and the question.
validations:
required: true
- type: checkboxes
id: troubleshooting
attributes:
label: Troubleshooting docs
description: I am familiar with the following troubleshooting docs
options:
- label: General - https://docs.victoriametrics.com/Troubleshooting.html
required: false
- label: vmagent - https://docs.victoriametrics.com/vmagent.html#troubleshooting
required: false
- label: vmalert - https://docs.victoriametrics.com/vmalert.html#troubleshooting
required: false

View File

@@ -1,30 +0,0 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 0
- package-ecosystem: "bundler"
directory: "/docs"
schedule:
interval: "weekly"
open-pull-requests-limit: 0
- package-ecosystem: "gomod"
directory: "/app/vmui/packages/vmui/web"
schedule:
interval: "weekly"
open-pull-requests-limit: 0
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "daily"
- package-ecosystem: "npm"
directory: "/app/vmui/packages/vmui"
schedule:
interval: "weekly"
open-pull-requests-limit: 0

View File

@@ -1,26 +0,0 @@
name: license-check
on:
push:
paths:
- 'vendor'
pull_request:
paths:
- 'vendor'
permissions:
contents: read
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Setup Go
uses: actions/setup-go@main
with:
go-version: 1.20.7
id: go
- name: Code checkout
uses: actions/checkout@master
- name: Check License
run: |
make check-licenses

View File

@@ -1,46 +0,0 @@
name: "CodeQL - JS"
on:
push:
branches: [master, cluster]
paths:
- "**.js"
pull_request:
# The branches below must be a subset of the branches above
branches: [master, cluster]
paths:
- "**.js"
schedule:
- cron: "30 18 * * 2"
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: ["javascript"]
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
with:
category: "javascript"

View File

@@ -1,92 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [master, cluster]
paths-ignore:
- "docs/**"
- "**.md"
- "**.txt"
- "**.js"
pull_request:
# The branches below must be a subset of the branches above
branches: [master, cluster]
paths-ignore:
- "docs/**"
- "**.md"
- "**.txt"
- "**.js"
schedule:
- cron: "30 18 * * 2"
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: ["go"]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.20.7
check-latest: true
cache: true
if: ${{ matrix.language == 'go' }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

30
.github/workflows/github-pages.yml vendored Normal file
View File

@@ -0,0 +1,30 @@
name: github-pages
on:
push:
paths:
- 'docs/*'
- 'README.md'
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: publish
shell: bash
env:
TOKEN: ${{secrets.CI_TOKEN}}
run: |
git clone https://vika:${TOKEN}@github.com/VictoriaMetrics/VictoriaMetrics.github.io.git gpages
cp docs/* gpages
cp README.md gpages
cd gpages
git config --local user.email "info@victoriametrics.com"
git config --local user.name "Vika"
git add .
git commit -m "update github pages"
remote_repo="https://vika:${TOKEN}@github.com/VictoriaMetrics/VictoriaMetrics.github.io.git"
git push "${remote_repo}"
cd ..
rm -rf gpages

View File

@@ -1,96 +1,66 @@
name: main
on:
push:
branches:
- master
- cluster
paths-ignore:
- "docs/**"
- "**.md"
- 'docs/**'
- '**.md'
pull_request:
branches:
- master
- cluster
paths-ignore:
- "docs/**"
- "**.md"
permissions:
contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
- 'docs/**'
- '**.md'
jobs:
lint:
name: lint
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@main
with:
go-version: 1.20.7
check-latest: true
cache: true
go-version: 1.15
id: go
- name: Dependencies
run: |
make install-golangci-lint
go get -u golang.org/x/lint/golint
go get -u github.com/kisielk/errcheck
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.29.0
- name: Code checkout
uses: actions/checkout@master
- name: Build
env:
GO111MODULE: on
run: |
export PATH=$PATH:$(go env GOPATH)/bin # temporary fix. See https://github.com/actions/setup-go/issues/14
make check-all
git diff --exit-code
test:
needs: lint
strategy:
matrix:
scenario: ["test-full", "test-pure", "test-full-386"]
name: test
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@v3
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: 1.20.7
check-latest: true
cache: true
- name: run tests
run: |
make ${{ matrix.scenario}}
make test-full
make test-pure
make test-full-386
make victoria-metrics
make victoria-metrics-pure
make victoria-metrics-arm
make victoria-metrics-arm64
make vmutils
GOOS=freebsd go build -mod=vendor ./app/victoria-metrics
GOOS=freebsd go build -mod=vendor ./app/vmagent
GOOS=freebsd go build -mod=vendor ./app/vmalert
GOOS=freebsd go build -mod=vendor ./app/vmbackup
GOOS=freebsd go build -mod=vendor ./app/vmrestore
GOOS=freebsd go build -mod=vendor ./app/vmctl
GOOS=openbsd go build -mod=vendor ./app/victoria-metrics
GOOS=openbsd go build -mod=vendor ./app/vmagent
GOOS=openbsd go build -mod=vendor ./app/vmalert
GOOS=openbsd go build -mod=vendor ./app/vmbackup
GOOS=openbsd go build -mod=vendor ./app/vmrestore
GOOS=openbsd go build -mod=vendor ./app/vmctl
GOOS=darwin go build -mod=vendor ./app/victoria-metrics
GOOS=darwin go build -mod=vendor ./app/vmagent
GOOS=darwin go build -mod=vendor ./app/vmalert
GOOS=darwin go build -mod=vendor ./app/vmbackup
GOOS=darwin go build -mod=vendor ./app/vmrestore
GOOS=darwin go build -mod=vendor ./app/vmctl
- name: Publish coverage
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v1.0.6
with:
token: ${{secrets.CODECOV_TOKEN}}
file: ./coverage.txt
build:
needs: test
name: build
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@v3
- name: Setup Go
id: go
uses: actions/setup-go@v4
with:
go-version: 1.20.7
check-latest: true
cache: true
- uses: actions/cache@v3
with:
path: gocache-for-docker
key: gocache-docker-${{ runner.os }}-${{ steps.go.outputs.go-version }}-${{ hashFiles('go.mod') }}
- name: Build
run: |
make victoria-metrics-crossbuild
make vmuitils-crossbuild

View File

@@ -1,53 +0,0 @@
name: publish-docs
on:
push:
branches:
- 'master'
paths:
- 'docs/**'
workflow_dispatch: {}
permissions:
contents: read # This is required for actions/checkout and to commit back image update
deployments: write
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@v3
with:
path: main
- name: Checkout private code
uses: actions/checkout@v3
with:
repository: VictoriaMetrics/vmdocs
token: ${{ secrets.VM_BOT_GH_TOKEN }}
path: docs
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@v5
with:
gpg_private_key: ${{ secrets.VM_BOT_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.VM_BOT_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true
workdir: docs
- name: Set short git commit SHA
id: vars
run: |
calculatedSha=$(git rev-parse --short ${{ github.sha }})
echo "short_sha=$calculatedSha" >> $GITHUB_OUTPUT
working-directory: main
- name: update code and commit
run: |
rm -rf content
cp -r ../main/docs content
make clean-after-copy
git config --global user.name "${{ steps.import-gpg.outputs.email }}"
git config --global user.email "${{ steps.import-gpg.outputs.email }}"
git add .
git commit -S -m "sync docs with VictoriaMetrics/VictoriaMetrics commit: ${{ steps.vars.outputs.short_sha }}"
git push
working-directory: docs

View File

@@ -1,80 +0,0 @@
name: sandbox-release
on:
release:
types: [published]
permissions:
contents: write
jobs:
deploy-sandbox:
runs-on: ubuntu-latest
steps:
- name: check inputs
if: github.event.release.tag_name == ''
run: exit 1
- name: Check out code
uses: actions/checkout@v3
with:
repository: VictoriaMetrics/ops
token: ${{ secrets.VM_BOT_GH_TOKEN }}
- name: Import GPG key
id: import-gpg
uses: crazy-max/ghaction-import-gpg@v5
with:
gpg_private_key: ${{ secrets.VM_BOT_GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.VM_BOT_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true
- name: update image tag
uses: fjogeleit/yaml-update-action@main
with:
valueFile: 'gcp-test/sandbox/manifests/benchmark-vm/vmcluster.yaml'
commitChange: false
createPR: false
changes: |
{
"gcp-test/sandbox/manifests/benchmark-vm/vmcluster.yaml": {
"spec.vminsert.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster",
"spec.vmselect.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster",
"spec.vmstorage.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster"
},
"gcp-test/sandbox/manifests/benchmark-vm/vmsingle.yaml": {
"spec.image.tag": "${{ github.event.release.tag_name }}-enterprise"
},
"gcp-test/sandbox/manifests/monitoring/monitoring-vmagent.yaml": {
"spec.image.tag": "${{ github.event.release.tag_name }}"
},
"gcp-test/sandbox/manifests/monitoring/monitoring-vmcluster.yaml": {
"spec.vminsert.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster",
"spec.vmselect.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster",
"spec.vmstorage.image.tag": "${{ github.event.release.tag_name }}-enterprise-cluster"
},
"gcp-test/sandbox/manifests/monitoring/vmalert.yaml": {
"spec.image.tag": "${{ github.event.release.tag_name }}-enterprise"
}
}
- name: commit changes
run: |
git config --global user.name "${{ steps.import-gpg.outputs.email }}"
git config --global user.email "${{ steps.import-gpg.outputs.email }}"
git add .
git commit -S -m "Deploy image tag ${RELEASE_TAG} to sandbox"
env:
RELEASE_TAG: ${{ github.event.release.tag_name }}
- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
with:
author: ${{ github.actor }} <${{ github.actor }}@users.noreply.github.com>
branch: release-automation
token: ${{ secrets.VM_BOT_GH_TOKEN }}
delete-branch: true
title: "release ${{ github.event.release.tag_name }}"
body: |
Release [${{ github.event.release.tag_name }}](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/${{ github.event.release.tag_name }}) to sandbox
> Auto-generated by `Github Actions Bot`

View File

@@ -5,13 +5,8 @@ on:
- 'docs/*'
branches:
- master
permissions:
contents: read
jobs:
build:
permissions:
contents: write # for Git to git push
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
@@ -21,7 +16,7 @@ jobs:
TOKEN: ${{secrets.CI_TOKEN}}
run: |
git clone https://vika:${TOKEN}@github.com/VictoriaMetrics/VictoriaMetrics.wiki.git wiki
cp -r docs/* wiki
cp docs/* wiki
cd wiki
git config --local user.email "info@victoriametrics.com"
git config --local user.name "Vika"

7
.gitignore vendored
View File

@@ -4,11 +4,9 @@
*.pprof
/bin
.idea
.vscode
*.test
*.swp
/gocache-for-docker
/victoria-logs-data
/victoria-metrics-data
/vmagent-remotewrite-data
/vmstorage-data
@@ -17,8 +15,3 @@
/package/temp-rpm-*
/package/*.deb
/package/*.rpm
.DS_store
Gemfile.lock
/_site
_site
*.tmp

View File

@@ -1,19 +0,0 @@
run:
timeout: 2m
linters:
enable:
- revive
issues:
exclude-rules:
- linters:
- staticcheck
text: "SA(4003|1019|5011):"
include:
- EXC0012
- EXC0014
linters-settings:
errcheck:
exclude: ./errcheck_excludes.txt

View File

@@ -1,6 +0,0 @@
allowlist:
- Apache-2.0
- MIT
- BSD-3-Clause
- BSD-2-Clause
- ISC

View File

@@ -7,7 +7,7 @@ contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion or sexual identity and orientation.
appearance, race, religion, or sexual identity and orientation.
## Our Standards
@@ -24,9 +24,9 @@ Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments and personal or political attacks
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as physical or electronic
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
@@ -38,26 +38,26 @@ behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues and other contributions
that are not aligned to this Code of Conduct or to ban temporarily or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive or harmful.
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account or acting as an appointed
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing or otherwise unacceptable behavior may be
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at info@victoriametrics.com. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate for the circumstances. The project team is
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
@@ -68,9 +68,9 @@ members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at <https://www.contributor-covenant.org/version/1/4/code-of-conduct.html>
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
<https://www.contributor-covenant.org/faq>
https://www.contributor-covenant.org/faq

View File

@@ -79,7 +79,7 @@
**Последствия**: Предупреждение о последствиях в случае продолжающегося неуместного поведения.
На определенное время не допускается взаимодействие с людьми, вовлеченными в инцидент,
включая незапрошенное взаимодействие
включая незапрошенное взаимодействие
с теми, кто обеспечивает соблюдение Кодекса. Это включает в себя избегание взаимодействия
в публичных пространствах, а так же во внешних каналах,
таких как социальные сети. Нарушение этих правил влечет за собой временный или вечный бан.
@@ -89,10 +89,10 @@
**Общественное влияние**: Серьёзное нарушение стандартов сообщества,
включая продолжительное неуместное поведение.
**Последствия**: Временный запрет (бан) на любое взаимодействие
**Последствия**: Временный запрет (бан) на любое взаимодействие
или публичное общение с сообществом на определенный период времени.
На этот период не допускается публичное или личное взаимодействие с людьми,
вовлеченными в инцидент, включая незапрошенное взаимодействие
вовлеченными в инцидент, включая незапрошенное взаимодействие
с теми, кто обеспечивает соблюдение Кодекса.
Нарушение этих правил влечет за собой вечный бан.
@@ -108,7 +108,7 @@
Данный Кодекс Поведения основан на [Кодекс Поведения участника][homepage],
версии 2.0, доступной по адресу
<https://www.contributor-covenant.org/version/2/0/code_of_conduct.html>.
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Принципы Воздействия в Сообществе были вдохновлены [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
@@ -116,5 +116,5 @@ enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
Ответы на общие вопросы о данном кодексе поведения ищите на странице FAQ:
<https://www.contributor-covenant.org/faq>. Переводы доступны по адресу
<https://www.contributor-covenant.org/translations>.
https://www.contributor-covenant.org/faq. Переводы доступны по адресу
https://www.contributor-covenant.org/translations.

View File

@@ -175,7 +175,7 @@
END OF TERMS AND CONDITIONS
Copyright 2019-2023 VictoriaMetrics, Inc.
Copyright 2019-2021 VictoriaMetrics, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

519
Makefile
View File

@@ -1,27 +1,19 @@
PKG_PREFIX := github.com/VictoriaMetrics/VictoriaMetrics
DATEINFO_TAG ?= $(shell date -u +'%Y%m%d-%H%M%S')
BUILDINFO_TAG ?= $(shell echo $$(git describe --long --all | tr '/' '-')$$( \
git diff-index --quiet HEAD -- || echo '-dirty-'$$(git diff-index -u HEAD | openssl sha1 | cut -d' ' -f2 | cut -c 1-8)))
LATEST_TAG ?= latest
git diff-index --quiet HEAD -- || echo '-dirty-'$$(git diff-index -u HEAD | openssl sha1 | cut -c 10-17)))
PKG_TAG ?= $(shell git tag -l --points-at HEAD)
ifeq ($(PKG_TAG),)
PKG_TAG := $(BUILDINFO_TAG)
endif
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(DATEINFO_TAG)-$(BUILDINFO_TAG)'
GO_BUILDINFO = -X '$(PKG_PREFIX)/lib/buildinfo.Version=$(APP_NAME)-$(shell date -u +'%Y%m%d-%H%M%S')-$(BUILDINFO_TAG)'
.PHONY: $(MAKECMDGOALS)
include app/*/Makefile
include deployment/*/Makefile
include snap/local/Makefile
include package/release/Makefile
all: \
victoria-metrics-prod \
victoria-logs-prod \
vmagent-prod \
vmalert-prod \
vmauth-prod \
@@ -29,12 +21,14 @@ all: \
vmrestore-prod \
vmctl-prod
include app/*/Makefile
include deployment/*/Makefile
clean:
rm -rf bin/*
publish: package-base \
publish: \
publish-victoria-metrics \
publish-victoria-logs \
publish-vmagent \
publish-vmalert \
publish-vmauth \
@@ -44,7 +38,6 @@ publish: package-base \
package: \
package-victoria-metrics \
package-victoria-logs \
package-vmagent \
package-vmalert \
package-vmauth \
@@ -60,457 +53,165 @@ vmutils: \
vmrestore \
vmctl
vmutils-pure: \
vmagent-pure \
vmalert-pure \
vmauth-pure \
vmbackup-pure \
vmrestore-pure \
vmctl-pure
vmutils-arm64: \
vmagent-arm64 \
vmalert-arm64 \
vmauth-arm64 \
vmbackup-arm64 \
vmrestore-arm64 \
vmctl-arm64
vmutils-linux-amd64: \
vmagent-linux-amd64 \
vmalert-linux-amd64 \
vmauth-linux-amd64 \
vmbackup-linux-amd64 \
vmrestore-linux-amd64 \
vmctl-linux-amd64
vmutils-linux-arm64: \
vmagent-linux-arm64 \
vmalert-linux-arm64 \
vmauth-linux-arm64 \
vmbackup-linux-arm64 \
vmrestore-linux-arm64 \
vmctl-linux-arm64
vmutils-linux-arm: \
vmagent-linux-arm \
vmalert-linux-arm \
vmauth-linux-arm \
vmbackup-linux-arm \
vmrestore-linux-arm \
vmctl-linux-arm
vmutils-linux-386: \
vmagent-linux-386 \
vmalert-linux-386 \
vmauth-linux-386 \
vmbackup-linux-386 \
vmrestore-linux-386 \
vmctl-linux-386
vmutils-linux-ppc64le: \
vmagent-linux-ppc64le \
vmalert-linux-ppc64le \
vmauth-linux-ppc64le \
vmbackup-linux-ppc64le \
vmrestore-linux-ppc64le \
vmctl-linux-ppc64le
vmutils-darwin-amd64: \
vmagent-darwin-amd64 \
vmalert-darwin-amd64 \
vmauth-darwin-amd64 \
vmbackup-darwin-amd64 \
vmrestore-darwin-amd64 \
vmctl-darwin-amd64
vmutils-darwin-arm64: \
vmagent-darwin-arm64 \
vmalert-darwin-arm64 \
vmauth-darwin-arm64 \
vmbackup-darwin-arm64 \
vmrestore-darwin-arm64 \
vmctl-darwin-arm64
vmutils-freebsd-amd64: \
vmagent-freebsd-amd64 \
vmalert-freebsd-amd64 \
vmauth-freebsd-amd64 \
vmbackup-freebsd-amd64 \
vmrestore-freebsd-amd64 \
vmctl-freebsd-amd64
vmutils-openbsd-amd64: \
vmagent-openbsd-amd64 \
vmalert-openbsd-amd64 \
vmauth-openbsd-amd64 \
vmbackup-openbsd-amd64 \
vmrestore-openbsd-amd64 \
vmctl-openbsd-amd64
vmutils-windows-amd64: \
vmagent-windows-amd64 \
vmalert-windows-amd64 \
vmauth-windows-amd64 \
vmbackup-windows-amd64 \
vmrestore-windows-amd64 \
vmctl-windows-amd64
victoria-metrics-crossbuild: \
victoria-metrics-linux-386 \
victoria-metrics-linux-amd64 \
victoria-metrics-linux-arm64 \
victoria-metrics-linux-arm \
victoria-metrics-linux-386 \
victoria-metrics-linux-ppc64le \
victoria-metrics-darwin-amd64 \
victoria-metrics-darwin-arm64 \
victoria-metrics-freebsd-amd64 \
victoria-metrics-openbsd-amd64
vmutils-crossbuild: \
vmutils-linux-386 \
vmutils-linux-amd64 \
vmutils-linux-arm64 \
vmutils-linux-arm \
vmutils-linux-386 \
vmutils-linux-ppc64le \
vmutils-darwin-amd64 \
vmutils-darwin-arm64 \
vmutils-freebsd-amd64 \
vmutils-openbsd-amd64 \
vmutils-windows-amd64
publish-release:
git checkout $(TAG) && LATEST_TAG=stable $(MAKE) release publish && \
git checkout $(TAG)-cluster && LATEST_TAG=cluster-stable $(MAKE) release publish && \
git checkout $(TAG)-enterprise && LATEST_TAG=enterprise-stable $(MAKE) release publish && \
git checkout $(TAG)-enterprise-cluster && LATEST_TAG=enterprise-cluster-stable $(MAKE) release publish
release-snap:
snapcraft
snapcraft upload "victoriametrics_$(PKG_TAG)_multi.snap" --release beta,edge,candidate
release: \
release-victoria-metrics \
release-victoria-logs \
release-vmutils
release-victoria-metrics: \
release-victoria-metrics-linux-386 \
release-victoria-metrics-linux-amd64 \
release-victoria-metrics-linux-arm \
release-victoria-metrics-linux-arm64 \
release-victoria-metrics-darwin-amd64 \
release-victoria-metrics-darwin-arm64 \
release-victoria-metrics-freebsd-amd64 \
release-victoria-metrics-openbsd-amd64 \
release-victoria-metrics-windows-amd64
release-victoria-metrics-amd64 \
release-victoria-metrics-arm64
release-victoria-metrics-linux-386:
GOOS=linux GOARCH=386 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-amd64:
GOARCH=amd64 $(MAKE) release-victoria-metrics-generic
release-victoria-metrics-linux-amd64:
GOOS=linux GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-arm64:
GOARCH=arm64 $(MAKE) release-victoria-metrics-generic
release-victoria-metrics-linux-arm:
GOOS=linux GOARCH=arm $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-linux-arm64:
GOOS=linux GOARCH=arm64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-darwin-amd64:
GOOS=darwin GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-darwin-arm64:
GOOS=darwin GOARCH=arm64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-freebsd-amd64:
GOOS=freebsd GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-openbsd-amd64:
GOOS=openbsd GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-windows-amd64:
GOARCH=amd64 $(MAKE) release-victoria-metrics-windows-goarch
release-victoria-metrics-goos-goarch: victoria-metrics-$(GOOS)-$(GOARCH)-prod
release-victoria-metrics-generic: victoria-metrics-$(GOARCH)-prod
cd bin && \
tar --transform="flags=r;s|-$(GOOS)-$(GOARCH)||" -czf victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-metrics-$(GOOS)-$(GOARCH)-prod \
&& sha256sum victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-metrics-$(GOOS)-$(GOARCH)-prod \
| sed s/-$(GOOS)-$(GOARCH)-prod/-prod/ > victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf victoria-metrics-$(GOOS)-$(GOARCH)-prod
release-victoria-metrics-windows-goarch: victoria-metrics-windows-$(GOARCH)-prod
cd bin && \
zip victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-metrics-windows-$(GOARCH)-prod.exe \
&& sha256sum victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-metrics-windows-$(GOARCH)-prod.exe \
> victoria-metrics-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf \
victoria-metrics-windows-$(GOARCH)-prod.exe
release-victoria-logs: \
release-victoria-logs-linux-386 \
release-victoria-logs-linux-amd64 \
release-victoria-logs-linux-arm \
release-victoria-logs-linux-arm64 \
release-victoria-logs-darwin-amd64 \
release-victoria-logs-darwin-arm64 \
release-victoria-logs-freebsd-amd64 \
release-victoria-logs-openbsd-amd64 \
release-victoria-logs-windows-amd64
release-victoria-logs-linux-386:
GOOS=linux GOARCH=386 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-linux-amd64:
GOOS=linux GOARCH=amd64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-linux-arm:
GOOS=linux GOARCH=arm $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-linux-arm64:
GOOS=linux GOARCH=arm64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-darwin-amd64:
GOOS=darwin GOARCH=amd64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-darwin-arm64:
GOOS=darwin GOARCH=arm64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-freebsd-amd64:
GOOS=freebsd GOARCH=amd64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-openbsd-amd64:
GOOS=openbsd GOARCH=amd64 $(MAKE) release-victoria-logs-goos-goarch
release-victoria-logs-windows-amd64:
GOARCH=amd64 $(MAKE) release-victoria-logs-windows-goarch
release-victoria-logs-goos-goarch: victoria-logs-$(GOOS)-$(GOARCH)-prod
cd bin && \
tar --transform="flags=r;s|-$(GOOS)-$(GOARCH)||" -czf victoria-logs-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-logs-$(GOOS)-$(GOARCH)-prod \
&& sha256sum victoria-logs-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-logs-$(GOOS)-$(GOARCH)-prod \
| sed s/-$(GOOS)-$(GOARCH)-prod/-prod/ > victoria-logs-$(GOOS)-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf victoria-logs-$(GOOS)-$(GOARCH)-prod
release-victoria-logs-windows-goarch: victoria-logs-windows-$(GOARCH)-prod
cd bin && \
zip victoria-logs-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-logs-windows-$(GOARCH)-prod.exe \
&& sha256sum victoria-logs-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-logs-windows-$(GOARCH)-prod.exe \
> victoria-logs-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf \
victoria-logs-windows-$(GOARCH)-prod.exe
tar --transform="flags=r;s|-$(GOARCH)||" -czf victoria-metrics-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-metrics-$(GOARCH)-prod \
&& sha256sum victoria-metrics-$(GOARCH)-$(PKG_TAG).tar.gz \
victoria-metrics-$(GOARCH)-prod \
| sed s/-$(GOARCH)// > victoria-metrics-$(GOARCH)-$(PKG_TAG)_checksums.txt
release-vmutils: \
release-vmutils-linux-386 \
release-vmutils-linux-amd64 \
release-vmutils-linux-arm64 \
release-vmutils-linux-arm \
release-vmutils-darwin-amd64 \
release-vmutils-darwin-arm64 \
release-vmutils-freebsd-amd64 \
release-vmutils-openbsd-amd64 \
release-vmutils-windows-amd64
release-vmutils-amd64 \
release-vmutils-arm64
release-vmutils-linux-386:
GOOS=linux GOARCH=386 $(MAKE) release-vmutils-goos-goarch
release-vmutils-linux-amd64:
GOOS=linux GOARCH=amd64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-amd64:
GOARCH=amd64 $(MAKE) release-vmutils-generic
release-vmutils-linux-arm64:
GOOS=linux GOARCH=arm64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-arm64:
GOARCH=arm64 $(MAKE) release-vmutils-generic
release-vmutils-linux-arm:
GOOS=linux GOARCH=arm $(MAKE) release-vmutils-goos-goarch
release-vmutils-darwin-amd64:
GOOS=darwin GOARCH=amd64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-darwin-arm64:
GOOS=darwin GOARCH=arm64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-freebsd-amd64:
GOOS=freebsd GOARCH=amd64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-openbsd-amd64:
GOOS=openbsd GOARCH=amd64 $(MAKE) release-vmutils-goos-goarch
release-vmutils-windows-amd64:
GOARCH=amd64 $(MAKE) release-vmutils-windows-goarch
release-vmutils-goos-goarch: \
vmagent-$(GOOS)-$(GOARCH)-prod \
vmalert-$(GOOS)-$(GOARCH)-prod \
vmauth-$(GOOS)-$(GOARCH)-prod \
vmbackup-$(GOOS)-$(GOARCH)-prod \
vmrestore-$(GOOS)-$(GOARCH)-prod \
vmctl-$(GOOS)-$(GOARCH)-prod
release-vmutils-generic: \
vmagent-$(GOARCH)-prod \
vmalert-$(GOARCH)-prod \
vmauth-$(GOARCH)-prod \
vmbackup-$(GOARCH)-prod \
vmrestore-$(GOARCH)-prod \
vmctl-$(GOARCH)-prod
cd bin && \
tar --transform="flags=r;s|-$(GOOS)-$(GOARCH)||" -czf vmutils-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
vmagent-$(GOOS)-$(GOARCH)-prod \
vmalert-$(GOOS)-$(GOARCH)-prod \
vmauth-$(GOOS)-$(GOARCH)-prod \
vmbackup-$(GOOS)-$(GOARCH)-prod \
vmrestore-$(GOOS)-$(GOARCH)-prod \
vmctl-$(GOOS)-$(GOARCH)-prod \
&& sha256sum vmutils-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
vmagent-$(GOOS)-$(GOARCH)-prod \
vmalert-$(GOOS)-$(GOARCH)-prod \
vmauth-$(GOOS)-$(GOARCH)-prod \
vmbackup-$(GOOS)-$(GOARCH)-prod \
vmrestore-$(GOOS)-$(GOARCH)-prod \
vmctl-$(GOOS)-$(GOARCH)-prod \
| sed s/-$(GOOS)-$(GOARCH)-prod/-prod/ > vmutils-$(GOOS)-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf \
vmagent-$(GOOS)-$(GOARCH)-prod \
vmalert-$(GOOS)-$(GOARCH)-prod \
vmauth-$(GOOS)-$(GOARCH)-prod \
vmbackup-$(GOOS)-$(GOARCH)-prod \
vmrestore-$(GOOS)-$(GOARCH)-prod \
vmctl-$(GOOS)-$(GOARCH)-prod
release-vmutils-windows-goarch: \
vmagent-windows-$(GOARCH)-prod \
vmalert-windows-$(GOARCH)-prod \
vmauth-windows-$(GOARCH)-prod \
vmbackup-windows-$(GOARCH)-prod \
vmrestore-windows-$(GOARCH)-prod \
vmctl-windows-$(GOARCH)-prod
cd bin && \
zip vmutils-windows-$(GOARCH)-$(PKG_TAG).zip \
vmagent-windows-$(GOARCH)-prod.exe \
vmalert-windows-$(GOARCH)-prod.exe \
vmauth-windows-$(GOARCH)-prod.exe \
vmbackup-windows-$(GOARCH)-prod.exe \
vmrestore-windows-$(GOARCH)-prod.exe \
vmctl-windows-$(GOARCH)-prod.exe \
&& sha256sum vmutils-windows-$(GOARCH)-$(PKG_TAG).zip \
vmagent-windows-$(GOARCH)-prod.exe \
vmalert-windows-$(GOARCH)-prod.exe \
vmauth-windows-$(GOARCH)-prod.exe \
vmbackup-windows-$(GOARCH)-prod.exe \
vmrestore-windows-$(GOARCH)-prod.exe \
vmctl-windows-$(GOARCH)-prod.exe \
> vmutils-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf \
vmagent-windows-$(GOARCH)-prod.exe \
vmalert-windows-$(GOARCH)-prod.exe \
vmauth-windows-$(GOARCH)-prod.exe \
vmbackup-windows-$(GOARCH)-prod.exe \
vmrestore-windows-$(GOARCH)-prod.exe \
vmctl-windows-$(GOARCH)-prod.exe
tar --transform="flags=r;s|-$(GOARCH)||" -czf vmutils-$(GOARCH)-$(PKG_TAG).tar.gz \
vmagent-$(GOARCH)-prod \
vmalert-$(GOARCH)-prod \
vmauth-$(GOARCH)-prod \
vmbackup-$(GOARCH)-prod \
vmrestore-$(GOARCH)-prod \
vmctl-$(GOARCH)-prod \
&& sha256sum vmutils-$(GOARCH)-$(PKG_TAG).tar.gz \
vmagent-$(GOARCH)-prod \
vmalert-$(GOARCH)-prod \
vmauth-$(GOARCH)-prod \
vmbackup-$(GOARCH)-prod \
vmrestore-$(GOARCH)-prod \
vmctl-$(GOARCH)-prod \
| sed s/-$(GOARCH)// > vmutils-$(GOARCH)-$(PKG_TAG)_checksums.txt
pprof-cpu:
go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaMetrics@ $(PPROF_FILE)
fmt:
gofmt -l -w -s ./lib
gofmt -l -w -s ./app
GO111MODULE=on gofmt -l -w -s ./lib
GO111MODULE=on gofmt -l -w -s ./app
vet:
go vet ./lib/...
go vet ./app/...
GO111MODULE=on go vet -mod=vendor ./lib/...
GO111MODULE=on go vet -mod=vendor ./app/...
check-all: fmt vet golangci-lint govulncheck
lint: install-golint
golint lib/...
golint app/...
install-golint:
which golint || go install golang.org/x/lint/golint
errcheck: install-errcheck
errcheck -exclude=errcheck_excludes.txt ./lib/...
errcheck -exclude=errcheck_excludes.txt ./app/vminsert/...
errcheck -exclude=errcheck_excludes.txt ./app/vmselect/...
errcheck -exclude=errcheck_excludes.txt ./app/vmstorage/...
errcheck -exclude=errcheck_excludes.txt ./app/vmagent/...
errcheck -exclude=errcheck_excludes.txt ./app/vmalert/...
errcheck -exclude=errcheck_excludes.txt ./app/vmauth/...
errcheck -exclude=errcheck_excludes.txt ./app/vmbackup/...
errcheck -exclude=errcheck_excludes.txt ./app/vmrestore/...
errcheck -exclude=errcheck_excludes.txt ./app/vmctl/...
install-errcheck:
which errcheck || go install github.com/kisielk/errcheck
check-all: fmt vet lint errcheck golangci-lint
test:
go test ./lib/... ./app/...
GO111MODULE=on go test -mod=vendor ./lib/... ./app/...
test-race:
go test -race ./lib/... ./app/...
GO111MODULE=on go test -mod=vendor -race ./lib/... ./app/...
test-pure:
CGO_ENABLED=0 go test ./lib/... ./app/...
GO111MODULE=on CGO_ENABLED=0 go test -mod=vendor ./lib/... ./app/...
test-full:
go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
GO111MODULE=on go test -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
test-full-386:
GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
GO111MODULE=on GOARCH=386 go test -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
benchmark:
go test -bench=. ./lib/...
go test -bench=. ./app/...
GO111MODULE=on go test -mod=vendor -bench=. ./lib/...
GO111MODULE=on go test -mod=vendor -bench=. ./app/...
benchmark-pure:
CGO_ENABLED=0 go test -bench=. ./lib/...
CGO_ENABLED=0 go test -bench=. ./app/...
GO111MODULE=on CGO_ENABLED=0 go test -mod=vendor -bench=. ./lib/...
GO111MODULE=on CGO_ENABLED=0 go test -mod=vendor -bench=. ./app/...
vendor-update:
go get -u -d ./lib/...
go get -u -d ./app/...
go mod tidy -compat=1.19
go mod vendor
GO111MODULE=on go get -u -d ./lib/...
GO111MODULE=on go get -u -d ./app/...
GO111MODULE=on go mod tidy
GO111MODULE=on go mod vendor
app-local:
CGO_ENABLED=1 go build $(RACE) -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
CGO_ENABLED=1 GO111MODULE=on go build $(RACE) -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
app-local-pure:
CGO_ENABLED=0 go build $(RACE) -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)-pure$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
CGO_ENABLED=0 GO111MODULE=on go build $(RACE) -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)-pure$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
app-local-goos-goarch:
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) go build $(RACE) -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)-$(GOOS)-$(GOARCH)$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
app-local-windows-goarch:
CGO_ENABLED=0 GOOS=windows GOARCH=$(GOARCH) go build $(RACE) -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)-windows-$(GOARCH)$(RACE).exe $(PKG_PREFIX)/app/$(APP_NAME)
app-local-with-goarch:
GO111MODULE=on go build $(RACE) -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/$(APP_NAME)-$(GOARCH)$(RACE) $(PKG_PREFIX)/app/$(APP_NAME)
quicktemplate-gen: install-qtc
qtc
install-qtc:
which qtc || go install github.com/valyala/quicktemplate/qtc@latest
which qtc || go install github.com/valyala/quicktemplate/qtc
golangci-lint: install-golangci-lint
golangci-lint run
golangci-lint run --exclude '(SA4003|SA1019|SA5011):' -D errcheck -D structcheck --timeout 2m
install-golangci-lint:
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.51.2
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.29.0
govulncheck: install-govulncheck
govulncheck ./...
install-govulncheck:
which govulncheck || go install golang.org/x/vuln/cmd/govulncheck@latest
install-wwhrd:
which wwhrd || go install github.com/frapposelli/wwhrd@latest
check-licenses: install-wwhrd
wwhrd check -f .wwhrd.yml
copy-docs:
# The 'printf' function is used instead of 'echo' or 'echo -e' to handle line breaks (e.g. '\n') in the same way on different operating systems (MacOS/Ubuntu Linux/Arch Linux) and their shells (bash/sh/zsh/fish).
# For details, see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4548#issue-1782796419 and https://stackoverflow.com/questions/8467424/echo-newline-in-bash-prints-literal-n
echo "---" > ${DST}
@if [ ${ORDER} -ne 0 ]; then \
echo "sort: ${ORDER}" >> ${DST}; \
echo "weight: ${ORDER}" >> ${DST}; \
printf "menu:\n docs:\n parent: 'victoriametrics'\n weight: ${ORDER}\n" >> ${DST}; \
fi
echo "title: ${TITLE}" >> ${DST}
@if [ ${OLD_URL} ]; then \
printf "aliases:\n - ${OLD_URL}\n" >> ${DST}; \
fi
echo "---" >> ${DST}
cat ${SRC} >> ${DST}
sed -i='.tmp' 's/<img src=\"docs\//<img src=\"/' ${DST}
rm -rf docs/*.tmp
# Copies docs for all components and adds the order/weight tag, title, menu position and alias with the backward compatible link for the old site.
# For ORDER=0 it adds no order tag/weight tag.
# FOR OLD_URL - relative link, used for backward compatibility with the link from documentation based on GitHub pages (old one)
# FOR OLD_URL='' it adds no alias, it should be empty for every new page, don't change it for already existing links.
# Images starting with <img src="docs/ are replaced with <img src="
# Cluster docs are supposed to be ordered as 2nd.
# The rest of docs is ordered manually.
docs-sync:
SRC=README.md DST=docs/README.md OLD_URL='' ORDER=0 TITLE=VictoriaMetrics $(MAKE) copy-docs
SRC=README.md DST=docs/Single-server-VictoriaMetrics.md OLD_URL='/Single-server-VictoriaMetrics.html' TITLE=VictoriaMetrics ORDER=1 $(MAKE) copy-docs
SRC=app/vmagent/README.md DST=docs/vmagent.md OLD_URL='/vmagent.html' ORDER=3 TITLE=vmagent $(MAKE) copy-docs
SRC=app/vmalert/README.md DST=docs/vmalert.md OLD_URL='/vmalert.html' ORDER=4 TITLE=vmalert $(MAKE) copy-docs
SRC=app/vmauth/README.md DST=docs/vmauth.md OLD_URL='/vmauth.html' ORDER=5 TITLE=vmauth $(MAKE) copy-docs
SRC=app/vmbackup/README.md DST=docs/vmbackup.md OLD_URL='/vmbackup.html' ORDER=6 TITLE=vmbackup $(MAKE) copy-docs
SRC=app/vmrestore/README.md DST=docs/vmrestore.md OLD_URL='/vmrestore.html' ORDER=7 TITLE=vmrestore $(MAKE) copy-docs
SRC=app/vmctl/README.md DST=docs/vmctl.md OLD_URL='/vmctl.html' ORDER=8 TITLE=vmctl $(MAKE) copy-docs
SRC=app/vmgateway/README.md DST=docs/vmgateway.md OLD_URL='/vmgateway.html' ORDER=9 TITLE=vmgateway $(MAKE) copy-docs
SRC=app/vmbackupmanager/README.md DST=docs/vmbackupmanager.md OLD_URL='/vmbackupmanager.html' ORDER=10 TITLE=vmbackupmanager $(MAKE) copy-docs
cp app/vmagent/README.md docs/vmagent.md
cp app/vmalert/README.md docs/vmalert.md
cp app/vmauth/README.md docs/vmauth.md
cp app/vmbackup/README.md docs/vmbackup.md
cp app/vmrestore/README.md docs/vmrestore.md
cp app/vmctl/README.md docs/vmctl.md
cp README.md docs/Single-server-VictoriaMetrics.md

2257
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -1,14 +0,0 @@
# Security Policy
## Supported Versions
| Version | Supported |
|---------|--------------------|
| [latest release](https://docs.victoriametrics.com/CHANGELOG.html) | :white_check_mark: |
| v1.87.x LTS release | :white_check_mark: |
| v1.79.x LTS release | :white_check_mark: |
| other releases | :x: |
## Reporting a Vulnerability
Please report any security issues to security@victoriametrics.com

Binary file not shown.

View File

@@ -1,103 +0,0 @@
# All these commands must run from repository root.
victoria-logs:
APP_NAME=victoria-logs $(MAKE) app-local
victoria-logs-race:
APP_NAME=victoria-logs RACE=-race $(MAKE) app-local
victoria-logs-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker
victoria-logs-pure-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-pure
victoria-logs-linux-amd64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-linux-amd64
victoria-logs-linux-arm-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-linux-arm
victoria-logs-linux-arm64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-linux-arm64
victoria-logs-linux-ppc64le-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-linux-ppc64le
victoria-logs-linux-386-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-linux-386
victoria-logs-darwin-amd64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-darwin-amd64
victoria-logs-darwin-arm64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-darwin-arm64
victoria-logs-freebsd-amd64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-freebsd-amd64
victoria-logs-openbsd-amd64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-openbsd-amd64
victoria-logs-windows-amd64-prod:
APP_NAME=victoria-logs $(MAKE) app-via-docker-windows-amd64
package-victoria-logs:
APP_NAME=victoria-logs $(MAKE) package-via-docker
package-victoria-logs-pure:
APP_NAME=victoria-logs $(MAKE) package-via-docker-pure
package-victoria-logs-amd64:
APP_NAME=victoria-logs $(MAKE) package-via-docker-amd64
package-victoria-logs-arm:
APP_NAME=victoria-logs $(MAKE) package-via-docker-arm
package-victoria-logs-arm64:
APP_NAME=victoria-logs $(MAKE) package-via-docker-arm64
package-victoria-logs-ppc64le:
APP_NAME=victoria-logs $(MAKE) package-via-docker-ppc64le
package-victoria-logs-386:
APP_NAME=victoria-logs $(MAKE) package-via-docker-386
publish-victoria-logs:
APP_NAME=victoria-logs $(MAKE) publish-via-docker
victoria-logs-linux-amd64:
APP_NAME=victoria-logs CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-logs-linux-arm:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=linux GOARCH=arm $(MAKE) app-local-goos-goarch
victoria-logs-linux-arm64:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(MAKE) app-local-goos-goarch
victoria-logs-linux-ppc64le:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le $(MAKE) app-local-goos-goarch
victoria-logs-linux-s390x:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=linux GOARCH=s390x $(MAKE) app-local-goos-goarch
victoria-logs-linux-386:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=linux GOARCH=386 $(MAKE) app-local-goos-goarch
victoria-logs-darwin-amd64:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-logs-darwin-arm64:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 $(MAKE) app-local-goos-goarch
victoria-logs-freebsd-amd64:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-logs-openbsd-amd64:
APP_NAME=victoria-logs CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-logs-windows-amd64:
GOARCH=amd64 APP_NAME=victoria-logs $(MAKE) app-local-windows-goarch
victoria-logs-pure:
APP_NAME=victoria-logs $(MAKE) app-local-pure

View File

@@ -1,8 +0,0 @@
ARG base_image
FROM $base_image
EXPOSE 8428
ENTRYPOINT ["/victoria-logs-prod"]
ARG src_binary
COPY $src_binary ./victoria-logs-prod

View File

@@ -1,103 +0,0 @@
package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
)
var (
httpListenAddr = flag.String("httpListenAddr", ":9428", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
gogc = flag.Int("gogc", 100, "GOGC to use. See https://tip.golang.org/doc/gc-guide")
)
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
flag.Usage = usage
envflag.Parse()
cgroup.SetGOGC(*gogc)
buildinfo.Init()
logger.Init()
pushmetrics.Init()
logger.Infof("starting VictoriaLogs at %q...", *httpListenAddr)
startTime := time.Now()
vlstorage.Init()
vlselect.Init()
vlinsert.Init()
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started VictoriaLogs in %.3f seconds; see https://docs.victoriametrics.com/VictoriaLogs/", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm()
logger.Infof("received signal %s", sig)
logger.Infof("gracefully shutting down webservice at %q", *httpListenAddr)
startTime = time.Now()
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop the webservice: %s", err)
}
logger.Infof("successfully shut down the webservice in %.3f seconds", time.Since(startTime).Seconds())
vlinsert.Stop()
vlselect.Stop()
vlstorage.Stop()
fs.MustStopDirRemover()
logger.Infof("the VictoriaLogs has been stopped in %.3f seconds", time.Since(startTime).Seconds())
}
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.URL.Path == "/" {
if r.Method != http.MethodGet {
return false
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h2>Single-node VictoriaLogs</h2></br>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/VictoriaLogs/'>https://docs.victoriametrics.com/VictoriaLogs/</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"select/vmui", "Web UI for VictoriaLogs"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
})
return true
}
if vlinsert.RequestHandler(w, r) {
return true
}
if vlselect.RequestHandler(w, r) {
return true
}
return false
}
func usage() {
const s = `
victoria-logs is a log management and analytics service.
See the docs at https://docs.victoriametrics.com/VictoriaLogs/
`
flagutil.Usage(s)
}

View File

@@ -1,12 +0,0 @@
# See https://medium.com/on-docker/use-multi-stage-builds-to-inject-ca-certs-ad1e8f01de1b
ARG certs_image
ARG root_image
FROM $certs_image as certs
RUN apk update && apk upgrade && apk --update --no-cache add ca-certificates
FROM $root_image
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
EXPOSE 8428
ENTRYPOINT ["/victoria-logs-prod"]
ARG TARGETARCH
COPY victoria-logs-linux-${TARGETARCH}-prod ./victoria-logs-prod

View File

@@ -12,35 +12,20 @@ victoria-metrics-prod:
victoria-metrics-pure-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-pure
victoria-metrics-linux-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-linux-amd64
victoria-metrics-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-amd64
victoria-metrics-linux-arm-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-linux-arm
victoria-metrics-arm-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-arm
victoria-metrics-linux-arm64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-linux-arm64
victoria-metrics-arm64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-arm64
victoria-metrics-linux-ppc64le-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-linux-ppc64le
victoria-metrics-ppc64le-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-ppc64le
victoria-metrics-linux-386-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-linux-386
victoria-metrics-darwin-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-darwin-amd64
victoria-metrics-darwin-arm64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-darwin-arm64
victoria-metrics-freebsd-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-freebsd-amd64
victoria-metrics-openbsd-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-openbsd-amd64
victoria-metrics-windows-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-windows-amd64
victoria-metrics-386-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-386
package-victoria-metrics:
APP_NAME=victoria-metrics $(MAKE) package-via-docker
@@ -73,74 +58,60 @@ run-victoria-metrics:
ARGS='-graphiteListenAddr=:2003 -opentsdbListenAddr=:4242 -retentionPeriod=12 -search.maxUniqueTimeseries=1000000 -search.maxQueryDuration=10m' \
$(MAKE) run-via-docker
victoria-metrics-linux-amd64:
APP_NAME=victoria-metrics CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-metrics-amd64:
CGO_ENABLED=1 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-amd64 ./app/victoria-metrics
victoria-metrics-linux-arm:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=linux GOARCH=arm $(MAKE) app-local-goos-goarch
victoria-metrics-arm:
CGO_ENABLED=0 GOOS=linux GOARCH=arm GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-arm ./app/victoria-metrics
victoria-metrics-linux-arm64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(MAKE) app-local-goos-goarch
victoria-metrics-arm64:
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-arm64 ./app/victoria-metrics
victoria-metrics-linux-ppc64le:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le $(MAKE) app-local-goos-goarch
victoria-metrics-ppc64le:
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-ppc64le ./app/victoria-metrics
victoria-metrics-linux-s390x:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=linux GOARCH=s390x $(MAKE) app-local-goos-goarch
victoria-metrics-linux-386:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=linux GOARCH=386 $(MAKE) app-local-goos-goarch
victoria-metrics-darwin-amd64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-metrics-darwin-arm64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 $(MAKE) app-local-goos-goarch
victoria-metrics-freebsd-amd64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-metrics-openbsd-amd64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-metrics-windows-amd64:
GOARCH=amd64 APP_NAME=victoria-metrics $(MAKE) app-local-windows-goarch
victoria-metrics-386:
CGO_ENABLED=0 GOOS=linux GOARCH=386 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-386 ./app/victoria-metrics
victoria-metrics-pure:
APP_NAME=victoria-metrics $(MAKE) app-local-pure
### Packaging as DEB - amd64
victoria-metrics-package-deb-amd64: victoria-metrics-linux-amd64-prod
victoria-metrics-package-deb: victoria-metrics-prod
./package/package_deb.sh amd64
### Packaging as DEB - arm64
victoria-metrics-package-deb-arm: victoria-metrics-linux-arm-prod
./package/package_deb.sh arm
### Packaging as DEB - arm64
victoria-metrics-package-deb-arm64: victoria-metrics-linux-arm64-prod
victoria-metrics-package-deb-arm64: victoria-metrics-arm64-prod
./package/package_deb.sh arm64
### Packaging as DEB - all
victoria-metrics-package-deb: \
victoria-metrics-package-deb-amd64 \
victoria-metrics-package-deb-arm \
victoria-metrics-package-deb-all: \
victoria-metrics-package-deb \
victoria-metrics-package-deb-arm64
### Packaging as RPM - amd64
victoria-metrics-package-rpm-amd64: victoria-metrics-linux-amd64-prod
victoria-metrics-package-rpm: victoria-metrics-prod
./package/package_rpm.sh amd64
### Packaging as RPM - arm64
victoria-metrics-package-rpm-arm64: victoria-metrics-linux-arm64-prod
victoria-metrics-package-rpm-arm64: victoria-metrics-arm64-prod
./package/package_rpm.sh arm64
### Packaging as RPM - all
victoria-metrics-package-rpm: \
victoria-metrics-package-rpm-amd64 \
victoria-metrics-package-rpm-all: \
victoria-metrics-package-rpm \
victoria-metrics-package-rpm-arm64
### Packaging as both DEB and RPM - all
victoria-metrics-package-deb-rpm: \
victoria-metrics-package-deb-rpm-all: \
victoria-metrics-package-deb \
victoria-metrics-package-rpm
victoria-metrics-package-deb-arm64 \
victoria-metrics-package-rpm \
victoria-metrics-package-rpm-arm64
### Packaging as snap
victoria-metrics-package-snap:
which snapcraft || snap install snapcraft
which multipass || snap install multipass
snapcraft

View File

@@ -3,13 +3,13 @@ package main
import (
"flag"
"fmt"
"io"
"net/http"
"os"
"path"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
vminsertcommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
vminsertrelabel "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
@@ -21,24 +21,16 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+
"-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+
"This can be changed with -promscrape.config.strictParse=false command-line flag")
inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+
"The saved data survives unclean shutdowns such as OOM crash, hardware reset, SIGKILL, etc. "+
"Bigger intervals may help increase the lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+
"Smaller intervals increase disk IO load. Minimum supported value is 1s")
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
"Deduplication is disabled if the -dedup.minScrapeInterval is 0")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
)
func main() {
@@ -48,7 +40,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
pushmetrics.Init()
if promscrape.IsDryRun() {
*dryRun = true
@@ -57,26 +48,19 @@ func main() {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
if err := vminsertrelabel.CheckRelabelConfig(); err != nil {
logger.Fatalf("error when checking -relabelConfig: %s", err)
}
if err := vminsertcommon.CheckStreamAggrConfig(); err != nil {
logger.Fatalf("error when checking -streamAggr.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exiting with 0 status code")
logger.Infof("-promscrape.config is ok; exitting with 0 status code")
return
}
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now()
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval)
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()
startSelfScraper()
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
go httpserver.Serve(*httpListenAddr, requestHandler)
logger.Infof("started VictoriaMetrics in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm()
@@ -102,26 +86,14 @@ func main() {
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.URL.Path == "/" {
if r.Method != http.MethodGet {
return false
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h2>Single-node VictoriaMetrics</h2></br>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/'>https://docs.victoriametrics.com/</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"vmui", "Web UI"},
{"targets", "status for discovered active targets"},
{"service-discovery", "labels before and after relabeling for discovered targets"},
{"metric-relabel-debug", "debug metric relabeling"},
{"expand-with-exprs", "WITH expressions' tutorial"},
{"api/v1/targets", "advanced information about discovered targets in JSON format"},
{"config", "-promscrape.config contents"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
{"api/v1/status/tsdb", "tsdb status page"},
{"api/v1/status/top_queries", "top queries"},
{"api/v1/status/active_queries", "active queries"},
fmt.Fprintf(w, "<h2>Single-node VictoriaMetrics.</h2></br>")
fmt.Fprintf(w, "See docs at <a href='https://victoriametrics.github.io/'>https://victoriametrics.github.io/</a></br>")
fmt.Fprintf(w, "Useful endpoints: </br>")
writeAPIHelp(w, [][]string{
{"/targets", "discovered targets list"},
{"/api/v1/targets", "advanced information about discovered targets in JSON format"},
{"/metrics", "available service metrics"},
{"/api/v1/status/tsdb", "tsdb status page"},
})
return true
}
@@ -137,11 +109,20 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return false
}
func writeAPIHelp(w io.Writer, pathList [][]string) {
pathPrefix := httpserver.GetPathPrefix()
for _, p := range pathList {
p, doc := p[0], p[1]
p = path.Join(pathPrefix, p)
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
}
}
func usage() {
const s = `
victoria-metrics is a time series database and monitoring solution.
See the docs at https://docs.victoriametrics.com/
See the docs at https://victoriametrics.github.io/
`
flagutil.Usage(s)
}

View File

@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@@ -21,6 +22,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -129,16 +131,16 @@ func setUp() {
storagePath = filepath.Join(os.TempDir(), testStorageSuffix)
processFlags()
logger.Init()
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()
go httpserver.Serve(*httpListenAddr, false, requestHandler)
go httpserver.Serve(*httpListenAddr, requestHandler)
readyStorageCheckFunc := func() bool {
resp, err := http.Get(testHealthHTTPPath)
if err != nil {
return false
}
_ = resp.Body.Close()
resp.Body.Close()
return resp.StatusCode == 200
}
if err := waitFor(testStorageInitTimeout, readyStorageCheckFunc); err != nil {
@@ -147,7 +149,7 @@ func setUp() {
}
func processFlags() {
flag.Parse()
envflag.Parse()
for _, fv := range []struct {
flag string
value string
@@ -189,8 +191,10 @@ func tearDown() {
func TestWriteRead(t *testing.T) {
t.Run("write", testWrite)
vmstorage.Storage.DebugFlush()
time.Sleep(1 * time.Second)
vmstorage.Stop()
// open storage after stop in write
vmstorage.InitWithoutMetrics(promql.ResetRollupResultCacheIfNeeded)
t.Run("read", testRead)
}
@@ -259,7 +263,7 @@ func testRead(t *testing.T) {
for _, q := range test.Query {
q = testutil.PopulateTimeTplString(q, insertionTime)
if test.Issue != "" {
test.Issue = "\nRegression in " + test.Issue
test.Issue = "Regression in " + test.Issue
}
switch true {
case strings.HasPrefix(q, "/api/v1/export"):
@@ -282,7 +286,7 @@ func testRead(t *testing.T) {
queryResult := Query{}
httpReadStruct(t, testReadHTTPPath, q, &queryResult)
if err := checkQueryResult(queryResult, test.ResultQuery); err != nil {
t.Fatalf("Query. %s fails with error: %s.%s", q, err, test.Issue)
t.Fatalf("Query. %s fails with error %s.%s", q, err, test.Issue)
}
default:
t.Fatalf("unsupported read query %s", q)
@@ -305,7 +309,7 @@ func readIn(readFor string, t *testing.T, insertTime time.Time) []test {
if filepath.Ext(path) != ".json" {
return nil
}
b, err := os.ReadFile(path)
b, err := ioutil.ReadFile(path)
s.noError(err)
item := test{}
s.noError(json.Unmarshal(b, &item))
@@ -335,9 +339,7 @@ func tcpWrite(t *testing.T, address string, data string) {
s := newSuite(t)
conn, err := net.Dial("tcp", address)
s.noError(err)
defer func() {
_ = conn.Close()
}()
defer conn.Close()
n, err := conn.Write([]byte(data))
s.noError(err)
s.equalInt(n, len(data))
@@ -348,9 +350,7 @@ func httpReadMetrics(t *testing.T, address, query string) []Metric {
s := newSuite(t)
resp, err := http.Get(address + query)
s.noError(err)
defer func() {
_ = resp.Body.Close()
}()
defer resp.Body.Close()
s.equalInt(resp.StatusCode, 200)
var rows []Metric
for dec := json.NewDecoder(resp.Body); dec.More(); {
@@ -365,9 +365,7 @@ func httpReadStruct(t *testing.T, address, query string, dst interface{}) {
s := newSuite(t)
resp, err := http.Get(address + query)
s.noError(err)
defer func() {
_ = resp.Body.Close()
}()
defer resp.Body.Close()
s.equalInt(resp.StatusCode, 200)
s.noError(json.NewDecoder(resp.Body).Decode(dst))
}

View File

@@ -1,12 +0,0 @@
# See https://medium.com/on-docker/use-multi-stage-builds-to-inject-ca-certs-ad1e8f01de1b
ARG certs_image
ARG root_image
FROM $certs_image as certs
RUN apk update && apk upgrade && apk --update --no-cache add ca-certificates
FROM $root_image
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
EXPOSE 8428
ENTRYPOINT ["/victoria-metrics-prod"]
ARG TARGETARCH
COPY victoria-metrics-linux-${TARGETARCH}-prod ./victoria-metrics-prod

View File

@@ -6,8 +6,8 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/appmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@@ -60,7 +60,7 @@ func selfScraper(scrapeInterval time.Duration) {
currentTimestamp = currentTime.UnixNano() / 1e6
}
bb.Reset()
appmetrics.WritePrometheusMetrics(&bb)
httpserver.WritePrometheusMetrics(&bb)
s := bytesutil.ToUnsafeString(bb.B)
rows.Reset()
rows.Unmarshal(s)
@@ -85,9 +85,7 @@ func selfScraper(scrapeInterval time.Duration) {
mr.Timestamp = currentTimestamp
mr.Value = r.Value
}
if err := vmstorage.AddRows(mrs); err != nil {
logger.Errorf("cannot store self-scraped metrics: %s", err)
}
vmstorage.AddRows(mrs)
}
}

View File

@@ -12,7 +12,7 @@ func TestPopulateTimeTplString(t *testing.T) {
}
f := func(s, resultExpected string) {
t.Helper()
result := PopulateTimeTplString(s, now.UTC())
result := PopulateTimeTplString(s, now)
if result != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}

View File

@@ -11,6 +11,6 @@
"status":"success",
"data":{"resultType":"matrix",
"result":[
{"metric":{"item":"y"},"values":[["{TIME_S-1m}","0.5"], ["{TIME_S}","0.5"]]}
{"metric":{"item":"y"},"values":[["{TIME_S-1m}","0.5"]]}
]}}
}

View File

@@ -1,17 +0,0 @@
{
"name": "graphite-selector",
"issue": "",
"data": [
"graphite-selector.bar.baz 1 {TIME_S-1m}",
"graphite-selector.xxx.yy 2 {TIME_S-1m}",
"graphite-selector.bb.cc 3 {TIME_S-1m}",
"graphite-selector.a.baz 4 {TIME_S-1m}"],
"query": ["/api/v1/query?query=sort({__graphite__='graphite-selector.*.baz'})&time={TIME_S-1m}"],
"result_query": {
"status":"success",
"data":{"resultType":"vector","result":[
{"metric":{"__name__":"graphite-selector.bar.baz"},"value":["{TIME_S-1m}","1"]},
{"metric":{"__name__":"graphite-selector.a.baz"},"value":["{TIME_S-1m}","4"]}
]}
}
}

View File

@@ -19,7 +19,6 @@
["{TIME_S-110s}","3"],
["{TIME_S-100s}","3"],
["{TIME_S-90s}","3"],
["{TIME_S-80s}","3"],
["{TIME_S-60s}","2"],
["{TIME_S-50s}","2"],
["{TIME_S-40s}","2"],

View File

@@ -1,16 +0,0 @@
{
"name": "name-plus-negative-filter",
"issue": "",
"data": [
"name-plus-negative-filter;foo=123 1 {TIME_S-1m}",
"name-plus-negative-filter;bar=123 2 {TIME_S-1m}",
"name-plus-negative-filter;foo=qwe 3 {TIME_S-1m}"
],
"query": ["/api/v1/query?query={__name__='name-plus-negative-filter',foo!='123'}&time={TIME_S-1m}"],
"result_query": {
"status":"success",
"data":{"resultType":"vector","result":[
{"metric":{"__name__":"name-plus-negative-filter","foo":"qwe"},"value":["{TIME_S-1m}","3"]}
]}
}
}

View File

@@ -13,6 +13,6 @@
"data":{"resultType":"matrix",
"result":[
{"metric":{"__name__":"not_nan_as_missing_data","item":"x"},"values":[["{TIME_S-2m}","2"]]},
{"metric":{"__name__":"not_nan_as_missing_data","item":"y"},"values":[["{TIME_S-2m}","4"],["{TIME_S-1m}","3"],["{TIME_S}", "3"]]}
{"metric":{"__name__":"not_nan_as_missing_data","item":"y"},"values":[["{TIME_S-2m}","4"],["{TIME_S-1m}","3"]]}
]}}
}

View File

@@ -6,7 +6,7 @@
"forms_daily_count;item=x 2 {TIME_S-2m}",
"forms_daily_count;item=y 3 {TIME_S-1m}",
"forms_daily_count;item=y 4 {TIME_S-2m}"],
"query": ["/api/v1/query?query=min%20by%20(item)%20(min_over_time(forms_daily_count[10m:1m]))&time={TIME_S-1m}&latency_offset=1ms"],
"query": ["/api/v1/query?query=min%20by%20(item)%20(min_over_time(forms_daily_count[10m:1m]))&time={TIME_S-1m}"],
"result_query": {
"status":"success",
"data":{"resultType":"vector","result":[{"metric":{"item":"x"},"value":["{TIME_S-1m}","2"]},{"metric":{"item":"y"},"value":["{TIME_S-1m}","4"]}]}

View File

@@ -1,8 +0,0 @@
{
"name": "basic_select_with_extra_labels",
"data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"],
"query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"],
"result_metrics": [
{"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]}
]
}

View File

@@ -1,20 +0,0 @@
{% stripspace %}
{% func BulkResponse(n int, tookMs int64) %}
{
"took":{%dl tookMs %},
"errors":false,
"items":[
{% for i := 0; i < n; i++ %}
{
"create":{
"status":201
}
}
{% if i+1 < n %},{% endif %}
{% endfor %}
]
}
{% endfunc %}
{% endstripspace %}

View File

@@ -1,69 +0,0 @@
// Code generated by qtc from "bulk_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vlinsert/elasticsearch/bulk_response.qtpl:3
package elasticsearch
//line app/vlinsert/elasticsearch/bulk_response.qtpl:3
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:3
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:3
func StreamBulkResponse(qw422016 *qt422016.Writer, n int, tookMs int64) {
//line app/vlinsert/elasticsearch/bulk_response.qtpl:3
qw422016.N().S(`{"took":`)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:5
qw422016.N().DL(tookMs)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:5
qw422016.N().S(`,"errors":false,"items":[`)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:8
for i := 0; i < n; i++ {
//line app/vlinsert/elasticsearch/bulk_response.qtpl:8
qw422016.N().S(`{"create":{"status":201}}`)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:14
if i+1 < n {
//line app/vlinsert/elasticsearch/bulk_response.qtpl:14
qw422016.N().S(`,`)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:14
}
//line app/vlinsert/elasticsearch/bulk_response.qtpl:15
}
//line app/vlinsert/elasticsearch/bulk_response.qtpl:15
qw422016.N().S(`]}`)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
}
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
func WriteBulkResponse(qq422016 qtio422016.Writer, n int, tookMs int64) {
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
StreamBulkResponse(qw422016, n, tookMs)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
}
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
func BulkResponse(n int, tookMs int64) string {
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
WriteBulkResponse(qb422016, n, tookMs)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
qs422016 := string(qb422016.B)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
return qs422016
//line app/vlinsert/elasticsearch/bulk_response.qtpl:18
}

View File

@@ -1,264 +0,0 @@
package elasticsearch
import (
"bufio"
"errors"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// RequestHandler processes Elasticsearch insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
w.Header().Add("Content-Type", "application/json")
// This header is needed for Logstash
w.Header().Set("X-Elastic-Product", "Elasticsearch")
if strings.HasPrefix(path, "/_ilm/policy") {
// Return fake response for Elasticsearch ilm request.
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_index_template") {
// Return fake response for Elasticsearch index template request.
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_ingest") {
// Return fake response for Elasticsearch ingest pipeline request.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_nodes") {
// Return fake response for Elasticsearch nodes discovery request.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html
fmt.Fprintf(w, `{}`)
return true
}
switch path {
case "/":
switch r.Method {
case http.MethodGet:
// Return fake response for Elasticsearch ping request.
// See the latest available version for Elasticsearch at https://github.com/elastic/elasticsearch/releases
fmt.Fprintf(w, `{
"version": {
"number": "8.8.0"
}
}`)
case http.MethodHead:
// Return empty response for Logstash ping request.
}
return true
case "/_license":
// Return fake response for Elasticsearch license request.
fmt.Fprintf(w, `{
"license": {
"uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
"type": "oss",
"status": "active",
"expiry_date_in_millis" : 4000000000000
}
}`)
return true
case "/_bulk":
startTime := time.Now()
bulkRequestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
isGzip := r.Header.Get("Content-Encoding") == "gzip"
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage)
if err != nil {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
return true
}
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
tookMs := time.Since(startTime).Milliseconds()
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteBulkResponse(bw, n, tookMs)
_ = bw.Flush()
return true
default:
return false
}
}
var (
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
)
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
if isGzip {
zr, err := common.GetGzipReader(r)
if err != nil {
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
lb := lineBufferPool.Get()
defer lineBufferPool.Put(lb)
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
sc := bufio.NewScanner(wcr)
sc.Buffer(lb.B, len(lb.B))
n := 0
nCheckpoint := 0
for {
ok, err := readBulkLine(sc, timeField, msgField, processLogMessage)
wcr.DecConcurrency()
if err != nil || !ok {
rowsIngestedTotal.Add(n - nCheckpoint)
return n, err
}
n++
if batchSize := n - nCheckpoint; n >= 1000 {
rowsIngestedTotal.Add(batchSize)
nCheckpoint = n
}
}
}
var lineBufferPool bytesutil.ByteBufferPool
var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
) (bool, error) {
var line []byte
// Read the command, must be "create" or "index"
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, nil
}
line = sc.Bytes()
}
lineStr := bytesutil.ToUnsafeString(line)
if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) {
return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line)
}
// Decode log message
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, fmt.Errorf(`missing log message after the "create" or "index" command`)
}
line = sc.Bytes()
p := logjson.GetParser()
if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err)
}
if ts == 0 {
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
processLogMessage(ts, p.Fields)
logjson.PutParser(p)
return true, nil
}
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseElasticsearchTimestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
}
return 0, nil
}
func parseElasticsearchTimestamp(s string) (int64, error) {
if s == "0" || s == "" {
// Special case - zero or empty timestamp must be substituted
// with the current time by the caller.
return 0, nil
}
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
// Try parsing timestamp in milliseconds
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err)
}
if n > int64(math.MaxInt64)/1e6 {
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
}
if n < int64(math.MinInt64)/1e6 {
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
}
n *= 1e6
return n, nil
}
if len(s) == len("YYYY-MM-DD") {
t, err := time.Parse("2006-01-02", s)
if err != nil {
return 0, fmt.Errorf("cannot parse date %q: %w", s, err)
}
return t.UnixNano(), nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
return t.UnixNano(), nil
}

View File

@@ -1,129 +0,0 @@
package elasticsearch
import (
"bytes"
"compress/gzip"
"fmt"
"reflect"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestReadBulkRequestFailure(t *testing.T) {
f := func(data string) {
t.Helper()
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
}
r := bytes.NewBufferString(data)
rows, err := readBulkRequest(r, false, "_time", "_msg", processLogMessage)
if err == nil {
t.Fatalf("expecting non-empty error")
}
if rows != 0 {
t.Fatalf("unexpected non-zero rows=%d", rows)
}
}
f("foobar")
f(`{}`)
f(`{"create":{}}`)
f(`{"creat":{}}
{}`)
f(`{"create":{}}
foobar`)
}
func TestReadBulkRequestSuccess(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper()
var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp)
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
}
// Read the request without compression
r := bytes.NewBufferString(data)
rows, err := readBulkRequest(r, false, timeField, msgField, processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if !reflect.DeepEqual(timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
// Read the request with compression
timestamps = nil
result = ""
compressedData := compressData(data)
r = bytes.NewBufferString(compressedData)
rows, err = readBulkRequest(r, true, timeField, msgField, processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if !reflect.DeepEqual(timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Verify an empty data
f("", "_time", "_msg", 0, nil, "")
f("\n", "_time", "_msg", 0, nil, "")
f("\n\n", "_time", "_msg", 0, nil, "")
// Verify non-empty data
data := `{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"index":{"_index":"filebeat-8.8.0"}}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
timeField := "@timestamp"
msgField := "message"
rowsExpected := 3
timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000}
resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
{"@timestamp":"","_msg":"baz"}
{"_msg":"xyz","@timestamp":"","x":"y"}
`
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
}
func compressData(s string) string {
var bb bytes.Buffer
zw := gzip.NewWriter(&bb)
if _, err := zw.Write([]byte(s)); err != nil {
panic(fmt.Errorf("unexpected error when compressing data: %s", err))
}
if err := zw.Close(); err != nil {
panic(fmt.Errorf("unexpected error when closing gzip writer: %s", err))
}
return bb.String()
}

View File

@@ -1,50 +0,0 @@
package elasticsearch
import (
"bytes"
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func BenchmarkReadBulkRequest(b *testing.B) {
b.Run("gzip:off", func(b *testing.B) {
benchmarkReadBulkRequest(b, false)
})
b.Run("gzip:on", func(b *testing.B) {
benchmarkReadBulkRequest(b, true)
})
}
func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
data := `{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"create":{"_index":"filebeat-8.8.0"}}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
if isGzip {
data = compressData(data)
}
dataBytes := bytesutil.ToUnsafeBytes(data)
timeField := "@timestamp"
msgField := "message"
processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
b.ReportAllocs()
b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) {
r := &bytes.Reader{}
for pb.Next() {
r.Reset(dataBytes)
_, err := readBulkRequest(r, isGzip, timeField, msgField, processLogMessage)
if err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})
}

View File

@@ -1,91 +0,0 @@
package insertutils
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/metrics"
)
// CommonParams contains common HTTP parameters used by log ingestion APIs.
//
// See https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters
type CommonParams struct {
TenantID logstorage.TenantID
TimeField string
MsgField string
StreamFields []string
IgnoreFields []string
Debug bool
DebugRequestURI string
DebugRemoteAddr string
}
// GetCommonParams returns CommonParams from r.
func GetCommonParams(r *http.Request) (*CommonParams, error) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
return nil, err
}
// Extract time field name from _time_field query arg
var timeField = "_time"
if tf := r.FormValue("_time_field"); tf != "" {
timeField = tf
}
// Extract message field name from _msg_field query arg
var msgField = ""
if msgf := r.FormValue("_msg_field"); msgf != "" {
msgField = msgf
}
streamFields := httputils.GetArray(r, "_stream_fields")
ignoreFields := httputils.GetArray(r, "ignore_fields")
debug := httputils.GetBool(r, "debug")
debugRequestURI := ""
debugRemoteAddr := ""
if debug {
debugRequestURI = httpserver.GetRequestURI(r)
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
}
cp := &CommonParams{
TenantID: tenantID,
TimeField: timeField,
MsgField: msgField,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
Debug: debug,
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
}
return cp, nil
}
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
return func(timestamp int64, fields []logstorage.Field) {
lr.MustAdd(cp.TenantID, timestamp, fields)
if cp.Debug {
s := lr.GetRowString(0)
lr.ResetKeepSettings()
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
rowsDroppedTotal.Inc()
return
}
if lr.NeedFlush() {
vlstorage.MustAddRows(lr)
lr.ResetKeepSettings()
}
}
}
var rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)

View File

@@ -1,10 +0,0 @@
package insertutils
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
)
var (
// MaxLineSizeBytes is the maximum length of a single line for /insert/* handlers
MaxLineSizeBytes = flagutil.NewBytes("insert.maxLineSizeBytes", 256*1024, "The maximum size of a single line, which can be read by /insert/* handlers")
)

View File

@@ -1,149 +0,0 @@
package jsonline
import (
"bufio"
"errors"
"fmt"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// RequestHandler processes jsonline insert requests
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Header().Add("Content-Type", "application/json")
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return true
}
requestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader)
if err != nil {
logger.Errorf("cannot read gzipped _bulk request: %s", err)
return true
}
defer common.PutGzipReader(zr)
reader = zr
}
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
lb := lineBufferPool.Get()
defer lineBufferPool.Put(lb)
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
sc := bufio.NewScanner(wcr)
sc.Buffer(lb.B, len(lb.B))
n := 0
for {
ok, err := readLine(sc, cp.TimeField, cp.MsgField, processLogMessage)
wcr.DecConcurrency()
if err != nil {
logger.Errorf("cannot read line #%d in /jsonline request: %s", n, err)
break
}
if !ok {
break
}
n++
rowsIngestedTotal.Inc()
}
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
return true
}
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
var line []byte
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read json line, since its size exceeds -insert.maxLineSizeBytes=%d`, insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, nil
}
line = sc.Bytes()
}
p := logjson.GetParser()
if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err)
}
if ts == 0 {
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
processLogMessage(ts, p.Fields)
logjson.PutParser(p)
return true, nil
}
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseISO8601Timestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
}
return 0, nil
}
func parseISO8601Timestamp(s string) (int64, error) {
if s == "0" || s == "" {
// Special case for returning the current timestamp.
// It must be automatically converted to the current timestamp by the caller.
return 0, nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
return t.UnixNano(), nil
}
var lineBufferPool bytesutil.ByteBufferPool
var (
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
)

View File

@@ -1,70 +0,0 @@
package jsonline
import (
"bufio"
"bytes"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"reflect"
"strings"
"testing"
)
func TestReadBulkRequestSuccess(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper()
var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp)
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
}
// Read the request without compression
r := bytes.NewBufferString(data)
sc := bufio.NewScanner(r)
rows := 0
for {
ok, err := readLine(sc, timeField, msgField, processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
break
}
rows++
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if !reflect.DeepEqual(timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Verify non-empty data
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
timeField := "@timestamp"
msgField := "message"
rowsExpected := 3
timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000}
resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
{"@timestamp":"","_msg":"baz"}
{"_msg":"xyz","@timestamp":"","x":"y"}
`
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
}

View File

@@ -1,56 +0,0 @@
package loki
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/metrics"
)
var (
lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
)
// RequestHandler processes Loki insert requests
//
// See https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
if path != "/api/v1/push" {
return false
}
contentType := r.Header.Get("Content-Type")
switch contentType {
case "application/json":
lokiRequestsJSONTotal.Inc()
return handleJSON(r, w)
default:
// Protobuf request body should be handled by default accoring to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
lokiRequestsProtobufTotal.Inc()
return handleProtobuf(r, w)
}
}
func getCommonParams(r *http.Request) (*insertutils.CommonParams, error) {
cp, err := insertutils.GetCommonParams(r)
if err != nil {
return nil, err
}
// If parsed tenant is (0,0) it is likely to be default tenant
// Try parsing tenant from Loki headers
if cp.TenantID.AccountID == 0 && cp.TenantID.ProjectID == 0 {
org := r.Header.Get("X-Scope-OrgID")
if org != "" {
tenantID, err := logstorage.GetTenantIDFromString(org)
if err != nil {
return nil, err
}
cp.TenantID = tenantID
}
}
return cp, nil
}

View File

@@ -1,190 +0,0 @@
package loki
import (
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson"
)
var (
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
parserPool fastjson.ParserPool
)
func handleJSON(r *http.Request, w http.ResponseWriter) bool {
reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader)
if err != nil {
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
return true
}
defer common.PutGzipReader(zr)
reader = zr
}
wcr := writeconcurrencylimiter.GetReader(reader)
data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr)
if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true
}
cp, err := getCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := parseJSONRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
return true
}
rowsIngestedJSONTotal.Add(n)
return true
}
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
p := parserPool.Get()
defer parserPool.Put(p)
v, err := p.ParseBytes(data)
if err != nil {
return 0, fmt.Errorf("cannot parse JSON request body: %w", err)
}
streamsV := v.Get("streams")
if streamsV == nil {
return 0, fmt.Errorf("missing `streams` item in the parsed JSON: %q", v)
}
streams, err := streamsV.Array()
if err != nil {
return 0, fmt.Errorf("`streams` item in the parsed JSON must contain an array; got %q", streamsV)
}
currentTimestamp := time.Now().UnixNano()
var commonFields []logstorage.Field
rowsIngested := 0
for _, stream := range streams {
// populate common labels from `stream` dict
commonFields = commonFields[:0]
labelsV := stream.Get("stream")
var labels *fastjson.Object
if labelsV != nil {
o, err := labelsV.Object()
if err != nil {
return rowsIngested, fmt.Errorf("`stream` item in the parsed JSON must contain an object; got %q", labelsV)
}
labels = o
}
labels.Visit(func(k []byte, v *fastjson.Value) {
if err != nil {
return
}
vStr, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("unexpected label value type for %q:%q; want string", k, v)
return
}
commonFields = append(commonFields, logstorage.Field{
Name: bytesutil.ToUnsafeString(k),
Value: bytesutil.ToUnsafeString(vStr),
})
})
if err != nil {
return rowsIngested, fmt.Errorf("error when parsing `stream` object: %w", err)
}
// populate messages from `values` array
linesV := stream.Get("values")
if linesV == nil {
return rowsIngested, fmt.Errorf("missing `values` item in the parsed JSON %q", stream)
}
lines, err := linesV.Array()
if err != nil {
return rowsIngested, fmt.Errorf("`values` item in the parsed JSON must contain an array; got %q", linesV)
}
fields := commonFields
for _, line := range lines {
lineA, err := line.Array()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected contents of `values` item; want array; got %q", line)
}
if len(lineA) != 2 {
return rowsIngested, fmt.Errorf("unexpected number of values in `values` item array %q; got %d want 2", line, len(lineA))
}
// parse timestamp
timestamp, err := lineA[0].StringBytes()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected log timestamp type for %q; want string", lineA[0])
}
ts, err := parseLokiTimestamp(bytesutil.ToUnsafeString(timestamp))
if err != nil {
return rowsIngested, fmt.Errorf("cannot parse log timestamp %q: %w", timestamp, err)
}
if ts == 0 {
ts = currentTimestamp
}
// parse log message
msg, err := lineA[1].StringBytes()
if err != nil {
return rowsIngested, fmt.Errorf("unexpected log message type for %q; want string", lineA[1])
}
fields = append(fields[:len(commonFields)], logstorage.Field{
Name: "_msg",
Value: bytesutil.ToUnsafeString(msg),
})
processLogMessage(ts, fields)
}
rowsIngested += len(lines)
}
return rowsIngested, nil
}
func parseLokiTimestamp(s string) (int64, error) {
if s == "" {
// Special case - an empty timestamp must be substituted with the current time by the caller.
return 0, nil
}
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
// Fall back to parsing floating-point value
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0, err
}
if f > math.MaxInt64 {
return 0, fmt.Errorf("too big timestamp in nanoseconds: %v; mustn't exceed %v", f, int64(math.MaxInt64))
}
if f < math.MinInt64 {
return 0, fmt.Errorf("too small timestamp in nanoseconds: %v; must be bigger or equal to %v", f, int64(math.MinInt64))
}
n = int64(f)
}
if n < 0 {
return 0, fmt.Errorf("too small timestamp in nanoseconds: %d; must be bigger than 0", n)
}
return n, nil
}

View File

@@ -1,130 +0,0 @@
package loki
import (
"fmt"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestParseJSONRequestFailure(t *testing.T) {
f := func(s string) {
t.Helper()
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to parseJSONRequest callback!")
})
if err == nil {
t.Fatalf("expecting non-nil error")
}
if n != 0 {
t.Fatalf("unexpected number of parsed lines: %d; want 0", n)
}
}
f(``)
// Invalid json
f(`{}`)
f(`[]`)
f(`"foo"`)
f(`123`)
// invalid type for `streams` item
f(`{"streams":123}`)
// Missing `values` item
f(`{"streams":[{}]}`)
// Invalid type for `values` item
f(`{"streams":[{"values":"foobar"}]}`)
// Invalid type for `stream` item
f(`{"streams":[{"stream":[],"values":[]}]}`)
// Invalid type for `values` individual item
f(`{"streams":[{"values":[123]}]}`)
// Invalid length of `values` individual item
f(`{"streams":[{"values":[[]]}]}`)
f(`{"streams":[{"values":[["123"]]}]}`)
f(`{"streams":[{"values":[["123","456","789"]]}]}`)
// Invalid type for timestamp inside `values` individual item
f(`{"streams":[{"values":[[123,"456"]}]}`)
// Invalid type for log message
f(`{"streams":[{"values":[["123",1234]]}]}`)
}
func TestParseJSONRequestSuccess(t *testing.T) {
f := func(s string, resultExpected string) {
t.Helper()
var lines []string
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
var a []string
for _, f := range fields {
a = append(a, f.String())
}
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line)
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != len(lines) {
t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines))
}
result := strings.Join(lines, "\n")
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Empty streams
f(`{"streams":[]}`, ``)
f(`{"streams":[{"values":[]}]}`, ``)
f(`{"streams":[{"stream":{},"values":[]}]}`, ``)
f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``)
// Empty stream labels
f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
// Non-empty stream labels
f(`{"streams":[{"stream":{
"label1": "value1",
"label2": "value2"
},"values":[
["1577836800000000001", "foo bar"],
["1477836900005000002", "abc"],
["147.78369e9", "foobar"]
]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar"
_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc"
_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`)
// Multiple streams
f(`{
"streams": [
{
"stream": {
"foo": "bar",
"a": "b"
},
"values": [
["1577836800000000001", "foo bar"],
["1577836900005000002", "abc"]
]
},
{
"stream": {
"x": "y"
},
"values": [
["1877836900005000002", "yx"]
]
}
]
}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar"
_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc"
_time:1877836900005000002 "x":"y" "_msg":"yx"`)
}

View File

@@ -1,78 +0,0 @@
package loki
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func BenchmarkParseJSONRequest(b *testing.B) {
for _, streams := range []int{5, 10} {
for _, rows := range []int{100, 1000} {
for _, labels := range []int{10, 50} {
b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) {
benchmarkParseJSONRequest(b, streams, rows, labels)
})
}
}
}
}
func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
b.ReportAllocs()
b.SetBytes(int64(streams * rows))
b.RunParallel(func(pb *testing.PB) {
data := getJSONBody(streams, rows, labels)
for pb.Next() {
_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {})
if err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})
}
func getJSONBody(streams, rows, labels int) []byte {
body := append([]byte{}, `{"streams":[`...)
now := time.Now().UnixNano()
valuePrefix := fmt.Sprintf(`["%d","value_`, now)
for i := 0; i < streams; i++ {
body = append(body, `{"stream":{`...)
for j := 0; j < labels; j++ {
body = append(body, `"label_`...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, `":"value_`...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, '"')
if j < labels-1 {
body = append(body, ',')
}
}
body = append(body, `}, "values":[`...)
for j := 0; j < rows; j++ {
body = append(body, valuePrefix...)
body = strconv.AppendInt(body, int64(j), 10)
body = append(body, `"]`...)
if j < rows-1 {
body = append(body, ',')
}
}
body = append(body, `]}`...)
if i < streams-1 {
body = append(body, ',')
}
}
body = append(body, `]}`...)
return body
}

View File

@@ -1,171 +0,0 @@
package loki
import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
var (
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
bytesBufPool bytesutil.ByteBufferPool
pushReqsPool sync.Pool
)
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
wcr := writeconcurrencylimiter.GetReader(r.Body)
data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr)
if err != nil {
httpserver.Errorf(w, r, "cannot read request body: %s", err)
return true
}
cp, err := getCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := parseProtobufRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse loki request: %s", err)
return true
}
rowsIngestedProtobufTotal.Add(n)
return true
}
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
bb := bytesBufPool.Get()
defer bytesBufPool.Put(bb)
buf, err := snappy.Decode(bb.B[:cap(bb.B)], data)
if err != nil {
return 0, fmt.Errorf("cannot decode snappy-encoded request body: %w", err)
}
bb.B = buf
req := getPushRequest()
defer putPushRequest(req)
err = req.Unmarshal(bb.B)
if err != nil {
return 0, fmt.Errorf("cannot parse request body: %s", err)
}
var commonFields []logstorage.Field
rowsIngested := 0
streams := req.Streams
currentTimestamp := time.Now().UnixNano()
for i := range streams {
stream := &streams[i]
// st.Labels contains labels for the stream.
// Labels are same for all entries in the stream.
commonFields, err = parsePromLabels(commonFields[:0], stream.Labels)
if err != nil {
return rowsIngested, fmt.Errorf("cannot parse stream labels %q: %s", stream.Labels, err)
}
fields := commonFields
entries := stream.Entries
for j := range entries {
entry := &entries[j]
fields = append(fields[:len(commonFields)], logstorage.Field{
Name: "_msg",
Value: entry.Line,
})
ts := entry.Timestamp.UnixNano()
if ts == 0 {
ts = currentTimestamp
}
processLogMessage(ts, fields)
}
rowsIngested += len(stream.Entries)
}
return rowsIngested, nil
}
// parsePromLabels parses log fields in Prometheus text exposition format from s, appends them to dst and returns the result.
//
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
func parsePromLabels(dst []logstorage.Field, s string) ([]logstorage.Field, error) {
// Make sure s is wrapped into `{...}`
s = strings.TrimSpace(s)
if len(s) < 2 {
return nil, fmt.Errorf("too short string to parse: %q", s)
}
if s[0] != '{' {
return nil, fmt.Errorf("missing `{` at the beginning of %q", s)
}
if s[len(s)-1] != '}' {
return nil, fmt.Errorf("missing `}` at the end of %q", s)
}
s = s[1 : len(s)-1]
for len(s) > 0 {
// Parse label name
n := strings.IndexByte(s, '=')
if n < 0 {
return nil, fmt.Errorf("cannot find `=` char for label value at %s", s)
}
name := s[:n]
s = s[n+1:]
// Parse label value
qs, err := strconv.QuotedPrefix(s)
if err != nil {
return nil, fmt.Errorf("cannot parse value for label %q at %s: %w", name, s, err)
}
s = s[len(qs):]
value, err := strconv.Unquote(qs)
if err != nil {
return nil, fmt.Errorf("cannot unquote value %q for label %q: %w", qs, name, err)
}
// Append the found field to dst.
dst = append(dst, logstorage.Field{
Name: name,
Value: value,
})
// Check whether there are other labels remaining
if len(s) == 0 {
break
}
if !strings.HasPrefix(s, ",") {
return nil, fmt.Errorf("missing `,` char at %s", s)
}
s = s[1:]
s = strings.TrimPrefix(s, " ")
}
return dst, nil
}
func getPushRequest() *PushRequest {
v := pushReqsPool.Get()
if v == nil {
return &PushRequest{}
}
return v.(*PushRequest)
}
func putPushRequest(req *PushRequest) {
req.Reset()
pushReqsPool.Put(req)
}

View File

@@ -1,171 +0,0 @@
package loki
import (
"fmt"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/golang/snappy"
)
func TestParseProtobufRequestSuccess(t *testing.T) {
f := func(s string, resultExpected string) {
t.Helper()
var pr PushRequest
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
msg := ""
for _, f := range fields {
if f.Name == "_msg" {
msg = f.Value
}
}
var a []string
for _, f := range fields {
if f.Name == "_msg" {
continue
}
item := fmt.Sprintf("%s=%q", f.Name, f.Value)
a = append(a, item)
}
labels := "{" + strings.Join(a, ", ") + "}"
pr.Streams = append(pr.Streams, Stream{
Labels: labels,
Entries: []Entry{
{
Timestamp: time.Unix(0, timestamp),
Line: msg,
},
},
})
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != len(pr.Streams) {
t.Fatalf("unexpected number of streams; got %d; want %d", len(pr.Streams), n)
}
data, err := pr.Marshal()
if err != nil {
t.Fatalf("unexpected error when marshaling PushRequest: %s", err)
}
encodedData := snappy.Encode(nil, data)
var lines []string
n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) {
var a []string
for _, f := range fields {
a = append(a, f.String())
}
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line)
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != len(lines) {
t.Fatalf("unexpected number of lines parsed; got %d; want %d", n, len(lines))
}
result := strings.Join(lines, "\n")
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Empty streams
f(`{"streams":[]}`, ``)
f(`{"streams":[{"values":[]}]}`, ``)
f(`{"streams":[{"stream":{},"values":[]}]}`, ``)
f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, ``)
// Empty stream labels
f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, `_time:1577836800000000001 "_msg":"foo bar"`)
// Non-empty stream labels
f(`{"streams":[{"stream":{
"label1": "value1",
"label2": "value2"
},"values":[
["1577836800000000001", "foo bar"],
["1477836900005000002", "abc"],
["147.78369e9", "foobar"]
]}]}`, `_time:1577836800000000001 "label1":"value1" "label2":"value2" "_msg":"foo bar"
_time:1477836900005000002 "label1":"value1" "label2":"value2" "_msg":"abc"
_time:147783690000 "label1":"value1" "label2":"value2" "_msg":"foobar"`)
// Multiple streams
f(`{
"streams": [
{
"stream": {
"foo": "bar",
"a": "b"
},
"values": [
["1577836800000000001", "foo bar"],
["1577836900005000002", "abc"]
]
},
{
"stream": {
"x": "y"
},
"values": [
["1877836900005000002", "yx"]
]
}
]
}`, `_time:1577836800000000001 "foo":"bar" "a":"b" "_msg":"foo bar"
_time:1577836900005000002 "foo":"bar" "a":"b" "_msg":"abc"
_time:1877836900005000002 "x":"y" "_msg":"yx"`)
}
func TestParsePromLabelsSuccess(t *testing.T) {
f := func(s string) {
t.Helper()
fields, err := parsePromLabels(nil, s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
var a []string
for _, f := range fields {
a = append(a, fmt.Sprintf("%s=%q", f.Name, f.Value))
}
result := "{" + strings.Join(a, ", ") + "}"
if result != s {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, s)
}
}
f("{}")
f(`{foo="bar"}`)
f(`{foo="bar", baz="x", y="z"}`)
f(`{foo="ba\"r\\z\n", a="", b="\"\\"}`)
}
func TestParsePromLabelsFailure(t *testing.T) {
f := func(s string) {
t.Helper()
fields, err := parsePromLabels(nil, s)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if len(fields) > 0 {
t.Fatalf("unexpected non-empty fields: %s", fields)
}
}
f("")
f("{")
f(`{foo}`)
f(`{foo=bar}`)
f(`{foo="bar}`)
f(`{foo="ba\",r}`)
f(`{foo="bar" baz="aa"}`)
f(`foobar`)
f(`foo{bar="baz"}`)
}

View File

@@ -1,65 +0,0 @@
package loki
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/golang/snappy"
)
func BenchmarkParseProtobufRequest(b *testing.B) {
for _, streams := range []int{5, 10} {
for _, rows := range []int{100, 1000} {
for _, labels := range []int{10, 50} {
b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) {
benchmarkParseProtobufRequest(b, streams, rows, labels)
})
}
}
}
}
func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
b.ReportAllocs()
b.SetBytes(int64(streams * rows))
b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels)
for pb.Next() {
_, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {})
if err != nil {
panic(fmt.Errorf("unexpected error: %s", err))
}
}
})
}
func getProtobufBody(streams, rows, labels int) []byte {
var pr PushRequest
for i := 0; i < streams; i++ {
var st Stream
st.Labels = `{`
for j := 0; j < labels; j++ {
st.Labels += `label_` + strconv.Itoa(j) + `="value_` + strconv.Itoa(j) + `"`
if j < labels-1 {
st.Labels += `,`
}
}
st.Labels += `}`
for j := 0; j < rows; j++ {
st.Entries = append(st.Entries, Entry{Timestamp: time.Now(), Line: "value_" + strconv.Itoa(j)})
}
pr.Streams = append(pr.Streams, st)
}
body, _ := pr.Marshal()
encodedBody := snappy.Encode(nil, body)
return encodedBody
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,38 +0,0 @@
syntax = "proto3";
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/push.proto
// Licensed under the Apache License, Version 2.0 (the "License");
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
package logproto;
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki";
message PushRequest {
repeated StreamAdapter streams = 1 [
(gogoproto.jsontag) = "streams",
(gogoproto.customtype) = "Stream"
];
}
message StreamAdapter {
string labels = 1 [(gogoproto.jsontag) = "labels"];
repeated EntryAdapter entries = 2 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "entries"
];
// hash contains the original hash of the stream.
uint64 hash = 3 [(gogoproto.jsontag) = "-"];
}
message EntryAdapter {
google.protobuf.Timestamp timestamp = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "ts"
];
string line = 2 [(gogoproto.jsontag) = "line"];
}

View File

@@ -1,110 +0,0 @@
package loki
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/timestamp.go
// Licensed under the Apache License, Version 2.0 (the "License");
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
import (
"errors"
"strconv"
"time"
"github.com/gogo/protobuf/types"
)
const (
// Seconds field of the earliest valid Timestamp.
// This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
minValidSeconds = -62135596800
// Seconds field just after the latest valid Timestamp.
// This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
maxValidSeconds = 253402300800
)
// validateTimestamp determines whether a Timestamp is valid.
// A valid timestamp represents a time in the range
// [0001-01-01, 10000-01-01) and has a Nanos field
// in the range [0, 1e9).
//
// If the Timestamp is valid, validateTimestamp returns nil.
// Otherwise, it returns an error that describes
// the problem.
//
// Every valid Timestamp can be represented by a time.Time, but the converse is not true.
func validateTimestamp(ts *types.Timestamp) error {
if ts == nil {
return errors.New("timestamp: nil Timestamp")
}
if ts.Seconds < minValidSeconds {
return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01")
}
if ts.Seconds >= maxValidSeconds {
return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01")
}
if ts.Nanos < 0 || ts.Nanos >= 1e9 {
return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)")
}
return nil
}
// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts)
// but avoids the escape incurred by using fmt.Sprintf, eliminating
// unnecessary heap allocations.
func formatTimestamp(ts *types.Timestamp) string {
if ts == nil {
return "nil"
}
seconds := strconv.FormatInt(ts.Seconds, 10)
nanos := strconv.FormatInt(int64(ts.Nanos), 10)
return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}"
}
func sizeOfStdTime(t time.Time) int {
ts, err := timestampProto(t)
if err != nil {
return 0
}
return ts.Size()
}
func stdTimeMarshalTo(t time.Time, data []byte) (int, error) {
ts, err := timestampProto(t)
if err != nil {
return 0, err
}
return ts.MarshalTo(data)
}
func stdTimeUnmarshal(t *time.Time, data []byte) error {
ts := &types.Timestamp{}
if err := ts.Unmarshal(data); err != nil {
return err
}
tt, err := timestampFromProto(ts)
if err != nil {
return err
}
*t = tt
return nil
}
func timestampFromProto(ts *types.Timestamp) (time.Time, error) {
// Don't return the zero value on error, because corresponds to a valid
// timestamp. Instead return whatever time.Unix gives us.
var t time.Time
if ts == nil {
t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp
} else {
t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC()
}
return t, validateTimestamp(ts)
}
func timestampProto(t time.Time) (types.Timestamp, error) {
ts := types.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
return ts, validateTimestamp(&ts)
}

View File

@@ -1,481 +0,0 @@
package loki
// source: https://raw.githubusercontent.com/grafana/loki/main/pkg/push/types.go
// Licensed under the Apache License, Version 2.0 (the "License");
// https://github.com/grafana/loki/blob/main/pkg/push/LICENSE
import (
"fmt"
"io"
"time"
)
// Stream contains a unique labels set as a string and a set of entries for it.
// We are not using the proto generated version but this custom one so that we
// can improve serialization see benchmark.
type Stream struct {
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"`
}
// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}
// Marshal implements the proto.Marshaler interface.
func (m *Stream) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
// MarshalTo marshals m to dst.
func (m *Stream) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
// MarshalToSizedBuffer marshals m to the sized buffer.
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Hash != 0 {
i = encodeVarintPush(dAtA, i, m.Hash)
i--
dAtA[i] = 0x18
}
if len(m.Entries) > 0 {
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if len(m.Labels) > 0 {
i -= len(m.Labels)
copy(dAtA[i:], m.Labels)
i = encodeVarintPush(dAtA, i, uint64(len(m.Labels)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
// Marshal implements the proto.Marshaler interface.
func (m *Entry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
// MarshalTo marshals m to dst.
func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
// MarshalToSizedBuffer marshals m to the sized buffer.
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
i = encodeVarintPush(dAtA, i, uint64(len(m.Line)))
i--
dAtA[i] = 0x12
}
n7, err7 := stdTimeMarshalTo(m.Timestamp, dAtA[i-sizeOfStdTime(m.Timestamp):])
if err7 != nil {
return 0, err7
}
i -= n7
i = encodeVarintPush(dAtA, i, uint64(n7))
i--
dAtA[i] = 0xa
return len(dAtA) - i, nil
}
// Unmarshal unmarshals the given data into m.
func (m *Stream) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Entries = append(m.Entries, Entry{})
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType)
}
m.Hash = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Hash |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
// Unmarshal unmarshals the given data into m.
func (m *Entry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := stdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
// Size returns the size of the serialized Stream.
func (m *Stream) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Labels)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
if len(m.Entries) > 0 {
for _, e := range m.Entries {
l = e.Size()
n += 1 + l + sovPush(uint64(l))
}
}
if m.Hash != 0 {
n += 1 + sovPush(m.Hash)
}
return n
}
// Size returns the size of the serialized Entry
func (m *Entry) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = sizeOfStdTime(m.Timestamp)
n += 1 + l + sovPush(uint64(l))
l = len(m.Line)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}
// Equal returns true if the two Streams are equal.
func (m *Stream) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Stream)
if !ok {
that2, ok := that.(Stream)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if m.Labels != that1.Labels {
return false
}
if len(m.Entries) != len(that1.Entries) {
return false
}
for i := range m.Entries {
if !m.Entries[i].Equal(that1.Entries[i]) {
return false
}
}
return m.Hash == that1.Hash
}
// Equal returns true if the two Entries are equal.
func (m *Entry) Equal(that interface{}) bool {
if that == nil {
return m == nil
}
that1, ok := that.(*Entry)
if !ok {
that2, ok := that.(Entry)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return m == nil
} else if m == nil {
return false
}
if !m.Timestamp.Equal(that1.Timestamp) {
return false
}
if m.Line != that1.Line {
return false
}
return true
}

View File

@@ -1,43 +0,0 @@
package vlinsert
import (
"net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
)
// Init initializes vlinsert
func Init() {
}
// Stop stops vlinsert
func Stop() {
}
// RequestHandler handles insert requests for VictoriaLogs
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if !strings.HasPrefix(path, "/insert/") {
// Skip requests, which do not start with /insert/, since these aren't our requests.
return false
}
path = strings.TrimPrefix(path, "/insert")
path = strings.ReplaceAll(path, "//", "/")
if path == "/jsonline" {
return jsonline.RequestHandler(w, r)
}
switch {
case strings.HasPrefix(path, "/elasticsearch/"):
path = strings.TrimPrefix(path, "/elasticsearch")
return elasticsearch.RequestHandler(path, w, r)
case strings.HasPrefix(path, "/loki/"):
path = strings.TrimPrefix(path, "/loki")
return loki.RequestHandler(path, w, r)
default:
return false
}
}

View File

@@ -1,56 +0,0 @@
package logsql
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
var (
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
"if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+
"too big value for this flag may result in high memory usage since the sorting is performed in memory")
)
// ProcessQueryRequest handles /select/logsql/query request
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
qStr := r.FormValue("query")
q, err := logstorage.ParseQuery(qStr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
return
}
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
sw := getSortWriter()
sw.Init(w, maxSortBufferSize.IntN())
tenantIDs := []logstorage.TenantID{tenantID}
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
if len(columns) == 0 {
return
}
rowsCount := len(columns[0].Values)
bb := blockResultPool.Get()
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
WriteJSONRow(bb, columns, rowIdx)
}
sw.MustWrite(bb.B)
blockResultPool.Put(bb)
})
sw.FinalFlush()
putSortWriter(sw)
}
var blockResultPool bytesutil.ByteBufferPool

View File

@@ -1,41 +0,0 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) %}
{% stripspace %}
// JSONRow creates JSON row from the given fields.
{% func JSONRow(columns []logstorage.BlockColumn, rowIdx int) %}
{
{% code c := &columns[0] %}
{%q= c.Name %}:{%q= c.Values[rowIdx] %}
{% code columns = columns[1:] %}
{% for colIdx := range columns %}
{% code c := &columns[colIdx] %}
,{%q= c.Name %}:{%q= c.Values[rowIdx] %}
{% endfor %}
}{% newline %}
{% endfunc %}
// JSONRows prints formatted rows
{% func JSONRows(rows [][]logstorage.Field) %}
{% if len(rows) == 0 %}
{% return %}
{% endif %}
{% for _, fields := range rows %}
{
{% if len(fields) > 0 %}
{% code
f := fields[0]
fields = fields[1:]
%}
{%q= f.Name %}:{%q= f.Value %}
{% for _, f := range fields %}
,{%q= f.Name %}:{%q= f.Value %}
{% endfor %}
{% endif %}
}{% newline %}
{% endfor %}
{% endfunc %}
{% endstripspace %}

View File

@@ -1,166 +0,0 @@
// Code generated by qtc from "query_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vlselect/logsql/query_response.qtpl:1
package logsql
//line app/vlselect/logsql/query_response.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// JSONRow creates JSON row from the given fields.
//line app/vlselect/logsql/query_response.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/query_response.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/query_response.qtpl:8
func StreamJSONRow(qw422016 *qt422016.Writer, columns []logstorage.BlockColumn, rowIdx int) {
//line app/vlselect/logsql/query_response.qtpl:8
qw422016.N().S(`{`)
//line app/vlselect/logsql/query_response.qtpl:10
c := &columns[0]
//line app/vlselect/logsql/query_response.qtpl:11
qw422016.N().Q(c.Name)
//line app/vlselect/logsql/query_response.qtpl:11
qw422016.N().S(`:`)
//line app/vlselect/logsql/query_response.qtpl:11
qw422016.N().Q(c.Values[rowIdx])
//line app/vlselect/logsql/query_response.qtpl:12
columns = columns[1:]
//line app/vlselect/logsql/query_response.qtpl:13
for colIdx := range columns {
//line app/vlselect/logsql/query_response.qtpl:14
c := &columns[colIdx]
//line app/vlselect/logsql/query_response.qtpl:14
qw422016.N().S(`,`)
//line app/vlselect/logsql/query_response.qtpl:15
qw422016.N().Q(c.Name)
//line app/vlselect/logsql/query_response.qtpl:15
qw422016.N().S(`:`)
//line app/vlselect/logsql/query_response.qtpl:15
qw422016.N().Q(c.Values[rowIdx])
//line app/vlselect/logsql/query_response.qtpl:16
}
//line app/vlselect/logsql/query_response.qtpl:16
qw422016.N().S(`}`)
//line app/vlselect/logsql/query_response.qtpl:17
qw422016.N().S(`
`)
//line app/vlselect/logsql/query_response.qtpl:18
}
//line app/vlselect/logsql/query_response.qtpl:18
func WriteJSONRow(qq422016 qtio422016.Writer, columns []logstorage.BlockColumn, rowIdx int) {
//line app/vlselect/logsql/query_response.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/query_response.qtpl:18
StreamJSONRow(qw422016, columns, rowIdx)
//line app/vlselect/logsql/query_response.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/query_response.qtpl:18
}
//line app/vlselect/logsql/query_response.qtpl:18
func JSONRow(columns []logstorage.BlockColumn, rowIdx int) string {
//line app/vlselect/logsql/query_response.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/query_response.qtpl:18
WriteJSONRow(qb422016, columns, rowIdx)
//line app/vlselect/logsql/query_response.qtpl:18
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/query_response.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/query_response.qtpl:18
return qs422016
//line app/vlselect/logsql/query_response.qtpl:18
}
// JSONRows prints formatted rows
//line app/vlselect/logsql/query_response.qtpl:21
func StreamJSONRows(qw422016 *qt422016.Writer, rows [][]logstorage.Field) {
//line app/vlselect/logsql/query_response.qtpl:22
if len(rows) == 0 {
//line app/vlselect/logsql/query_response.qtpl:23
return
//line app/vlselect/logsql/query_response.qtpl:24
}
//line app/vlselect/logsql/query_response.qtpl:25
for _, fields := range rows {
//line app/vlselect/logsql/query_response.qtpl:25
qw422016.N().S(`{`)
//line app/vlselect/logsql/query_response.qtpl:27
if len(fields) > 0 {
//line app/vlselect/logsql/query_response.qtpl:29
f := fields[0]
fields = fields[1:]
//line app/vlselect/logsql/query_response.qtpl:32
qw422016.N().Q(f.Name)
//line app/vlselect/logsql/query_response.qtpl:32
qw422016.N().S(`:`)
//line app/vlselect/logsql/query_response.qtpl:32
qw422016.N().Q(f.Value)
//line app/vlselect/logsql/query_response.qtpl:33
for _, f := range fields {
//line app/vlselect/logsql/query_response.qtpl:33
qw422016.N().S(`,`)
//line app/vlselect/logsql/query_response.qtpl:34
qw422016.N().Q(f.Name)
//line app/vlselect/logsql/query_response.qtpl:34
qw422016.N().S(`:`)
//line app/vlselect/logsql/query_response.qtpl:34
qw422016.N().Q(f.Value)
//line app/vlselect/logsql/query_response.qtpl:35
}
//line app/vlselect/logsql/query_response.qtpl:36
}
//line app/vlselect/logsql/query_response.qtpl:36
qw422016.N().S(`}`)
//line app/vlselect/logsql/query_response.qtpl:37
qw422016.N().S(`
`)
//line app/vlselect/logsql/query_response.qtpl:38
}
//line app/vlselect/logsql/query_response.qtpl:39
}
//line app/vlselect/logsql/query_response.qtpl:39
func WriteJSONRows(qq422016 qtio422016.Writer, rows [][]logstorage.Field) {
//line app/vlselect/logsql/query_response.qtpl:39
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/query_response.qtpl:39
StreamJSONRows(qw422016, rows)
//line app/vlselect/logsql/query_response.qtpl:39
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/query_response.qtpl:39
}
//line app/vlselect/logsql/query_response.qtpl:39
func JSONRows(rows [][]logstorage.Field) string {
//line app/vlselect/logsql/query_response.qtpl:39
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/query_response.qtpl:39
WriteJSONRows(qb422016, rows)
//line app/vlselect/logsql/query_response.qtpl:39
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/query_response.qtpl:39
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/query_response.qtpl:39
return qs422016
//line app/vlselect/logsql/query_response.qtpl:39
}

View File

@@ -1,225 +0,0 @@
package logsql
import (
"bytes"
"io"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func getSortWriter() *sortWriter {
v := sortWriterPool.Get()
if v == nil {
return &sortWriter{}
}
return v.(*sortWriter)
}
func putSortWriter(sw *sortWriter) {
sw.reset()
sortWriterPool.Put(sw)
}
var sortWriterPool sync.Pool
// sortWriter expects JSON line stream to be written to it.
//
// It buffers the incoming data until its size reaches maxBufLen.
// Then it streams the buffered data and all the incoming data to w.
//
// The FinalFlush() must be called when all the data is written.
// If the buf isn't empty at FinalFlush() call, then the buffered data
// is sorted by _time field.
type sortWriter struct {
mu sync.Mutex
w io.Writer
maxBufLen int
buf []byte
bufFlushed bool
hasErr bool
}
func (sw *sortWriter) reset() {
sw.w = nil
sw.maxBufLen = 0
sw.buf = sw.buf[:0]
sw.bufFlushed = false
sw.hasErr = false
}
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
sw.reset()
sw.w = w
sw.maxBufLen = maxBufLen
}
func (sw *sortWriter) MustWrite(p []byte) {
sw.mu.Lock()
defer sw.mu.Unlock()
if sw.hasErr {
return
}
if sw.bufFlushed {
if _, err := sw.w.Write(p); err != nil {
sw.hasErr = true
}
return
}
if len(sw.buf)+len(p) < sw.maxBufLen {
sw.buf = append(sw.buf, p...)
return
}
sw.bufFlushed = true
if len(sw.buf) > 0 {
if _, err := sw.w.Write(sw.buf); err != nil {
sw.hasErr = true
return
}
sw.buf = sw.buf[:0]
}
if _, err := sw.w.Write(p); err != nil {
sw.hasErr = true
}
}
func (sw *sortWriter) FinalFlush() {
if sw.hasErr || sw.bufFlushed {
return
}
rs := getRowsSorter()
rs.parseRows(sw.buf)
rs.sort()
WriteJSONRows(sw.w, rs.rows)
putRowsSorter(rs)
}
func getRowsSorter() *rowsSorter {
v := rowsSorterPool.Get()
if v == nil {
return &rowsSorter{}
}
return v.(*rowsSorter)
}
func putRowsSorter(rs *rowsSorter) {
rs.reset()
rowsSorterPool.Put(rs)
}
var rowsSorterPool sync.Pool
type rowsSorter struct {
buf []byte
fieldsBuf []logstorage.Field
rows [][]logstorage.Field
times []string
}
func (rs *rowsSorter) reset() {
rs.buf = rs.buf[:0]
fieldsBuf := rs.fieldsBuf
for i := range fieldsBuf {
fieldsBuf[i].Reset()
}
rs.fieldsBuf = fieldsBuf[:0]
rows := rs.rows
for i := range rows {
rows[i] = nil
}
rs.rows = rows[:0]
times := rs.times
for i := range times {
times[i] = ""
}
rs.times = times[:0]
}
func (rs *rowsSorter) parseRows(src []byte) {
rs.reset()
buf := rs.buf
fieldsBuf := rs.fieldsBuf
rows := rs.rows
times := rs.times
p := logjson.GetParser()
for len(src) > 0 {
var line []byte
n := bytes.IndexByte(src, '\n')
if n < 0 {
line = src
src = nil
} else {
line = src[:n]
src = src[n+1:]
}
if len(line) == 0 {
continue
}
if err := p.ParseLogMessage(line); err != nil {
logger.Panicf("BUG: unexpected invalid JSON line: %s", err)
}
timeValue := ""
fieldsBufLen := len(fieldsBuf)
for _, f := range p.Fields {
bufLen := len(buf)
buf = append(buf, f.Name...)
name := bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, f.Value...)
value := bytesutil.ToUnsafeString(buf[bufLen:])
fieldsBuf = append(fieldsBuf, logstorage.Field{
Name: name,
Value: value,
})
if name == "_time" {
timeValue = value
}
}
rows = append(rows, fieldsBuf[fieldsBufLen:])
times = append(times, timeValue)
}
logjson.PutParser(p)
rs.buf = buf
rs.fieldsBuf = fieldsBuf
rs.rows = rows
rs.times = times
}
func (rs *rowsSorter) Len() int {
return len(rs.rows)
}
func (rs *rowsSorter) Less(i, j int) bool {
times := rs.times
return times[i] < times[j]
}
func (rs *rowsSorter) Swap(i, j int) {
times := rs.times
rows := rs.rows
times[i], times[j] = times[j], times[i]
rows[i], rows[j] = rows[j], rows[i]
}
func (rs *rowsSorter) sort() {
sort.Sort(rs)
}

View File

@@ -1,39 +0,0 @@
package logsql
import (
"bytes"
"strings"
"testing"
)
func TestSortWriter(t *testing.T) {
f := func(maxBufLen int, data string, expectedResult string) {
t.Helper()
var bb bytes.Buffer
sw := getSortWriter()
sw.Init(&bb, maxBufLen)
for _, s := range strings.Split(data, "\n") {
sw.MustWrite([]byte(s + "\n"))
}
sw.FinalFlush()
putSortWriter(sw)
result := bb.String()
if result != expectedResult {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
}
}
f(100, "", "")
f(100, "{}", "{}\n")
data := `{"_time":"def","_msg":"xxx"}
{"_time":"abc","_msg":"foo"}`
resultExpected := `{"_time":"abc","_msg":"foo"}
{"_time":"def","_msg":"xxx"}
`
f(100, data, resultExpected)
f(10, data, data+"\n")
}

View File

@@ -1,162 +0,0 @@
package vlselect
import (
"embed"
"flag"
"fmt"
"net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/logsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
var (
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+
"It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. "+
"See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the search request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
)
func getDefaultMaxConcurrentRequests() int {
n := cgroup.AvailableCPUs()
if n <= 4 {
n *= 2
}
if n > 16 {
// A single request can saturate all the CPU cores, so there is no sense
// in allowing higher number of concurrent requests - they will just contend
// for unavailable CPU time.
n = 16
}
return n
}
// Init initializes vlselect
func Init() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentRequests)
}
// Stop stops vlselect
func Stop() {
}
var concurrencyLimitCh chan struct{}
var (
concurrencyLimitReached = metrics.NewCounter(`vl_concurrent_select_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vl_concurrent_select_limit_timeout_total`)
_ = metrics.NewGauge(`vl_concurrent_select_capacity`, func() float64 {
return float64(cap(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vl_concurrent_select_current`, func() float64 {
return float64(len(concurrencyLimitCh))
})
)
//go:embed vmui
var vmuiFiles embed.FS
var vmuiFileServer = http.FileServer(http.FS(vmuiFiles))
// RequestHandler handles select requests for VictoriaLogs
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if !strings.HasPrefix(path, "/select/") {
// Skip requests, which do not start with /select/, since these aren't our requests.
return false
}
path = strings.TrimPrefix(path, "/select")
path = strings.ReplaceAll(path, "//", "/")
if path == "/vmui" {
// VMUI access via incomplete url without `/` in the end. Redirect to complete url.
// Use relative redirect, since the hostname and path prefix may be incorrect if VictoriaMetrics
// is hidden behind vmauth or similar proxy.
_ = r.ParseForm()
newURL := "vmui/?" + r.Form.Encode()
httpserver.Redirect(w, newURL)
return true
}
if strings.HasPrefix(path, "/vmui/") {
r.URL.Path = path
vmuiFileServer.ServeHTTP(w, r)
return true
}
// Limit the number of concurrent queries, which can consume big amounts of CPU.
startTime := time.Now()
stopCh := r.Context().Done()
select {
case concurrencyLimitCh <- struct{}{}:
defer func() { <-concurrencyLimitCh }()
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
d := getMaxQueryDuration(r)
if d > *maxQueueDuration {
d = *maxQueueDuration
}
t := timerpool.Get(d)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
defer func() { <-concurrencyLimitCh }()
case <-stopCh:
timerpool.Put(t)
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has cancelled the request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
return true
case <-t.C:
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
return true
}
}
switch {
case path == "/logsql/query":
logsqlQueryRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessQueryRequest(w, r, stopCh)
return true
default:
return false
}
}
// getMaxQueryDuration returns the maximum duration for query from r.
func getMaxQueryDuration(r *http.Request) time.Duration {
dms, err := httputils.GetDuration(r, "timeout", 0)
if err != nil {
dms = 0
}
d := time.Duration(dms) * time.Millisecond
if d <= 0 || d > *maxQueryDuration {
d = *maxQueryDuration
}
return d
}
var (
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

View File

@@ -1,14 +0,0 @@
{
"files": {
"main.css": "./static/css/main.5f461a27.css",
"main.js": "./static/js/main.7566144a.js",
"static/js/522.b5ae4365.chunk.js": "./static/js/522.b5ae4365.chunk.js",
"static/media/Lato-Regular.ttf": "./static/media/Lato-Regular.d714fec1633b69a9c2e9.ttf",
"static/media/Lato-Bold.ttf": "./static/media/Lato-Bold.32360ba4b57802daa4d6.ttf",
"index.html": "./index.html"
},
"entrypoints": [
"static/css/main.5f461a27.css",
"static/js/main.7566144a.js"
]
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

View File

@@ -1 +0,0 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1,user-scalable=no"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.7566144a.js"></script><link href="./static/css/main.5f461a27.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

View File

@@ -1,20 +0,0 @@
{
"short_name": "Victoria Metrics UI",
"name": "Victoria Metrics UI is a metric explorer for Victoria Metrics",
"icons": [
{
"src": "favicon-32x32.png",
"sizes": "32x32",
"type": "image/png"
},
{
"src": "apple-touch-icon.png",
"type": "image/png",
"sizes": "192x192"
}
],
"start_url": ".",
"display": "standalone",
"theme_color": "#000000",
"background_color": "#ffffff"
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

View File

@@ -1,3 +0,0 @@
# https://www.robotstxt.org/robotstxt.html
User-agent: *
Disallow:

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,40 +0,0 @@
/*!
Copyright (c) 2018 Jed Watson.
Licensed under the MIT License (MIT), see
http://jedwatson.github.io/classnames
*/
/*! regenerator-runtime -- Copyright (c) 2014-present, Facebook, Inc. -- license (MIT): https://github.com/facebook/regenerator/blob/main/LICENSE */
/**
* @remix-run/router v1.7.2
*
* Copyright (c) Remix Software Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE.md file in the root directory of this source tree.
*
* @license MIT
*/
/**
* React Router DOM v6.14.2
*
* Copyright (c) Remix Software Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE.md file in the root directory of this source tree.
*
* @license MIT
*/
/**
* React Router v6.14.2
*
* Copyright (c) Remix Software Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE.md file in the root directory of this source tree.
*
* @license MIT
*/

View File

@@ -1,189 +0,0 @@
package vlstorage
import (
"flag"
"fmt"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/metrics"
)
var (
retentionPeriod = flagutil.NewDuration("retentionPeriod", "7d", "Log entries with timestamps older than now-retentionPeriod are automatically deleted; "+
"log entries with timestamps outside the retention are also rejected during data ingestion; the minimum supported retention is 1d (one day); "+
"see https://docs.victoriametrics.com/VictoriaLogs/#retention")
futureRetention = flagutil.NewDuration("futureRetention", "2d", "Log entries with timestamps bigger than now+futureRetention are rejected during data ingestion; "+
"see https://docs.victoriametrics.com/VictoriaLogs/#retention")
storageDataPath = flag.String("storageDataPath", "victoria-logs-data", "Path to directory with the VictoriaLogs data; "+
"see https://docs.victoriametrics.com/VictoriaLogs/#storage")
inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+
"The saved data survives unclean shutdowns such as OOM crash, hardware reset, SIGKILL, etc. "+
"Bigger intervals may help increase the lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+
"Smaller intervals increase disk IO load. Minimum supported value is 1s")
logNewStreams = flag.Bool("logNewStreams", false, "Whether to log creation of new streams; this can be useful for debugging of high cardinality issues with log streams; "+
"see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields ; see also -logIngestedRows")
logIngestedRows = flag.Bool("logIngestedRows", false, "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; "+
"see https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/ ; see also -logNewStreams")
)
// Init initializes vlstorage.
//
// Stop must be called when vlstorage is no longer needed
func Init() {
if strg != nil {
logger.Panicf("BUG: Init() has been already called")
}
if retentionPeriod.Msecs < 24*3600*1000 {
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
}
cfg := &logstorage.StorageConfig{
Retention: time.Millisecond * time.Duration(retentionPeriod.Msecs),
FlushInterval: *inmemoryDataFlushInterval,
FutureRetention: time.Millisecond * time.Duration(futureRetention.Msecs),
LogNewStreams: *logNewStreams,
LogIngestedRows: *logIngestedRows,
}
logger.Infof("opening storage at -storageDataPath=%s", *storageDataPath)
startTime := time.Now()
strg = logstorage.MustOpenStorage(*storageDataPath, cfg)
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
logger.Infof("successfully opened storage in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
time.Since(startTime).Seconds(), ss.FileParts, ss.FileBlocks, ss.FileRowsCount, ss.CompressedFileSize)
storageMetrics = initStorageMetrics(strg)
metrics.RegisterSet(storageMetrics)
}
// Stop stops vlstorage.
func Stop() {
metrics.UnregisterSet(storageMetrics)
storageMetrics = nil
strg.MustClose()
strg = nil
}
var strg *logstorage.Storage
var storageMetrics *metrics.Set
// MustAddRows adds lr to vlstorage
func MustAddRows(lr *logstorage.LogRows) {
strg.MustAddRows(lr)
}
// RunQuery runs the given q and calls processBlock for the returned data blocks
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) {
strg.RunQuery(tenantIDs, q, stopCh, processBlock)
}
func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {
ssCache := &logstorage.StorageStats{}
var ssCacheLock sync.Mutex
var lastUpdateTime time.Time
m := func() *logstorage.StorageStats {
ssCacheLock.Lock()
defer ssCacheLock.Unlock()
if time.Since(lastUpdateTime) < time.Second {
return ssCache
}
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
ssCache = &ss
lastUpdateTime = time.Now()
return ssCache
}
ms := metrics.NewSet()
ms.NewGauge(fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), func() float64 {
return float64(fs.MustGetFreeSpace(*storageDataPath))
})
ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 {
return float64(m().InmemoryActiveMerges)
})
ms.NewGauge(`vl_merges_total{type="inmemory"}`, func() float64 {
return float64(m().InmemoryMergesTotal)
})
ms.NewGauge(`vl_active_merges{type="file"}`, func() float64 {
return float64(m().FileActiveMerges)
})
ms.NewGauge(`vl_merges_total{type="file"}`, func() float64 {
return float64(m().FileMergesTotal)
})
ms.NewGauge(`vl_storage_rows{type="inmemory"}`, func() float64 {
return float64(m().InmemoryRowsCount)
})
ms.NewGauge(`vl_storage_rows{type="file"}`, func() float64 {
return float64(m().FileRowsCount)
})
ms.NewGauge(`vl_storage_parts{type="inmemory"}`, func() float64 {
return float64(m().InmemoryParts)
})
ms.NewGauge(`vl_storage_parts{type="file"}`, func() float64 {
return float64(m().FileParts)
})
ms.NewGauge(`vl_storage_blocks{type="inmemory"}`, func() float64 {
return float64(m().InmemoryBlocks)
})
ms.NewGauge(`vl_storage_blocks{type="file"}`, func() float64 {
return float64(m().FileBlocks)
})
ms.NewGauge(`vl_partitions`, func() float64 {
return float64(m().PartitionsCount)
})
ms.NewGauge(`vl_streams_created_total`, func() float64 {
return float64(m().StreamsCreatedTotal)
})
ms.NewGauge(`vl_indexdb_rows`, func() float64 {
return float64(m().IndexdbItemsCount)
})
ms.NewGauge(`vl_indexdb_parts`, func() float64 {
return float64(m().IndexdbPartsCount)
})
ms.NewGauge(`vl_indexdb_blocks`, func() float64 {
return float64(m().IndexdbBlocksCount)
})
ms.NewGauge(`vl_data_size_bytes{type="indexdb"}`, func() float64 {
return float64(m().IndexdbSizeBytes)
})
ms.NewGauge(`vl_data_size_bytes{type="storage"}`, func() float64 {
dm := m()
return float64(dm.CompressedInmemorySize + dm.CompressedFileSize)
})
ms.NewGauge(`vl_compressed_data_size_bytes{type="inmemory"}`, func() float64 {
return float64(m().CompressedInmemorySize)
})
ms.NewGauge(`vl_compressed_data_size_bytes{type="file"}`, func() float64 {
return float64(m().CompressedFileSize)
})
ms.NewGauge(`vl_uncompressed_data_size_bytes{type="inmemory"}`, func() float64 {
return float64(m().UncompressedInmemorySize)
})
ms.NewGauge(`vl_uncompressed_data_size_bytes{type="file"}`, func() float64 {
return float64(m().UncompressedFileSize)
})
ms.NewGauge(`vl_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 {
return float64(m().RowsDroppedTooBigTimestamp)
})
ms.NewGauge(`vl_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 {
return float64(m().RowsDroppedTooSmallTimestamp)
})
return ms
}

View File

@@ -12,35 +12,20 @@ vmagent-prod:
vmagent-pure-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-pure
vmagent-linux-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-linux-amd64
vmagent-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-amd64
vmagent-linux-arm-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-linux-arm
vmagent-arm-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-arm
vmagent-linux-arm64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-linux-arm64
vmagent-arm64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-arm64
vmagent-linux-ppc64le-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-linux-ppc64le
vmagent-ppc64le-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-ppc64le
vmagent-linux-386-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-linux-386
vmagent-darwin-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-darwin-amd64
vmagent-darwin-arm64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-darwin-arm64
vmagent-freebsd-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-freebsd-amd64
vmagent-openbsd-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-openbsd-amd64
vmagent-windows-amd64-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-windows-amd64
vmagent-386-prod:
APP_NAME=vmagent $(MAKE) app-via-docker-386
package-vmagent:
APP_NAME=vmagent $(MAKE) package-via-docker
@@ -73,38 +58,23 @@ run-vmagent:
APP_NAME=vmagent \
$(MAKE) run-via-docker
vmagent-linux-amd64:
APP_NAME=vmagent CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
vmagent-amd64:
CGO_ENABLED=1 GOARCH=amd64 $(MAKE) vmagent-local-with-goarch
vmagent-linux-arm:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=linux GOARCH=arm $(MAKE) app-local-goos-goarch
vmagent-arm:
CGO_ENABLED=0 GOARCH=arm $(MAKE) vmagent-local-with-goarch
vmagent-linux-arm64:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(MAKE) app-local-goos-goarch
vmagent-arm64:
CGO_ENABLED=0 GOARCH=arm64 $(MAKE) vmagent-local-with-goarch
vmagent-linux-ppc64le:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le $(MAKE) app-local-goos-goarch
vmagent-ppc64le:
CGO_ENABLED=0 GOARCH=ppc64le $(MAKE) vmagent-local-with-goarch
vmagent-linux-s390x:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=linux GOARCH=s390x $(MAKE) app-local-goos-goarch
vmagent-386:
CGO_ENABLED=0 GOARCH=386 $(MAKE) vmagent-local-with-goarch
vmagent-linux-386:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=linux GOARCH=386 $(MAKE) app-local-goos-goarch
vmagent-darwin-amd64:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 $(MAKE) app-local-goos-goarch
vmagent-darwin-arm64:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 $(MAKE) app-local-goos-goarch
vmagent-freebsd-amd64:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
vmagent-openbsd-amd64:
APP_NAME=vmagent CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
vmagent-windows-amd64:
GOARCH=amd64 APP_NAME=vmagent $(MAKE) app-local-windows-goarch
vmagent-local-with-goarch:
APP_NAME=vmagent $(MAKE) app-local-with-goarch
vmagent-pure:
APP_NAME=vmagent $(MAKE) app-local-pure

File diff suppressed because it is too large Load Diff

View File

@@ -5,33 +5,32 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
)
// InsertHandler processes csv data from req.
func InsertHandler(at *auth.Token, req *http.Request) error {
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -65,11 +64,8 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
if at != nil {
rowsTenantInserted.Get(at).Add(len(rows))
}
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View File

@@ -1,98 +0,0 @@
package datadog
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="datadog"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="datadog"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="datadog"}`)
)
// InsertHandlerForHTTP processes remote write for DataDog POST /api/v1/series request.
//
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
ce := req.Header.Get("Content-Encoding")
return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels)
})
}
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: ss.Metric,
})
if ss.Host != "" {
labels = append(labels, prompbmarshal.Label{
Name: "host",
Value: ss.Host,
})
}
if ss.Device != "" {
labels = append(labels, prompbmarshal.Label{
Name: "device",
Value: ss.Device,
})
}
for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
labels = append(labels, prompbmarshal.Label{
Name: name,
Value: value,
})
}
labels = append(labels, extraLabels...)
samplesLen := len(samples)
for _, pt := range ss.Points {
samples = append(samples, prompbmarshal.Sample{
Timestamp: pt.Timestamp(),
Value: pt.Value(),
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,7 +20,9 @@ var (
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error {
return stream.Parse(r, insertRows)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
}
func insertRows(rows []parser.Row) error {
@@ -56,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(nil, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View File

@@ -8,59 +8,59 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
measurementFieldSeparator = flag.String("influxMeasurementFieldSeparator", "_", "Separator for '{measurement}{separator}{field_name}' metric name when inserted via InfluxDB line protocol")
skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metric name if InfluxDB line contains only a single field")
measurementFieldSeparator = flag.String("influxMeasurementFieldSeparator", "_", "Separator for '{measurement}{separator}{field_name}' metric name when inserted via Influx line protocol")
skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if Influx line contains only a single field")
skipMeasurement = flag.Bool("influxSkipMeasurement", false, "Uses '{field_name}' as a metric name while ignoring '{measurement}' and '-influxMeasurementFieldSeparator'")
dbLabel = flag.String("influxDBLabel", "db", "Default label for the DB name sent over '?db={db_name}' query parameter")
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="influx"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`)
)
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil)
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
})
}
// InsertHandlerForHTTP processes remote write for influx line protocol.
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
func InsertHandlerForHTTP(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels)
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
})
}
func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
@@ -77,7 +77,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
hasDBKey := false
for j := range r.Tags {
tag := &r.Tags[j]
if tag.Key == *dbLabel {
if tag.Key == "db" {
hasDBKey = true
}
commonLabels = append(commonLabels, prompbmarshal.Label{
@@ -87,7 +87,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
}
if len(db) > 0 && !hasDBKey {
commonLabels = append(commonLabels, prompbmarshal.Label{
Name: *dbLabel,
Name: "db",
Value: db,
})
}
@@ -96,8 +96,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
if !*skipMeasurement {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...)
}
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
skipFieldKey := len(r.Measurement) > 0 && len(r.Fields) == 1 && *skipSingleField
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
}
@@ -130,11 +129,8 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
ctx.ctx.Labels = labels
ctx.ctx.Samples = samples
ctx.commonLabels = commonLabels
remotewrite.Push(at, &ctx.ctx.WriteRequest)
remotewrite.Push(&ctx.ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil

View File

@@ -1,10 +1,8 @@
package main
import (
"embed"
"flag"
"fmt"
"io"
"net/http"
"os"
"strings"
@@ -12,24 +10,19 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentelemetry"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/influxutils"
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
@@ -38,39 +31,24 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+
"Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . "+
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to `http://<vmagent>:8429/write`")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpenTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
)
var (
@@ -80,12 +58,6 @@ var (
opentsdbhttpServer *opentsdbhttpserver.Server
)
var (
//go:embed static
staticFiles embed.FS
staticServer = http.FileServer(http.FS(staticFiles))
)
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
@@ -94,26 +66,22 @@ func main() {
remotewrite.InitSecretFlags()
buildinfo.Init()
logger.Init()
pushmetrics.Init()
if promscrape.IsDryRun() {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exiting with 0 status code")
logger.Infof("-promscrape.config is ok; exitting with 0 status code")
return
}
if *dryRun {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
if err := remotewrite.CheckRelabelConfigs(); err != nil {
logger.Fatalf("error when checking relabel configs: %s", err)
}
if err := remotewrite.CheckStreamAggrConfigs(); err != nil {
logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
logger.Infof("all the configs are ok; exiting with 0 status code")
logger.Infof("all the configs are ok; exitting with 0 status code")
return
}
@@ -121,27 +89,24 @@ func main() {
startTime := time.Now()
remotewrite.Init()
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
})
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
}
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
}
if len(*opentsdbListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler)
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler)
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
}
promscrape.Init(remotewrite.Push)
if len(*httpListenAddr) > 0 {
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
go httpserver.Serve(*httpListenAddr, requestHandler)
}
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())
@@ -177,232 +142,84 @@ func main() {
logger.Infof("successfully stopped vmagent in %.3f seconds", time.Since(startTime).Seconds())
}
func getOpenTSDBHTTPInsertHandler() func(req *http.Request) error {
if !remotewrite.MultitenancyEnabled() {
return func(req *http.Request) error {
path := strings.Replace(req.URL.Path, "//", "/", -1)
if path != "/api/put" {
return fmt.Errorf("unsupported path requested: %q; expecting '/api/put'", path)
}
return opentsdbhttp.InsertHandler(nil, req)
}
}
return func(req *http.Request) error {
path := strings.Replace(req.URL.Path, "//", "/", -1)
at, err := getAuthTokenFromPath(path)
if err != nil {
return fmt.Errorf("cannot obtain auth token from path %q: %w", path, err)
}
return opentsdbhttp.InsertHandler(at, req)
}
}
func getAuthTokenFromPath(path string) (*auth.Token, error) {
p, err := httpserver.ParsePath(path)
if err != nil {
return nil, fmt.Errorf("cannot parse multitenant path: %w", err)
}
if p.Prefix != "insert" {
return nil, fmt.Errorf(`unsupported multitenant prefix: %q; expected "insert"`, p.Prefix)
}
if p.Suffix != "opentsdb/api/put" {
return nil, fmt.Errorf("unsupported path requested: %q; expecting 'opentsdb/api/put'", p.Suffix)
}
return auth.NewToken(p.AuthToken)
}
func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if r.URL.Path == "/" {
if r.Method != http.MethodGet {
return false
}
w.Header().Add("Content-Type", "text/html; charset=utf-8")
fmt.Fprintf(w, "<h2>vmagent</h2>")
fmt.Fprintf(w, "See docs at <a href='https://docs.victoriametrics.com/vmagent.html'>https://docs.victoriametrics.com/vmagent.html</a></br>")
fmt.Fprintf(w, "Useful endpoints:</br>")
httpserver.WriteAPIHelp(w, [][2]string{
{"targets", "status for discovered active targets"},
{"service-discovery", "labels before and after relabeling for discovered targets"},
{"metric-relabel-debug", "debug metric relabeling"},
{"api/v1/targets", "advanced information about discovered targets in JSON format"},
{"config", "-promscrape.config contents"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
{"-/reload", "reload configuration"},
})
fmt.Fprintf(w, "vmagent - see docs at https://victoriametrics.github.io/vmagent.html")
return true
}
path := strings.Replace(r.URL.Path, "//", "/", -1)
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus") || strings.HasPrefix(path, "/api/v1/import/prometheus") {
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(nil, r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
statusCode := http.StatusNoContent
if strings.HasPrefix(path, "/prometheus/api/v1/import/prometheus/metrics/job/") ||
strings.HasPrefix(path, "/api/v1/import/prometheus/metrics/job/") {
// Return 200 status code for pushgateway requests.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636
statusCode = http.StatusOK
}
w.WriteHeader(statusCode)
return true
}
if strings.HasPrefix(path, "datadog/") {
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
path = strings.TrimSuffix(path, "/")
}
switch path {
case "/prometheus/api/v1/write", "/api/v1/write":
if common.HandleVMProtoServerHandshake(w, r) {
return true
}
case "/api/v1/write":
prometheusWriteRequests.Inc()
if err := promremotewrite.InsertHandler(nil, r); err != nil {
if err := promremotewrite.InsertHandler(r); err != nil {
prometheusWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/prometheus/api/v1/import", "/api/v1/import":
case "/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(nil, r); err != nil {
if err := vmimport.InsertHandler(r); err != nil {
vmimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/prometheus/api/v1/import/csv", "/api/v1/import/csv":
case "/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(nil, r); err != nil {
if err := csvimport.InsertHandler(r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/prometheus/api/v1/import/native", "/api/v1/import/native":
case "/api/v1/import/prometheus":
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(nil, r); err != nil {
if err := native.InsertHandler(r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/influx/write", "/influx/api/v2/write", "/write", "/api/v2/write":
case "/write", "/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(nil, r); err != nil {
if err := influx.InsertHandlerForHTTP(r); err != nil {
influxWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/influx/query", "/query":
case "/query":
// Emulate fake response for influx query.
// This is required for TSBS benchmark.
influxQueryRequests.Inc()
influxutils.WriteDatabaseNames(w)
fmt.Fprintf(w, `{"results":[{"series":[{"values":[]}]}]}`)
return true
case "/opentelemetry/api/v1/push":
opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(nil, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "/datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(nil, r); err != nil {
datadogWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"valid":true}`)
return true
case "/datadog/api/v1/check_run":
datadogCheckRunRequests.Inc()
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "/datadog/intake":
datadogIntakeRequests.Inc()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/datadog/api/v1/metadata":
datadogMetadataRequests.Inc()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "/prometheus/targets", "/targets":
case "/targets":
promscrapeTargetsRequests.Inc()
promscrape.WriteHumanReadableTargetsStatus(w, r)
return true
case "/prometheus/service-discovery", "/service-discovery":
promscrapeServiceDiscoveryRequests.Inc()
promscrape.WriteServiceDiscovery(w, r)
return true
case "/prometheus/metric-relabel-debug", "/metric-relabel-debug":
promscrapeMetricRelabelDebugRequests.Inc()
promscrape.WriteMetricRelabelDebug(w, r)
return true
case "/prometheus/target-relabel-debug", "/target-relabel-debug":
promscrapeTargetRelabelDebugRequests.Inc()
promscrape.WriteTargetRelabelDebug(w, r)
return true
case "/prometheus/api/v1/targets", "/api/v1/targets":
case "/api/v1/targets":
promscrapeAPIV1TargetsRequests.Inc()
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
state := r.FormValue("state")
promscrape.WriteAPIV1Targets(w, state)
return true
case "/prometheus/target_response", "/target_response":
promscrapeTargetResponseRequests.Inc()
if err := promscrape.WriteTargetResponse(w, r); err != nil {
promscrapeTargetResponseErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
return true
case "/prometheus/config", "/config":
if !httpserver.CheckAuthFlag(w, r, *configAuthKey, "configAuthKey") {
return true
}
promscrapeConfigRequests.Inc()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
promscrape.WriteConfigData(w)
return true
case "/prometheus/api/v1/status/config", "/api/v1/status/config":
// See https://prometheus.io/docs/prometheus/latest/querying/api/#config
if !httpserver.CheckAuthFlag(w, r, *configAuthKey, "configAuthKey") {
return true
}
promscrapeStatusConfigRequests.Inc()
w.Header().Set("Content-Type", "application/json")
var bb bytesutil.ByteBuffer
promscrape.WriteConfigData(&bb)
fmt.Fprintf(w, `{"status":"success","data":{"yaml":%q}}`, bb.B)
return true
case "/prometheus/-/reload", "/-/reload":
case "/-/reload":
promscrapeConfigReloadRequests.Inc()
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
@@ -417,145 +234,8 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
w.Write([]byte("OK"))
}
return true
default:
if strings.HasPrefix(r.URL.Path, "/static") {
staticServer.ServeHTTP(w, r)
return true
}
if remotewrite.MultitenancyEnabled() {
return processMultitenantRequest(w, r, path)
}
return false
}
}
func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path string) bool {
p, err := httpserver.ParsePath(path)
if err != nil {
// Cannot parse multitenant path. Skip it - probably it will be parsed later.
return false
}
if p.Prefix != "insert" {
httpserver.Errorf(w, r, `unsupported multitenant prefix: %q; expected "insert"`, p.Prefix)
return true
}
at, err := auth.NewToken(p.AuthToken)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain auth token: %s", err)
return true
}
if strings.HasPrefix(p.Suffix, "prometheus/api/v1/import/prometheus") {
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(at, r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
}
if strings.HasPrefix(p.Suffix, "datadog/") {
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
p.Suffix = strings.TrimSuffix(p.Suffix, "/")
}
switch p.Suffix {
case "prometheus/", "prometheus", "prometheus/api/v1/write":
prometheusWriteRequests.Inc()
if err := promremotewrite.InsertHandler(at, r); err != nil {
prometheusWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(at, r); err != nil {
vmimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(at, r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(at, r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "influx/write", "influx/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
influxWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "influx/query":
influxQueryRequests.Inc()
influxutils.WriteDatabaseNames(w)
return true
case "opentelemetry/api/v1/push":
opentelemetryPushRequests.Inc()
if err := opentelemetry.InsertHandler(at, r); err != nil {
opentelemetryPushErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusOK)
return true
case "datadog/api/v1/series":
datadogWriteRequests.Inc()
if err := datadog.InsertHandlerForHTTP(at, r); err != nil {
datadogWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/api/v1/validate":
datadogValidateRequests.Inc()
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"valid":true}`)
return true
case "datadog/api/v1/check_run":
datadogCheckRunRequests.Inc()
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(202)
fmt.Fprintf(w, `{"status":"ok"}`)
return true
case "datadog/intake":
datadogIntakeRequests.Inc()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
case "datadog/api/v1/metadata":
datadogMetadataRequests.Inc()
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
return true
default:
httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix)
return true
}
return false
}
var (
@@ -574,36 +254,14 @@ var (
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/influx/write", protocol="influx"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/influx/query", protocol="influx"}`)
datadogWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/datadog/api/v1/series", protocol="datadog"}`)
datadogValidateRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/validate", protocol="datadog"}`)
datadogCheckRunRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/check_run", protocol="datadog"}`)
datadogIntakeRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/intake", protocol="datadog"}`)
datadogMetadataRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/datadog/api/v1/metadata", protocol="datadog"}`)
opentelemetryPushRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
opentelemetryPushErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/opentelemetry/api/v1/push", protocol="opentelemetry"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeServiceDiscoveryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/service-discovery"}`)
promscrapeMetricRelabelDebugRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/metric-relabel-debug"}`)
promscrapeTargetRelabelDebugRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/target-relabel-debug"}`)
influxQueryRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/query", protocol="influx"}`)
promscrapeTargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/targets"}`)
promscrapeAPIV1TargetsRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/targets"}`)
promscrapeTargetResponseRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/target_response"}`)
promscrapeTargetResponseErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/target_response"}`)
promscrapeConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/config"}`)
promscrapeStatusConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/status/config"}`)
promscrapeConfigReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
)
@@ -611,7 +269,7 @@ func usage() {
const s = `
vmagent collects metrics data via popular data ingestion protocols and routes it to VictoriaMetrics.
See the docs at https://docs.victoriametrics.com/vmagent.html .
See the docs at https://victoriametrics.github.io/vmagent.html .
`
flagutil.Usage(s)
}

View File

@@ -1,12 +0,0 @@
# See https://medium.com/on-docker/use-multi-stage-builds-to-inject-ca-certs-ad1e8f01de1b
ARG certs_image
ARG root_image
FROM $certs_image as certs
RUN apk update && apk upgrade && apk --update --no-cache add ca-certificates
FROM $root_image
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
EXPOSE 8429
ENTRYPOINT ["/vmagent-prod"]
ARG TARGETARCH
COPY vmagent-linux-${TARGETARCH}-prod ./vmagent-prod

View File

@@ -5,37 +5,36 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(at *auth.Token, req *http.Request) error {
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
return stream.Parse(req.Body, isGzip, func(block *stream.Block) error {
return insertRows(at, block, extraLabels)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
})
}
func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal.Label) error {
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -43,9 +42,6 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsLen)
}
rowsPerInsert.Update(float64(rowsLen))
tssDst := ctx.WriteRequest.Timeseries[:0]
@@ -84,6 +80,6 @@ func insertRows(at *auth.Token, block *stream.Block, extraLabels []prompbmarshal
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
return nil
}

View File

@@ -1,65 +0,0 @@
package opentelemetry
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="opentelemetry"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="opentelemetry"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="opentelemetry"}`)
)
// InsertHandler processes opentelemetry metrics.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return stream.ParseStream(req.Body, isGzipped, func(tss []prompbmarshal.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}
func insertRows(at *auth.Token, tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
rowsTotal := 0
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range tss {
ts := &tss[i]
rowsTotal += len(ts.Samples)
labelsLen := len(labels)
labels = append(labels, ts.Labels...)
labels = append(labels, extraLabels...)
samplesLen := len(samples)
samples = append(samples, ts.Samples...)
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,7 +20,9 @@ var (
//
// See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error {
return stream.Parse(r, insertRows)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
}
func insertRows(rows []parser.Row) error {
@@ -56,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(nil, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View File

@@ -5,11 +5,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@@ -20,17 +19,19 @@ var (
// InsertHandler processes HTTP OpenTSDB put requests.
// See http://opentsdb.net/docs/build/html/api_http/put.html
func InsertHandler(at *auth.Token, req *http.Request) error {
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -64,7 +65,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View File

@@ -5,24 +5,20 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="prometheus"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`)
)
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(at *auth.Token, req *http.Request) error {
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
@@ -31,15 +27,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
}, nil)
})
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -73,11 +69,8 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(len(rows))
if at != nil {
rowsTenantInserted.Get(at).Add(len(rows))
}
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View File

@@ -1,60 +0,0 @@
package prometheusimport
import (
"bytes"
"flag"
"log"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
)
var (
srv *httptest.Server
testOutput *bytes.Buffer
)
func TestInsertHandler(t *testing.T) {
setUp()
defer tearDown()
req := httptest.NewRequest(http.MethodPost, "/insert/0/api/v1/import/prometheus", bytes.NewBufferString(`{"foo":"bar"}
go_memstats_alloc_bytes_total 1`))
if err := InsertHandler(nil, req); err != nil {
t.Errorf("unxepected error %s", err)
}
expectedMsg := "cannot unmarshal Prometheus line"
if !strings.Contains(testOutput.String(), expectedMsg) {
t.Errorf("output %q should contain %q", testOutput.String(), expectedMsg)
}
}
func setUp() {
srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(204)
}))
flag.Parse()
remoteWriteFlag := "remoteWrite.url"
if err := flag.Lookup(remoteWriteFlag).Value.Set(srv.URL); err != nil {
log.Fatalf("unable to set %q with value %q, err: %v", remoteWriteFlag, srv.URL, err)
}
logger.Init()
common.StartUnmarshalWorkers()
remotewrite.Init()
testOutput = &bytes.Buffer{}
logger.SetOutputForTests(testOutput)
}
func tearDown() {
common.StopUnmarshalWorkers()
srv.Close()
logger.ResetOutputForTest()
tmpDataDir := flag.Lookup("remoteWrite.tmpDataPath").Value.String()
fs.MustRemoveAll(tmpDataDir)
}

View File

@@ -5,35 +5,34 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`)
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`)
)
// InsertHandler processes remote write for prometheus.
func InsertHandler(at *auth.Token, req *http.Request) error {
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
isVMRemoteWrite := req.Header.Get("Content-Encoding") == "zstd"
return stream.Parse(req.Body, isVMRemoteWrite, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
})
}
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -69,11 +68,8 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(at, &ctx.WriteRequest)
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if at != nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

Some files were not shown because too many files have changed in this diff Show More