Compare commits

...

18 Commits

Author SHA1 Message Date
Zakhar Bessarab
1272a7f743 app/vminsert/netstorage: refactor snb rebuild
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 17:23:22 +04:00
Zakhar Bessarab
2b39ee785c app/vmselect: send static empty node ID for multi-level setup
Multi-level vmselect setup is not intended to use storage node IDs, so it is safe to return 0 here.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:23 +04:00
Zakhar Bessarab
842bf78cb1 app/vminsert/netstorage: sync comment
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:23 +04:00
Zakhar Bessarab
5420989018 app/vminsert/netstorage: reinitialize snb on vmstorage connection restore
It is needed to rebuild snb in order to ensure that list of storage nodes and consistent hash are in sync.
Updating just consistent hash ring is not safe because it can cause misalignment of indexes of alive nodes in snb.sns and hash slots.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:23 +04:00
Zakhar Bessarab
9ff8b312bb app/vminsert/netstorage: use correct snb reference
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
130b9cd04e app/vminsert/netstorage: make linter happy
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
88bfad9535 app/vminsert/netstorage: exclude unavailable nodes from consistent hash on start
Exclude unhealthy storage nodes from consistent hash in case persistent storage node IDs are enabled.
This is needed in order to avoid uneven distribution of load due to default(uint64(0)) IDs assigned to storage nodes.

Remove generating fallback ID from node IP address as this will cause a re-distribution of series once storage node will become available and will change its ID.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
e44c6f38c2 app/vminsert/netstorage: print storage node IDs in logs
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
96a62a275a lib/handshake: use a json payload for metadata exchange
Update the handshake to use an arbitrary JSON payload to transfer metadata.
Handshake sends the metadata length first as an uint64 and then the metadata itself.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
20b9c8007b lib/storage: print node ID in startup log
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
3df456dd35 lib/storage: don't save persistent node ID on shutdown
It is supposed to be saved right after it was generated, there is no reason re-write it again.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
7402ee0801 lib/storage: fallback to address-based ID
Generate an ID based on storage node address if storage node is not available.
This is needed in order to prevent uneven load distribution if some storage nodes are not available when vminsert is starting.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
5ac1e77520 lib/storage: save storage ID after init
This helps to avoid re-creating a storage ID in case of unclean shutdown.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
71e729f3f8 app/vminsert: disable usage of persistent storage node ID by default
This is needed in order to avoid complete data re-sharding after the upgrade to a new version.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:22 +04:00
Zakhar Bessarab
8729ec174b docs/changelog: add info about persisting vmstorage node ID
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:31:18 +04:00
Zakhar Bessarab
84184b707a app/cluster: communicate node IDs when performing a handshake
Send a node ID of vmstorage as a part of vmselect and vminsert handshakes.
Use vmstorage node ID as an identifier for consistent hashing at vminsert.

Cluster native endpoints calculate vminsert and vmselect node IDs as a hash of all underlying storage node IDs, so that it will also remain consistent in case of address changes.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:29:20 +04:00
Zakhar Bessarab
41e217423f lib/storage: store node ID in metadata so that it is included in the backups
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:29:20 +04:00
Zakhar Bessarab
8d8073a24d lib/storage: add storage node id
Generate random node ID on start if it is missing or load from disk. Save to storage on storage shutdown.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2024-07-29 12:29:19 +04:00
20 changed files with 355 additions and 65 deletions

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"net"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
@@ -12,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -25,7 +26,7 @@ var (
func InsertHandler(c net.Conn) error {
// There is no need in response compression, since
// lower-level vminsert sends only small packets to upper-level vminsert.
bc, err := handshake.VMInsertServer(c, 0)
bc, err := handshake.VMInsertServer(c, 0, netstorage.GetNodeID())
if err != nil {
if errors.Is(err, handshake.ErrIgnoreHealthcheck) {
return nil

View File

@@ -1,9 +1,5 @@
package netstorage
import (
"github.com/cespare/xxhash/v2"
)
// See the following docs:
// - https://www.eecs.umich.edu/techreports/cse/96/CSE-TR-316-96.pdf
// - https://github.com/dgryski/go-rendezvous
@@ -13,14 +9,10 @@ type consistentHash struct {
nodeHashes []uint64
}
func newConsistentHash(nodes []string, hashSeed uint64) *consistentHash {
nodeHashes := make([]uint64, len(nodes))
for i, node := range nodes {
nodeHashes[i] = xxhash.Sum64([]byte(node))
}
func newConsistentHash(ids []uint64, hashSeed uint64) *consistentHash {
return &consistentHash{
hashSeed: hashSeed,
nodeHashes: nodeHashes,
nodeHashes: ids,
}
}

View File

@@ -4,16 +4,18 @@ import (
"math"
"math/rand"
"testing"
"github.com/cespare/xxhash/v2"
)
func TestConsistentHash(t *testing.T) {
r := rand.New(rand.NewSource(1))
nodes := []string{
"node1",
"node2",
"node3",
"node4",
nodes := []uint64{
xxhash.Sum64String("node1"),
xxhash.Sum64String("node2"),
xxhash.Sum64String("node3"),
xxhash.Sum64String("node4"),
}
rh := newConsistentHash(nodes, 0)

View File

@@ -4,16 +4,19 @@ import (
"math/rand"
"sync/atomic"
"testing"
"github.com/cespare/xxhash/v2"
)
func BenchmarkConsistentHash(b *testing.B) {
nodes := []string{
"node1",
"node2",
"node3",
"node4",
nodes := []uint64{
xxhash.Sum64String("node1"),
xxhash.Sum64String("node2"),
xxhash.Sum64String("node3"),
xxhash.Sum64String("node4"),
}
rh := newConsistentHash(nodes, 0)
b.ReportAllocs()
b.SetBytes(int64(len(benchKeys)))
b.RunParallel(func(pb *testing.PB) {

View File

@@ -5,6 +5,8 @@ import (
"net/http"
"strconv"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -12,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/cespare/xxhash/v2"
)
// InsertCtx is a generic context for inserting data.

View File

@@ -6,10 +6,14 @@ import (
"fmt"
"io"
"net"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -21,8 +25,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
)
var (
@@ -43,6 +45,9 @@ var (
"On the other side, disabled re-routing minimizes the number of active time series in the cluster "+
"during rolling restarts and during spikes in series churn rate. "+
"See also -disableRerouting")
usePersistentStorageNodeID = flag.Bool("vmstorageUsePersistentID", false, "Whether to use persistent storage node ID for -storageNode instances. "+
"If set to false uses storage node address in order to generate an ID. "+
"Using persistent node ID is useful if vmstorage node address changes over time, e.g. due to dynamic IP addresses or DNS names. ")
)
var errStorageReadOnly = errors.New("storage node is read only")
@@ -278,7 +283,7 @@ func (sn *storageNode) checkHealth() {
}
return
}
logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
logger.Infof("successfully dialed -storageNode=%q (node ID: %d)", sn.dialer.Addr(), sn.id.Load())
sn.lastDialErr = nil
sn.bc = bc
sn.isBroken.Store(false)
@@ -398,15 +403,24 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
if *disableRPCCompression {
compressionLevel = 0
}
bc, err := handshake.VMInsertClient(c, compressionLevel)
bc, id, err := handshake.VMInsertClient(c, compressionLevel)
if err != nil {
_ = c.Close()
sn.handshakeErrors.Inc()
return nil, fmt.Errorf("handshake error: %w", err)
}
sn.id.CompareAndSwap(0, id)
return bc, nil
}
func (sn *storageNode) getID() uint64 {
// Ensure that the id is populated
if sn.id.Load() == 0 {
sn.checkHealth()
}
return sn.id.Load()
}
// storageNode is a client sending data to vmstorage node.
type storageNode struct {
// isBroken is set to true if the given vmstorage node is temporarily unhealthy.
@@ -473,6 +487,9 @@ type storageNode struct {
// The total duration spent for sending data to vmstorage node.
// This metric is useful for determining the saturation of vminsert->vmstorage link.
sendDurationSeconds *metrics.FloatCounter
// id is a unique identifier for the storage node.
id atomic.Uint64
}
type storageNodesBucket struct {
@@ -515,14 +532,31 @@ func MustStop() {
mustStopStorageNodes(snb)
}
// GetNodeID returns unique identifier for underlying storage nodes.
func GetNodeID() uint64 {
snb := getStorageNodesBucket()
snIDs := make([]uint64, 0, len(snb.sns))
for _, sn := range snb.sns {
snIDs = append(snIDs, sn.getID())
}
slices.Sort(snIDs)
idsM := make([]byte, 0)
for _, id := range snIDs {
idsM = encoding.MarshalUint64(idsM, id)
}
return xxhash.Sum64(idsM)
}
func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
ms := metrics.NewSet()
nodesHash := newConsistentHash(addrs, hashSeed)
sns := make([]*storageNode, 0, len(addrs))
brokenNodes := make([]*storageNode, 0)
stopCh := make(chan struct{})
nodeIDs := make([]uint64, 0, len(addrs))
for _, addr := range addrs {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
@@ -568,10 +602,22 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
}
return 0
})
var nodeID uint64
if *usePersistentStorageNodeID {
nodeID = sn.getID()
if nodeID == 0 {
brokenNodes = append(brokenNodes, sn)
continue
}
} else {
nodeID = xxhash.Sum64String(addr)
}
nodeIDs = append(nodeIDs, nodeID)
sns = append(sns, sn)
}
nodesHash := newConsistentHash(nodeIDs, hashSeed)
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
maxBufSizePerStorageNode = memory.Allowed() / 8 / len(addrs)
if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
}
@@ -586,7 +632,12 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
wg: &wg,
}
for idx, sn := range sns {
// add broken nodes to the end of the list
// this is needed because consistent hash slots will be populated with IDs of available
// storage nodes (if there are any) and indexes of consistent hash must be linked to healthy storage nodes
snb.sns = append(snb.sns, brokenNodes...)
for idx, sn := range snb.sns {
wg.Add(1)
go func(sn *storageNode, idx int) {
sn.run(snb, idx)
@@ -594,6 +645,28 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
}(sn, idx)
}
// Watch for node become healthy and rebuild snb.
for _, sn := range brokenNodes {
wg.Add(1)
sn := sn
go watchStorageNodeHealthy(sn, func() {
defer wg.Done()
// rebuild snb in order to update consistent hash with an ID of the healthy storage node
for {
currentSnb := getStorageNodesBucket()
newSnb := initStorageNodes(addrs, hashSeed)
if !storageNodes.CompareAndSwap(currentSnb, newSnb) {
// snb has been changed, so we need to stop the newSnb and try again
mustStopStorageNodes(newSnb)
continue
}
// stop previous snb and exit
mustStopStorageNodes(currentSnb)
break
}
})
}
return snb
}
@@ -606,6 +679,34 @@ func mustStopStorageNodes(snb *storageNodesBucket) {
metrics.UnregisterSet(snb.ms, true)
}
// watchStorageNodeHealthy watches for sn become healthy and calls cb once it is ready.
func watchStorageNodeHealthy(sn *storageNode, cb func()) {
for {
sn.brLock.Lock()
for !sn.isReady() {
select {
case <-sn.stopCh:
sn.brLock.Unlock()
return
default:
sn.brCond.Wait()
}
}
sn.brLock.Unlock()
select {
case <-sn.stopCh:
return
default:
}
if sn.isReady() {
cb()
return
}
}
}
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
//
// The function blocks until src is fully re-routed.

View File

@@ -31,7 +31,9 @@ var (
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from netstorage.
func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
api := &vmstorageAPI{}
api := &vmstorageAPI{
nodeID: netstorage.GetNodeID(),
}
limits := vmselectapi.Limits{
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
@@ -45,7 +47,9 @@ func NewVMSelectServer(addr string) (*vmselectapi.Server, error) {
}
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct{}
type vmstorageAPI struct {
nodeID uint64
}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
denyPartialResponse := httputils.GetDenyPartialResponse(nil)
@@ -112,6 +116,10 @@ func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []stora
return netstorage.RegisterMetricNames(qt, mrs, dl)
}
func (api *vmstorageAPI) GetID() uint64 {
return api.nodeID
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
workCh chan workItem

View File

@@ -2107,6 +2107,9 @@ type storageNode struct {
// The number of list tenants errors to storageNode.
tenantsErrors *metrics.Counter
// id is the unique identifier for the storageNode.
id uint64
}
func (sn *storageNode) registerMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
@@ -2954,6 +2957,12 @@ func getStorageNodes() []*storageNode {
return snb.sns
}
// GetNodeID returns unique identifier of vmselect
func GetNodeID() uint64 {
// Returns a 0 as persistent IDs are not intended to use with multi-level setup
return 0
}
// Init initializes storage nodes' connections to the given addrs.
//
// MustStop must be called when the initialized connections are no longer needed.
@@ -3015,6 +3024,7 @@ func newStorageNode(ms *metrics.Set, group *storageNodesGroup, addr string) *sto
sn := &storageNode{
group: group,
connPool: connPool,
id: connPool.GetTargetNodeID(),
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),

View File

@@ -111,8 +111,8 @@ func main() {
blocksCount := tm.SmallBlocksCount + tm.BigBlocksCount
rowsCount := tm.SmallRowsCount + tm.BigRowsCount
sizeBytes := tm.SmallSizeBytes + tm.BigSizeBytes
logger.Infof("successfully opened storage %q in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
*storageDataPath, time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
logger.Infof("successfully opened storage %q (node ID: %d) in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d",
*storageDataPath, strg.GetID(), time.Since(startTime).Seconds(), partsCount, blocksCount, rowsCount, sizeBytes)
// register storage metrics
storageMetrics := metrics.NewSet()

View File

@@ -101,7 +101,7 @@ func (s *VMInsertServer) run() {
// There is no need in response compression, since
// vmstorage sends only small packets to vminsert.
compressionLevel := 0
bc, err := handshake.VMInsertServer(c, compressionLevel)
bc, err := handshake.VMInsertServer(c, compressionLevel, s.storage.GetID())
if err != nil {
if s.isStopping() {
// c is stopped inside VMInsertServer.MustStop

View File

@@ -195,6 +195,10 @@ func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQue
return tfss, nil
}
func (api *vmstorageAPI) GetID() uint64 {
return api.s.GetID()
}
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage.Search

View File

@@ -31,10 +31,13 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
**Update note 1: [vmauth](https://docs.victoriametrics.com/vmauth/) HTTP response code has changed from 503 to 502 for a case when all upstream backends were not available. This was changed to align [vmauth](https://docs.victoriametrics.com/vmauth/) behaviour with other well-known reverse-proxies behaviour. **
**Update note 2: release contains breaking change to inter-cluster communication. The `vmstorage` nodes with version lower than `tip` won't be able to communicate with `vmselect` and `vminsert` nodes with version lower than `tip`. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5438) issue for the details.**
* SECURITY: upgrade base docker image (Alpine) from 3.20.1 to 3.20.2. See [alpine 3.20.2 release notes](https://alpinelinux.org/posts/Alpine-3.20.2-released.html).
* FEATURE: [vmauth](./vmauth.md): add `keep_original_host` option, which can be used for proxying the original `Host` header from client request to the backend. By default the backend host is used as `Host` header when proxying requests to the configured backends. See [these docs](./vmauth.md#host-http-header).
* FEATURE: [vmauth](./vmauth.md) now returns HTTP 502 status code when all upstream backends are not available. Previously, it returned HTTP 503 status code. This change aligns vmauth behavior with other well-known reverse-proxies behavior.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): use a persistent `vmstorage` node ID for consistent hashing at data write path. This allows to keep the same data distribution after `vmstorage` changes its IP address. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5438).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly proxy requests to backend urls ending with `/` if the original request path equals to `/`. Previously the trailing `/` at the backend path was incorrectly removed. For example, if the request to `http://vmauth/` is configured to be proxied to `url_prefix=http://backend/foo/`, then it was proxied to `http://backend/foo`, while it should go to `http://backend/foo/`.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): fix `cannot read data after closing the reader` error when proxying HTTP requests without body (aka `GET` requests). The issue has been introduced in [v1.102.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0) in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7ee57974935a662896f2de40fdf613156630617d).

View File

@@ -1301,6 +1301,8 @@ Below is the output for `/path/to/vminsert -help`:
Show VictoriaMetrics version
-vmstorageDialTimeout duration
Timeout for establishing RPC connections from vminsert to vmstorage. See also -vmstorageUserTimeout (default 3s)
-vmstorageUsePersistentID
Whether to use persistent storage node ID for -storageNode instances. If set to false uses storage node address in order to generate an ID. Using persistent node ID is useful if vmstorage node address changes over time, e.g. due to dynamic IP addresses or DNS names.
-vmstorageUserTimeout duration
Network timeout for RPC connections from vminsert to vmstorage (Linux only). Lower values speed up re-rerouting recovery when some of vmstorage nodes become unavailable because of networking issues. Read more about TCP_USER_TIMEOUT at https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ . See also -vmstorageDialTimeout (default 3s)
```

View File

@@ -1,11 +1,18 @@
package handshake
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"unsafe"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
const (
@@ -15,17 +22,22 @@ const (
successResponse = "ok"
)
// Func must perform handshake on the given c using the given compressionLevel.
// ClientFunc must perform handshake on the given c using the given compressionLevel.
//
// It must return BufferedConn wrapper for c on successful handshake.
type Func func(c net.Conn, compressionLevel int) (*BufferedConn, error)
type ClientFunc func(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error)
// ServerFunc must perform handshake on the given c using the given compressionLevel and id.
//
// It must return BufferedConn wrapper for c on successful handshake.
type ServerFunc func(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error)
// VMInsertClient performs client-side handshake for vminsert protocol.
//
// compressionLevel is the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
return genericClient(c, vminsertHello, compressionLevel)
}
@@ -34,8 +46,8 @@ func VMInsertClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericServer(c, vminsertHello, compressionLevel)
func VMInsertServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
return genericServer(c, vminsertHello, compressionLevel, id)
}
// VMSelectClient performs client-side handshake for vmselect protocol.
@@ -43,7 +55,7 @@ func VMInsertServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
// compressionLevel is the level used for compression of the data sent
// to the server.
// compressionLevel <= 0 means 'no compression'
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, uint64, error) {
return genericClient(c, vmselectHello, compressionLevel)
}
@@ -52,8 +64,8 @@ func VMSelectClient(c net.Conn, compressionLevel int) (*BufferedConn, error) {
// compressionLevel is the level used for compression of the data sent
// to the client.
// compressionLevel <= 0 means 'no compression'
func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
return genericServer(c, vmselectHello, compressionLevel)
func VMSelectServer(c net.Conn, compressionLevel int, id uint64) (*BufferedConn, error) {
return genericServer(c, vmselectHello, compressionLevel, id)
}
// ErrIgnoreHealthcheck means the TCP healthckeck, which must be ignored.
@@ -61,7 +73,11 @@ func VMSelectServer(c net.Conn, compressionLevel int) (*BufferedConn, error) {
// The TCP healthcheck is performed by opening and then immediately closing the connection.
var ErrIgnoreHealthcheck = fmt.Errorf("TCP healthcheck - ignore it")
func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
type handshakeMetadata struct {
NodeID uint64 `json:"nodeId"`
}
func genericServer(c net.Conn, msg string, compressionLevel int, id uint64) (*BufferedConn, error) {
if err := readMessage(c, msg); err != nil {
if errors.Is(err, io.EOF) {
// This is TCP healthcheck, which must be ignored in order to prevent from logs pollution.
@@ -86,32 +102,40 @@ func genericServer(c net.Conn, msg string, compressionLevel int) (*BufferedConn,
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
if err := writeMetadata(c, id); err != nil {
return nil, fmt.Errorf("cannot write metadata: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
}
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, error) {
func genericClient(c net.Conn, msg string, compressionLevel int) (*BufferedConn, uint64, error) {
if err := writeMessage(c, msg); err != nil {
return nil, fmt.Errorf("cannot write hello: %w", err)
return nil, 0, fmt.Errorf("cannot write hello: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response after sending hello: %w", err)
return nil, 0, fmt.Errorf("cannot read success response after sending hello: %w", err)
}
if err := writeIsCompressed(c, compressionLevel > 0); err != nil {
return nil, fmt.Errorf("cannot write isCompressed flag: %w", err)
return nil, 0, fmt.Errorf("cannot write isCompressed flag: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot read success response on isCompressed: %w", err)
return nil, 0, fmt.Errorf("cannot read success response on isCompressed: %w", err)
}
isRemoteCompressed, err := readIsCompressed(c)
if err != nil {
return nil, fmt.Errorf("cannot read isCompressed flag: %w", err)
return nil, 0, fmt.Errorf("cannot read isCompressed flag: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on isCompressed: %w", err)
return nil, 0, fmt.Errorf("cannot write success response on isCompressed: %w", err)
}
metadata, err := readMetadata(c)
if err != nil {
return nil, 0, fmt.Errorf("cannot read nodeID: %w", err)
}
bc := newBufferedConn(c, compressionLevel, isRemoteCompressed)
return bc, nil
return bc, metadata.NodeID, nil
}
func writeIsCompressed(c net.Conn, isCompressed bool) error {
@@ -164,6 +188,80 @@ func readMessage(c net.Conn, msg string) error {
return nil
}
var metadataParserPool fastjson.ParserPool
func readMetadata(c net.Conn) (*handshakeMetadata, error) {
metaLenBuf, err := readData(c, int(unsafe.Sizeof(uint64(0))))
if err != nil {
return nil, fmt.Errorf("cannot read metadata length: %w", err)
}
metaLen := int(encoding.UnmarshalUint64(metaLenBuf))
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on metadata length: %w", err)
}
metaBuf, err := readData(c, metaLen)
if err != nil {
return nil, fmt.Errorf("cannot read metadata: %w", err)
}
if err := writeMessage(c, successResponse); err != nil {
return nil, fmt.Errorf("cannot write success response on metadata: %w", err)
}
parser := metadataParserPool.Get()
defer metadataParserPool.Put(parser)
v, err := parser.ParseBytes(metaBuf)
if err != nil {
return nil, fmt.Errorf("cannot parse metadata: %w", err)
}
return &handshakeMetadata{
NodeID: v.GetUint64("nodeId"),
}, nil
}
var (
metadataCache sync.Map
)
func getMetadataBytes(id uint64) ([]byte, error) {
m, ok := metadataCache.Load(id)
if !ok {
metadata := handshakeMetadata{
NodeID: id,
}
var err error
m, err = json.Marshal(metadata)
if err != nil {
return nil, fmt.Errorf("cannot marshal metadata: %w", err)
}
metadataCache.Store(id, m)
}
metaV := m.([]byte)
return metaV, nil
}
func writeMetadata(c net.Conn, id uint64) error {
meta, err := getMetadataBytes(id)
if err != nil {
return fmt.Errorf("cannot obtain metadata bytes: %w", err)
}
metaLen := len(meta)
if err := writeMessage(c, string(encoding.MarshalUint64(nil, uint64(metaLen)))); err != nil {
return fmt.Errorf("cannot write metadata length: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return fmt.Errorf("cannot read success response on metadata length: %w", err)
}
if err := writeMessage(c, string(meta[:])); err != nil {
return fmt.Errorf("cannot write metadata: %w", err)
}
if err := readMessage(c, successResponse); err != nil {
return fmt.Errorf("cannot read success response on metadata: %w", err)
}
return nil
}
func readData(c net.Conn, dataLen int) ([]byte, error) {
if err := c.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return nil, fmt.Errorf("cannot set read deadline: %w", err)

View File

@@ -15,22 +15,26 @@ func TestVMSelectHandshake(t *testing.T) {
testHandshake(t, VMSelectClient, VMSelectServer)
}
func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
func testHandshake(t *testing.T, clientFunc ClientFunc, serverFunc ServerFunc) {
t.Helper()
c, s := net.Pipe()
ch := make(chan error, 1)
go func() {
bcs, err := serverFunc(s, 3)
bcs, err := serverFunc(s, 3, 1)
if err != nil {
ch <- fmt.Errorf("error on outer handshake: %w", err)
return
}
bcc, err := clientFunc(bcs, 3)
bcc, id, err := clientFunc(bcs, 3)
if err != nil {
ch <- fmt.Errorf("error on inner handshake: %w", err)
return
}
if id != 1 {
ch <- fmt.Errorf("unexpected id; got %d; want 1", id)
return
}
if bcc == nil {
ch <- fmt.Errorf("expecting non-nil conn")
return
@@ -38,11 +42,15 @@ func testHandshake(t *testing.T, clientFunc, serverFunc Func) {
ch <- nil
}()
bcc, err := clientFunc(c, 0)
bcc, id, err := clientFunc(c, 0)
if err != nil {
t.Fatalf("error on outer handshake: %s", err)
}
bcs, err := serverFunc(bcc, 0)
if id != 1 {
t.Fatalf("unexpected id; got %d; want 2", id)
}
bcs, err := serverFunc(bcc, 0, 1)
if err != nil {
t.Fatalf("error on inner handshake: %s", err)
}

View File

@@ -3,12 +3,14 @@ package netutil
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
// ConnPool is a connection pool with ZSTD-compressed connections.
@@ -23,8 +25,9 @@ type ConnPool struct {
concurrentDialsCh chan struct{}
name string
handshakeFunc handshake.Func
handshakeFunc handshake.ClientFunc
compressionLevel int
nodeID atomic.Uint64
conns []connWithTimestamp
@@ -49,7 +52,7 @@ type connWithTimestamp struct {
// The compression is disabled if compressionLevel <= 0.
//
// Call ConnPool.MustStop when the returned ConnPool is no longer needed.
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.Func, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
func NewConnPool(ms *metrics.Set, name, addr string, handshakeFunc handshake.ClientFunc, compressionLevel int, dialTimeout, userTimeout time.Duration) *ConnPool {
cp := &ConnPool{
d: NewTCPDialer(ms, name, addr, dialTimeout, userTimeout),
concurrentDialsCh: make(chan struct{}, 8),
@@ -163,7 +166,8 @@ func (cp *ConnPool) dialAndHandshake() (*handshake.BufferedConn, error) {
if err != nil {
return nil, err
}
bc, err := cp.handshakeFunc(c, cp.compressionLevel)
bc, id, err := cp.handshakeFunc(c, cp.compressionLevel)
cp.nodeID.CompareAndSwap(0, id)
if err != nil {
// Do not put handshake error to cp.lastDialError, because handshake
// is perfomed on an already established connection.
@@ -249,6 +253,16 @@ func (cp *ConnPool) checkAvailability(force bool) {
}
}
// GetTargetNodeID returns the nodeID of the target server.
func (cp *ConnPool) GetTargetNodeID() uint64 {
// Ensure that nodeID is initialized.
if cp.nodeID.Load() == 0 {
cp.checkAvailability(true)
}
return cp.nodeID.Load()
}
func init() {
go func() {
for {

View File

@@ -10,6 +10,8 @@ const (
appliedRetentionFilename = "appliedRetention.txt"
resetCacheOnStartupFilename = "reset_cache_on_startup"
nodeIDFilename = "node_id.bin"
)
const (

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"os"
"path/filepath"
"regexp"
@@ -15,6 +16,9 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -29,8 +33,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metricsql"
)
const (
@@ -64,6 +66,9 @@ type Storage struct {
cachePath string
retentionMsecs int64
// Used to uniquely identify storage node
nodeID uint64
// lock file for exclusive access to the storage on the given path.
flockF *os.File
@@ -244,6 +249,23 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
fs.MustMkdirIfNotExist(metadataDir)
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
nodeIDPath := filepath.Join(metadataDir, nodeIDFilename)
if fs.IsPathExist(nodeIDPath) {
r, err := os.Open(nodeIDPath)
if err != nil {
logger.Panicf("FATAL: cannot open nodeID file %q: %s", nodeIDPath, err)
}
nodeID, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: cannot read nodeID from %q: %s", nodeIDPath, err)
}
s.nodeID = encoding.UnmarshalUint64(nodeID)
} else {
nodeID := rand.Uint64()
s.nodeID = nodeID
s.mustSaveNodeID()
}
// Load indexdb
idbPath := filepath.Join(path, indexdbDirname)
@@ -1032,6 +1054,16 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
return hm
}
func (s *Storage) mustSaveNodeID() {
path := filepath.Join(s.path, metadataDirname, nodeIDFilename)
dst := make([]byte, 0)
dst = encoding.MarshalUint64(dst, s.nodeID)
if err := os.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
}
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
name := "next_day_metric_ids_v2"
path := filepath.Join(s.cachePath, name)
@@ -1609,6 +1641,11 @@ func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uin
return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
// GetID returns a unique identifier for the storage node.
func (s *Storage) GetID() uint64 {
return s.nodeID
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded

View File

@@ -38,6 +38,9 @@ type API interface {
// Tenants returns list of tenants in the storage on the given tr.
Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error)
// GetID returns the ID of the node.
GetID() uint64
}
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.

View File

@@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@@ -20,7 +22,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
// Server processes vmselect requests.
@@ -193,7 +194,7 @@ func (s *Server) run() {
if s.disableResponseCompression {
compressionLevel = 0
}
bc, err := handshake.VMSelectServer(c, compressionLevel)
bc, err := handshake.VMSelectServer(c, compressionLevel, s.api.GetID())
if err != nil {
if s.isStopping() {
// c is closed inside Server.MustStop