mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
18 Commits
query-debu
...
storage-no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1272a7f743 | ||
|
|
2b39ee785c | ||
|
|
842bf78cb1 | ||
|
|
5420989018 | ||
|
|
9ff8b312bb | ||
|
|
130b9cd04e | ||
|
|
88bfad9535 | ||
|
|
e44c6f38c2 | ||
|
|
96a62a275a | ||
|
|
20b9c8007b | ||
|
|
3df456dd35 | ||
|
|
7402ee0801 | ||
|
|
5ac1e77520 | ||
|
|
71e729f3f8 | ||
|
|
8729ec174b | ||
|
|
84184b707a | ||
|
|
41e217423f | ||
|
|
8d8073a24d |
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
@@ -12,7 +14,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -25,7 +26,7 @@ var (
|
||||
func InsertHandler(c net.Conn) error {
|
||||
// There is no need in response compression, since
|
||||
// lower-level vminsert sends only small packets to upper-level vminsert.
|
||||
bc, err := handshake.VMInsertServer(c, 0)
|
||||
bc, err := handshake.VMInsertServer(c, 0, netstorage.GetNodeID())
|
||||
if err != nil {
|
||||
if errors.Is(err, handshake.ErrIgnoreHealthcheck) {
|
||||
return nil
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
package netstorage
|
||||
|
||||
import (
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// See the following docs:
|
||||
// - https://www.eecs.umich.edu/techreports/cse/96/CSE-TR-316-96.pdf
|
||||
// - https://github.com/dgryski/go-rendezvous
|
||||
@@ -13,14 +9,10 @@ type consistentHash struct {
|
||||
nodeHashes []uint64
|
||||
}
|
||||
|
||||
func newConsistentHash(nodes []string, hashSeed uint64) *consistentHash {
|
||||
nodeHashes := make([]uint64, len(nodes))
|
||||
for i, node := range nodes {
|
||||
nodeHashes[i] = xxhash.Sum64([]byte(node))
|
||||
}
|
||||
func newConsistentHash(ids []uint64, hashSeed uint64) *consistentHash {
|
||||
return &consistentHash{
|
||||
hashSeed: hashSeed,
|
||||
nodeHashes: nodeHashes,
|
||||
nodeHashes: ids,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,16 +4,18 @@ import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
func TestConsistentHash(t *testing.T) {
|
||||
r := rand.New(rand.NewSource(1))
|
||||
|
||||
nodes := []string{
|
||||
"node1",
|
||||
"node2",
|
||||
"node3",
|
||||
"node4",
|
||||
nodes := []uint64{
|
||||
xxhash.Sum64String("node1"),
|
||||
xxhash.Sum64String("node2"),
|
||||
xxhash.Sum64String("node3"),
|
||||
xxhash.Sum64String("node4"),
|
||||
}
|
||||
rh := newConsistentHash(nodes, 0)
|
||||
|
||||
|
||||
@@ -4,16 +4,19 @@ import (
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
func BenchmarkConsistentHash(b *testing.B) {
|
||||
nodes := []string{
|
||||
"node1",
|
||||
"node2",
|
||||
"node3",
|
||||
"node4",
|
||||
nodes := []uint64{
|
||||
xxhash.Sum64String("node1"),
|
||||
xxhash.Sum64String("node2"),
|
||||
xxhash.Sum64String("node3"),
|
||||
xxhash.Sum64String("node4"),
|
||||
}
|
||||
rh := newConsistentHash(nodes, 0)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(benchKeys)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -12,7 +14,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// InsertCtx is a generic context for inserting data.
|
||||
|
||||
@@ -6,10 +6,14 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@@ -21,8 +25,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -43,6 +45,9 @@ var (
|
||||
"On the other side, disabled re-routing minimizes the number of active time series in the cluster "+
|
||||
"during rolling restarts and during spikes in series churn rate. "+
|
||||
"See also -disableRerouting")
|
||||
usePersistentStorageNodeID = flag.Bool("vmstorageUsePersistentID", false, "Whether to use persistent storage node ID for -storageNode instances. "+
|
||||
"If set to false uses storage node address in order to generate an ID. "+
|
||||
"Using persistent node ID is useful if vmstorage node address changes over time, e.g. due to dynamic IP addresses or DNS names. ")
|
||||
)
|
||||
|
||||
var errStorageReadOnly = errors.New("storage node is read only")
|
||||
@@ -278,7 +283,7 @@ func (sn *storageNode) checkHealth() {
|
||||
}
|
||||
return
|
||||
}
|
||||
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
|
||||
logger.Infof("successfully dialed -storageNode=%q (node ID: %d)", sn.dialer.Addr(), sn.id.Load())
|
||||
sn.lastDialErr = nil
|
||||
sn.bc = bc
|
||||
sn.isBroken.Store(false)
|
||||
@@ -398,15 +403,24 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
|
||||
if *disableRPCCompression {
|
||||
compressionLevel = 0
|
||||
}
|
||||
bc, err := handshake.VMInsertClient(c, compressionLevel)
|
||||
bc, id, err := handshake.VMInsertClient(c, compressionLevel)
|
||||
if err != nil {
|
||||
_ = c.Close()
|
||||
sn.handshakeErrors.Inc()
|
||||
return nil, fmt.Errorf("handshake error: %w", err)
|
||||
}
|
||||
sn.id.CompareAndSwap(0, id)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) getID() uint64 {
|
||||
// Ensure that the id is populated
|
||||
if sn.id.Load() == 0 {
|
||||
sn.checkHealth()
|
||||
}
|
||||
return sn.id.Load()
|
||||
}
|
||||
|
||||
// storageNode is a client sending data to vmstorage node.
|
||||
type storageNode struct {
|
||||
// isBroken is set to true if the given vmstorage node is temporarily unhealthy.
|
||||
@@ -473,6 +487,9 @@ type storageNode struct {
|
||||
// The total duration spent for sending data to vmstorage node.
|
||||
// This metric is useful for determining the saturation of vminsert->vmstorage link.
|
||||
sendDurationSeconds *metrics.FloatCounter
|
||||
|
||||
// id is a unique identifier for the storage node.
|
||||
id atomic.Uint64
|
||||
}
|
||||
|
||||
type storageNodesBucket struct {
|
||||
@@ -515,14 +532,31 @@ func MustStop() {
|
||||
mustStopStorageNodes(snb)
|
||||
}
|
||||
|
||||
// GetNodeID returns unique identifier for underlying storage nodes.
|
||||
func GetNodeID() uint64 {
|
||||
snb := getStorageNodesBucket()
|
||||
snIDs := make([]uint64, 0, len(snb.sns))
|
||||
for _, sn := range snb.sns {
|
||||
snIDs = append(snIDs, sn.getID())
|
||||
}
|
||||
slices.Sort(snIDs)
|
||||
idsM := make([]byte, 0)
|
||||
for _, id := range snIDs {
|
||||
idsM = encoding.MarshalUint64(idsM, id)
|
||||
}
|
||||
|
||||
return xxhash.Sum64(idsM)
|
||||
}
|
||||
|
||||
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
if len(addrs) == 0 {
|
||||
logger.Panicf("BUG: addrs must be non-empty")
|
||||
}
|
||||
ms := metrics.NewSet()
|
||||
nodesHash := newConsistentHash(addrs, hashSeed)
|
||||
sns := make([]*storageNode, 0, len(addrs))
|
||||
brokenNodes := make([]*storageNode, 0)
|
||||
stopCh := make(chan struct{})
|
||||
nodeIDs := make([]uint64, 0, len(addrs))
|
||||
for _, addr := range addrs {
|
||||
if _, _, err := net.SplitHostPort(addr); err != nil {
|
||||
// Automatically add missing port.
|
||||
@@ -568,10 +602,22 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
}
|
||||
return 0
|
||||
})
|
||||
var nodeID uint64
|
||||
if *usePersistentStorageNodeID {
|
||||
nodeID = sn.getID()
|
||||
if nodeID == 0 {
|
||||
brokenNodes = append(brokenNodes, sn)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
nodeID = xxhash.Sum64String(addr)
|
||||
}
|
||||
nodeIDs = append(nodeIDs, nodeID)
|
||||
sns = append(sns, sn)
|
||||
}
|
||||
nodesHash := newConsistentHash(nodeIDs, hashSeed)
|
||||
|
||||
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
|
||||
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(addrs)
|
||||
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
|
||||
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
|
||||
}
|
||||
@@ -586,7 +632,12 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
wg: &wg,
|
||||
}
|
||||
|
||||
for idx, sn := range sns {
|
||||
// add broken nodes to the end of the list
|
||||
// this is needed because consistent hash slots will be populated with IDs of available
|
||||
// storage nodes (if there are any) and indexes of consistent hash must be linked to healthy storage nodes
|
||||
snb.sns = append(snb.sns, brokenNodes...)
|
||||
|
||||
for idx, sn := range snb.sns {
|
||||
wg.Add(1)
|
||||
go func(sn *storageNode, idx int) {
|
||||
sn.run(snb, idx)
|
||||
@@ -594,6 +645,28 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
|
||||
}(sn, idx)
|
||||
}
|
||||
|
||||
// Watch for node become healthy and rebuild snb.
|
||||
for _, sn := range brokenNodes {
|
||||
wg.Add(1)
|
||||
sn := sn
|
||||
go watchStorageNodeHealthy(sn, func() {
|
||||
defer wg.Done()
|
||||
// rebuild snb in order to update consistent hash with an ID of the healthy storage node
|
||||
for {
|
||||
currentSnb := getStorageNodesBucket()
|
||||
newSnb := initStorageNodes(addrs, hashSeed)
|
||||
if !storageNodes.CompareAndSwap(currentSnb, newSnb) {
|
||||
// snb has been changed, so we need to stop the newSnb and try again
|
||||
mustStopStorageNodes(newSnb)
|
||||
continue
|
||||
}
|
||||
// stop previous snb and exit
|
||||
mustStopStorageNodes(currentSnb)
|
||||
break
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return snb
|
||||
}
|
||||
|
||||
@@ -606,6 +679,34 @@ func mustStopStorageNodes(snb *storageNodesBucket) {
|
||||
metrics.UnregisterSet(snb.ms, true)
|
||||
}
|
||||
|
||||
// watchStorageNodeHealthy watches for sn become healthy and calls cb once it is ready.
|
||||
func watchStorageNodeHealthy(sn *storageNode, cb func()) {
|
||||
for {
|
||||
sn.brLock.Lock()
|
||||
for !sn.isReady() {
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
sn.brLock.Unlock()
|
||||
return
|
||||
default:
|
||||
sn.brCond.Wait()
|
||||
}
|
||||
}
|
||||
sn.brLock.Unlock()
|
||||
|
||||
select {
|
||||
case <-sn.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if sn.isReady() {
|
||||
cb()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
|
||||
//
|
||||
// The function blocks until src is fully re-routed.
|
||||
|
||||
@@ -31,7 +31,9 @@ var (
|
||||
|
||||
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
|
||||
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
|
||||
api := &vmstorageAPI{}
|
||||
api := &vmstorageAPI{
|
||||
nodeID: netstorage.GetNodeID(),
|
||||
}
|
||||
limits := vmselectapi.Limits{
|
||||
MaxLabelNames: *maxTagKeys,
|
||||
MaxLabelValues: *maxTagValues,
|
||||
@@ -45,7 +47,9 @@ func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
|
||||
}
|
||||
|
||||
// vmstorageAPI impelements vmselectapi.API
|
||||
type vmstorageAPI struct{}
|
||||
type vmstorageAPI struct {
|
||||
nodeID uint64
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
||||
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
|
||||
@@ -112,6 +116,10 @@ func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []stora
|
||||
return netstorage.RegisterMetricNames(qt, mrs, dl)
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetID() uint64 {
|
||||
return api.nodeID
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
workCh chan workItem
|
||||
|
||||
@@ -2107,6 +2107,9 @@ type storageNode struct {
|
||||
|
||||
// The number of list tenants errors to storageNode.
|
||||
tenantsErrors *metrics.Counter
|
||||
|
||||
// id is the unique identifier for the storageNode.
|
||||
id uint64
|
||||
}
|
||||
|
||||
func (sn *storageNode) registerMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
|
||||
@@ -2954,6 +2957,12 @@ func getStorageNodes() []*storageNode {
|
||||
return snb.sns
|
||||
}
|
||||
|
||||
// GetNodeID returns unique identifier of vmselect
|
||||
func GetNodeID() uint64 {
|
||||
// Returns a 0 as persistent IDs are not intended to use with multi-level setup
|
||||
return 0
|
||||
}
|
||||
|
||||
// Init initializes storage nodes' connections to the given addrs.
|
||||
//
|
||||
// MustStop must be called when the initialized connections are no longer needed.
|
||||
@@ -3015,6 +3024,7 @@ func newStorageNode(ms *metrics.Set, group *storageNodesGroup, addr string) *sto
|
||||
sn := &storageNode{
|
||||
group: group,
|
||||
connPool: connPool,
|
||||
id: connPool.GetTargetNodeID(),
|
||||
|
||||
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
|
||||
|
||||
|
||||
@@ -111,8 +111,8 @@ func main() {
|
||||
blocksCount := tm.SmallBlocksCount + tm.BigBlocksCount
|
||||
rowsCount := tm.SmallRowsCount + tm.BigRowsCount
|
||||
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
|
||||
logger.Infof("successfully opened storage %q in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
|
||||
*storageDataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
|
||||
logger.Infof("successfully opened storage %q (node ID: %d) in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
|
||||
*storageDataPath, strg.GetID(), time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
|
||||
|
||||
// register storage metrics
|
||||
storageMetrics := metrics.NewSet()
|
||||
|
||||
@@ -101,7 +101,7 @@ func (s *VMInsertServer) run() {
|
||||
// There is no need in response compression, since
|
||||
// vmstorage sends only small packets to vminsert.
|
||||
compressionLevel := 0
|
||||
bc, err := handshake.VMInsertServer(c, compressionLevel)
|
||||
bc, err := handshake.VMInsertServer(c, compressionLevel, s.storage.GetID())
|
||||
if err != nil {
|
||||
if s.isStopping() {
|
||||
// c is stopped inside VMInsertServer.MustStop
|
||||
|
||||
@@ -195,6 +195,10 @@ func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQue
|
||||
return tfss, nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) GetID() uint64 {
|
||||
return api.s.GetID()
|
||||
}
|
||||
|
||||
// blockIterator implements vmselectapi.BlockIterator
|
||||
type blockIterator struct {
|
||||
sr storage.Search
|
||||
|
||||
@@ -31,10 +31,13 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||
|
||||
**Update note 1: [vmauth](https://docs.victoriametrics.com/vmauth/) HTTP response code has changed from 503 to 502 for a case when all upstream backends were not available. This was changed to align [vmauth](https://docs.victoriametrics.com/vmauth/) behaviour with other well-known reverse-proxies behaviour. **
|
||||
|
||||
**Update note 2: release contains breaking change to inter-cluster communication. The `vmstorage` nodes with version lower than `tip` won't be able to communicate with `vmselect` and `vminsert` nodes with version lower than `tip`. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5438) issue for the details.**
|
||||
|
||||
* SECURITY: upgrade base docker image (Alpine) from 3.20.1 to 3.20.2. See [alpine 3.20.2 release notes](https://alpinelinux.org/posts/Alpine-3.20.2-released.html).
|
||||
|
||||
* FEATURE: [vmauth](./vmauth.md): add `keep_original_host` option, which can be used for proxying the original `Host` header from client request to the backend. By default the backend host is used as `Host` header when proxying requests to the configured backends. See [these docs](./vmauth.md#host-http-header).
|
||||
* FEATURE: [vmauth](./vmauth.md) now returns HTTP 502 status code when all upstream backends are not available. Previously, it returned HTTP 503 status code. This change aligns vmauth behavior with other well-known reverse-proxies behavior.
|
||||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): use a persistent `vmstorage` node ID for consistent hashing at data write path. This allows to keep the same data distribution after `vmstorage` changes its IP address. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5438).
|
||||
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly proxy requests to backend urls ending with `/` if the original request path equals to `/`. Previously the trailing `/` at the backend path was incorrectly removed. For example, if the request to `http://vmauth/` is configured to be proxied to `url_prefix=http://backend/foo/`, then it was proxied to `http://backend/foo`, while it should go to `http://backend/foo/`.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): fix `cannot read data after closing the reader` error when proxying HTTP requests without body (aka `GET` requests). The issue has been introduced in [v1.102.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0) in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7ee57974935a662896f2de40fdf613156630617d).
|
||||
|
||||
@@ -1301,6 +1301,8 @@ Below is the output for `/path/to/vminsert -help`:
|
||||
Show VictoriaMetrics version
|
||||
-vmstorageDialTimeout duration
|
||||
Timeout for establishing RPC connections from vminsert to vmstorage. See also -vmstorageUserTimeout (default 3s)
|
||||
-vmstorageUsePersistentID
|
||||
Whether to use persistent storage node ID for -storageNode instances. If set to false uses storage node address in order to generate an ID. Using persistent node ID is useful if vmstorage node address changes over time, e.g. due to dynamic IP addresses or DNS names.
|
||||
-vmstorageUserTimeout duration
|
||||
Network timeout for RPC connections from vminsert to vmstorage (Linux only). Lower values speed up re-rerouting recovery when some of vmstorage nodes become unavailable because of networking issues. Read more about TCP_USER_TIMEOUT at https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ . See also -vmstorageDialTimeout (default 3s)
|
||||
```
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
package handshake
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/valyala/fastjson"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -15,17 +22,22 @@ const (
|
||||
successResponse = "ok"
|
||||
)
|
||||
|
||||
// Func must perform handshake on the given c using the given compressionLevel.
|
||||
// ClientFunc must perform handshake on the given c using the given compressionLevel.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
|
||||
type ClientFunc func(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error)
|
||||
|
||||
// ServerFunc must perform handshake on the given c using the given compressionLevel and id.
|
||||
//
|
||||
// It must return BufferedConn wrapper for c on successful handshake.
|
||||
type ServerFunc func(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error)
|
||||
|
||||
// VMInsertClient performs client-side handshake for vminsert protocol.
|
||||
//
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
return genericClient(c, vminsertHello, compressionLevel)
|
||||
}
|
||||
|
||||
@@ -34,8 +46,8 @@ func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, vminsertHello, compressionLevel)
|
||||
func VMInsertServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
return genericServer(c, vminsertHello, compressionLevel, id)
|
||||
}
|
||||
|
||||
// VMSelectClient performs client-side handshake for vmselect protocol.
|
||||
@@ -43,7 +55,7 @@ func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the server.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
return genericClient(c, vmselectHello, compressionLevel)
|
||||
}
|
||||
|
||||
@@ -52,8 +64,8 @@ func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
// compressionLevel is the level used for compression of the data sent
|
||||
// to the client.
|
||||
// compressionLevel <= 0 means 'no compression'
|
||||
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
return genericServer(c, vmselectHello, compressionLevel)
|
||||
func VMSelectServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
return genericServer(c, vmselectHello, compressionLevel, id)
|
||||
}
|
||||
|
||||
// ErrIgnoreHealthcheck means the TCP healthckeck, which must be ignored.
|
||||
@@ -61,7 +73,11 @@ func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
|
||||
// The TCP healthcheck is performed by opening and then immediately closing the connection.
|
||||
var ErrIgnoreHealthcheck = fmt.Errorf("TCP healthcheck - ignore it")
|
||||
|
||||
func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
type handshakeMetadata struct {
|
||||
NodeID uint64 `json:"nodeId"`
|
||||
}
|
||||
|
||||
func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*BufferedConn, error) {
|
||||
if err := readMessage(c, msg); err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// This is TCP healthcheck, which must be ignored in order to prevent from logs pollution.
|
||||
@@ -86,32 +102,40 @@ func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn,
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
if err := writeMetadata(c, id); err != nil {
|
||||
return nil, fmt.Errorf("cannot write metadata: %w", err)
|
||||
}
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
|
||||
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, uint64, error) {
|
||||
if err := writeMessage(c, msg); err != nil {
|
||||
return nil, fmt.Errorf("cannot write hello: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write hello: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read success response after sending hello: %w", err)
|
||||
}
|
||||
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
|
||||
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write isCompressed flag: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read success response on isCompressed: %w", err)
|
||||
}
|
||||
isRemoteCompressed, err := readIsCompressed(c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot read isCompressed flag: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot write success response on isCompressed: %w", err)
|
||||
}
|
||||
metadata, err := readMetadata(c)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot read nodeID: %w", err)
|
||||
}
|
||||
|
||||
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
|
||||
return bc, nil
|
||||
return bc, metadata.NodeID, nil
|
||||
}
|
||||
|
||||
func writeIsCompressed(c net.Conn, isCompressed bool) error {
|
||||
@@ -164,6 +188,80 @@ func readMessage(c net.Conn, msg string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var metadataParserPool fastjson.ParserPool
|
||||
|
||||
func readMetadata(c net.Conn) (*handshakeMetadata, error) {
|
||||
metaLenBuf, err := readData(c, int(unsafe.Sizeof(uint64(0))))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read metadata length: %w", err)
|
||||
}
|
||||
|
||||
metaLen := int(encoding.UnmarshalUint64(metaLenBuf))
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on metadata length: %w", err)
|
||||
}
|
||||
metaBuf, err := readData(c, metaLen)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read metadata: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, successResponse); err != nil {
|
||||
return nil, fmt.Errorf("cannot write success response on metadata: %w", err)
|
||||
}
|
||||
parser := metadataParserPool.Get()
|
||||
defer metadataParserPool.Put(parser)
|
||||
v, err := parser.ParseBytes(metaBuf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse metadata: %w", err)
|
||||
}
|
||||
|
||||
return &handshakeMetadata{
|
||||
NodeID: v.GetUint64("nodeId"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
var (
|
||||
metadataCache sync.Map
|
||||
)
|
||||
|
||||
func getMetadataBytes(id uint64) ([]byte, error) {
|
||||
m, ok := metadataCache.Load(id)
|
||||
if !ok {
|
||||
metadata := handshakeMetadata{
|
||||
NodeID: id,
|
||||
}
|
||||
var err error
|
||||
m, err = json.Marshal(metadata)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot marshal metadata: %w", err)
|
||||
}
|
||||
metadataCache.Store(id, m)
|
||||
}
|
||||
metaV := m.([]byte)
|
||||
return metaV, nil
|
||||
}
|
||||
|
||||
func writeMetadata(c net.Conn, id uint64) error {
|
||||
meta, err := getMetadataBytes(id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain metadata bytes: %w", err)
|
||||
}
|
||||
metaLen := len(meta)
|
||||
if err := writeMessage(c, string(encoding.MarshalUint64(nil, uint64(metaLen)))); err != nil {
|
||||
return fmt.Errorf("cannot write metadata length: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return fmt.Errorf("cannot read success response on metadata length: %w", err)
|
||||
}
|
||||
if err := writeMessage(c, string(meta[:])); err != nil {
|
||||
return fmt.Errorf("cannot write metadata: %w", err)
|
||||
}
|
||||
if err := readMessage(c, successResponse); err != nil {
|
||||
return fmt.Errorf("cannot read success response on metadata: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readData(c net.Conn, dataLen int) ([]byte, error) {
|
||||
if err := c.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
|
||||
return nil, fmt.Errorf("cannot set read deadline: %w", err)
|
||||
|
||||
@@ -15,22 +15,26 @@ func TestVMSelectHandshake(t *testing.T) {
|
||||
testHandshake(t, VMSelectClient, VMSelectServer)
|
||||
}
|
||||
|
||||
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
||||
func testHandshake(t *testing.T, clientFunc ClientFunc, serverFunc ServerFunc) {
|
||||
t.Helper()
|
||||
|
||||
c, s := net.Pipe()
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
bcs, err := serverFunc(s, 3)
|
||||
bcs, err := serverFunc(s, 3, 1)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on outer handshake: %w", err)
|
||||
return
|
||||
}
|
||||
bcc, err := clientFunc(bcs, 3)
|
||||
bcc, id, err := clientFunc(bcs, 3)
|
||||
if err != nil {
|
||||
ch <- fmt.Errorf("error on inner handshake: %w", err)
|
||||
return
|
||||
}
|
||||
if id != 1 {
|
||||
ch <- fmt.Errorf("unexpected id; got %d; want 1", id)
|
||||
return
|
||||
}
|
||||
if bcc == nil {
|
||||
ch <- fmt.Errorf("expecting non-nil conn")
|
||||
return
|
||||
@@ -38,11 +42,15 @@ func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
|
||||
ch <- nil
|
||||
}()
|
||||
|
||||
bcc, err := clientFunc(c, 0)
|
||||
bcc, id, err := clientFunc(c, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error on outer handshake: %s", err)
|
||||
}
|
||||
bcs, err := serverFunc(bcc, 0)
|
||||
if id != 1 {
|
||||
t.Fatalf("unexpected id; got %d; want 2", id)
|
||||
}
|
||||
|
||||
bcs, err := serverFunc(bcc, 0, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("error on inner handshake: %s", err)
|
||||
}
|
||||
|
||||
@@ -3,12 +3,14 @@ package netutil
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// ConnPool is a connection pool with ZSTD-compressed connections.
|
||||
@@ -23,8 +25,9 @@ type ConnPool struct {
|
||||
concurrentDialsCh chan struct{}
|
||||
|
||||
name string
|
||||
handshakeFunc handshake.Func
|
||||
handshakeFunc handshake.ClientFunc
|
||||
compressionLevel int
|
||||
nodeID atomic.Uint64
|
||||
|
||||
conns []connWithTimestamp
|
||||
|
||||
@@ -49,7 +52,7 @@ type connWithTimestamp struct {
|
||||
// The compression is disabled if compressionLevel <= 0.
|
||||
//
|
||||
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
|
||||
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
|
||||
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.ClientFunc, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
|
||||
cp := &ConnPool{
|
||||
d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout),
|
||||
concurrentDialsCh: make(chan struct{}, 8),
|
||||
@@ -163,7 +166,8 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc, err := cp.handshakeFunc(c, cp.compressionLevel)
|
||||
bc, id, err := cp.handshakeFunc(c, cp.compressionLevel)
|
||||
cp.nodeID.CompareAndSwap(0, id)
|
||||
if err != nil {
|
||||
// Do not put handshake error to cp.lastDialError, because handshake
|
||||
// is perfomed on an already established connection.
|
||||
@@ -249,6 +253,16 @@ func (cp *ConnPool) checkAvailability(force bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetTargetNodeID returns the nodeID of the target server.
|
||||
func (cp *ConnPool) GetTargetNodeID() uint64 {
|
||||
// Ensure that nodeID is initialized.
|
||||
if cp.nodeID.Load() == 0 {
|
||||
cp.checkAvailability(true)
|
||||
}
|
||||
|
||||
return cp.nodeID.Load()
|
||||
}
|
||||
|
||||
func init() {
|
||||
go func() {
|
||||
for {
|
||||
|
||||
@@ -10,6 +10,8 @@ const (
|
||||
|
||||
appliedRetentionFilename = "appliedRetention.txt"
|
||||
resetCacheOnStartupFilename = "reset_cache_on_startup"
|
||||
|
||||
nodeIDFilename = "node_id.bin"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
@@ -15,6 +16,9 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -29,8 +33,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -64,6 +66,9 @@ type Storage struct {
|
||||
cachePath string
|
||||
retentionMsecs int64
|
||||
|
||||
// Used to uniquely identify storage node
|
||||
nodeID uint64
|
||||
|
||||
// lock file for exclusive access to the storage on the given path.
|
||||
flockF *os.File
|
||||
|
||||
@@ -244,6 +249,23 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD
|
||||
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
|
||||
fs.MustMkdirIfNotExist(metadataDir)
|
||||
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
|
||||
nodeIDPath := filepath.Join(metadataDir, nodeIDFilename)
|
||||
if fs.IsPathExist(nodeIDPath) {
|
||||
r, err := os.Open(nodeIDPath)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open nodeID file %q: %s", nodeIDPath, err)
|
||||
}
|
||||
|
||||
nodeID, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read nodeID from %q: %s", nodeIDPath, err)
|
||||
}
|
||||
s.nodeID = encoding.UnmarshalUint64(nodeID)
|
||||
} else {
|
||||
nodeID := rand.Uint64()
|
||||
s.nodeID = nodeID
|
||||
s.mustSaveNodeID()
|
||||
}
|
||||
|
||||
// Load indexdb
|
||||
idbPath := filepath.Join(path, indexdbDirname)
|
||||
@@ -1032,6 +1054,16 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
||||
return hm
|
||||
}
|
||||
|
||||
func (s *Storage) mustSaveNodeID() {
|
||||
path := filepath.Join(s.path, metadataDirname, nodeIDFilename)
|
||||
dst := make([]byte, 0)
|
||||
dst = encoding.MarshalUint64(dst, s.nodeID)
|
||||
|
||||
if err := os.WriteFile(path, dst, 0644); err != nil {
|
||||
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
||||
name := "next_day_metric_ids_v2"
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
@@ -1609,6 +1641,11 @@ func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uin
|
||||
return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
// GetID returns a unique identifier for the storage node.
|
||||
func (s *Storage) GetID() uint64 {
|
||||
return s.nodeID
|
||||
}
|
||||
|
||||
// MetricRow is a metric to insert into storage.
|
||||
type MetricRow struct {
|
||||
// MetricNameRaw contains raw metric name, which must be decoded
|
||||
|
||||
@@ -38,6 +38,9 @@ type API interface {
|
||||
|
||||
// Tenants returns list of tenants in the storage on the given tr.
|
||||
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
|
||||
|
||||
// GetID returns the ID of the node.
|
||||
GetID() uint64
|
||||
}
|
||||
|
||||
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
@@ -20,7 +22,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// Server processes vmselect requests.
|
||||
@@ -193,7 +194,7 @@ func (s *Server) run() {
|
||||
if s.disableResponseCompression {
|
||||
compressionLevel = 0
|
||||
}
|
||||
bc, err := handshake.VMSelectServer(c, compressionLevel)
|
||||
bc, err := handshake.VMSelectServer(c, compressionLevel, s.api.GetID())
|
||||
if err != nil {
|
||||
if s.isStopping() {
|
||||
// c is closed inside Server.MustStop
|
||||
|
||||
Reference in New Issue
Block a user