mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 09:46:57 +03:00
Compare commits
3 Commits
test/memor
...
vmselect/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb5b50508d | ||
|
|
a4267ab068 | ||
|
|
ed054ce197 |
@@ -822,7 +822,7 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli
|
||||
}
|
||||
|
||||
// Push mrs to storage nodes in parallel.
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
sn.registerMetricNamesRequests.Inc()
|
||||
err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline)
|
||||
if err != nil {
|
||||
@@ -857,8 +857,8 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||
return 0, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.deleteSeriesRequests.Inc()
|
||||
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
||||
if err != nil {
|
||||
@@ -906,8 +906,8 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||
return nil, false, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.labelNamesRequests.Inc()
|
||||
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
||||
if err != nil {
|
||||
@@ -1050,8 +1050,8 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
|
||||
return nil, false, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.labelValuesRequests.Inc()
|
||||
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
||||
if err != nil {
|
||||
@@ -1114,7 +1114,7 @@ func Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
// Deny partial responses when obtaining the list of tenants, since partial tenants have little sense.
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
sn.tenantsRequests.Inc()
|
||||
tenants, err := sn.getTenants(qt, tr, deadline)
|
||||
if err != nil {
|
||||
@@ -1195,7 +1195,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
|
||||
err error
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
sn.tagValueSuffixesRequests.Inc()
|
||||
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||
if err != nil {
|
||||
@@ -1263,8 +1263,8 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||
return nil, false, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||
sn.tsdbStatusRequests.Inc()
|
||||
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
||||
if err != nil {
|
||||
@@ -1373,7 +1373,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
|
||||
err error
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, _ *atomic.Bool) any {
|
||||
sn.seriesCountRequests.Inc()
|
||||
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
|
||||
if err != nil {
|
||||
@@ -1696,8 +1696,8 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
|
||||
return nil, false, err
|
||||
}
|
||||
sns := getStorageNodes()
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
|
||||
sn.searchMetricNamesRequests.Inc()
|
||||
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
||||
if sq.IsMultiTenant {
|
||||
@@ -1887,10 +1887,10 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon
|
||||
return false, err
|
||||
}
|
||||
// Send the query to all the storage nodes in parallel.
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
|
||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any {
|
||||
// Use a separate variable for each goroutine
|
||||
var err error
|
||||
res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
|
||||
res := execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
|
||||
sn.searchRequests.Inc()
|
||||
err = sn.processSearchQuery(qt, rd, f, workerID, deadline)
|
||||
if err != nil {
|
||||
@@ -1953,6 +1953,8 @@ type storageNodesRequest struct {
|
||||
resultsCh chan rpcResult
|
||||
qts map[*querytracer.Tracer]struct{}
|
||||
sns []*storageNode
|
||||
cancelled *atomic.Bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type rpcResult struct {
|
||||
@@ -1962,15 +1964,25 @@ type rpcResult struct {
|
||||
}
|
||||
|
||||
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
|
||||
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any,
|
||||
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any,
|
||||
) *storageNodesRequest {
|
||||
resultsCh := make(chan rpcResult, len(sns))
|
||||
qts := make(map[*querytracer.Tracer]struct{}, len(sns))
|
||||
snr := &storageNodesRequest{
|
||||
denyPartialResponse: denyPartialResponse,
|
||||
resultsCh: resultsCh,
|
||||
qts: qts,
|
||||
sns: sns,
|
||||
cancelled: &atomic.Bool{},
|
||||
}
|
||||
|
||||
snr.wg.Add(len(sns))
|
||||
for idx, sn := range sns {
|
||||
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
|
||||
qts[qtChild] = struct{}{}
|
||||
go func(workerID uint, sn *storageNode) {
|
||||
data := f(qtChild, workerID, sn)
|
||||
defer snr.wg.Done()
|
||||
data := f(qtChild, workerID, sn, snr.cancelled)
|
||||
resultsCh <- rpcResult{
|
||||
data: data,
|
||||
qt: qtChild,
|
||||
@@ -1978,15 +1990,16 @@ func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPa
|
||||
}
|
||||
}(uint(idx), sn)
|
||||
}
|
||||
return &storageNodesRequest{
|
||||
denyPartialResponse: denyPartialResponse,
|
||||
resultsCh: resultsCh,
|
||||
qts: qts,
|
||||
sns: sns,
|
||||
}
|
||||
return snr
|
||||
}
|
||||
|
||||
// finishQueryTracers cancels all the query tracers and waits for all the workers to finish.
|
||||
func (snr *storageNodesRequest) finishQueryTracers(msg string) {
|
||||
// Set cancelled flag to stop new work
|
||||
snr.cancelled.Store(true)
|
||||
// Wait for all workers to finish
|
||||
snr.wg.Wait()
|
||||
// Now safe to close all tracers
|
||||
for qt := range snr.qts {
|
||||
snr.finishQueryTracer(qt, msg)
|
||||
}
|
||||
@@ -3265,11 +3278,16 @@ func (pnc *perNodeCounter) GetTotal() uint64 {
|
||||
const maxFastAllocBlockSize = 32 * 1024
|
||||
|
||||
// execSearchQuery calls cb for with marshaled requestData for each tenant in sq.
|
||||
func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cb func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any) []any {
|
||||
func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cancelled *atomic.Bool, cb func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any) []any {
|
||||
var requestData []byte
|
||||
var results []any
|
||||
|
||||
for i := range sq.TenantTokens {
|
||||
// Stop processing the remaining tenants if the query is cancelled.
|
||||
// It is safe to return the partial results.
|
||||
if cancelled.Load() {
|
||||
return results
|
||||
}
|
||||
requestData = sq.TenantTokens[i].Marshal(requestData)
|
||||
requestData = sq.MarshaWithoutTenant(requestData)
|
||||
qtL := qt
|
||||
@@ -3277,7 +3295,7 @@ func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cb func(qt
|
||||
qtL = qt.NewChild("query for tenant: %s", sq.TenantTokens[i].String())
|
||||
}
|
||||
r := cb(qtL, requestData, sq.TenantTokens[i])
|
||||
if sq.IsMultiTenant {
|
||||
if sq.IsMultiTenant && qt.Enabled() {
|
||||
qtL.Done()
|
||||
}
|
||||
results = append(results, r)
|
||||
|
||||
@@ -26,6 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/): allow ingesting histograms with missing `_sum` metric via [OpenTelemetry ingestion protocol](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) in the same way as Prometheus does.
|
||||
* BUGFIX: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: properly trim whitespaces at the end of license provided via `-license` and `-licenseFile` command-line flags. Previously, the trailing whitespaces could cause the license verification to fail.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): fix possible runtime panic during requests processing under heavy load. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8051) for details.
|
||||
* BUGFIX: [vmselect](https://docs.victoriametrics.com/cluster-victoriametrics/): prevent panic when `vmselect` receives an error response from `vmstorage` during the query execution and request processing for other `vmstorage` nodes is still in progress. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8114) for the details.
|
||||
|
||||
## [v1.109.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.109.1)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user