mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-17 15:53:29 +03:00
Compare commits
2 Commits
vmestimato
...
issue-1104
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4bdda8a8b1 | ||
|
|
2cc547d2ad |
@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
|
||||
|
||||
@@ -797,6 +797,13 @@ For example, the following commands spread scrape targets among a cluster of two
|
||||
The `-promscrape.cluster.memberNum` can be set to a StatefulSet pod name when `vmagent` runs in Kubernetes.
|
||||
The pod name must end with a number in the range `0 ... promscrape.cluster.membersCount-1`. For example, `-promscrape.cluster.memberNum=vmagent-0`.
|
||||
|
||||
By default, targets are sharded among `vmagent` instances by all target labels after relabeling.
|
||||
Use `-promscrape.cluster.shardByLabels` to shard targets by specified labels instead.
|
||||
For example, `-promscrape.cluster.shardByLabels=service,pod` keeps targets with the same `service` and `pod` label value on the same `vmagent` instance.
|
||||
|
||||
If some of the specified labels are present on a target, then only the present labels are used for sharding.
|
||||
If none of the specified labels are present, then all target labels are used for sharding.
|
||||
|
||||
By default, each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances,
|
||||
then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands
|
||||
start a cluster of three `vmagent` instances, where two `vmagent` instances scrape each target:
|
||||
|
||||
@@ -76,6 +76,9 @@ var (
|
||||
"Every %d occurrence in the template is substituted with -promscrape.cluster.memberNum at urls to vmagent instances responsible for scraping the given target "+
|
||||
"at /service-discovery page. For example -promscrape.cluster.memberURLTemplate='http://vmagent-%d:8429/targets'. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more details")
|
||||
clusterShardByLabels = flagutil.NewArrayString("promscrape.cluster.shardByLabels", "Optional list of target labels, which will be used for sharding targets among cluster members "+
|
||||
"if -promscrape.cluster.membersCount is greater than 1. If none of the specified labels are found in a target, then all the target labels will be used for sharding. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
|
||||
clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+
|
||||
"If the replication factor is greater than 1, then the deduplication must be enabled at remote storage side. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/vmagent/#scraping-big-number-of-targets for more info")
|
||||
@@ -86,7 +89,10 @@ var (
|
||||
"Bigger uncompressed responses are rejected. See also max_scrape_size option at https://docs.victoriametrics.com/victoriametrics/sd_configs/#scrape_configs")
|
||||
)
|
||||
|
||||
var clusterMemberID int
|
||||
var (
|
||||
clusterMemberID int
|
||||
clusterShardByLabelsSorted []string
|
||||
)
|
||||
|
||||
func mustInitClusterMemberID() {
|
||||
s := *clusterMemberNum
|
||||
@@ -110,6 +116,14 @@ func mustInitClusterMemberID() {
|
||||
clusterMemberID = n
|
||||
}
|
||||
|
||||
func initClusterShardByLabels() {
|
||||
if len(*clusterShardByLabels) == 0 {
|
||||
return
|
||||
}
|
||||
clusterShardByLabelsSorted = slices.Clone(*clusterShardByLabels)
|
||||
slices.Sort(clusterShardByLabelsSorted)
|
||||
}
|
||||
|
||||
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
|
||||
type Config struct {
|
||||
Global GlobalConfig `yaml:"global,omitempty"`
|
||||
@@ -1137,13 +1151,29 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
|
||||
return dst
|
||||
}
|
||||
|
||||
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels) []byte {
|
||||
for _, label := range labels.GetLabels() {
|
||||
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
|
||||
dst = append(dst, label.Name...)
|
||||
dst = append(dst, '=')
|
||||
dst = append(dst, label.Value...)
|
||||
dst = append(dst, ',')
|
||||
func appendScrapeWorkKey(dst []byte, labels *promutil.Labels, shardByLabels []string) []byte {
|
||||
originalDstLen := len(dst)
|
||||
for _, targetLabelName := range shardByLabels {
|
||||
for _, label := range labels.GetLabels() {
|
||||
if label.Name == targetLabelName {
|
||||
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
|
||||
dst = append(dst, label.Name...)
|
||||
dst = append(dst, '=')
|
||||
dst = append(dst, label.Value...)
|
||||
dst = append(dst, ',')
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// none of the labels specified in -promscrape.cluster.shardByLabels is present, use all labels for backward compatibility.
|
||||
if len(dst) == originalDstLen {
|
||||
for _, label := range labels.GetLabels() {
|
||||
dst = append(dst, label.Name...)
|
||||
dst = append(dst, '=')
|
||||
dst = append(dst, label.Value...)
|
||||
dst = append(dst, ',')
|
||||
}
|
||||
return dst
|
||||
}
|
||||
return dst
|
||||
}
|
||||
@@ -1195,7 +1225,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
|
||||
if *clusterMembersCount > 1 {
|
||||
bb := scrapeWorkKeyBufPool.Get()
|
||||
bb.B = appendScrapeWorkKey(bb.B[:0], labels)
|
||||
bb.B = appendScrapeWorkKey(bb.B[:0], labels, clusterShardByLabelsSorted)
|
||||
memberNums := getClusterMemberNumsForScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor)
|
||||
scrapeWorkKeyBufPool.Put(bb)
|
||||
if !slices.Contains(memberNums, clusterMemberID) {
|
||||
|
||||
@@ -148,6 +148,112 @@ func TestGetClusterMemberNumsForScrapeWork(t *testing.T) {
|
||||
f("foo", 3, 2, []int{2, 0})
|
||||
}
|
||||
|
||||
func TestAppendScrapeWorkKeyShardByLabels(t *testing.T) {
|
||||
f := func(labelsA, labelsB map[string]string, shardByLabels []string, equal bool) {
|
||||
t.Helper()
|
||||
originValue := *clusterShardByLabels
|
||||
*clusterShardByLabels = shardByLabels
|
||||
initClusterShardByLabels()
|
||||
keyA := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labelsA), clusterShardByLabelsSorted))
|
||||
keyB := string(appendScrapeWorkKey(nil, promutil.NewLabelsFromMap(labelsB), clusterShardByLabelsSorted))
|
||||
if equal && keyA != keyB {
|
||||
t.Fatalf("unexpected different scrape work keys for shardByLabels=%q;\nlabelsA=%v\nlabelsB=%v\nkeyA=%q\nkeyB=%q",
|
||||
shardByLabels, labelsA, labelsB, keyA, keyB)
|
||||
} else if !equal && keyA == keyB {
|
||||
t.Fatalf("unexpected equal scrape work keys for shardByLabels=%q;\nlabelsA=%v\nlabelsB=%v\nkeyA=%q\nkeyB=%q",
|
||||
shardByLabels, labelsA, labelsB, keyA, keyB)
|
||||
}
|
||||
*clusterShardByLabels = originValue
|
||||
}
|
||||
|
||||
// didn't specify -promscrape.cluster.shardByLabels, and all labels are the same
|
||||
f(
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"b": "bb",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"b": "bb",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
[]string{},
|
||||
true,
|
||||
)
|
||||
// match all labels in -promscrape.cluster.shardByLabels, and they're the same
|
||||
f(
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"b": "bb",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
map[string]string{
|
||||
"c": "cc",
|
||||
"a": "aa",
|
||||
"b": "other",
|
||||
"d": "other"},
|
||||
[]string{"c", "a"},
|
||||
true,
|
||||
)
|
||||
|
||||
// match all labels in -promscrape.cluster.shardByLabels, and they're different
|
||||
f(
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"b": "bb",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"b": "other",
|
||||
"c": "cc-------",
|
||||
"d": "other"},
|
||||
[]string{"a", "c"},
|
||||
false,
|
||||
)
|
||||
|
||||
// match part of labels in -promscrape.cluster.shardByLabels, and they're the same
|
||||
f(
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"c": "cc",
|
||||
"e": "ee"},
|
||||
[]string{"a", "b", "c"},
|
||||
true,
|
||||
)
|
||||
|
||||
// match part of labels in -promscrape.cluster.shardByLabels, and they're different
|
||||
f(
|
||||
map[string]string{
|
||||
"a": "aa",
|
||||
"c": "cc",
|
||||
"d": "dd"},
|
||||
map[string]string{
|
||||
"a": "aa-------",
|
||||
"c": "cc",
|
||||
"e": "ee"},
|
||||
[]string{"a", "b", "c"},
|
||||
false,
|
||||
)
|
||||
// none of labels in -promscrape.cluster.shardByLabels is matched, so all labels will be used to sharding
|
||||
f(
|
||||
map[string]string{
|
||||
"d": "dd",
|
||||
"e": "ee"},
|
||||
map[string]string{
|
||||
"d": "dd",
|
||||
"e": "ee"},
|
||||
[]string{"a", "b", "c"},
|
||||
true,
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func TestLoadStaticConfigs(t *testing.T) {
|
||||
scs, err := loadStaticConfigs("testdata/file_sd.json")
|
||||
if err != nil {
|
||||
|
||||
@@ -66,6 +66,7 @@ func CheckConfig() error {
|
||||
// Scraped data is passed to pushData.
|
||||
func Init(pushData func(at *auth.Token, wr *prompb.WriteRequest)) {
|
||||
mustInitClusterMemberID()
|
||||
initClusterShardByLabels()
|
||||
globalStopChan = make(chan struct{})
|
||||
scraperWG.Go(func() {
|
||||
runScraper(*promscrapeConfigFile, pushData, globalStopChan)
|
||||
|
||||
Reference in New Issue
Block a user