mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-18 01:06:33 +03:00
Compare commits
17 Commits
weakpointe
...
get-vllogs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e85e65d476 | ||
|
|
a2cef2a05f | ||
|
|
1257bb6427 | ||
|
|
c84a4b50ad | ||
|
|
ce4b03d023 | ||
|
|
142f45f968 | ||
|
|
82d5b4659d | ||
|
|
1689860030 | ||
|
|
77ac142183 | ||
|
|
b4e114efe7 | ||
|
|
629a943683 | ||
|
|
64e2c017ff | ||
|
|
2808f90492 | ||
|
|
2a553096bb | ||
|
|
f9568661ea | ||
|
|
64b48d0abf | ||
|
|
2c96f3512d |
@@ -3,6 +3,7 @@ package internalselect
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -19,6 +20,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
)
|
||||
|
||||
// RequestHandler processes requests to /internal/select/*
|
||||
@@ -49,6 +51,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
|
||||
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
|
||||
"/internal/select/streams": processStreamsRequest,
|
||||
"/internal/select/stream_ids": processStreamIDsRequest,
|
||||
"/internal/select/tenant_ids": processTenantIDsRequest,
|
||||
}
|
||||
|
||||
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
@@ -242,6 +245,30 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
|
||||
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
|
||||
}
|
||||
|
||||
func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
start, okStart, err := getTimeNsec(r, "start")
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse start timestamp: %w", err)
|
||||
}
|
||||
end, okEnd, err := getTimeNsec(r, "end")
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse end timestamp: %w", err)
|
||||
}
|
||||
if !okStart {
|
||||
start = math.MinInt64
|
||||
}
|
||||
if !okEnd {
|
||||
end = math.MaxInt64
|
||||
}
|
||||
|
||||
tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
|
||||
}
|
||||
|
||||
return writeTenantIDs(w, tenantIDs, false)
|
||||
}
|
||||
|
||||
type commonParams struct {
|
||||
TenantIDs []logstorage.TenantID
|
||||
Query *logstorage.Query
|
||||
@@ -306,6 +333,17 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeTenantIDs(w http.ResponseWriter, tenantIDs []byte, disableCompression bool) error {
|
||||
if !disableCompression {
|
||||
tenantIDs = zstd.CompressLevel(nil, tenantIDs, 1)
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if _, err := w.Write(tenantIDs); err != nil {
|
||||
return fmt.Errorf("cannot send response to the client: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
|
||||
s := r.FormValue(argName)
|
||||
n, err := strconv.ParseInt(s, 10, 64)
|
||||
@@ -314,3 +352,16 @@ func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
|
||||
s := r.FormValue(argName)
|
||||
if s == "" {
|
||||
return 0, false, nil
|
||||
}
|
||||
currentTimestamp := time.Now().UnixNano()
|
||||
nsecs, err := timeutil.ParseTimeAt(s, currentTimestamp)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
|
||||
}
|
||||
return nsecs, true, nil
|
||||
}
|
||||
|
||||
@@ -972,6 +972,60 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessAdminTenantsRequest processes /select/admin/tenants request.
|
||||
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
start, okStart, err := getTimeNsec(r, "start")
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
end, okEnd, err := getTimeNsec(r, "end")
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
if !okStart {
|
||||
start = math.MinInt64
|
||||
}
|
||||
if !okEnd {
|
||||
end = math.MaxInt64
|
||||
}
|
||||
|
||||
if start > end {
|
||||
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
|
||||
return
|
||||
}
|
||||
|
||||
sw := &syncWriter{
|
||||
w: w,
|
||||
}
|
||||
|
||||
var bwShards atomicutil.Slice[bufferedWriter]
|
||||
bwShards.Init = func(shard *bufferedWriter) {
|
||||
shard.sw = sw
|
||||
}
|
||||
defer func() {
|
||||
shards := bwShards.All()
|
||||
for _, shard := range shards {
|
||||
shard.FlushIgnoreErrors()
|
||||
}
|
||||
}()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = fmt.Fprintf(w, "{\"status\":\"success\",\"data\":%s}\n", tenants)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type syncWriter struct {
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
|
||||
@@ -10,12 +10,13 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/internalselect"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/logsql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -267,6 +268,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
|
||||
logsql.ProcessStreamsRequest(ctx, w, r)
|
||||
logsqlStreamsDuration.UpdateDuration(startTime)
|
||||
return true
|
||||
case "/select/admin/tenants":
|
||||
logsqlAdminTenantsRequests.Inc()
|
||||
logsql.ProcessAdminTenantsRequest(ctx, w, r)
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
@@ -321,4 +326,6 @@ var (
|
||||
|
||||
// no need to track duration for tail requests, as they usually take long time
|
||||
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
|
||||
|
||||
logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/admin/tenants"}`)
|
||||
)
|
||||
|
||||
@@ -350,6 +350,14 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
|
||||
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
|
||||
}
|
||||
|
||||
// GetTenantIDs returns tenantIDs from the storage by the given start and end.
|
||||
func GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
if localStorage != nil {
|
||||
return localStorage.GetTenantIDs(ctx, start, end)
|
||||
}
|
||||
return netstorageSelect.GetTenantIDs(ctx, start, end)
|
||||
}
|
||||
|
||||
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
|
||||
var ss logstorage.StorageStats
|
||||
strg.UpdateStats(&ss)
|
||||
|
||||
@@ -58,6 +58,11 @@ const (
|
||||
//
|
||||
// It must be updated every time the protocol changes.
|
||||
QueryProtocolVersion = "v1"
|
||||
|
||||
// TenantIDsProtocolVersion is the version of the protocol used for /internal/select/tenant_ids HTTP endpoint.
|
||||
//
|
||||
// It must be updated every time the protocol changes.
|
||||
TenantIDsProtocolVersion = "v1"
|
||||
)
|
||||
|
||||
// Storage is a network storage for querying remote storage nodes in the cluster.
|
||||
@@ -224,6 +229,13 @@ func (sn *storageNode) getStreamIDs(ctx context.Context, tenantIDs []logstorage.
|
||||
return sn.getValuesWithHits(ctx, "/internal/select/stream_ids", args)
|
||||
}
|
||||
|
||||
func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
args := url.Values{}
|
||||
args.Set("start", fmt.Sprintf("%d", start))
|
||||
args.Set("end", fmt.Sprintf("%d", end))
|
||||
return sn.executeRequestAt(ctx, "/internal/select/tenant_ids", args)
|
||||
}
|
||||
|
||||
func (sn *storageNode) getCommonArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values {
|
||||
args := url.Values{}
|
||||
args.Set("version", version)
|
||||
@@ -406,6 +418,48 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []logstorage.Tenan
|
||||
})
|
||||
}
|
||||
|
||||
// GetTenantIDs returns tenantIDs for the given start and end.
|
||||
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
return s.getTenantIDs(ctx, start, end)
|
||||
}
|
||||
|
||||
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
results := make([][]byte, len(s.sns))
|
||||
errs := make([]error, len(s.sns))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := range s.sns {
|
||||
wg.Add(1)
|
||||
go func(nodeIdx int) {
|
||||
defer wg.Done()
|
||||
|
||||
sn := s.sns[nodeIdx]
|
||||
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
|
||||
results[nodeIdx] = tenantIDs
|
||||
errs[nodeIdx] = err
|
||||
|
||||
if err != nil {
|
||||
// Cancel the remaining parallel requests
|
||||
cancel()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if err := getFirstNonCancelError(errs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tenantIDs []byte
|
||||
for i := range results {
|
||||
tenantIDs = append(tenantIDs, results[i]...)
|
||||
}
|
||||
|
||||
return tenantIDs, nil
|
||||
}
|
||||
|
||||
func (s *Storage) getValuesWithHits(ctx context.Context, limit uint64, resetHitsOnLimitExceeded bool,
|
||||
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {
|
||||
|
||||
|
||||
@@ -437,6 +437,53 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
|
||||
return ids
|
||||
}
|
||||
|
||||
func (is *indexSearch) getTenantIDs() []string {
|
||||
tenants := make(map[string]struct{})
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
|
||||
tID := TenantID{0, 0}
|
||||
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
|
||||
ts.Seek(kb.B)
|
||||
|
||||
for ts.NextItem() {
|
||||
_, prefix, err := unmarshalCommonPrefix(&tID, ts.Item)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
|
||||
}
|
||||
if prefix != nsPrefixStreamID {
|
||||
// Reached the end of enteris with the needed prefix.
|
||||
break
|
||||
}
|
||||
tenant := fmt.Sprintf("%d:%d", tID.AccountID, tID.ProjectID)
|
||||
tenants[tenant] = struct{}{}
|
||||
// Seek for the next (accountID, projectID)
|
||||
tID.ProjectID++
|
||||
if tID.ProjectID == 0 {
|
||||
tID.AccountID++
|
||||
if tID.AccountID == 0 {
|
||||
// Reached the end (accountID, projectID) space
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
|
||||
ts.Seek(kb.B)
|
||||
}
|
||||
|
||||
if err := ts.Error(); err != nil {
|
||||
logger.Panicf("FATAL: error when performing search: %s", err)
|
||||
}
|
||||
|
||||
tenantIDs := make([]string, 0)
|
||||
for tenantID := range tenants {
|
||||
tenantIDs = append(tenantIDs, tenantID)
|
||||
}
|
||||
|
||||
return tenantIDs
|
||||
}
|
||||
|
||||
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
|
||||
st := GetStreamTags()
|
||||
mustUnmarshalStreamTags(st, streamTagsCanonical)
|
||||
@@ -542,6 +589,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
func (idb *indexdb) searchTenants() []string {
|
||||
is := idb.getIndexSearch()
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
return is.getTenantIDs()
|
||||
}
|
||||
|
||||
type batchItems struct {
|
||||
buf []byte
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package logstorage
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
@@ -112,7 +113,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
|
||||
// non-existing-tag-re
|
||||
f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil)
|
||||
|
||||
//non-existing-non-empty-tag-re
|
||||
// non-existing-non-empty-tag-re
|
||||
f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil)
|
||||
|
||||
// match-job-instance
|
||||
@@ -252,3 +253,78 @@ func TestStorageSearchStreamIDs(t *testing.T) {
|
||||
|
||||
closeTestStorage(s)
|
||||
}
|
||||
|
||||
func TestGetTenantsIds(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
path := t.Name()
|
||||
const partitionName = "foobar"
|
||||
s := newTestStorage()
|
||||
mustCreateIndexdb(path)
|
||||
idb := mustOpenIndexdb(path, partitionName, s)
|
||||
|
||||
tenantIDs := []TenantID{
|
||||
{AccountID: 0, ProjectID: 0},
|
||||
{AccountID: 0, ProjectID: 1},
|
||||
{AccountID: 1, ProjectID: 0},
|
||||
{AccountID: 1, ProjectID: 1},
|
||||
{AccountID: 123, ProjectID: 567},
|
||||
}
|
||||
getStreamIDForTags := func(tags map[string]string) ([]streamID, string) {
|
||||
st := GetStreamTags()
|
||||
for k, v := range tags {
|
||||
st.Add(k, v)
|
||||
}
|
||||
streamTagsCanonical := st.MarshalCanonical(nil)
|
||||
PutStreamTags(st)
|
||||
id := hash128(streamTagsCanonical)
|
||||
sids := make([]streamID, 0, len(tenantIDs))
|
||||
for _, tenantID := range tenantIDs {
|
||||
sid := streamID{
|
||||
tenantID: tenantID,
|
||||
id: id,
|
||||
}
|
||||
|
||||
sids = append(sids, sid)
|
||||
}
|
||||
|
||||
return sids, string(streamTagsCanonical)
|
||||
}
|
||||
|
||||
// Create indexdb entries
|
||||
const jobsCount = 7
|
||||
const instancesCount = 5
|
||||
for i := 0; i < jobsCount; i++ {
|
||||
for j := 0; j < instancesCount; j++ {
|
||||
sids, streamTagsCanonical := getStreamIDForTags(map[string]string{
|
||||
"job": fmt.Sprintf("job-%d", i),
|
||||
"instance": fmt.Sprintf("instance-%d", j),
|
||||
})
|
||||
for _, sid := range sids {
|
||||
idb.mustRegisterStream(&sid, streamTagsCanonical)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
idb.debugFlush()
|
||||
|
||||
f := func(expectedTenantIDs []string) {
|
||||
t.Helper()
|
||||
tenantIDs := idb.searchTenants()
|
||||
slices.Sort(tenantIDs)
|
||||
slices.Sort(expectedTenantIDs)
|
||||
if !reflect.DeepEqual(tenantIDs, expectedTenantIDs) {
|
||||
fs.MustRemoveAll(path)
|
||||
t.Fatalf("unexpected tensntIds; got %v; want %v", tenantIDs, expectedTenantIDs)
|
||||
}
|
||||
}
|
||||
|
||||
expectedTenantIDs := []string{"1:1", "123:567", "0:0", "0:1", "1:0"}
|
||||
|
||||
f(expectedTenantIDs)
|
||||
|
||||
mustCloseIndexdb(idb)
|
||||
fs.MustRemoveAll(path)
|
||||
|
||||
closeTestStorage(s)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package logstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
@@ -488,6 +489,89 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Que
|
||||
return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit)
|
||||
}
|
||||
|
||||
// GetTenantIDs returns tenantIDs for the given start and end.
|
||||
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
return s.getTenantIDs(ctx, start, end)
|
||||
}
|
||||
|
||||
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
stopCh := ctx.Done()
|
||||
|
||||
tenantIDs := make([][]string, workersCount)
|
||||
processPartitions := func(pt *partition, workerID uint) {
|
||||
tenants := pt.idb.searchTenants()
|
||||
tenantIDs[workerID] = append(tenantIDs[workerID], tenants...)
|
||||
}
|
||||
|
||||
// Spin up workers
|
||||
var wgWorkers sync.WaitGroup
|
||||
workCh := make(chan *partition, workersCount)
|
||||
wgWorkers.Add(workersCount)
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
for pt := range workCh {
|
||||
if needStop(stopCh) {
|
||||
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
|
||||
continue
|
||||
}
|
||||
processPartitions(pt, workerID)
|
||||
}
|
||||
wgWorkers.Done()
|
||||
}(uint(i))
|
||||
}
|
||||
|
||||
// Select partitions according to the selected time range
|
||||
s.partitionsLock.Lock()
|
||||
ptws := s.partitions
|
||||
minDay := start / nsecsPerDay
|
||||
n := sort.Search(len(ptws), func(i int) bool {
|
||||
return ptws[i].day >= minDay
|
||||
})
|
||||
ptws = ptws[n:]
|
||||
maxDay := end / nsecsPerDay
|
||||
n = sort.Search(len(ptws), func(i int) bool {
|
||||
return ptws[i].day > maxDay
|
||||
})
|
||||
ptws = ptws[:n]
|
||||
|
||||
// Copy the selected partitions, so they don't interfere with s.partitions.
|
||||
ptws = append([]*partitionWrapper{}, ptws...)
|
||||
|
||||
for _, ptw := range ptws {
|
||||
ptw.incRef()
|
||||
}
|
||||
s.partitionsLock.Unlock()
|
||||
|
||||
// Schedule concurrent search across matching partitions.
|
||||
for _, ptw := range ptws {
|
||||
workCh <- ptw.pt
|
||||
}
|
||||
|
||||
// Wait until workers finish their work
|
||||
close(workCh)
|
||||
wgWorkers.Wait()
|
||||
|
||||
// Decrement references to partitions
|
||||
for _, ptw := range ptws {
|
||||
ptw.decRef()
|
||||
}
|
||||
|
||||
m := make(map[string]struct{})
|
||||
for _, tids := range tenantIDs {
|
||||
for _, tid := range tids {
|
||||
m[tid] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
tenants := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
tenants = append(tenants, k)
|
||||
}
|
||||
|
||||
return json.Marshal(tenants)
|
||||
}
|
||||
|
||||
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
|
||||
var results []ValueWithHits
|
||||
var resultsLock sync.Mutex
|
||||
|
||||
Reference in New Issue
Block a user