Compare commits

...

3 Commits

Author SHA1 Message Date
Jayice
f12da461af address review comments 2026-06-17 23:05:00 +08:00
Jayice
4bdda8a8b1 improve documentation 2026-06-16 17:37:20 +08:00
Jayice
2cc547d2ad support sharding targets by label in vmagent cluster mode 2026-06-16 17:33:03 +08:00
5 changed files with 117 additions and 7 deletions

View File

@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)

View File

@@ -797,6 +797,12 @@ For example, the following commands spread scrape targets among a cluster of two
The `-promscrape.cluster.memberNum` can be set to a StatefulSet pod name when `vmagent` runs in Kubernetes.
The pod name must end with a number in the range `0 ... promscrape.cluster.membersCount-1`. For example, `-promscrape.cluster.memberNum=vmagent-0`.
By default, targets are sharded among `vmagent` instances by all target labels after relabeling.
Use `-promscrape.cluster.shardByLabels` {{% available_from "#" %}} to shard targets by specified labels instead.
For example, with `-promscrape.cluster.shardByLabels=service`, the targets with the same `service` label value will be scraped by the same `vmagent` instance,
which is useful when perform stream aggregation that requires all metrics with the same `service` label value to be processed on the same `vmagent` instance.
If none of the specified labels are present in the target labels, then all target labels will be used for sharding.
By default, each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances,
then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands
start a cluster of three `vmagent` instances, where two `vmagent` instances scrape each target:

View File

@@ -76,6 +76,9 @@ var (
"Every %d occurrence in the template is substituted with -promscrape.cluster.memberNum at urls to vmagent instances responsible for scraping the given target "+
"at /service-discovery page. For example -promscrape.cluster.memberURLTemplate='http://vmagent-%d:8429/targets'. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more details")
clusterShardByLabels = flagutil.NewArrayString("promscrape.cluster.shardByLabels", "Optional list of target labels, which will be used for sharding targets among cluster members "+
"if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. "+
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
@@ -86,7 +89,10 @@ var (
"Bigger uncompressed responses are rejected. See also max_scrape_size option at https://docs.victoriametrics.com/victoriametrics/sd_configs/#scrape_configs")
)
var clusterMemberID int
var (
clusterMemberID int
clusterShardByLabelsSorted []string
)
func mustInitClusterMemberID() {
s := *clusterMemberNum
@@ -110,6 +116,15 @@ func mustInitClusterMemberID() {
clusterMemberID = n
}
func initClusterShardByLabels() {
if len(*clusterShardByLabels) == 0 {
clusterShardByLabelsSorted = nil
return
}
clusterShardByLabelsSorted = slices.Clone(*clusterShardByLabels)
slices.Sort(clusterShardByLabelsSorted)
}
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
Global GlobalConfig `yaml:"global,omitempty"`
@@ -1138,12 +1153,28 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
}
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels) []byte {
for _, label := range labels.GetLabels() {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
originalDstLen := len(dst)
for _, targetLabelName := range clusterShardByLabelsSorted {
for _, label := range labels.GetLabels() {
if label.Name == targetLabelName {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
break
}
}
}
// Use all labels to compute the key if `promscrape.cluster.shardByLabels` is not configured
if len(dst) == originalDstLen {
for _, label := range labels.GetLabels() {
dst = append(dst, label.Name...)
dst = append(dst, '=')
dst = append(dst, label.Value...)
dst = append(dst, ',')
}
return dst
}
return dst
}

View File

@@ -148,6 +148,77 @@ func TestGetClusterMemberNumsForScrapeWork(t *testing.T) {
f("foo", 3, 2, []int{2, 0})
}
func TestAppendScrapeWorkKeyShardByLabels(t *testing.T) {
f := func(labels map[string]string, shardByLabels []string, expectedKey string) {
t.Helper()
originValue := *clusterShardByLabels
*clusterShardByLabels = shardByLabels
defer func() {
*clusterShardByLabels = originValue
}()
initClusterShardByLabels()
outputKey := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labels)))
if expectedKey != outputKey {
t.Fatalf("unexpected sharding key:%q for target labels:%v with shardByLabels=%q, expect: %q",
outputKey, labels, shardByLabels, expectedKey)
}
}
// didn't specify -promscrape.cluster.shardByLabels, so all labels will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{},
"a=aa,b=bb,c=cc,d=dd,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"a", "c"},
"a=aa,c=cc,",
)
// match all labels in -promscrape.cluster.shardByLabels, so label "a" and "c" will be used for sharding even if they're not in order in -promscrape.cluster.shardByLabels.
f(
map[string]string{
"a": "aa",
"b": "bb",
"c": "cc",
"d": "dd"},
[]string{"c", "a"},
"a=aa,c=cc,",
)
// match part of labels in -promscrape.cluster.shardByLabels, label "a" and "c" will be used for sharding
f(
map[string]string{
"a": "aa",
"c": "cc",
"d": "dd"},
[]string{"a", "b", "c"},
"a=aa,c=cc,",
)
// none of labels in -promscrape.cluster.shardByLabels is matched, so all labels will be used for sharding
f(
map[string]string{
"d": "dd",
"e": "ee"},
[]string{"a", "b", "c"},
"d=dd,e=ee,",
)
}
func TestLoadStaticConfigs(t *testing.T) {
scs, err := loadStaticConfigs("testdata/file_sd.json")
if err != nil {

View File

@@ -66,6 +66,7 @@ func CheckConfig() error {
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
mustInitClusterMemberID()
initClusterShardByLabels()
globalStopChan = make(chan struct{})
scraperWG.Go(func() {
runScraper(*promscrapeConfigFile, pushData, globalStopChan)