Compare commits

...

17 Commits

Author SHA1 Message Date
Dmytro Kozlov
e85e65d476 Merge branch 'master' into get-vllogs-tenants 2025-06-26 11:06:46 +02:00
dmitryk-dk
a2cef2a05f lib/logstorage: fixed comments, removed parsing of the tenants, add additional check start > end, fix json return 2025-05-09 14:42:23 +02:00
dmitryk-dk
1257bb6427 lib/logstorage: implement get tenant ids for the cluster version 2025-05-01 11:22:25 +02:00
dmitryk-dk
c84a4b50ad Merge branch 'master' into get-vllogs-tenants 2025-04-30 17:13:16 +02:00
dmitryk-dk
ce4b03d023 lib/logstorage: partially fix changes 2025-04-17 18:08:44 +02:00
dmitryk-dk
142f45f968 Merge branch 'master' into get-vllogs-tenants
# Conflicts:
#	app/vlselect/logsql/logsql.go
#	app/vlselect/main.go
2025-04-17 17:20:03 +02:00
dmitryk-dk
82d5b4659d lib/logstorage: moved logic to getTenantIDs function 2025-04-01 13:12:21 +02:00
dmitryk-dk
1689860030 lib/logstorage: code cleanup 2025-03-24 19:46:25 +01:00
dmitryk-dk
77ac142183 lib/logstorage: fix tests 2025-03-24 19:40:49 +01:00
dmitryk-dk
b4e114efe7 Merge branch 'master' into get-vllogs-tenants
# Conflicts:
#	lib/logstorage/indexdb.go
2025-03-24 19:28:05 +01:00
dmitryk-dk
629a943683 Merge branch 'master' into get-vllogs-tenants
# Conflicts:
#	app/vlselect/main.go
2025-03-04 09:48:10 +01:00
Dmytro Kozlov
64e2c017ff Merge branch 'master' into get-vllogs-tenants 2025-01-29 16:19:30 +01:00
Dmytro Kozlov
2808f90492 Merge branch 'master' into get-vllogs-tenants 2025-01-27 09:44:23 +01:00
Dmytro Kozlov
2a553096bb Merge branch 'master' into get-vllogs-tenants 2025-01-18 15:09:52 +01:00
Dmytro Kozlov
f9568661ea Merge branch 'master' into get-vllogs-tenants 2024-11-11 10:41:47 +01:00
dmitryk-dk
64b48d0abf vllogs: add tests fix some descriptions 2024-11-08 09:34:24 +01:00
dmitryk-dk
2c96f3512d vllogs: implement tenants endpoint for victorialogs 2024-11-07 21:51:43 +01:00
8 changed files with 390 additions and 2 deletions

View File

@@ -3,6 +3,7 @@ package internalselect
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"sync"
@@ -19,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
// RequestHandler processes requests to /internal/select/*
@@ -49,6 +51,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
"/internal/select/streams": processStreamsRequest,
"/internal/select/stream_ids": processStreamIDsRequest,
"/internal/select/tenant_ids": processTenantIDsRequest,
}
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -242,6 +245,30 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
}
func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return fmt.Errorf("cannot parse start timestamp: %w", err)
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return fmt.Errorf("cannot parse end timestamp: %w", err)
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}
tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
}
return writeTenantIDs(w, tenantIDs, false)
}
type commonParams struct {
TenantIDs []logstorage.TenantID
Query *logstorage.Query
@@ -306,6 +333,17 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
return nil
}
func writeTenantIDs(w http.ResponseWriter, tenantIDs []byte, disableCompression bool) error {
if !disableCompression {
tenantIDs = zstd.CompressLevel(nil, tenantIDs, 1)
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(tenantIDs); err != nil {
return fmt.Errorf("cannot send response to the client: %w", err)
}
return nil
}
func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
s := r.FormValue(argName)
n, err := strconv.ParseInt(s, 10, 64)
@@ -314,3 +352,16 @@ func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
}
return n, nil
}
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
s := r.FormValue(argName)
if s == "" {
return 0, false, nil
}
currentTimestamp := time.Now().UnixNano()
nsecs, err := timeutil.ParseTimeAt(s, currentTimestamp)
if err != nil {
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
}
return nsecs, true, nil
}

View File

@@ -972,6 +972,60 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
}
// ProcessAdminTenantsRequest processes /select/admin/tenants request.
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}
if start > end {
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
return
}
sw := &syncWriter{
w: w,
}
var bwShards atomicutil.Slice[bufferedWriter]
bwShards.Init = func(shard *bufferedWriter) {
shard.sw = sw
}
defer func() {
shards := bwShards.All()
for _, shard := range shards {
shard.FlushIgnoreErrors()
}
}()
w.Header().Set("Content-Type", "application/json")
tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}
_, err = fmt.Fprintf(w, "{\"status\":\"success\",\"data\":%s}\n", tenants)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}
}
type syncWriter struct {
mu sync.Mutex
w io.Writer

View File

@@ -10,12 +10,13 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/internalselect"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/logsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
var (
@@ -267,6 +268,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsql.ProcessStreamsRequest(ctx, w, r)
logsqlStreamsDuration.UpdateDuration(startTime)
return true
case "/select/admin/tenants":
logsqlAdminTenantsRequests.Inc()
logsql.ProcessAdminTenantsRequest(ctx, w, r)
return true
default:
return false
}
@@ -321,4 +326,6 @@ var (
// no need to track duration for tail requests, as they usually take long time
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/admin/tenants"}`)
)

View File

@@ -350,6 +350,14 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
}
// GetTenantIDs returns tenantIDs from the storage by the given start and end.
func GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
if localStorage != nil {
return localStorage.GetTenantIDs(ctx, start, end)
}
return netstorageSelect.GetTenantIDs(ctx, start, end)
}
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)

View File

@@ -58,6 +58,11 @@ const (
//
// It must be updated every time the protocol changes.
QueryProtocolVersion = "v1"
// TenantIDsProtocolVersion is the version of the protocol used for /internal/select/tenant_ids HTTP endpoint.
//
// It must be updated every time the protocol changes.
TenantIDsProtocolVersion = "v1"
)
// Storage is a network storage for querying remote storage nodes in the cluster.
@@ -224,6 +229,13 @@ func (sn *storageNode) getStreamIDs(ctx context.Context, tenantIDs []logstorage.
return sn.getValuesWithHits(ctx, "/internal/select/stream_ids", args)
}
func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
args := url.Values{}
args.Set("start", fmt.Sprintf("%d", start))
args.Set("end", fmt.Sprintf("%d", end))
return sn.executeRequestAt(ctx, "/internal/select/tenant_ids", args)
}
func (sn *storageNode) getCommonArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values {
args := url.Values{}
args.Set("version", version)
@@ -406,6 +418,48 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []logstorage.Tenan
})
}
// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
return s.getTenantIDs(ctx, start, end)
}
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
results := make([][]byte, len(s.sns))
errs := make([]error, len(s.sns))
var wg sync.WaitGroup
for i := range s.sns {
wg.Add(1)
go func(nodeIdx int) {
defer wg.Done()
sn := s.sns[nodeIdx]
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
results[nodeIdx] = tenantIDs
errs[nodeIdx] = err
if err != nil {
// Cancel the remaining parallel requests
cancel()
}
}(i)
}
wg.Wait()
if err := getFirstNonCancelError(errs); err != nil {
return nil, err
}
var tenantIDs []byte
for i := range results {
tenantIDs = append(tenantIDs, results[i]...)
}
return tenantIDs, nil
}
func (s *Storage) getValuesWithHits(ctx context.Context, limit uint64, resetHitsOnLimitExceeded bool,
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {

View File

@@ -437,6 +437,53 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
return ids
}
func (is *indexSearch) getTenantIDs() []string {
tenants := make(map[string]struct{})
ts := &is.ts
kb := &is.kb
tID := TenantID{0, 0}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)
for ts.NextItem() {
_, prefix, err := unmarshalCommonPrefix(&tID, ts.Item)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
}
if prefix != nsPrefixStreamID {
// Reached the end of enteris with the needed prefix.
break
}
tenant := fmt.Sprintf("%d:%d", tID.AccountID, tID.ProjectID)
tenants[tenant] = struct{}{}
// Seek for the next (accountID, projectID)
tID.ProjectID++
if tID.ProjectID == 0 {
tID.AccountID++
if tID.AccountID == 0 {
// Reached the end (accountID, projectID) space
break
}
}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when performing search: %s", err)
}
tenantIDs := make([]string, 0)
for tenantID := range tenants {
tenantIDs = append(tenantIDs, tenantID)
}
return tenantIDs
}
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
@@ -542,6 +589,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
bbPool.Put(bb)
}
func (idb *indexdb) searchTenants() []string {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)
return is.getTenantIDs()
}
type batchItems struct {
buf []byte

View File

@@ -3,6 +3,7 @@ package logstorage
import (
"fmt"
"reflect"
"slices"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@@ -112,7 +113,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
// non-existing-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil)
//non-existing-non-empty-tag-re
// non-existing-non-empty-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil)
// match-job-instance
@@ -252,3 +253,78 @@ func TestStorageSearchStreamIDs(t *testing.T) {
closeTestStorage(s)
}
func TestGetTenantsIds(t *testing.T) {
t.Parallel()
path := t.Name()
const partitionName = "foobar"
s := newTestStorage()
mustCreateIndexdb(path)
idb := mustOpenIndexdb(path, partitionName, s)
tenantIDs := []TenantID{
{AccountID: 0, ProjectID: 0},
{AccountID: 0, ProjectID: 1},
{AccountID: 1, ProjectID: 0},
{AccountID: 1, ProjectID: 1},
{AccountID: 123, ProjectID: 567},
}
getStreamIDForTags := func(tags map[string]string) ([]streamID, string) {
st := GetStreamTags()
for k, v := range tags {
st.Add(k, v)
}
streamTagsCanonical := st.MarshalCanonical(nil)
PutStreamTags(st)
id := hash128(streamTagsCanonical)
sids := make([]streamID, 0, len(tenantIDs))
for _, tenantID := range tenantIDs {
sid := streamID{
tenantID: tenantID,
id: id,
}
sids = append(sids, sid)
}
return sids, string(streamTagsCanonical)
}
// Create indexdb entries
const jobsCount = 7
const instancesCount = 5
for i := 0; i < jobsCount; i++ {
for j := 0; j < instancesCount; j++ {
sids, streamTagsCanonical := getStreamIDForTags(map[string]string{
"job": fmt.Sprintf("job-%d", i),
"instance": fmt.Sprintf("instance-%d", j),
})
for _, sid := range sids {
idb.mustRegisterStream(&sid, streamTagsCanonical)
}
}
}
idb.debugFlush()
f := func(expectedTenantIDs []string) {
t.Helper()
tenantIDs := idb.searchTenants()
slices.Sort(tenantIDs)
slices.Sort(expectedTenantIDs)
if !reflect.DeepEqual(tenantIDs, expectedTenantIDs) {
fs.MustRemoveAll(path)
t.Fatalf("unexpected tensntIds; got %v; want %v", tenantIDs, expectedTenantIDs)
}
}
expectedTenantIDs := []string{"1:1", "123:567", "0:0", "0:1", "1:0"}
f(expectedTenantIDs)
mustCloseIndexdb(idb)
fs.MustRemoveAll(path)
closeTestStorage(s)
}

View File

@@ -2,6 +2,7 @@ package logstorage
import (
"context"
"encoding/json"
"fmt"
"math"
"slices"
@@ -488,6 +489,89 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Que
return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit)
}
// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
return s.getTenantIDs(ctx, start, end)
}
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
workersCount := cgroup.AvailableCPUs()
stopCh := ctx.Done()
tenantIDs := make([][]string, workersCount)
processPartitions := func(pt *partition, workerID uint) {
tenants := pt.idb.searchTenants()
tenantIDs[workerID] = append(tenantIDs[workerID], tenants...)
}
// Spin up workers
var wgWorkers sync.WaitGroup
workCh := make(chan *partition, workersCount)
wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
for pt := range workCh {
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
continue
}
processPartitions(pt, workerID)
}
wgWorkers.Done()
}(uint(i))
}
// Select partitions according to the selected time range
s.partitionsLock.Lock()
ptws := s.partitions
minDay := start / nsecsPerDay
n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay
})
ptws = ptws[n:]
maxDay := end / nsecsPerDay
n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay
})
ptws = ptws[:n]
// Copy the selected partitions, so they don't interfere with s.partitions.
ptws = append([]*partitionWrapper{}, ptws...)
for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()
// Schedule concurrent search across matching partitions.
for _, ptw := range ptws {
workCh <- ptw.pt
}
// Wait until workers finish their work
close(workCh)
wgWorkers.Wait()
// Decrement references to partitions
for _, ptw := range ptws {
ptw.decRef()
}
m := make(map[string]struct{})
for _, tids := range tenantIDs {
for _, tid := range tids {
m[tid] = struct{}{}
}
}
tenants := make([]string, 0, len(m))
for k := range m {
tenants = append(tenants, k)
}
return json.Marshal(tenants)
}
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex