Compare commits

...

3 Commits

Author SHA1 Message Date
Zakhar Bessarab
fb5b50508d app/vmselect/netstorage: do not use "cancelled" in functions which are immediately called
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2025-01-23 16:09:34 +04:00
Zakhar Bessarab
a4267ab068 app/vmselect/netstorage: synchronize closing tracers
Synchronize tracers closing with workers asynchronously processing jobs in order to avoid panic at `qt.NewChild`.

`cancelled` is passed to each `execSearchQuery` in order to allow early exit before processing requests for all tenants.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2025-01-23 11:13:46 +04:00
Zakhar Bessarab
ed054ce197 app/vmselect/netstorage: prevent panic when vmselect receives an error response from vmstorage during the query execution
It is possible that qt would be cancelled by `snr.collectResults`, but concurrently running workers will continue execution even though results will be discarded later on.

Stop request execution once query tracer is done in order to prevent panic and avoid unnecessary load on storage nodes.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8114
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2025-01-22 19:56:20 +04:00
2 changed files with 45 additions and 26 deletions

View File

@@ -822,7 +822,7 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli
}
// Push mrs to storage nodes in parallel.
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any {
sn.registerMetricNamesRequests.Inc()
err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline)
if err != nil {
@@ -857,8 +857,8 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
return 0, err
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.deleteSeriesRequests.Inc()
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
if err != nil {
@@ -906,8 +906,8 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
return nil, false, err
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.labelNamesRequests.Inc()
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
if err != nil {
@@ -1050,8 +1050,8 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
return nil, false, err
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.labelValuesRequests.Inc()
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
if err != nil {
@@ -1114,7 +1114,7 @@ func Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.
}
sns := getStorageNodes()
// Deny partial responses when obtaining the list of tenants, since partial tenants have little sense.
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
sn.tenantsRequests.Inc()
tenants, err := sn.getTenants(qt, tr, deadline)
if err != nil {
@@ -1195,7 +1195,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
err error
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
@@ -1263,8 +1263,8 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
return nil, false, err
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
if err != nil {
@@ -1373,7 +1373,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
err error
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, _ *atomic.Bool) any {
sn.seriesCountRequests.Inc()
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
if err != nil {
@@ -1696,8 +1696,8 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
return nil, false, err
}
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode, cancelled *atomic.Bool) any {
return execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
if sq.IsMultiTenant {
@@ -1887,10 +1887,10 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon
return false, err
}
// Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any {
// Use a separate variable for each goroutine
var err error
res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
res := execSearchQuery(qt, sq, cancelled, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
sn.searchRequests.Inc()
err = sn.processSearchQuery(qt, rd, f, workerID, deadline)
if err != nil {
@@ -1953,6 +1953,8 @@ type storageNodesRequest struct {
resultsCh chan rpcResult
qts map[*querytracer.Tracer]struct{}
sns []*storageNode
cancelled *atomic.Bool
wg sync.WaitGroup
}
type rpcResult struct {
@@ -1962,15 +1964,25 @@ type rpcResult struct {
}
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any,
f func(qt *querytracer.Tracer, workerID uint, sn *storageNode, cancelled *atomic.Bool) any,
) *storageNodesRequest {
resultsCh := make(chan rpcResult, len(sns))
qts := make(map[*querytracer.Tracer]struct{}, len(sns))
snr := &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
resultsCh: resultsCh,
qts: qts,
sns: sns,
cancelled: &atomic.Bool{},
}
snr.wg.Add(len(sns))
for idx, sn := range sns {
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
qts[qtChild] = struct{}{}
go func(workerID uint, sn *storageNode) {
data := f(qtChild, workerID, sn)
defer snr.wg.Done()
data := f(qtChild, workerID, sn, snr.cancelled)
resultsCh <- rpcResult{
data: data,
qt: qtChild,
@@ -1978,15 +1990,16 @@ func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPa
}
}(uint(idx), sn)
}
return &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
resultsCh: resultsCh,
qts: qts,
sns: sns,
}
return snr
}
// finishQueryTracers cancels all the query tracers and waits for all the workers to finish.
func (snr *storageNodesRequest) finishQueryTracers(msg string) {
// Set cancelled flag to stop new work
snr.cancelled.Store(true)
// Wait for all workers to finish
snr.wg.Wait()
// Now safe to close all tracers
for qt := range snr.qts {
snr.finishQueryTracer(qt, msg)
}
@@ -3265,11 +3278,16 @@ func (pnc *perNodeCounter) GetTotal() uint64 {
const maxFastAllocBlockSize = 32 * 1024
// execSearchQuery calls cb for with marshaled requestData for each tenant in sq.
func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cb func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any) []any {
func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cancelled *atomic.Bool, cb func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any) []any {
var requestData []byte
var results []any
for i := range sq.TenantTokens {
// Stop processing the remaining tenants if the query is cancelled.
// It is safe to return the partial results.
if cancelled.Load() {
return results
}
requestData = sq.TenantTokens[i].Marshal(requestData)
requestData = sq.MarshaWithoutTenant(requestData)
qtL := qt
@@ -3277,7 +3295,7 @@ func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cb func(qt
qtL = qt.NewChild("query for tenant: %s", sq.TenantTokens[i].String())
}
r := cb(qtL, requestData, sq.TenantTokens[i])
if sq.IsMultiTenant {
if sq.IsMultiTenant && qt.Enabled() {
qtL.Done()
}
results = append(results, r)

View File

@@ -26,6 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/), `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/): allow ingesting histograms with missing `_sum` metric via [OpenTelemetry ingestion protocol](https://docs.victoriametrics.com/#sending-data-via-opentelemetry) in the same way as Prometheus does.
* BUGFIX: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: properly trim whitespaces at the end of license provided via `-license` and `-licenseFile` command-line flags. Previously, the trailing whitespaces could cause the license verification to fail.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): fix possible runtime panic during requests processing under heavy load. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8051) for details.
* BUGFIX: [vmselect](https://docs.victoriametrics.com/cluster-victoriametrics/): prevent panic when `vmselect` receives an error response from `vmstorage` during the query execution and request processing for other `vmstorage` nodes is still in progress. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8114) for the details.
## [v1.109.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.109.1)