mirror of
https://github.com/vrtmrz/obsidian-livesync.git
synced 2026-05-22 10:56:13 +03:00
Compare commits
8 Commits
main
...
cli_test_d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ab80190d6 | ||
|
|
8948bf2803 | ||
|
|
486fd15c60 | ||
|
|
5fd85c71ca | ||
|
|
c1f41910c4 | ||
|
|
3693d6a6b6 | ||
|
|
cc3c992b1d | ||
|
|
df390ac456 |
15
.github/workflows/cli-deno-tests.yml
vendored
15
.github/workflows/cli-deno-tests.yml
vendored
@@ -32,13 +32,13 @@ jobs:
|
||||
|
||||
case "$SELECTED_TASK" in
|
||||
test)
|
||||
TASK_MATRIX='["test:setup-put-cat","test:mirror","test:push-pull","test:sync-two-local","test:sync-locked-remote","test:p2p-host","test:p2p-peers","test:p2p-sync","test:p2p-three-nodes","test:p2p-upload-download","test:e2e-couchdb","test:e2e-matrix"]'
|
||||
TASK_MATRIX='["test:setup-put-cat","test:mirror","test:push-pull","test:sync-two-local","test:sync-locked-remote","test:p2p-host","test:p2p-peers","test:p2p-sync","test:p2p-three-nodes","test:p2p-upload-download","test:e2e-matrix:couchdb-enc0","test:e2e-matrix:couchdb-enc1","test:e2e-matrix:minio-enc0","test:e2e-matrix:minio-enc1"]'
|
||||
;;
|
||||
test:local)
|
||||
TASK_MATRIX='["test:setup-put-cat","test:mirror"]'
|
||||
;;
|
||||
test:e2e-matrix)
|
||||
TASK_MATRIX='["test:e2e-matrix"]'
|
||||
TASK_MATRIX='["test:e2e-matrix:couchdb-enc0","test:e2e-matrix:couchdb-enc1","test:e2e-matrix:minio-enc0","test:e2e-matrix:minio-enc1"]'
|
||||
;;
|
||||
test:p2p-sync)
|
||||
TASK_MATRIX='["test:p2p-sync"]'
|
||||
@@ -55,6 +55,8 @@ jobs:
|
||||
needs: prepare
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
env:
|
||||
DENO_DIR: ~/.cache/deno
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -70,12 +72,21 @@ jobs:
|
||||
with:
|
||||
node-version: '24.x'
|
||||
cache: 'npm'
|
||||
cache-dependency-path: package-lock.json
|
||||
|
||||
- name: Setup Deno
|
||||
uses: denoland/setup-deno@v2
|
||||
with:
|
||||
deno-version: v2.x
|
||||
|
||||
- name: Cache Deno dependencies
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/deno
|
||||
key: ${{ runner.os }}-deno-${{ hashFiles('src/apps/cli/testdeno/deno.lock', 'src/apps/cli/testdeno/deno.json') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-deno-
|
||||
|
||||
- name: Install dependencies
|
||||
run: npm ci
|
||||
|
||||
|
||||
3
src/apps/cli/.gitignore
vendored
3
src/apps/cli/.gitignore
vendored
@@ -5,4 +5,5 @@ test/test-init.local.sh
|
||||
node_modules
|
||||
.*.json
|
||||
*.env
|
||||
!.test.env
|
||||
!.test.env
|
||||
bench-results
|
||||
312
src/apps/cli/testdeno/bench-couchdb.ts
Normal file
312
src/apps/cli/testdeno/bench-couchdb.ts
Normal file
@@ -0,0 +1,312 @@
|
||||
import { TempDir } from "./helpers/temp.ts";
|
||||
import { applyRemoteSyncSettings, initSettingsFile } from "./helpers/settings.ts";
|
||||
import { assertFilesEqual, runCliOrFail } from "./helpers/cli.ts";
|
||||
import { startCouchdb, stopCouchdb } from "./helpers/docker.ts";
|
||||
import { createDeterministicDataset, type DatasetEntry } from "./helpers/dataset.ts";
|
||||
|
||||
type BenchmarkConfig = {
|
||||
couchdbBackendUri: string;
|
||||
couchdbProxyUri: string;
|
||||
couchdbUser: string;
|
||||
couchdbPassword: string;
|
||||
couchdbDbname: string;
|
||||
datasetDirName: string;
|
||||
datasetSeed: string;
|
||||
mdFileCount: number;
|
||||
mdMinSizeBytes: number;
|
||||
mdMaxSizeBytes: number;
|
||||
binFileCount: number;
|
||||
binSizeBytes: number;
|
||||
syncTimeoutSeconds: number;
|
||||
requestedRttMs: number;
|
||||
passphrase: string;
|
||||
encrypt: boolean;
|
||||
};
|
||||
|
||||
function readEnvString(name: string, fallback: string): string {
|
||||
const value = Deno.env.get(name)?.trim();
|
||||
return value && value.length > 0 ? value : fallback;
|
||||
}
|
||||
|
||||
function readEnvNumber(name: string, fallback: number): number {
|
||||
const raw = Deno.env.get(name);
|
||||
if (raw === undefined || raw.trim() === "") {
|
||||
return fallback;
|
||||
}
|
||||
|
||||
const parsed = Number(raw);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new Error(`${name} must be a positive number, got '${raw}'`);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function readEnvBool(name: string, fallback: boolean): boolean {
|
||||
const raw = Deno.env.get(name);
|
||||
if (raw === undefined || raw.trim() === "") {
|
||||
return fallback;
|
||||
}
|
||||
return /^(1|true|yes|on)$/i.test(raw.trim());
|
||||
}
|
||||
|
||||
function nowMs(): number {
|
||||
return performance.now();
|
||||
}
|
||||
|
||||
function formatMs(value: number): string {
|
||||
return `${value.toFixed(1)} ms`;
|
||||
}
|
||||
|
||||
function formatBytes(value: number): string {
|
||||
if (value < 1024) {
|
||||
return `${value} B`;
|
||||
}
|
||||
const kib = value / 1024;
|
||||
if (kib < 1024) {
|
||||
return `${kib.toFixed(1)} KiB`;
|
||||
}
|
||||
return `${(kib / 1024).toFixed(1)} MiB`;
|
||||
}
|
||||
|
||||
function buildConfig(): BenchmarkConfig {
|
||||
return {
|
||||
couchdbBackendUri: readEnvString("BENCH_COUCHDB_BACKEND_URI", "http://127.0.0.1:5989"),
|
||||
couchdbProxyUri: readEnvString("BENCH_COUCHDB_URI", "http://127.0.0.1:15989"),
|
||||
couchdbUser: readEnvString("BENCH_COUCHDB_USER", readEnvString("username", "admin")),
|
||||
couchdbPassword: readEnvString("BENCH_COUCHDB_PASSWORD", readEnvString("password", "password")),
|
||||
couchdbDbname: readEnvString("BENCH_COUCHDB_DBNAME", `bench-couchdb-${Date.now()}`),
|
||||
datasetDirName: readEnvString("BENCH_DATASET_DIR", "bench-dataset"),
|
||||
datasetSeed: readEnvString("BENCH_SEED", "livesync-benchmark-seed"),
|
||||
mdFileCount: Math.floor(readEnvNumber("BENCH_MD_FILE_COUNT", 1500)),
|
||||
mdMinSizeBytes: Math.floor(readEnvNumber("BENCH_MD_MIN_SIZE_BYTES", 1024)),
|
||||
mdMaxSizeBytes: Math.floor(readEnvNumber("BENCH_MD_MAX_SIZE_BYTES", 20 * 1024)),
|
||||
binFileCount: Math.floor(readEnvNumber("BENCH_BIN_FILE_COUNT", 500)),
|
||||
binSizeBytes: Math.floor(readEnvNumber("BENCH_BIN_SIZE_BYTES", 100 * 1024)),
|
||||
syncTimeoutSeconds: readEnvNumber("BENCH_SYNC_TIMEOUT", 240),
|
||||
requestedRttMs: Math.floor(readEnvNumber("BENCH_COUCHDB_RTT_MS", 50)),
|
||||
passphrase: readEnvString("BENCH_PASSPHRASE", `bench-${Date.now()}`),
|
||||
encrypt: readEnvBool("BENCH_ENCRYPT", true),
|
||||
};
|
||||
}
|
||||
|
||||
function readOptionalResultPath(): string | undefined {
|
||||
const raw = Deno.env.get("BENCH_RESULT_JSON")?.trim();
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
return raw;
|
||||
}
|
||||
|
||||
function pickSampleFiles(entries: DatasetEntry[]): DatasetEntry[] {
|
||||
if (entries.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const md = entries.find((e) => e.kind === "md");
|
||||
const bin = entries.find((e) => e.kind === "bin");
|
||||
const middle = entries[Math.floor(entries.length / 2)];
|
||||
const last = entries[entries.length - 1];
|
||||
const unique = new Map<string, DatasetEntry>();
|
||||
for (const entry of [md, bin, middle, last]) {
|
||||
if (entry) {
|
||||
unique.set(entry.relativePath, entry);
|
||||
}
|
||||
}
|
||||
return [...unique.values()];
|
||||
}
|
||||
|
||||
type ProxyHandle = {
|
||||
stop: () => Promise<void>;
|
||||
applied: boolean;
|
||||
note: string;
|
||||
};
|
||||
|
||||
function startCouchdbProxy(options: { backendUri: string; proxyUri: string; requestedRttMs: number }): ProxyHandle {
|
||||
const backend = new URL(options.backendUri);
|
||||
const proxy = new URL(options.proxyUri);
|
||||
const halfDelayMs = Math.max(1, Math.floor(options.requestedRttMs / 2));
|
||||
const controller = new AbortController();
|
||||
|
||||
const listener = Deno.serve(
|
||||
{
|
||||
hostname: proxy.hostname,
|
||||
port: Number(proxy.port),
|
||||
signal: controller.signal,
|
||||
onError(error) {
|
||||
console.error(`[Proxy] ${String(error)}`);
|
||||
return new Response("proxy error", { status: 502 });
|
||||
},
|
||||
},
|
||||
async (request) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, halfDelayMs));
|
||||
|
||||
const targetUrl = new URL(request.url);
|
||||
targetUrl.protocol = backend.protocol;
|
||||
targetUrl.host = backend.host;
|
||||
|
||||
const headers = new Headers(request.headers);
|
||||
headers.delete("host");
|
||||
headers.delete("content-length");
|
||||
|
||||
let requestBody: ArrayBuffer | undefined;
|
||||
if (request.method !== "GET" && request.method !== "HEAD") {
|
||||
try {
|
||||
requestBody = await request.arrayBuffer();
|
||||
} catch {
|
||||
requestBody = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
const upstream = await fetch(targetUrl, {
|
||||
method: request.method,
|
||||
headers,
|
||||
body: requestBody,
|
||||
redirect: "manual",
|
||||
});
|
||||
|
||||
const responseHeaders = new Headers(upstream.headers);
|
||||
responseHeaders.delete("content-length");
|
||||
const responseBody = await upstream.arrayBuffer();
|
||||
|
||||
return new Response(responseBody, {
|
||||
status: upstream.status,
|
||||
statusText: upstream.statusText,
|
||||
headers: responseHeaders,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
applied: true,
|
||||
note: `local reverse proxy on ${proxy.origin} with ${halfDelayMs}ms pre-forward delay`,
|
||||
stop: async () => {
|
||||
controller.abort();
|
||||
await listener.finished.catch(() => {});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const config = buildConfig();
|
||||
const resultPath = readOptionalResultPath();
|
||||
|
||||
await using workDir = await TempDir.create("livesync-cli-couchdb-bench");
|
||||
const vaultA = workDir.join("vault-a");
|
||||
const vaultB = workDir.join("vault-b");
|
||||
const settingsA = workDir.join("settings-a.json");
|
||||
const settingsB = workDir.join("settings-b.json");
|
||||
await Deno.mkdir(vaultA, { recursive: true });
|
||||
await Deno.mkdir(vaultB, { recursive: true });
|
||||
|
||||
await initSettingsFile(settingsA);
|
||||
await initSettingsFile(settingsB);
|
||||
|
||||
await startCouchdb(config.couchdbBackendUri, config.couchdbUser, config.couchdbPassword, config.couchdbDbname);
|
||||
|
||||
const proxy = startCouchdbProxy({
|
||||
backendUri: config.couchdbBackendUri,
|
||||
proxyUri: config.couchdbProxyUri,
|
||||
requestedRttMs: config.requestedRttMs,
|
||||
});
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
applyRemoteSyncSettings(settingsA, {
|
||||
remoteType: "COUCHDB",
|
||||
couchdbUri: config.couchdbProxyUri,
|
||||
couchdbUser: config.couchdbUser,
|
||||
couchdbPassword: config.couchdbPassword,
|
||||
couchdbDbname: config.couchdbDbname,
|
||||
encrypt: config.encrypt,
|
||||
passphrase: config.passphrase,
|
||||
}),
|
||||
applyRemoteSyncSettings(settingsB, {
|
||||
remoteType: "COUCHDB",
|
||||
couchdbUri: config.couchdbProxyUri,
|
||||
couchdbUser: config.couchdbUser,
|
||||
couchdbPassword: config.couchdbPassword,
|
||||
couchdbDbname: config.couchdbDbname,
|
||||
encrypt: config.encrypt,
|
||||
passphrase: config.passphrase,
|
||||
}),
|
||||
]);
|
||||
|
||||
const seedFiles = await createDeterministicDataset({
|
||||
rootDir: vaultA,
|
||||
datasetDirName: config.datasetDirName,
|
||||
seed: config.datasetSeed,
|
||||
mdCount: config.mdFileCount,
|
||||
mdMinSizeBytes: config.mdMinSizeBytes,
|
||||
mdMaxSizeBytes: config.mdMaxSizeBytes,
|
||||
binCount: config.binFileCount,
|
||||
binSizeBytes: config.binSizeBytes,
|
||||
});
|
||||
|
||||
const mirrorStart = nowMs();
|
||||
await runCliOrFail(vaultA, "--settings", settingsA, "mirror");
|
||||
const mirrorElapsed = nowMs() - mirrorStart;
|
||||
|
||||
const syncAStart = nowMs();
|
||||
await runCliOrFail(vaultA, "--settings", settingsA, "sync");
|
||||
const syncAElapsed = nowMs() - syncAStart;
|
||||
|
||||
const syncBStart = nowMs();
|
||||
await runCliOrFail(vaultB, "--settings", settingsB, "sync");
|
||||
const syncBElapsed = nowMs() - syncBStart;
|
||||
|
||||
const sampleFiles = pickSampleFiles(seedFiles.entries);
|
||||
for (const sample of sampleFiles) {
|
||||
const pulledPath = workDir.join(`pulled-${sample.relativePath.split("/").join("_")}`);
|
||||
await runCliOrFail(vaultB, "--settings", settingsB, "pull", sample.relativePath, pulledPath);
|
||||
await assertFilesEqual(
|
||||
sample.absolutePath,
|
||||
pulledPath,
|
||||
`sample file mismatch after CouchDB sync: ${sample.relativePath}`
|
||||
);
|
||||
}
|
||||
|
||||
const result = {
|
||||
mode: "couchdb-cli-benchmark",
|
||||
couchdbBackendUri: config.couchdbBackendUri,
|
||||
couchdbProxyUri: config.couchdbProxyUri,
|
||||
couchdbDbname: config.couchdbDbname,
|
||||
rttRequestedMs: config.requestedRttMs,
|
||||
proxyApplied: proxy.applied,
|
||||
proxyNote: proxy.note,
|
||||
datasetSeed: config.datasetSeed,
|
||||
datasetDirName: config.datasetDirName,
|
||||
totalFiles: seedFiles.totalFiles,
|
||||
totalBytes: seedFiles.totalBytes,
|
||||
mdFileCount: seedFiles.mdCount,
|
||||
binFileCount: seedFiles.binCount,
|
||||
mirrorElapsedMs: Number(mirrorElapsed.toFixed(1)),
|
||||
syncAElapsedMs: Number(syncAElapsed.toFixed(1)),
|
||||
syncBElapsedMs: Number(syncBElapsed.toFixed(1)),
|
||||
totalSyncElapsedMs: Number((syncAElapsed + syncBElapsed).toFixed(1)),
|
||||
throughputBytesPerSec: Number((seedFiles.totalBytes / ((syncAElapsed + syncBElapsed) / 1000)).toFixed(2)),
|
||||
throughputMiBPerSec: Number(
|
||||
(seedFiles.totalBytes / ((syncAElapsed + syncBElapsed) / 1000) / 1024 / 1024).toFixed(4)
|
||||
),
|
||||
};
|
||||
|
||||
if (resultPath) {
|
||||
await Deno.writeTextFile(resultPath, JSON.stringify(result, null, 2));
|
||||
}
|
||||
|
||||
console.log(JSON.stringify(result, null, 2));
|
||||
console.error(
|
||||
`[Benchmark] couchdb mirrored ${seedFiles.totalFiles} files (${formatBytes(seedFiles.totalBytes)}) in ${formatMs(
|
||||
mirrorElapsed
|
||||
)}, synced in ${formatMs(syncAElapsed + syncBElapsed)} (${result.throughputBytesPerSec} B/s, ${result.throughputMiBPerSec} MiB/s)`
|
||||
);
|
||||
} finally {
|
||||
await proxy.stop();
|
||||
await stopCouchdb().catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
if (import.meta.main) {
|
||||
main().catch((error) => {
|
||||
console.error(`[Fatal Error]`, error);
|
||||
Deno.exit(1);
|
||||
});
|
||||
}
|
||||
223
src/apps/cli/testdeno/bench-p2p.ts
Normal file
223
src/apps/cli/testdeno/bench-p2p.ts
Normal file
@@ -0,0 +1,223 @@
|
||||
import { TempDir } from "./helpers/temp.ts";
|
||||
import { applyP2pSettings, applyP2pTestTweaks, initSettingsFile } from "./helpers/settings.ts";
|
||||
import { startCliInBackground } from "./helpers/backgroundCli.ts";
|
||||
import { discoverPeer, maybeStartLocalRelay, stopLocalRelayIfStarted } from "./helpers/p2p.ts";
|
||||
import { assertFilesEqual, runCliOrFail } from "./helpers/cli.ts";
|
||||
import { createDeterministicDataset, type DatasetEntry } from "./helpers/dataset.ts";
|
||||
|
||||
type BenchmarkConfig = {
|
||||
relay: string;
|
||||
appId: string;
|
||||
roomId: string;
|
||||
passphrase: string;
|
||||
datasetDirName: string;
|
||||
datasetSeed: string;
|
||||
mdFileCount: number;
|
||||
mdMinSizeBytes: number;
|
||||
mdMaxSizeBytes: number;
|
||||
binFileCount: number;
|
||||
binSizeBytes: number;
|
||||
peersTimeoutSeconds: number;
|
||||
syncTimeoutSeconds: number;
|
||||
};
|
||||
|
||||
function readEnvString(name: string, fallback: string): string {
|
||||
const value = Deno.env.get(name)?.trim();
|
||||
return value && value.length > 0 ? value : fallback;
|
||||
}
|
||||
|
||||
function readEnvNumber(name: string, fallback: number): number {
|
||||
const raw = Deno.env.get(name);
|
||||
if (raw === undefined || raw.trim() === "") {
|
||||
return fallback;
|
||||
}
|
||||
|
||||
const parsed = Number(raw);
|
||||
if (!Number.isFinite(parsed) || parsed <= 0) {
|
||||
throw new Error(`${name} must be a positive number, got '${raw}'`);
|
||||
}
|
||||
return parsed;
|
||||
}
|
||||
|
||||
function nowMs(): number {
|
||||
return performance.now();
|
||||
}
|
||||
|
||||
function formatMs(value: number): string {
|
||||
return `${value.toFixed(1)} ms`;
|
||||
}
|
||||
|
||||
function formatBytes(value: number): string {
|
||||
if (value < 1024) {
|
||||
return `${value} B`;
|
||||
}
|
||||
const kib = value / 1024;
|
||||
if (kib < 1024) {
|
||||
return `${kib.toFixed(1)} KiB`;
|
||||
}
|
||||
const mib = kib / 1024;
|
||||
return `${mib.toFixed(1)} MiB`;
|
||||
}
|
||||
|
||||
function buildConfig(): BenchmarkConfig {
|
||||
return {
|
||||
relay: readEnvString("BENCH_RELAY", "ws://localhost:4000/"),
|
||||
appId: readEnvString("BENCH_APP_ID", "self-hosted-livesync-cli-benchmark"),
|
||||
roomId: readEnvString("BENCH_ROOM_ID", `bench-room-${Date.now()}`),
|
||||
passphrase: readEnvString("BENCH_PASSPHRASE", `bench-${Date.now()}`),
|
||||
datasetDirName: readEnvString("BENCH_DATASET_DIR", "bench-dataset"),
|
||||
datasetSeed: readEnvString("BENCH_SEED", "livesync-benchmark-seed"),
|
||||
mdFileCount: Math.floor(readEnvNumber("BENCH_MD_FILE_COUNT", 1500)),
|
||||
mdMinSizeBytes: Math.floor(readEnvNumber("BENCH_MD_MIN_SIZE_BYTES", 1024)),
|
||||
mdMaxSizeBytes: Math.floor(readEnvNumber("BENCH_MD_MAX_SIZE_BYTES", 20 * 1024)),
|
||||
binFileCount: Math.floor(readEnvNumber("BENCH_BIN_FILE_COUNT", 500)),
|
||||
binSizeBytes: Math.floor(readEnvNumber("BENCH_BIN_SIZE_BYTES", 100 * 1024)),
|
||||
peersTimeoutSeconds: readEnvNumber("BENCH_PEERS_TIMEOUT", 20),
|
||||
syncTimeoutSeconds: readEnvNumber("BENCH_SYNC_TIMEOUT", 240),
|
||||
};
|
||||
}
|
||||
|
||||
function readOptionalResultPath(): string | undefined {
|
||||
const raw = Deno.env.get("BENCH_RESULT_JSON")?.trim();
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
return raw;
|
||||
}
|
||||
|
||||
function pickSampleFiles(entries: DatasetEntry[]): DatasetEntry[] {
|
||||
if (entries.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const md = entries.find((e) => e.kind === "md");
|
||||
const bin = entries.find((e) => e.kind === "bin");
|
||||
const middle = entries[Math.floor(entries.length / 2)];
|
||||
const last = entries[entries.length - 1];
|
||||
const unique = new Map<string, DatasetEntry>();
|
||||
for (const entry of [md, bin, middle, last]) {
|
||||
if (entry) {
|
||||
unique.set(entry.relativePath, entry);
|
||||
}
|
||||
}
|
||||
return [...unique.values()];
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const config = buildConfig();
|
||||
const resultPath = readOptionalResultPath();
|
||||
|
||||
const relayStarted = await maybeStartLocalRelay(config.relay);
|
||||
await using workDir = await TempDir.create("livesync-cli-p2p-bench");
|
||||
|
||||
const hostVault = workDir.join("vault-host");
|
||||
const clientVault = workDir.join("vault-client");
|
||||
const hostSettings = workDir.join("settings-host.json");
|
||||
const clientSettings = workDir.join("settings-client.json");
|
||||
|
||||
await Promise.all([
|
||||
Deno.mkdir(hostVault, { recursive: true }),
|
||||
Deno.mkdir(clientVault, { recursive: true }),
|
||||
initSettingsFile(hostSettings),
|
||||
initSettingsFile(clientSettings),
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
applyP2pSettings(hostSettings, config.roomId, config.passphrase, config.appId, config.relay, "~.*"),
|
||||
applyP2pSettings(clientSettings, config.roomId, config.passphrase, config.appId, config.relay, "~.*"),
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
applyP2pTestTweaks(hostSettings, "p2p-bench-host", config.passphrase),
|
||||
applyP2pTestTweaks(clientSettings, "p2p-bench-client", config.passphrase),
|
||||
]);
|
||||
|
||||
const seedFiles = await createDeterministicDataset({
|
||||
rootDir: hostVault,
|
||||
datasetDirName: config.datasetDirName,
|
||||
seed: config.datasetSeed,
|
||||
mdCount: config.mdFileCount,
|
||||
mdMinSizeBytes: config.mdMinSizeBytes,
|
||||
mdMaxSizeBytes: config.mdMaxSizeBytes,
|
||||
binCount: config.binFileCount,
|
||||
binSizeBytes: config.binSizeBytes,
|
||||
});
|
||||
|
||||
const mirrorStart = nowMs();
|
||||
await runCliOrFail(hostVault, "--settings", hostSettings, "mirror");
|
||||
const mirrorElapsed = nowMs() - mirrorStart;
|
||||
|
||||
const host = startCliInBackground(hostVault, "--settings", hostSettings, "p2p-host");
|
||||
try {
|
||||
const hostReadyStart = nowMs();
|
||||
await host.waitUntilContains("P2P host is running", 20000);
|
||||
const hostReadyElapsed = nowMs() - hostReadyStart;
|
||||
|
||||
const peerDiscoveryStart = nowMs();
|
||||
const peer = await discoverPeer(clientVault, clientSettings, config.peersTimeoutSeconds);
|
||||
const peerDiscoveryElapsed = nowMs() - peerDiscoveryStart;
|
||||
|
||||
const syncStart = nowMs();
|
||||
await runCliOrFail(
|
||||
clientVault,
|
||||
"--settings",
|
||||
clientSettings,
|
||||
"p2p-sync",
|
||||
peer.id,
|
||||
String(config.syncTimeoutSeconds)
|
||||
);
|
||||
const syncElapsed = nowMs() - syncStart;
|
||||
|
||||
const sampleFiles = pickSampleFiles(seedFiles.entries);
|
||||
for (const sample of sampleFiles) {
|
||||
const pulledPath = workDir.join(`pulled-${sample.relativePath.replaceAll("/", "_")}`);
|
||||
await runCliOrFail(clientVault, "--settings", clientSettings, "pull", sample.relativePath, pulledPath);
|
||||
await assertFilesEqual(
|
||||
sample.absolutePath,
|
||||
pulledPath,
|
||||
`sample file mismatch after sync: ${sample.relativePath}`
|
||||
);
|
||||
}
|
||||
|
||||
const result = {
|
||||
mode: "p2p-cli-benchmark",
|
||||
relay: config.relay,
|
||||
appId: config.appId,
|
||||
roomId: config.roomId,
|
||||
datasetSeed: config.datasetSeed,
|
||||
datasetDirName: config.datasetDirName,
|
||||
peerId: peer.id,
|
||||
peerName: peer.name,
|
||||
totalFiles: seedFiles.totalFiles,
|
||||
totalBytes: seedFiles.totalBytes,
|
||||
mdFileCount: seedFiles.mdCount,
|
||||
binFileCount: seedFiles.binCount,
|
||||
mirrorElapsedMs: Number(mirrorElapsed.toFixed(1)),
|
||||
hostReadyElapsedMs: Number(hostReadyElapsed.toFixed(1)),
|
||||
peerDiscoveryElapsedMs: Number(peerDiscoveryElapsed.toFixed(1)),
|
||||
syncElapsedMs: Number(syncElapsed.toFixed(1)),
|
||||
throughputBytesPerSec: Number((seedFiles.totalBytes / (syncElapsed / 1000)).toFixed(2)),
|
||||
throughputMiBPerSec: Number((seedFiles.totalBytes / (syncElapsed / 1000) / 1024 / 1024).toFixed(4)),
|
||||
};
|
||||
|
||||
if (resultPath) {
|
||||
await Deno.writeTextFile(resultPath, JSON.stringify(result, null, 2));
|
||||
}
|
||||
|
||||
console.log(JSON.stringify(result, null, 2));
|
||||
console.error(
|
||||
`[Benchmark] mirrored ${seedFiles.totalFiles} files (${formatBytes(seedFiles.totalBytes)}) in ${formatMs(mirrorElapsed)}, ` +
|
||||
`synced in ${formatMs(syncElapsed)} ` +
|
||||
`(${result.throughputBytesPerSec} B/s, ${result.throughputMiBPerSec} MiB/s)`
|
||||
);
|
||||
} finally {
|
||||
await host.stop();
|
||||
await stopLocalRelayIfStarted(relayStarted);
|
||||
}
|
||||
}
|
||||
|
||||
if (import.meta.main) {
|
||||
main().catch((error) => {
|
||||
console.error(`[Fatal Error]`, error);
|
||||
Deno.exit(1);
|
||||
});
|
||||
}
|
||||
45
src/apps/cli/testdeno/bench-run-item1.sh
Normal file
45
src/apps/cli/testdeno/bench-run-item1.sh
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
RESULTS_ROOT="${SCRIPT_DIR}/bench-results"
|
||||
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
|
||||
OUT_DIR="${RESULTS_ROOT}/${TIMESTAMP}"
|
||||
|
||||
mkdir -p "${OUT_DIR}"
|
||||
|
||||
echo "[bench-wrapper] output directory: ${OUT_DIR}"
|
||||
|
||||
echo "[bench-wrapper] running p2p benchmark"
|
||||
(
|
||||
cd "${SCRIPT_DIR}"
|
||||
BENCH_RESULT_JSON="${OUT_DIR}/p2p.json" deno task bench:p2p
|
||||
)
|
||||
|
||||
echo "[bench-wrapper] running couchdb benchmark with RTT ${BENCH_COUCHDB_RTT_MS:-default} ms (emulating HTTP network latency)"
|
||||
(
|
||||
cd "${SCRIPT_DIR}"
|
||||
BENCH_RESULT_JSON="${OUT_DIR}/couchdb.json" deno task bench:couchdb
|
||||
)
|
||||
|
||||
cat > "${OUT_DIR}/README.txt" <<EOF
|
||||
Bench wrapper result set
|
||||
|
||||
Generated at: ${TIMESTAMP}
|
||||
Directory: ${OUT_DIR}
|
||||
|
||||
Files:
|
||||
- p2p.json
|
||||
- couchdb.json
|
||||
EOF
|
||||
|
||||
echo "[bench-wrapper] verify outputs by cat"
|
||||
echo "========== ${OUT_DIR}/README.txt =========="
|
||||
cat "${OUT_DIR}/README.txt"
|
||||
echo "========== ${OUT_DIR}/p2p.json =========="
|
||||
cat "${OUT_DIR}/p2p.json"
|
||||
echo "========== ${OUT_DIR}/couchdb.json =========="
|
||||
cat "${OUT_DIR}/couchdb.json"
|
||||
|
||||
echo "[bench-wrapper] done"
|
||||
echo "[bench-wrapper] result directory: ${OUT_DIR}"
|
||||
@@ -12,8 +12,16 @@
|
||||
"test:p2p-sync": "deno test --env-file=.test.env -A --no-check test-p2p-sync.ts",
|
||||
"test:p2p-three-nodes": "deno test --env-file=.test.env -A --no-check test-p2p-three-nodes-conflict.ts",
|
||||
"test:p2p-upload-download": "deno test --env-file=.test.env -A --no-check test-p2p-upload-download-repro.ts",
|
||||
"bench:p2p": "deno run --env-file=.test.env -A --no-check bench-p2p.ts",
|
||||
"bench:couchdb": "deno run --env-file=.test.env -A --no-check bench-couchdb.ts",
|
||||
"bench:item1": "bash ./bench-run-item1.sh",
|
||||
"bench:item1:full": "BENCH_MD_FILE_COUNT=1500 BENCH_MD_MIN_SIZE_BYTES=1024 BENCH_MD_MAX_SIZE_BYTES=20480 BENCH_BIN_FILE_COUNT=500 BENCH_BIN_SIZE_BYTES=102400 BENCH_COUCHDB_RTT_MS=50 bash ./bench-run-item1.sh",
|
||||
"test:e2e-couchdb": "deno test --env-file=.test.env -A --no-check test-e2e-two-vaults-couchdb.ts",
|
||||
"test:e2e-matrix": "deno test --env-file=.test.env -A --no-check test-e2e-two-vaults-matrix.ts"
|
||||
"test:e2e-matrix": "deno test --env-file=.test.env -A --no-check test-e2e-two-vaults-matrix.ts",
|
||||
"test:e2e-matrix:couchdb-enc0": "deno test --env-file=.test.env -A --no-check --filter='e2e matrix: COUCHDB-enc0' test-e2e-two-vaults-matrix.ts",
|
||||
"test:e2e-matrix:couchdb-enc1": "deno test --env-file=.test.env -A --no-check --filter='e2e matrix: COUCHDB-enc1' test-e2e-two-vaults-matrix.ts",
|
||||
"test:e2e-matrix:minio-enc0": "deno test --env-file=.test.env -A --no-check --filter='e2e matrix: MINIO-enc0' test-e2e-two-vaults-matrix.ts",
|
||||
"test:e2e-matrix:minio-enc1": "deno test --env-file=.test.env -A --no-check --filter='e2e matrix: MINIO-enc1' test-e2e-two-vaults-matrix.ts"
|
||||
},
|
||||
"imports": {
|
||||
"@std/assert": "jsr:@std/assert@^1.0.13",
|
||||
|
||||
@@ -39,27 +39,73 @@ function concatChunks(chunks: Uint8Array[]): Uint8Array {
|
||||
return out;
|
||||
}
|
||||
|
||||
function formatTeeCommand(args: string[]): string {
|
||||
return ["node", CLI_DIST, ...args].map((part) => JSON.stringify(part)).join(" ");
|
||||
}
|
||||
|
||||
function createLineTeeWriter(
|
||||
pid: number,
|
||||
streamName: "stdout" | "stderr",
|
||||
writer: (chunk: Uint8Array) => void
|
||||
): { write: (chunk: Uint8Array) => void; close: () => void } {
|
||||
const enc = new TextEncoder();
|
||||
const dec = new TextDecoder();
|
||||
let pending = "";
|
||||
let headerWritten = false;
|
||||
const emitLine = (line: string) => {
|
||||
if (!headerWritten) {
|
||||
writer(enc.encode(`[CLI tee pid=${pid}:${streamName}]\n`));
|
||||
headerWritten = true;
|
||||
}
|
||||
writer(enc.encode(`[CLI tee pid=${pid}:${streamName}] ${line}\n`));
|
||||
};
|
||||
|
||||
const flush = (final = false) => {
|
||||
let index = pending.indexOf("\n");
|
||||
while (index >= 0) {
|
||||
const line = pending.slice(0, index).replace(/\r$/, "");
|
||||
pending = pending.slice(index + 1);
|
||||
emitLine(line);
|
||||
index = pending.indexOf("\n");
|
||||
}
|
||||
if (final && pending.length > 0) {
|
||||
emitLine(pending.replace(/\r$/, ""));
|
||||
pending = "";
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
write(chunk: Uint8Array) {
|
||||
pending += dec.decode(chunk, { stream: true });
|
||||
flush(false);
|
||||
},
|
||||
close() {
|
||||
pending += dec.decode();
|
||||
flush(true);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function collectStream(
|
||||
stream: ReadableStream<Uint8Array>,
|
||||
teeTarget: WritableStream<Uint8Array> | null
|
||||
teeTarget: { write: (chunk: Uint8Array) => void; close: () => void } | null
|
||||
): Promise<Uint8Array> {
|
||||
const reader = stream.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
const writer = teeTarget?.getWriter();
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
if (value) {
|
||||
chunks.push(value);
|
||||
if (writer) {
|
||||
await writer.write(value);
|
||||
if (teeTarget) {
|
||||
teeTarget.write(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (writer) {
|
||||
writer.releaseLock();
|
||||
if (teeTarget) {
|
||||
teeTarget.close();
|
||||
}
|
||||
reader.releaseLock();
|
||||
}
|
||||
@@ -76,8 +122,20 @@ async function runNodeCommand(args: string[], stdinData?: Uint8Array): Promise<C
|
||||
stderr: "piped",
|
||||
}).spawn();
|
||||
|
||||
const stdoutPromise = collectStream(child.stdout, TEE_ENABLED ? Deno.stdout.writable : null);
|
||||
const stderrPromise = collectStream(child.stderr, TEE_ENABLED ? Deno.stderr.writable : null);
|
||||
if (TEE_ENABLED) {
|
||||
Deno.stdout.writeSync(
|
||||
new TextEncoder().encode(`[CLI tee pid=${child.pid}] process: ${formatTeeCommand(cliArgs)}\n`)
|
||||
);
|
||||
}
|
||||
|
||||
const stdoutPromise = collectStream(
|
||||
child.stdout,
|
||||
TEE_ENABLED ? createLineTeeWriter(child.pid, "stdout", (chunk) => Deno.stdout.writeSync(chunk)) : null
|
||||
);
|
||||
const stderrPromise = collectStream(
|
||||
child.stderr,
|
||||
TEE_ENABLED ? createLineTeeWriter(child.pid, "stderr", (chunk) => Deno.stderr.writeSync(chunk)) : null
|
||||
);
|
||||
|
||||
if (stdinData) {
|
||||
const w = child.stdin.getWriter();
|
||||
|
||||
123
src/apps/cli/testdeno/helpers/dataset.ts
Normal file
123
src/apps/cli/testdeno/helpers/dataset.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
export type DeterministicDatasetConfig = {
|
||||
rootDir: string;
|
||||
datasetDirName: string;
|
||||
seed: string;
|
||||
mdCount: number;
|
||||
mdMinSizeBytes: number;
|
||||
mdMaxSizeBytes: number;
|
||||
binCount: number;
|
||||
binSizeBytes: number;
|
||||
};
|
||||
|
||||
export type DatasetEntry = {
|
||||
kind: "md" | "bin";
|
||||
relativePath: string;
|
||||
absolutePath: string;
|
||||
size: number;
|
||||
};
|
||||
|
||||
export type DeterministicDataset = {
|
||||
rootDir: string;
|
||||
datasetDirName: string;
|
||||
seed: string;
|
||||
entries: DatasetEntry[];
|
||||
totalFiles: number;
|
||||
totalBytes: number;
|
||||
mdCount: number;
|
||||
binCount: number;
|
||||
};
|
||||
|
||||
function fnv1a32(input: string): number {
|
||||
let hash = 0x811c9dc5;
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
hash ^= input.charCodeAt(i) & 0xff;
|
||||
hash = Math.imul(hash, 0x01000193);
|
||||
}
|
||||
return hash >>> 0;
|
||||
}
|
||||
|
||||
function createXorshift32(seed: number): () => number {
|
||||
let state = seed >>> 0;
|
||||
if (state === 0) {
|
||||
state = 0x9e3779b9;
|
||||
}
|
||||
return () => {
|
||||
state ^= state << 13;
|
||||
state ^= state >>> 17;
|
||||
state ^= state << 5;
|
||||
return state >>> 0;
|
||||
};
|
||||
}
|
||||
|
||||
function createTextBytes(size: number, fileIndex: number, seed: string): Uint8Array {
|
||||
const template =
|
||||
`# Bench file ${fileIndex}\n` +
|
||||
`seed: ${seed}\n` +
|
||||
"lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.\n";
|
||||
|
||||
const templateBytes = new TextEncoder().encode(template);
|
||||
const out = new Uint8Array(size);
|
||||
for (let i = 0; i < size; i++) {
|
||||
out[i] = templateBytes[i % templateBytes.length];
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function toPath(rootDir: string, relativePath: string): string {
|
||||
return `${rootDir}/${relativePath}`;
|
||||
}
|
||||
|
||||
export async function createDeterministicDataset(config: DeterministicDatasetConfig): Promise<DeterministicDataset> {
|
||||
if (config.mdCount < 0 || config.binCount < 0) {
|
||||
throw new Error("mdCount and binCount must be non-negative");
|
||||
}
|
||||
if (config.mdMinSizeBytes <= 0 || config.mdMaxSizeBytes <= 0 || config.binSizeBytes <= 0) {
|
||||
throw new Error("all size values must be positive");
|
||||
}
|
||||
if (config.mdMinSizeBytes > config.mdMaxSizeBytes) {
|
||||
throw new Error("mdMinSizeBytes must be <= mdMaxSizeBytes");
|
||||
}
|
||||
|
||||
const datasetRoot = toPath(config.rootDir, config.datasetDirName);
|
||||
const mdDir = `${datasetRoot}/md`;
|
||||
const binDir = `${datasetRoot}/bin`;
|
||||
await Deno.mkdir(mdDir, { recursive: true });
|
||||
await Deno.mkdir(binDir, { recursive: true });
|
||||
|
||||
const nextRandom = createXorshift32(fnv1a32(config.seed));
|
||||
const mdRange = config.mdMaxSizeBytes - config.mdMinSizeBytes + 1;
|
||||
const entries: DatasetEntry[] = [];
|
||||
|
||||
for (let index = 0; index < config.mdCount; index++) {
|
||||
const size = config.mdMinSizeBytes + (nextRandom() % mdRange);
|
||||
const relativePath = `${config.datasetDirName}/md/file-${String(index).padStart(4, "0")}.md`;
|
||||
const absolutePath = toPath(config.rootDir, relativePath);
|
||||
const body = createTextBytes(size, index, config.seed);
|
||||
await Deno.writeFile(absolutePath, body);
|
||||
entries.push({ kind: "md", relativePath, absolutePath, size });
|
||||
}
|
||||
|
||||
for (let index = 0; index < config.binCount; index++) {
|
||||
const size = config.binSizeBytes;
|
||||
const relativePath = `${config.datasetDirName}/bin/file-${String(index).padStart(4, "0")}.bin`;
|
||||
const absolutePath = toPath(config.rootDir, relativePath);
|
||||
const body = new Uint8Array(size);
|
||||
for (let i = 0; i < size; i++) {
|
||||
body[i] = nextRandom() & 0xff;
|
||||
}
|
||||
await Deno.writeFile(absolutePath, body);
|
||||
entries.push({ kind: "bin", relativePath, absolutePath, size });
|
||||
}
|
||||
|
||||
const totalBytes = entries.reduce((sum, e) => sum + e.size, 0);
|
||||
return {
|
||||
rootDir: config.rootDir,
|
||||
datasetDirName: config.datasetDirName,
|
||||
seed: config.seed,
|
||||
entries,
|
||||
totalFiles: entries.length,
|
||||
totalBytes,
|
||||
mdCount: config.mdCount,
|
||||
binCount: config.binCount,
|
||||
};
|
||||
}
|
||||
@@ -14,6 +14,11 @@ type DockerInvoker = {
|
||||
|
||||
let dockerInvokerPromise: Promise<DockerInvoker> | null = null;
|
||||
const DOCKER_TEE = Deno.env.get("LIVESYNC_DOCKER_TEE") === "1" || Deno.env.get("LIVESYNC_TEST_TEE") === "1";
|
||||
const trackedContainers = new Set<string>();
|
||||
const CLEANUP_SIGNALS: Deno.Signal[] = ["SIGINT", "SIGTERM"];
|
||||
let signalCleanupHandlersInstalled = false;
|
||||
let signalCleanupInProgress = false;
|
||||
const signalCleanupHandlers = new Map<Deno.Signal, () => void>();
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Low-level docker wrapper
|
||||
@@ -27,29 +32,53 @@ function parseCommand(command: string): { bin: string; prefix: string[] } {
|
||||
return { bin: parts[0], prefix: parts.slice(1) };
|
||||
}
|
||||
|
||||
async function runCommand(bin: string, args: string[]): Promise<{ code: number; stdout: string; stderr: string }> {
|
||||
const cmd = new Deno.Command(bin, {
|
||||
args,
|
||||
stdin: "null",
|
||||
stdout: "piped",
|
||||
stderr: "piped",
|
||||
});
|
||||
async function collectStream(
|
||||
stream: ReadableStream<Uint8Array>,
|
||||
teeTarget: ((chunk: Uint8Array) => void) | null
|
||||
): Promise<Uint8Array> {
|
||||
const reader = stream.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
try {
|
||||
const { code, stdout, stderr } = await cmd.output();
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
if (!value) continue;
|
||||
chunks.push(value);
|
||||
if (teeTarget) {
|
||||
teeTarget(value);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
const total = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
|
||||
const out = new Uint8Array(total);
|
||||
let offset = 0;
|
||||
for (const chunk of chunks) {
|
||||
out.set(chunk, offset);
|
||||
offset += chunk.length;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function runCommand(bin: string, args: string[]): Promise<{ code: number; stdout: string; stderr: string }> {
|
||||
try {
|
||||
const child = new Deno.Command(bin, {
|
||||
args,
|
||||
stdin: "null",
|
||||
stdout: "piped",
|
||||
stderr: "piped",
|
||||
}).spawn();
|
||||
const stdoutPromise = collectStream(child.stdout, DOCKER_TEE ? (chunk) => Deno.stdout.writeSync(chunk) : null);
|
||||
const stderrPromise = collectStream(child.stderr, DOCKER_TEE ? (chunk) => Deno.stderr.writeSync(chunk) : null);
|
||||
const [status, stdout, stderr] = await Promise.all([child.status, stdoutPromise, stderrPromise]);
|
||||
const dec = new TextDecoder();
|
||||
const result = {
|
||||
code,
|
||||
code: status.code,
|
||||
stdout: dec.decode(stdout),
|
||||
stderr: dec.decode(stderr),
|
||||
};
|
||||
if (DOCKER_TEE) {
|
||||
if (result.stdout.trim().length > 0) {
|
||||
console.log(`[docker:${bin}] ${result.stdout.trimEnd()}`);
|
||||
}
|
||||
if (result.stderr.trim().length > 0) {
|
||||
console.error(`[docker:${bin}] ${result.stderr.trimEnd()}`);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} catch (err) {
|
||||
if (err instanceof Deno.errors.NotFound) {
|
||||
@@ -159,6 +188,73 @@ async function dockerOrFail(...args: string[]): Promise<string> {
|
||||
return r.stdout;
|
||||
}
|
||||
|
||||
async function stopAndRemoveContainer(container: string): Promise<void> {
|
||||
await docker("stop", container).catch(() => {});
|
||||
await docker("rm", container).catch(() => {});
|
||||
}
|
||||
|
||||
async function cleanupTrackedContainers(reason: string): Promise<void> {
|
||||
const names = [...trackedContainers];
|
||||
if (names.length === 0) return;
|
||||
|
||||
console.warn(`[WARN] cleaning up tracked containers on ${reason}: ${names.join(", ")}`);
|
||||
for (const container of names.reverse()) {
|
||||
await stopAndRemoveContainer(container);
|
||||
trackedContainers.delete(container);
|
||||
}
|
||||
}
|
||||
|
||||
async function handleSignalCleanup(signal: Deno.Signal): Promise<void> {
|
||||
if (signalCleanupInProgress) return;
|
||||
signalCleanupInProgress = true;
|
||||
try {
|
||||
await cleanupTrackedContainers(`signal ${signal}`);
|
||||
} finally {
|
||||
Deno.exit(signal === "SIGINT" ? 130 : 143);
|
||||
}
|
||||
}
|
||||
|
||||
function ensureSignalCleanupHandlers(): void {
|
||||
if (signalCleanupHandlersInstalled) return;
|
||||
signalCleanupHandlersInstalled = true;
|
||||
for (const signal of CLEANUP_SIGNALS) {
|
||||
const listener = () => {
|
||||
void handleSignalCleanup(signal);
|
||||
};
|
||||
try {
|
||||
Deno.addSignalListener(signal, listener);
|
||||
signalCleanupHandlers.set(signal, listener);
|
||||
} catch {
|
||||
// Unsupported signal on this platform.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function removeSignalCleanupHandlers(): void {
|
||||
if (!signalCleanupHandlersInstalled) return;
|
||||
for (const [signal, listener] of signalCleanupHandlers) {
|
||||
try {
|
||||
Deno.removeSignalListener(signal, listener);
|
||||
} catch {
|
||||
// Ignore if already removed or unsupported.
|
||||
}
|
||||
}
|
||||
signalCleanupHandlers.clear();
|
||||
signalCleanupHandlersInstalled = false;
|
||||
}
|
||||
|
||||
function trackContainer(container: string): void {
|
||||
ensureSignalCleanupHandlers();
|
||||
trackedContainers.add(container);
|
||||
}
|
||||
|
||||
function untrackContainer(container: string): void {
|
||||
trackedContainers.delete(container);
|
||||
if (trackedContainers.size === 0) {
|
||||
removeSignalCleanupHandlers();
|
||||
}
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
@@ -235,8 +331,8 @@ const MINIO_IMAGE = "minio/minio";
|
||||
const MINIO_MC_IMAGE = "minio/mc";
|
||||
|
||||
export async function stopCouchdb(): Promise<void> {
|
||||
await docker("stop", COUCHDB_CONTAINER);
|
||||
await docker("rm", COUCHDB_CONTAINER);
|
||||
await stopAndRemoveContainer(COUCHDB_CONTAINER);
|
||||
untrackContainer(COUCHDB_CONTAINER);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -265,6 +361,7 @@ export async function startCouchdb(couchdbUri: string, user: string, password: s
|
||||
"COUCHDB_SINGLE_NODE=y",
|
||||
COUCHDB_IMAGE
|
||||
);
|
||||
trackContainer(COUCHDB_CONTAINER);
|
||||
|
||||
console.log("[INFO] initialising CouchDB");
|
||||
await initCouchdb(couchdbUri, user, password);
|
||||
@@ -365,8 +462,8 @@ function shQuote(value: string): string {
|
||||
}
|
||||
|
||||
export async function stopMinio(): Promise<void> {
|
||||
await docker("stop", MINIO_CONTAINER);
|
||||
await docker("rm", MINIO_CONTAINER);
|
||||
await stopAndRemoveContainer(MINIO_CONTAINER);
|
||||
untrackContainer(MINIO_CONTAINER);
|
||||
}
|
||||
|
||||
async function initMinioBucket(
|
||||
@@ -446,6 +543,7 @@ export async function startMinio(
|
||||
"--console-address",
|
||||
":9001"
|
||||
);
|
||||
trackContainer(MINIO_CONTAINER);
|
||||
|
||||
console.log(`[INFO] initialising MinIO test bucket: ${bucket}`);
|
||||
let initialised = false;
|
||||
@@ -493,8 +591,8 @@ EOF
|
||||
exec /app/strfry --config /tmp/strfry.conf relay`;
|
||||
|
||||
export async function stopP2pRelay(): Promise<void> {
|
||||
await docker("stop", P2P_RELAY_CONTAINER);
|
||||
await docker("rm", P2P_RELAY_CONTAINER);
|
||||
await stopAndRemoveContainer(P2P_RELAY_CONTAINER);
|
||||
untrackContainer(P2P_RELAY_CONTAINER);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -523,6 +621,7 @@ export async function startP2pRelay(): Promise<void> {
|
||||
"-lc",
|
||||
STRFRY_BOOTSTRAP_SH
|
||||
);
|
||||
trackContainer(P2P_RELAY_CONTAINER);
|
||||
}
|
||||
|
||||
export function isLocalP2pRelay(relayUrl: string): boolean {
|
||||
|
||||
49
src/apps/cli/testdeno/helpers/net.ts
Normal file
49
src/apps/cli/testdeno/helpers/net.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
type WaitForPortOptions = {
|
||||
timeoutMs?: number;
|
||||
intervalMs?: number;
|
||||
connectTimeoutMs?: number;
|
||||
};
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async function connectWithTimeout(hostname: string, port: number, timeoutMs: number): Promise<void> {
|
||||
let timer: number | undefined;
|
||||
try {
|
||||
const connPromise = Deno.connect({ hostname, port });
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(`connect timeout after ${timeoutMs}ms`)), timeoutMs);
|
||||
});
|
||||
const conn = await Promise.race([connPromise, timeoutPromise]);
|
||||
conn.close();
|
||||
} finally {
|
||||
if (timer !== undefined) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function waitForPort(hostname: string, port: number, options: WaitForPortOptions = {}): Promise<void> {
|
||||
const timeoutMs = options.timeoutMs ?? 15000;
|
||||
const intervalMs = options.intervalMs ?? 250;
|
||||
const connectTimeoutMs = options.connectTimeoutMs ?? 1000;
|
||||
|
||||
const started = Date.now();
|
||||
let lastError: unknown;
|
||||
|
||||
while (Date.now() - started < timeoutMs) {
|
||||
try {
|
||||
await connectWithTimeout(hostname, port, connectTimeoutMs);
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
await sleep(intervalMs);
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`Port ${hostname}:${port} did not become ready within ${timeoutMs}ms` +
|
||||
(lastError ? ` (last error: ${String(lastError)})` : "")
|
||||
);
|
||||
}
|
||||
@@ -1,11 +1,26 @@
|
||||
import { runCli } from "./cli.ts";
|
||||
import { isLocalP2pRelay, startP2pRelay, stopP2pRelay } from "./docker.ts";
|
||||
import { waitForPort } from "./net.ts";
|
||||
|
||||
export type PeerEntry = {
|
||||
id: string;
|
||||
name: string;
|
||||
};
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function parseRelayEndpoint(relay: string): { hostname: string; port: number } {
|
||||
const url = new URL(relay);
|
||||
const port = url.port ? Number(url.port) : url.protocol === "ws:" ? 80 : url.protocol === "wss:" ? 443 : NaN;
|
||||
if (!Number.isFinite(port)) {
|
||||
throw new Error(`Unsupported relay URL: ${relay}`);
|
||||
}
|
||||
const hostname = url.hostname === "localhost" ? "127.0.0.1" : url.hostname;
|
||||
return { hostname, port };
|
||||
}
|
||||
|
||||
export function parsePeerLines(output: string): PeerEntry[] {
|
||||
return output
|
||||
.split(/\r?\n/)
|
||||
@@ -20,28 +35,55 @@ export async function discoverPeer(
|
||||
timeoutSeconds: number,
|
||||
targetPeer?: string
|
||||
): Promise<PeerEntry> {
|
||||
const result = await runCli(vaultDir, "--settings", settingsFile, "p2p-peers", String(timeoutSeconds));
|
||||
if (result.code !== 0) {
|
||||
throw new Error(`p2p-peers failed\n${result.combined}`);
|
||||
}
|
||||
const peers = parsePeerLines(result.stdout);
|
||||
if (targetPeer) {
|
||||
const matched = peers.find((peer) => peer.id === targetPeer || peer.name === targetPeer);
|
||||
if (matched) return matched;
|
||||
}
|
||||
if (peers.length === 0) {
|
||||
const fallback = result.combined.match(/Advertisement from\s+([^\s]+)/);
|
||||
if (fallback?.[1]) {
|
||||
return { id: fallback[1], name: fallback[1] };
|
||||
const retries = Math.max(0, Number(Deno.env.get("LIVESYNC_P2P_PEERS_RETRY") ?? "3"));
|
||||
let lastCombined = "";
|
||||
|
||||
for (let attempt = 0; attempt <= retries; attempt++) {
|
||||
const result = await runCli(vaultDir, "--settings", settingsFile, "p2p-peers", String(timeoutSeconds));
|
||||
lastCombined = result.combined;
|
||||
|
||||
if (result.code === 0) {
|
||||
const peers = parsePeerLines(result.stdout);
|
||||
if (targetPeer) {
|
||||
const matched = peers.find((peer) => peer.id === targetPeer || peer.name === targetPeer);
|
||||
if (matched) return matched;
|
||||
}
|
||||
if (peers.length > 0) {
|
||||
return peers[0];
|
||||
}
|
||||
|
||||
const fallback = result.combined.match(/Advertisement from\s+([^\s]+)/);
|
||||
if (fallback?.[1]) {
|
||||
return { id: fallback[1], name: fallback[1] };
|
||||
}
|
||||
}
|
||||
throw new Error(`No peers discovered\n${result.combined}`);
|
||||
|
||||
if (attempt < retries) {
|
||||
const waitMs = 400 * (attempt + 1);
|
||||
console.warn(
|
||||
`[WARN] p2p-peers returned no usable peers, retrying (${attempt + 1}/${retries}) in ${waitMs}ms`
|
||||
);
|
||||
await sleep(waitMs);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
result.code !== 0 ? `p2p-peers failed\n${result.combined}` : `No peers discovered\n${result.combined}`
|
||||
);
|
||||
}
|
||||
return peers[0];
|
||||
|
||||
throw new Error(`No peers discovered\n${lastCombined}`);
|
||||
}
|
||||
|
||||
export async function maybeStartLocalRelay(relay: string): Promise<boolean> {
|
||||
if (!isLocalP2pRelay(relay)) return false;
|
||||
await startP2pRelay();
|
||||
const endpoint = parseRelayEndpoint(relay);
|
||||
await waitForPort(endpoint.hostname, endpoint.port, {
|
||||
timeoutMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_READY_TIMEOUT_MS") ?? "15000"),
|
||||
intervalMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_READY_INTERVAL_MS") ?? "250"),
|
||||
connectTimeoutMs: Number(Deno.env.get("LIVESYNC_P2P_RELAY_CONNECT_TIMEOUT_MS") ?? "1000"),
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,9 @@ Deno.test("p2p-peers: discovers host through local relay", async () => {
|
||||
const roomId = Deno.env.get("ROOM_ID") ?? `room-${Date.now()}`;
|
||||
const passphrase = Deno.env.get("PASSPHRASE") ?? "test";
|
||||
const timeoutSeconds = Number(Deno.env.get("TIMEOUT_SECONDS") ?? "8");
|
||||
const nonce = `${Date.now()}-${crypto.randomUUID().slice(0, 8)}`;
|
||||
const hostPeerName = Deno.env.get("HOST_PEER_NAME") ?? `p2p-host-${nonce}`;
|
||||
const clientPeerName = Deno.env.get("CLIENT_PEER_NAME") ?? `p2p-client-${nonce}`;
|
||||
|
||||
await using workDir = await TempDir.create("livesync-cli-p2p-peers-local-relay");
|
||||
const hostVault = workDir.join("vault-host");
|
||||
@@ -24,15 +27,16 @@ Deno.test("p2p-peers: discovers host through local relay", async () => {
|
||||
await initSettingsFile(clientSettings);
|
||||
await applyP2pSettings(hostSettings, roomId, passphrase, "self-hosted-livesync-cli-tests", relay);
|
||||
await applyP2pSettings(clientSettings, roomId, passphrase, "self-hosted-livesync-cli-tests", relay);
|
||||
await applyP2pTestTweaks(hostSettings, "p2p-host", passphrase);
|
||||
await applyP2pTestTweaks(clientSettings, "p2p-client", passphrase);
|
||||
await applyP2pTestTweaks(hostSettings, hostPeerName, passphrase);
|
||||
await applyP2pTestTweaks(clientSettings, clientPeerName, passphrase);
|
||||
|
||||
const host = startCliInBackground(hostVault, "--settings", hostSettings, "p2p-host");
|
||||
try {
|
||||
await host.waitUntilContains("P2P host is running", 20000);
|
||||
const peer = await discoverPeer(clientVault, clientSettings, timeoutSeconds);
|
||||
const peer = await discoverPeer(clientVault, clientSettings, timeoutSeconds, hostPeerName);
|
||||
assert(peer.id.length > 0);
|
||||
assert(peer.name.length > 0);
|
||||
assert(peer.name === hostPeerName, `expected peer '${hostPeerName}', got '${peer.name}'`);
|
||||
} finally {
|
||||
await host.stop();
|
||||
}
|
||||
|
||||
@@ -11,6 +11,9 @@ Deno.test("p2p-sync: discovers peer and completes sync", async () => {
|
||||
const passphrase = Deno.env.get("PASSPHRASE") ?? "test";
|
||||
const peersTimeout = Number(Deno.env.get("PEERS_TIMEOUT") ?? "12");
|
||||
const syncTimeout = Number(Deno.env.get("SYNC_TIMEOUT") ?? "15");
|
||||
const nonce = `${Date.now()}-${crypto.randomUUID().slice(0, 8)}`;
|
||||
const hostPeerName = Deno.env.get("HOST_PEER_NAME") ?? `p2p-host-${nonce}`;
|
||||
const clientPeerName = Deno.env.get("CLIENT_PEER_NAME") ?? `p2p-client-${nonce}`;
|
||||
|
||||
await using workDir = await TempDir.create("livesync-cli-p2p-sync");
|
||||
const hostVault = workDir.join("vault-host");
|
||||
@@ -26,8 +29,8 @@ Deno.test("p2p-sync: discovers peer and completes sync", async () => {
|
||||
await initSettingsFile(clientSettings);
|
||||
await applyP2pSettings(hostSettings, roomId, passphrase, "self-hosted-livesync-cli-tests", relay);
|
||||
await applyP2pSettings(clientSettings, roomId, passphrase, "self-hosted-livesync-cli-tests", relay);
|
||||
await applyP2pTestTweaks(hostSettings, "p2p-host", passphrase);
|
||||
await applyP2pTestTweaks(clientSettings, "p2p-client", passphrase);
|
||||
await applyP2pTestTweaks(hostSettings, hostPeerName, passphrase);
|
||||
await applyP2pTestTweaks(clientSettings, clientPeerName, passphrase);
|
||||
|
||||
const host = startCliInBackground(hostVault, "--settings", hostSettings, "p2p-host");
|
||||
try {
|
||||
@@ -36,7 +39,7 @@ Deno.test("p2p-sync: discovers peer and completes sync", async () => {
|
||||
clientVault,
|
||||
clientSettings,
|
||||
peersTimeout,
|
||||
Deno.env.get("TARGET_PEER") ?? undefined
|
||||
Deno.env.get("TARGET_PEER") ?? hostPeerName
|
||||
);
|
||||
const syncResult = await runCli(
|
||||
clientVault,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { assert } from "@std/assert";
|
||||
import { TempDir } from "./helpers/temp.ts";
|
||||
import { applyP2pSettings, initSettingsFile } from "./helpers/settings.ts";
|
||||
import { applyP2pSettings, applyP2pTestTweaks, initSettingsFile } from "./helpers/settings.ts";
|
||||
import { startCliInBackground } from "./helpers/backgroundCli.ts";
|
||||
import { discoverPeer, maybeStartLocalRelay, stopLocalRelayIfStarted } from "./helpers/p2p.ts";
|
||||
import { jsonStringField, runCliOrFail, runCliWithInputOrFail, sanitiseCatStdout } from "./helpers/cli.ts";
|
||||
@@ -12,6 +12,10 @@ Deno.test("p2p: three nodes detect and resolve conflicts", async () => {
|
||||
const appId = Deno.env.get("APP_ID") ?? "self-hosted-livesync-cli-tests";
|
||||
const peersTimeout = Number(Deno.env.get("PEERS_TIMEOUT") ?? "10");
|
||||
const syncTimeout = Number(Deno.env.get("SYNC_TIMEOUT") ?? "15");
|
||||
const nonce = `${Date.now()}-${crypto.randomUUID().slice(0, 8)}`;
|
||||
const hostPeerName = Deno.env.get("HOST_PEER_NAME") ?? `p2p-host-${nonce}`;
|
||||
const peerNameB = Deno.env.get("PEER_NAME_B") ?? `p2p-client-b-${nonce}`;
|
||||
const peerNameC = Deno.env.get("PEER_NAME_C") ?? `p2p-client-c-${nonce}`;
|
||||
|
||||
await using workDir = await TempDir.create("livesync-cli-p2p-3nodes");
|
||||
const vaultA = workDir.join("vault-a");
|
||||
@@ -26,16 +30,21 @@ Deno.test("p2p: three nodes detect and resolve conflicts", async () => {
|
||||
|
||||
const relayStarted = await maybeStartLocalRelay(relay);
|
||||
try {
|
||||
for (const settings of [settingsA, settingsB, settingsC]) {
|
||||
await initSettingsFile(settings);
|
||||
await applyP2pSettings(settings, roomId, passphrase, appId, relay);
|
||||
}
|
||||
await initSettingsFile(settingsA);
|
||||
await initSettingsFile(settingsB);
|
||||
await initSettingsFile(settingsC);
|
||||
await applyP2pSettings(settingsA, roomId, passphrase, appId, relay);
|
||||
await applyP2pSettings(settingsB, roomId, passphrase, appId, relay);
|
||||
await applyP2pSettings(settingsC, roomId, passphrase, appId, relay);
|
||||
await applyP2pTestTweaks(settingsA, hostPeerName, passphrase);
|
||||
await applyP2pTestTweaks(settingsB, peerNameB, passphrase);
|
||||
await applyP2pTestTweaks(settingsC, peerNameC, passphrase);
|
||||
|
||||
const host = startCliInBackground(vaultA, "--settings", settingsA, "p2p-host");
|
||||
try {
|
||||
await host.waitUntilContains("P2P host is running", 20000);
|
||||
const peerFromB = await discoverPeer(vaultB, settingsB, peersTimeout);
|
||||
const peerFromC = await discoverPeer(vaultC, settingsC, peersTimeout);
|
||||
const peerFromB = await discoverPeer(vaultB, settingsB, peersTimeout, hostPeerName);
|
||||
const peerFromC = await discoverPeer(vaultC, settingsC, peersTimeout, hostPeerName);
|
||||
const targetPath = "p2p/conflicted-from-two-clients.txt";
|
||||
|
||||
await runCliWithInputOrFail("from-client-b-v1\n", vaultB, "--settings", settingsB, "put", targetPath);
|
||||
|
||||
Reference in New Issue
Block a user