Compare commits

...

2 Commits

Author SHA1 Message Date
Jayice
4bdda8a8b1 improve documentation 2026-06-16 17:37:20 +08:00
Jayice
2cc547d2ad support sharding targets by label in vmagent cluster mode 2026-06-16 17:33:03 +08:00
5 changed files with 154 additions and 9 deletions

View File

@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)

View File

@@ -797,6 +797,13 @@ For example, the following commands spread scrape targets among a cluster of two
The `-promscrape.cluster.memberNum` can be set to a StatefulSet pod name when `vmagent` runs in Kubernetes.
The pod name must end with a number in the range `0 ... promscrape.cluster.membersCount-1`. For example, `-promscrape.cluster.memberNum=vmagent-0`.
By default, targets are sharded among `vmagent` instances by all target labels after relabeling.
Use `-promscrape.cluster.shardByLabels` to shard targets by specified labels instead.
For example, `-promscrape.cluster.shardByLabels=service,pod` keeps targets with the same `service` and `pod` label value on the same `vmagent` instance.
If some of the specified labels are present on a target, then only the present labels are used for sharding.
If none of the specified labels are present, then all target labels are used for sharding.
By default, each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances,
then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands
start a cluster of three `vmagent` instances, where two `vmagent` instances scrape each target:

View File

@@ -76,6 +76,9 @@ var (
"Every %d occurrence in the template is substituted with -promscrape.cluster.memberNum at urls to vmagent instances responsible for scraping the given target "+
"at /service-discovery page. For example -promscrape.cluster.memberURLTemplate='http://vmagent-%d:8429/targets'. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more details")
clusterShardByLabels = flagutil.NewArrayString("promscrape.cluster.shardByLabels", "Optional list of target labels, which will be used for sharding targets among cluster members "+
"if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
@@ -86,7 +89,10 @@ var (
"Bigger uncompressed responses are rejected. See also max_scrape_size option at https://docs.victoriametrics.com/victoriametrics/sd_configs/#scrape_configs")
)
var clusterMemberID int
var (
clusterMemberID int
clusterShardByLabelsSorted []string
)
func mustInitClusterMemberID() {
s := *clusterMemberNum
@@ -110,6 +116,14 @@ func mustInitClusterMemberID() {
clusterMemberID = n
}
func initClusterShardByLabels() {
if len(*clusterShardByLabels) == 0 {
return
}
clusterShardByLabelsSorted = slices.Clone(*clusterShardByLabels)
slices.Sort(clusterShardByLabelsSorted)
}
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
Global GlobalConfig `yaml:"global,omitempty"`
@@ -1137,13 +1151,29 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
return dst
}
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels) []byte {
for _, label := range labels.GetLabels() {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels, shardByLabels []string) []byte {
originalDstLen := len(dst)
for _, targetLabelName := range shardByLabels {
for _, label := range labels.GetLabels() {
if label.Name == targetLabelName {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
break
}
}
}
// none of the labels specified in -promscrape.cluster.shardByLabels is present, use all labels for backward compatibility.
if len(dst) == originalDstLen {
for _, label := range labels.GetLabels() {
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
}
return dst
}
return dst
}
@@ -1195,7 +1225,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
if *clusterMembersCount > 1 {
bb := scrapeWorkKeyBufPool.Get()
bb.B = appendScrapeWorkKey(bb.B[:0], labels)
bb.B = appendScrapeWorkKey(bb.B[:0], labels, clusterShardByLabelsSorted)
memberNums := getClusterMemberNumsForScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor)
scrapeWorkKeyBufPool.Put(bb)
if !slices.Contains(memberNums, clusterMemberID) {

View File

@@ -148,6 +148,112 @@ func TestGetClusterMemberNumsForScrapeWork(t *testing.T) {
f("foo", 3, 2, []int{2, 0})
}
func TestAppendScrapeWorkKeyShardByLabels(t *testing.T) {
f := func(labelsA, labelsB map[string]string, shardByLabels []string, equal bool) {
t.Helper()
originValue := *clusterShardByLabels
*clusterShardByLabels = shardByLabels
initClusterShardByLabels()
keyA := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labelsA), clusterShardByLabelsSorted))
keyB := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labelsB), clusterShardByLabelsSorted))
if equal && keyA != keyB {
t.Fatalf("unexpected different scrape work keys for shardByLabels=%q;\nlabelsA=%v\nlabelsB=%v\nkeyA=%q\nkeyB=%q",
shardByLabels, labelsA, labelsB, keyA, keyB)
} else if !equal && keyA == keyB {
t.Fatalf("unexpected equal scrape work keys for shardByLabels=%q;\nlabelsA=%v\nlabelsB=%v\nkeyA=%q\nkeyB=%q",
shardByLabels, labelsA, labelsB, keyA, keyB)
}
*clusterShardByLabels = originValue
}
// didn't specify -promscrape.cluster.shardByLabels, and all labels are the same
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{},
true,
)
// match all labels in -promscrape.cluster.shardByLabels, and they're the same
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
map[string]string{
"c": "cc",
"a": "aa",
"b": "other",
"d": "other"},
[]string{"c", "a"},
true,
)
// match all labels in -promscrape.cluster.shardByLabels, and they're different
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
map[string]string{
"a": "aa",
"b": "other",
"c": "cc-------",
"d": "other"},
[]string{"a", "c"},
false,
)
// match part of labels in -promscrape.cluster.shardByLabels, and they're the same
f(
map[string]string{
"a": "aa",
"c": "cc",
"d": "dd"},
map[string]string{
"a": "aa",
"c": "cc",
"e": "ee"},
[]string{"a", "b", "c"},
true,
)
// match part of labels in -promscrape.cluster.shardByLabels, and they're different
f(
map[string]string{
"a": "aa",
"c": "cc",
"d": "dd"},
map[string]string{
"a": "aa-------",
"c": "cc",
"e": "ee"},
[]string{"a", "b", "c"},
false,
)
// none of labels in -promscrape.cluster.shardByLabels is matched, so all labels will be used to sharding
f(
map[string]string{
"d": "dd",
"e": "ee"},
map[string]string{
"d": "dd",
"e": "ee"},
[]string{"a", "b", "c"},
true,
)
}
func TestLoadStaticConfigs(t *testing.T) {
scs, err := loadStaticConfigs("testdata/file_sd.json")
if err != nil {

View File

@@ -66,6 +66,7 @@ func CheckConfig() error {
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
mustInitClusterMemberID()
initClusterShardByLabels()
globalStopChan = make(chan struct{})
scraperWG.Go(func() {
runScraper(*promscrapeConfigFile, pushData, globalStopChan)