mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
lib/storage: Fix data race in containsTimeRange() (#9965)
When one goroutine attemps to update the min timestamp under the lock it could have been updated already by another goroutine with a smaller timestamp. As a result the goroutine will update the timestamp with a bigger value. A simple unit test (included in this commit) demonstrates that. Additionally, use a simple Mutex instead of RWMutex. RWMutexes only introduce an unnecessary overhead for operations as simple as retrieving a value from a map and regular Mutex should be preferred. Thanks to @valyala for spotting a bug and the advice on RWMutexes. Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
@@ -109,7 +109,7 @@ type indexDB struct {
|
||||
// with bigger timestamps at any time.
|
||||
minMissingTimestampByKey map[string]int64
|
||||
// protects minMissingTimestampByKey
|
||||
minMissingTimestampByKeyLock sync.RWMutex
|
||||
minMissingTimestampByKeyLock sync.Mutex
|
||||
|
||||
// generation identifies the index generation ID
|
||||
// and is used for syncing items from different indexDBs
|
||||
@@ -1932,9 +1932,9 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) bool {
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
|
||||
key := kb.B
|
||||
|
||||
db.minMissingTimestampByKeyLock.RLock()
|
||||
db.minMissingTimestampByKeyLock.Lock()
|
||||
minMissingTimestamp, ok := db.minMissingTimestampByKey[string(key)]
|
||||
db.minMissingTimestampByKeyLock.RUnlock()
|
||||
db.minMissingTimestampByKeyLock.Unlock()
|
||||
|
||||
if ok && tr.MinTimestamp >= minMissingTimestamp {
|
||||
return false
|
||||
@@ -1944,7 +1944,10 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) bool {
|
||||
}
|
||||
|
||||
db.minMissingTimestampByKeyLock.Lock()
|
||||
db.minMissingTimestampByKey[string(key)] = tr.MinTimestamp
|
||||
minMissingTimestamp, ok = db.minMissingTimestampByKey[string(key)]
|
||||
if !ok || tr.MinTimestamp < minMissingTimestamp {
|
||||
db.minMissingTimestampByKey[string(key)] = tr.MinTimestamp
|
||||
}
|
||||
db.minMissingTimestampByKeyLock.Unlock()
|
||||
|
||||
return false
|
||||
|
||||
@@ -4,10 +4,12 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -2228,3 +2230,38 @@ func sortedSlice(m map[string]struct{}) []string {
|
||||
slices.Sort(s)
|
||||
return s
|
||||
}
|
||||
|
||||
func TestIndexSearchContainsTimeRange_Concurrent(t *testing.T) {
|
||||
defer testRemoveAll(t)
|
||||
|
||||
// Create storage because indexDB depends on it.
|
||||
s := MustOpenStorage(filepath.Join(t.Name(), "storage"), OpenOptions{})
|
||||
defer s.MustClose()
|
||||
|
||||
idbName := nextIndexDBTableName()
|
||||
idbPath := filepath.Join(t.Name(), indexdbDirname, idbName)
|
||||
var readOnly atomic.Bool
|
||||
readOnly.Store(true)
|
||||
noRegisterNewSeries := true
|
||||
idb := mustOpenIndexDB(idbPath, s, &readOnly, noRegisterNewSeries)
|
||||
defer idb.MustClose()
|
||||
|
||||
minTimestamp := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()
|
||||
concurrency := int64(100)
|
||||
var wg sync.WaitGroup
|
||||
for i := range concurrency {
|
||||
wg.Add(1)
|
||||
go func(ts int64) {
|
||||
is := idb.getIndexSearch(noDeadline)
|
||||
_ = is.containsTimeRange(TimeRange{ts, ts})
|
||||
idb.putIndexSearch(is)
|
||||
wg.Done()
|
||||
}(minTimestamp + msecPerDay*i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
key := marshalCommonPrefix(nil, nsPrefixDateToMetricID)
|
||||
if got, want := idb.minMissingTimestampByKey[string(key)], minTimestamp; got != want {
|
||||
t.Fatalf("unexpected min timestamp: got %v, want %v", time.UnixMilli(got).UTC(), time.UnixMilli(want).UTC())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user