2024-10-30 15:22:06 +01:00
|
|
|
package apptest
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
2025-08-01 16:21:14 +04:00
|
|
|
"io"
|
2024-10-30 15:22:06 +01:00
|
|
|
"regexp"
|
|
|
|
|
"strings"
|
|
|
|
|
"testing"
|
2024-11-20 16:30:55 +01:00
|
|
|
"time"
|
2024-10-30 15:22:06 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Vminsert holds the state of a vminsert app and provides vminsert-specific
|
|
|
|
|
// functions.
|
|
|
|
|
type Vminsert struct {
|
|
|
|
|
*app
|
2026-05-12 16:24:01 +02:00
|
|
|
*metricsClient
|
|
|
|
|
*vminsertClient
|
2024-10-30 15:22:06 +01:00
|
|
|
|
2024-12-13 11:59:03 +01:00
|
|
|
httpListenAddr string
|
|
|
|
|
clusternativeListenAddr string
|
2024-10-30 15:22:06 +01:00
|
|
|
}
|
|
|
|
|
|
2024-12-03 12:25:53 +01:00
|
|
|
// storageNodes returns the storage node addresses passed to vminsert via
|
|
|
|
|
// -storageNode command line flag.
|
|
|
|
|
func storageNodes(flags []string) []string {
|
|
|
|
|
for _, flag := range flags {
|
|
|
|
|
if storageNodes, found := strings.CutPrefix(flag, "-storageNode="); found {
|
|
|
|
|
return strings.Split(storageNodes, ",")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-10-30 15:22:06 +01:00
|
|
|
// StartVminsert starts an instance of vminsert with the given flags. It also
|
|
|
|
|
// sets the default flags and populates the app instance state with runtime
|
|
|
|
|
// values extracted from the application log (such as httpListenAddr)
|
2025-08-01 16:21:14 +04:00
|
|
|
func StartVminsert(instance string, flags []string, cli *Client, output io.Writer) (*Vminsert, error) {
|
2024-12-03 12:25:53 +01:00
|
|
|
extractREs := []*regexp.Regexp{
|
|
|
|
|
httpListenAddrRE,
|
2024-12-13 11:59:03 +01:00
|
|
|
vminsertClusterNativeAddrRE,
|
2025-05-05 11:58:45 +01:00
|
|
|
graphiteListenAddrRE,
|
|
|
|
|
openTSDBListenAddrRE,
|
2024-12-03 12:25:53 +01:00
|
|
|
}
|
2025-08-18 22:46:34 +02:00
|
|
|
// Add storageNode REs to block until vminsert establishes connections with
|
2024-12-03 12:25:53 +01:00
|
|
|
// all storage nodes. The extracted values are unused.
|
|
|
|
|
for _, sn := range storageNodes(flags) {
|
|
|
|
|
logRecord := fmt.Sprintf("successfully dialed -storageNode=\"%s\"", sn)
|
|
|
|
|
extractREs = append(extractREs, regexp.MustCompile(logRecord))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-14 07:18:09 +02:00
|
|
|
app, stderrExtracts, err := startApp(instance, "../../bin/vminsert-race", flags, &appOptions{
|
2024-10-30 15:22:06 +01:00
|
|
|
defaultFlags: map[string]string{
|
2025-11-14 09:21:08 +01:00
|
|
|
"-httpListenAddr": "127.0.0.1:0",
|
|
|
|
|
"-clusternativeListenAddr": "127.0.0.1:0",
|
|
|
|
|
"-graphiteListenAddr": ":0",
|
|
|
|
|
"-opentsdbListenAddr": "127.0.0.1:0",
|
|
|
|
|
"-clusternative.vminsertConnsShutdownDuration": "1ms",
|
2024-10-30 15:22:06 +01:00
|
|
|
},
|
2024-12-03 12:25:53 +01:00
|
|
|
extractREs: extractREs,
|
2025-08-01 16:21:14 +04:00
|
|
|
output: output,
|
2024-10-30 15:22:06 +01:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-12 16:24:01 +02:00
|
|
|
metricsClient := newMetricsClient(cli, stderrExtracts[0])
|
2024-10-30 15:22:06 +01:00
|
|
|
return &Vminsert{
|
2026-05-12 16:24:01 +02:00
|
|
|
app: app,
|
|
|
|
|
metricsClient: metricsClient,
|
|
|
|
|
vminsertClient: &vminsertClient{
|
|
|
|
|
vminsertCli: cli,
|
|
|
|
|
url: func(op, path string, opts QueryOpts) string {
|
|
|
|
|
return getClusterPath(stderrExtracts[0], op, path, opts)
|
|
|
|
|
},
|
|
|
|
|
openTSDBURL: func(op, path string, opts QueryOpts) string {
|
|
|
|
|
return getClusterPath(stderrExtracts[3], op, path, opts)
|
|
|
|
|
},
|
|
|
|
|
graphiteListenAddr: stderrExtracts[2],
|
|
|
|
|
sendBlocking: func(t *testing.T, numRecordsToSend int, send func()) {
|
|
|
|
|
t.Helper()
|
|
|
|
|
sendBlocking(t, metricsClient, numRecordsToSend, send)
|
|
|
|
|
},
|
2024-10-30 15:22:06 +01:00
|
|
|
},
|
2024-12-13 11:59:03 +01:00
|
|
|
httpListenAddr: stderrExtracts[0],
|
|
|
|
|
clusternativeListenAddr: stderrExtracts[1],
|
2024-10-30 15:22:06 +01:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-12-13 11:59:03 +01:00
|
|
|
// ClusternativeListenAddr returns the address at which the vminsert process is
|
|
|
|
|
// listening for connections from other vminsert apps.
|
|
|
|
|
func (app *Vminsert) ClusternativeListenAddr() string {
|
|
|
|
|
return app.clusternativeListenAddr
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-04 16:40:26 +02:00
|
|
|
// HTTPAddr returns the address at which the vminsert process is
|
|
|
|
|
// listening for incoming HTTP requests.
|
|
|
|
|
func (app *Vminsert) HTTPAddr() string {
|
|
|
|
|
return app.httpListenAddr
|
|
|
|
|
}
|
|
|
|
|
|
2024-10-30 15:22:06 +01:00
|
|
|
// String returns the string representation of the vminsert app state.
|
|
|
|
|
func (app *Vminsert) String() string {
|
|
|
|
|
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
|
|
|
|
|
}
|
2024-11-20 16:30:55 +01:00
|
|
|
|
2024-11-21 19:39:17 +01:00
|
|
|
// sendBlocking sends the data to vmstorage by executing `send` function and
|
|
|
|
|
// waits until the data is actually sent.
|
|
|
|
|
//
|
|
|
|
|
// vminsert does not send the data immediately. It first puts the data into a
|
|
|
|
|
// buffer. Then a background goroutine takes the data from the buffer sends it
|
|
|
|
|
// to the vmstorage. This happens every 200ms.
|
2024-11-20 16:30:55 +01:00
|
|
|
//
|
|
|
|
|
// Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total`
|
|
|
|
|
// metric and checking whether it is equal or greater than the wanted value.
|
|
|
|
|
// If it is, then the data has been sent to vmstorage.
|
2026-05-12 16:24:01 +02:00
|
|
|
func sendBlocking(t *testing.T, c *metricsClient, numRecordsToSend int, send func()) {
|
2024-11-20 16:30:55 +01:00
|
|
|
t.Helper()
|
|
|
|
|
|
2026-05-12 16:24:01 +02:00
|
|
|
wantRowsSentCount := c.rpcRowsSentTotal(t) + numRecordsToSend
|
2026-03-02 10:41:35 +01:00
|
|
|
|
2024-11-21 19:39:17 +01:00
|
|
|
send()
|
|
|
|
|
|
2024-11-20 16:30:55 +01:00
|
|
|
const (
|
|
|
|
|
retries = 20
|
|
|
|
|
period = 100 * time.Millisecond
|
|
|
|
|
)
|
|
|
|
|
for range retries {
|
2026-05-12 16:24:01 +02:00
|
|
|
d := c.rpcRowsSentTotal(t)
|
2025-11-14 09:21:08 +01:00
|
|
|
if d >= wantRowsSentCount {
|
2024-11-20 16:30:55 +01:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
time.Sleep(period)
|
|
|
|
|
}
|
|
|
|
|
t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage")
|
|
|
|
|
}
|