mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
1 Commits
v1.122.22
...
add-tests-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e5527ae57 |
76
apptest/tests/sparse_cache_test.go
Normal file
76
apptest/tests/sparse_cache_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
||||
)
|
||||
|
||||
func TestStorageUsesSparseCacheForFinalMerge(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
sut := tc.MustStartVmsingle("sparse-cache-final-merge", []string{`-retentionPeriod=100y`, `-downsampling.period={__name__=~"metric.*"}:5m:1s`})
|
||||
|
||||
// insert metrics daily over past 2 months
|
||||
const metricsPerStep = 10
|
||||
const steps = 35
|
||||
const stepSize = int64(1 * 24 * 60 * 60 * 1000)
|
||||
records := make([]string, metricsPerStep)
|
||||
ts := time.Now().Add(-2 * 30 * 24 * time.Hour).UnixMilli()
|
||||
for range steps {
|
||||
for i := range metricsPerStep {
|
||||
name := fmt.Sprintf("metric_%d", i)
|
||||
records[i] = fmt.Sprintf("%s %d %d", name, rand.IntN(1000), ts)
|
||||
}
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{})
|
||||
ts += stepSize
|
||||
}
|
||||
sut.ForceFlush(t)
|
||||
sut.ForceMerge(t)
|
||||
|
||||
// todo: replace with a more reliable way to check if the merge is completed
|
||||
// wait for merge to be completed
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
v := sut.GetIntMetric(t, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`)
|
||||
if v <= 0 {
|
||||
t.Fatalf(`unexpected vm_cache_requests_total{type="indexdb/dataBlocksSparse"} value: %d`, v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageDoesNotUseSparseCacheForRegularMerge(t *testing.T) {
|
||||
tc := apptest.NewTestCase(t)
|
||||
defer tc.Stop()
|
||||
|
||||
sut := tc.MustStartVmsingle("sparse-cache-regular-merge", []string{`-retentionPeriod=100y`, `-downsampling.period={__name__=~"metric.*"}:5m:1s`})
|
||||
|
||||
// insert metrics into current month only
|
||||
const metricsPerStep = 10
|
||||
const steps = 2
|
||||
const stepSize = 60 * 1000
|
||||
records := make([]string, metricsPerStep)
|
||||
ts := time.Now().Add(-1 * time.Hour).UnixMilli()
|
||||
for range steps {
|
||||
for i := range metricsPerStep {
|
||||
name := fmt.Sprintf("metric_%d", i)
|
||||
records[i] = fmt.Sprintf("%s %d %d", name, rand.IntN(1000), ts)
|
||||
}
|
||||
sut.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{})
|
||||
ts += stepSize
|
||||
}
|
||||
sut.ForceFlush(t)
|
||||
sut.ForceMerge(t)
|
||||
|
||||
// todo: replace with a more reliable way to check if the merge is completed
|
||||
// wait for merge to be completed
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
v := sut.GetIntMetric(t, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`)
|
||||
if v == 0 {
|
||||
t.Fatalf(`unexpected vm_cache_requests_total{type="indexdb/dataBlocksSparse"} value: %d`, v)
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ type Vmsingle struct {
|
||||
|
||||
// vmstorage URLs.
|
||||
forceFlushURL string
|
||||
forceMergeURL string
|
||||
|
||||
// vminsert URLs.
|
||||
influxLineWriteURL string
|
||||
@@ -65,6 +66,7 @@ func StartVmsingle(instance string, flags []string, cli *Client) (*Vmsingle, err
|
||||
httpListenAddr: stderrExtracts[1],
|
||||
|
||||
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
|
||||
forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]),
|
||||
influxLineWriteURL: fmt.Sprintf("http://%s/influx/write", stderrExtracts[1]),
|
||||
prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]),
|
||||
prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]),
|
||||
@@ -83,6 +85,13 @@ func (app *Vmsingle) ForceFlush(t *testing.T) {
|
||||
app.cli.Get(t, app.forceFlushURL, http.StatusOK)
|
||||
}
|
||||
|
||||
// ForceMerge is a test helper function that forces the merging of parts.
|
||||
func (app *Vmsingle) ForceMerge(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
app.cli.Get(t, app.forceMergeURL, http.StatusOK)
|
||||
}
|
||||
|
||||
// InfluxWrite is a test helper function that inserts a
|
||||
// collection of records in Influx line format by sending a HTTP
|
||||
// POST request to /influx/write vmsingle endpoint.
|
||||
|
||||
@@ -21,6 +21,7 @@ type Vmstorage struct {
|
||||
vmselectAddr string
|
||||
|
||||
forceFlushURL string
|
||||
forceMergeURL string
|
||||
}
|
||||
|
||||
// StartVmstorage starts an instance of vmstorage with the given flags. It also
|
||||
@@ -57,6 +58,7 @@ func StartVmstorage(instance string, flags []string, cli *Client) (*Vmstorage, e
|
||||
vmselectAddr: stderrExtracts[3],
|
||||
|
||||
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
|
||||
forceMergeURL: fmt.Sprintf("http://%s/internal/force_merge", stderrExtracts[1]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -80,6 +82,12 @@ func (app *Vmstorage) ForceFlush(t *testing.T) {
|
||||
app.cli.Get(t, app.forceFlushURL, http.StatusOK)
|
||||
}
|
||||
|
||||
// ForceMerge is a test helper function that forces the merging of parts.
|
||||
func (app *Vmstorage) ForceMerge(t *testing.T) {
|
||||
t.Helper()
|
||||
app.cli.Get(t, app.forceMergeURL, http.StatusOK)
|
||||
}
|
||||
|
||||
// String returns the string representation of the vmstorage app state.
|
||||
func (app *Vmstorage) String() string {
|
||||
return fmt.Sprintf("{app: %s storageDataPath: %q httpListenAddr: %q vminsertAddr: %q vmselectAddr: %q}", []any{
|
||||
|
||||
Reference in New Issue
Block a user