mirror of
https://github.com/telemt/telemt.git
synced 2026-06-30 22:24:09 +03:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81ae483201 | ||
|
|
88d161a5e9 | ||
|
|
a0ac108807 | ||
|
|
809352fac5 | ||
|
|
22627b498d | ||
|
|
b9c5c71dbc |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2899,7 +2899,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
||||
|
||||
[[package]]
|
||||
name = "telemt"
|
||||
version = "3.4.19"
|
||||
version = "3.4.22"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"anyhow",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "telemt"
|
||||
version = "3.4.19"
|
||||
version = "3.4.22"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
|
||||
@@ -958,6 +958,10 @@ impl ProxyConfig {
|
||||
.server
|
||||
.client_mss_value()
|
||||
.map_err(|error| ProxyError::Config(format!("server.client_mss {error}")))?;
|
||||
config
|
||||
.server
|
||||
.client_mss_bulk_value()
|
||||
.map_err(|error| ProxyError::Config(format!("server.client_mss_bulk {error}")))?;
|
||||
for (idx, listener) in config.server.listeners.iter().enumerate() {
|
||||
if listener.client_mss.is_some() {
|
||||
listener
|
||||
|
||||
@@ -1652,6 +1652,7 @@ fn client_mss_custom_value_is_accepted() {
|
||||
let toml = r#"
|
||||
[server]
|
||||
client_mss = "4096"
|
||||
client_mss_bulk = "1400"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
@@ -1665,6 +1666,7 @@ fn client_mss_custom_value_is_accepted() {
|
||||
let cfg = ProxyConfig::load(&path).unwrap();
|
||||
|
||||
assert_eq!(cfg.server.client_mss_value(), Ok(Some(4096)));
|
||||
assert_eq!(cfg.server.client_mss_bulk_value(), Ok(Some(1400)));
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
@@ -1693,6 +1695,31 @@ fn client_mss_out_of_range_is_rejected() {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_mss_bulk_out_of_range_is_rejected() {
|
||||
for value in ["87", "4097"] {
|
||||
let toml = format!(
|
||||
r#"
|
||||
[server]
|
||||
client_mss_bulk = "{value}"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#
|
||||
);
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join(format!("telemt_client_mss_bulk_out_of_range_{value}_test.toml"));
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
|
||||
assert!(err.contains("server.client_mss_bulk custom value must be within [88, 4096]"));
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn client_mss_unquoted_number_is_rejected() {
|
||||
let toml = r#"
|
||||
|
||||
@@ -246,7 +246,7 @@ pub fn secure_payload_len_from_wire_len(wire_len: usize) -> Option<usize> {
|
||||
}
|
||||
|
||||
/// Generate padding length for Secure Intermediate protocol.
|
||||
/// Telegram Desktop uses a 4-bit random padding length for VersionD packets.
|
||||
/// Outbound padding is 1..=3 so a receiver can strip it by 4-byte alignment.
|
||||
pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize {
|
||||
debug_assert!(
|
||||
is_valid_secure_payload_len(data_len),
|
||||
@@ -425,15 +425,21 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn secure_padding_matches_tdesktop_range() {
|
||||
fn secure_padding_never_produces_aligned_total() {
|
||||
let rng = SecureRandom::new();
|
||||
for data_len in (0..1000).step_by(4) {
|
||||
for _ in 0..100 {
|
||||
let padding = secure_padding_len(data_len, &rng);
|
||||
assert!(
|
||||
padding <= 15,
|
||||
(1..=3).contains(&padding),
|
||||
"padding out of range: data_len={data_len}, padding={padding}"
|
||||
);
|
||||
assert_ne!(
|
||||
(data_len + padding) % 4,
|
||||
0,
|
||||
"invariant violated: data_len={data_len}, padding={padding}, total={}",
|
||||
data_len + padding
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ pub(crate) const INTERMEDIATE_QUICKACK_FLAG: u32 = 0x8000_0000;
|
||||
/// Payload length mask used by Intermediate and Secure Intermediate headers.
|
||||
pub(crate) const INTERMEDIATE_WIRE_LEN_MASK: u32 = 0x7fff_ffff;
|
||||
|
||||
/// Maximum random tail length used by Telegram Desktop VersionD packets.
|
||||
pub(crate) const SECURE_VERSION_D_PADDING_MAX: usize = 15;
|
||||
/// Maximum outbound Secure tail length that keeps wire lengths non-aligned.
|
||||
pub(crate) const SECURE_VERSION_D_PADDING_MAX: usize = 3;
|
||||
|
||||
/// Parsed Intermediate/Secure Intermediate length header.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
@@ -51,9 +51,9 @@ pub(crate) fn secure_version_d_body_len_from_wire_len(wire_len: usize) -> Option
|
||||
Some(wire_len - (wire_len % 4))
|
||||
}
|
||||
|
||||
/// Generate Telegram Desktop-compatible VersionD random tail length.
|
||||
/// Generate outbound Secure tail length without ambiguous full-word padding.
|
||||
pub(crate) fn secure_version_d_padding_len(rng: &SecureRandom) -> usize {
|
||||
rng.range(SECURE_VERSION_D_PADDING_MAX + 1)
|
||||
rng.range(SECURE_VERSION_D_PADDING_MAX) + 1
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -69,7 +69,9 @@ use self::quota::{
|
||||
#[cfg(test)]
|
||||
use self::c2me::enqueue_c2me_command;
|
||||
#[cfg(test)]
|
||||
use self::d2c::{compute_intermediate_secure_wire_len, process_me_writer_response};
|
||||
use self::d2c::{
|
||||
compute_intermediate_secure_wire_len, process_me_writer_response, write_client_payload,
|
||||
};
|
||||
#[cfg(test)]
|
||||
pub(crate) use self::desync::{
|
||||
clear_desync_dedup_for_testing_in_shared, desync_dedup_get_for_testing,
|
||||
@@ -166,3 +168,7 @@ mod middle_relay_atomic_quota_invariant_tests;
|
||||
#[cfg(test)]
|
||||
#[path = "tests/middle_relay_baseline_invariant_tests.rs"]
|
||||
mod middle_relay_baseline_invariant_tests;
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tests/middle_relay_d2c_flush_padding_security_tests.rs"]
|
||||
mod middle_relay_d2c_flush_padding_security_tests;
|
||||
|
||||
148
src/proxy/tests/middle_relay_d2c_flush_padding_security_tests.rs
Normal file
148
src/proxy/tests/middle_relay_d2c_flush_padding_security_tests.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
use super::*;
|
||||
use crate::crypto::AesCtr;
|
||||
use crate::protocol::framing::INTERMEDIATE_WIRE_LEN_MASK;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct RecordingWriter {
|
||||
writes: Arc<Mutex<Vec<u8>>>,
|
||||
flushes: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl RecordingWriter {
|
||||
fn captured(&self) -> Vec<u8> {
|
||||
self.writes
|
||||
.lock()
|
||||
.expect("test writer capture lock must not be poisoned")
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for RecordingWriter {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.expect("test writer capture lock must not be poisoned")
|
||||
.extend_from_slice(buf);
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
self.flushes.fetch_add(1, Ordering::Relaxed);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn crypto_writer(inner: RecordingWriter) -> CryptoWriter<RecordingWriter> {
|
||||
let key = [0u8; 32];
|
||||
CryptoWriter::new(inner, AesCtr::new(&key, 0), 8 * 1024 * 1024)
|
||||
}
|
||||
|
||||
fn decrypt_capture(mut encrypted: Vec<u8>) -> Vec<u8> {
|
||||
let key = [0u8; 32];
|
||||
let mut cipher = AesCtr::new(&key, 0);
|
||||
cipher.apply(&mut encrypted);
|
||||
encrypted
|
||||
}
|
||||
|
||||
fn secure_wire_len(cleartext: &[u8]) -> usize {
|
||||
let header = cleartext
|
||||
.get(..4)
|
||||
.expect("secure frame must include an intermediate header");
|
||||
(u32::from_le_bytes(
|
||||
header
|
||||
.try_into()
|
||||
.expect("secure frame header must be four bytes"),
|
||||
) & INTERMEDIATE_WIRE_LEN_MASK) as usize
|
||||
}
|
||||
|
||||
async fn write_secure_payload(payload_len: usize) -> (MeD2cWriteMode, Vec<u8>) {
|
||||
let inner = RecordingWriter::default();
|
||||
let capture = inner.clone();
|
||||
let mut writer = crypto_writer(inner);
|
||||
let payload = vec![0xa5; payload_len];
|
||||
let mut frame_buf = Vec::new();
|
||||
let cancel = CancellationToken::new();
|
||||
let rng = SecureRandom::new();
|
||||
|
||||
let mode = write_client_payload(
|
||||
&mut writer,
|
||||
ProtoTag::Secure,
|
||||
0,
|
||||
&payload,
|
||||
&rng,
|
||||
&mut frame_buf,
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.expect("secure payload write must succeed");
|
||||
flush_client_or_cancel(&mut writer, &cancel)
|
||||
.await
|
||||
.expect("secure payload flush must succeed");
|
||||
|
||||
(mode, decrypt_capture(capture.captured()))
|
||||
}
|
||||
|
||||
fn assert_secure_payload_with_tail_padding(cleartext: &[u8], payload_len: usize) {
|
||||
let wire_len = secure_wire_len(cleartext);
|
||||
assert_eq!(cleartext.len(), 4 + wire_len);
|
||||
assert!(cleartext[4..4 + payload_len]
|
||||
.iter()
|
||||
.all(|byte| *byte == 0xa5));
|
||||
|
||||
let padding_len = wire_len
|
||||
.checked_sub(payload_len)
|
||||
.expect("secure wire length must include payload bytes");
|
||||
assert!((1..=3).contains(&padding_len));
|
||||
assert_ne!(wire_len % 4, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queue_drain_flush_reason_performs_physical_client_flush() {
|
||||
let inner = RecordingWriter::default();
|
||||
let flushes = inner.flushes.clone();
|
||||
let mut writer = crypto_writer(inner);
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
assert!(me_d2c_flush_reason_requires_client_flush(
|
||||
MeD2cFlushReason::QueueDrain
|
||||
));
|
||||
flush_client_or_cancel(&mut writer, &cancel)
|
||||
.await
|
||||
.expect("client flush must succeed");
|
||||
|
||||
assert_eq!(flushes.load(Ordering::Relaxed), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn secure_payload_coalesced_path_keeps_tail_padding() {
|
||||
let payload_len = 8;
|
||||
let (mode, cleartext) = write_secure_payload(payload_len).await;
|
||||
|
||||
assert!(matches!(mode, MeD2cWriteMode::Coalesced));
|
||||
assert_secure_payload_with_tail_padding(&cleartext, payload_len);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn secure_payload_split_path_keeps_tail_padding() {
|
||||
let payload_len = ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES;
|
||||
let (mode, cleartext) = write_secure_payload(payload_len).await;
|
||||
|
||||
assert!(matches!(mode, MeD2cWriteMode::Split));
|
||||
assert_secure_payload_with_tail_padding(&cleartext, payload_len);
|
||||
}
|
||||
@@ -312,7 +312,7 @@ fn encode_secure(frame: &Frame, dst: &mut BytesMut, rng: &SecureRandom) -> io::R
|
||||
));
|
||||
}
|
||||
|
||||
// Telegram Desktop VersionD uses a 4-bit random padding length.
|
||||
// Outbound Secure padding avoids full-word tails that readers cannot strip.
|
||||
let padding_len = secure_padding_len(data.len(), rng);
|
||||
|
||||
let total_len = data.len() + padding_len;
|
||||
@@ -521,13 +521,7 @@ mod tests {
|
||||
use tokio_util::codec::{FramedRead, FramedWrite};
|
||||
|
||||
fn assert_secure_decoded_payload(decoded: &[u8], original: &[u8]) {
|
||||
assert!(decoded.starts_with(original));
|
||||
assert!(
|
||||
(original.len()..=original.len() + 12).contains(&decoded.len()),
|
||||
"Secure decoded payload may retain up to 12 bytes of full-word padding, got {}",
|
||||
decoded.len()
|
||||
);
|
||||
assert_eq!(decoded.len() % 4, 0);
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -653,7 +647,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn secure_codec_uses_tdesktop_padding_range_and_jitters_wire_length() {
|
||||
fn secure_codec_uses_non_aligned_padding_and_jitters_wire_length() {
|
||||
let codec = SecureCodec::new(Arc::new(SecureRandom::new()));
|
||||
let payload = Bytes::from_static(&[1, 2, 3, 4, 5, 6, 7, 8]);
|
||||
let mut wire_lens = HashSet::new();
|
||||
@@ -666,9 +660,10 @@ mod tests {
|
||||
let wire_len = u32::from_le_bytes([out[0], out[1], out[2], out[3]]) as usize;
|
||||
assert_eq!(out.len(), 4 + wire_len);
|
||||
assert!(
|
||||
(payload.len()..=payload.len() + 15).contains(&wire_len),
|
||||
"Secure wire length must be payload+0..15, got {wire_len}"
|
||||
(payload.len() + 1..=payload.len() + 3).contains(&wire_len),
|
||||
"Secure wire length must be payload+1..3, got {wire_len}"
|
||||
);
|
||||
assert_ne!(wire_len % 4, 0);
|
||||
wire_lens.insert(wire_len);
|
||||
}
|
||||
|
||||
|
||||
@@ -367,7 +367,7 @@ impl<W: AsyncWrite + Unpin> SecureIntermediateFrameWriter<W> {
|
||||
));
|
||||
}
|
||||
|
||||
// Telegram Desktop VersionD uses a 4-bit random padding length.
|
||||
// Outbound Secure padding avoids full-word tails that readers cannot strip.
|
||||
let padding_len = secure_padding_len(data.len(), &self.rng);
|
||||
let padding = self.rng.bytes(padding_len);
|
||||
|
||||
@@ -633,13 +633,7 @@ mod tests {
|
||||
use tokio::time::{Duration, timeout};
|
||||
|
||||
fn assert_secure_decoded_payload(decoded: &[u8], original: &[u8]) {
|
||||
assert!(decoded.starts_with(original));
|
||||
assert!(
|
||||
(original.len()..=original.len() + 12).contains(&decoded.len()),
|
||||
"Secure decoded payload may retain up to 12 bytes of full-word padding, got {}",
|
||||
decoded.len()
|
||||
);
|
||||
assert_eq!(decoded.len() % 4, 0);
|
||||
assert_eq!(decoded, original);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -226,8 +226,9 @@ fn iptables_reject_args() -> Vec<String> {
|
||||
]
|
||||
}
|
||||
|
||||
pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<(), String> {
|
||||
pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<bool, String> {
|
||||
let mut errors = Vec::new();
|
||||
let mut removed = false;
|
||||
for _ in 0..8 {
|
||||
match run_command(
|
||||
binary,
|
||||
@@ -236,7 +237,9 @@ pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<(), String> {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Ok(()) => {
|
||||
removed = true;
|
||||
}
|
||||
Err(error) if is_missing_command_or_iptables_rule(&error) => break,
|
||||
Err(error) => {
|
||||
errors.push(format!("{binary} delete INPUT jump failed: {error}"));
|
||||
@@ -244,19 +247,27 @@ pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<(), String> {
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(error) = run_command(binary, &["-t", "filter", "-F", IPTABLES_CHAIN], None).await
|
||||
&& !is_missing_command_or_iptables_rule(&error)
|
||||
{
|
||||
errors.push(format!("{binary} flush chain failed: {error}"));
|
||||
match run_command(binary, &["-t", "filter", "-F", IPTABLES_CHAIN], None).await {
|
||||
Ok(()) => {
|
||||
removed = true;
|
||||
}
|
||||
Err(error) if is_missing_command_or_iptables_rule(&error) => {}
|
||||
Err(error) => {
|
||||
errors.push(format!("{binary} flush chain failed: {error}"));
|
||||
}
|
||||
}
|
||||
if let Err(error) = run_command(binary, &["-t", "filter", "-X", IPTABLES_CHAIN], None).await
|
||||
&& !is_missing_command_or_iptables_rule(&error)
|
||||
{
|
||||
errors.push(format!("{binary} delete chain failed: {error}"));
|
||||
match run_command(binary, &["-t", "filter", "-X", IPTABLES_CHAIN], None).await {
|
||||
Ok(()) => {
|
||||
removed = true;
|
||||
}
|
||||
Err(error) if is_missing_command_or_iptables_rule(&error) => {}
|
||||
Err(error) => {
|
||||
errors.push(format!("{binary} delete chain failed: {error}"));
|
||||
}
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
Ok(())
|
||||
Ok(removed)
|
||||
} else {
|
||||
Err(errors.join(", "))
|
||||
}
|
||||
@@ -266,6 +277,7 @@ fn is_missing_command_or_iptables_rule(error: &str) -> bool {
|
||||
error.contains("is not available")
|
||||
|| error.contains("No chain/target/match by that name")
|
||||
|| error.contains("does not exist")
|
||||
|| error.contains("Couldn't load target")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -280,6 +292,10 @@ mod tests {
|
||||
.any(|pair| pair[0].as_str() == key && pair[1].as_str() == value)
|
||||
}
|
||||
|
||||
fn has_key(args: &[String], key: &str) -> bool {
|
||||
args.iter().any(|arg| arg == key)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iptables_rules_use_synfix_order_and_rejects() {
|
||||
let target = test_rule(Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7))), 443);
|
||||
@@ -308,4 +324,34 @@ mod tests {
|
||||
assert!(has_pair(&rules[0], "--hl-lt", "65"));
|
||||
assert!(has_pair(&rules[0], "-d", "::1"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iptables_missing_rule_errors_are_cleanup_benign() {
|
||||
assert!(is_missing_command_or_iptables_rule(
|
||||
"iptables is not available"
|
||||
));
|
||||
assert!(is_missing_command_or_iptables_rule(
|
||||
"iptables: No chain/target/match by that name."
|
||||
));
|
||||
assert!(is_missing_command_or_iptables_rule(
|
||||
"iptables: Chain TELEMT_SYNLIMIT does not exist."
|
||||
));
|
||||
assert!(is_missing_command_or_iptables_rule(
|
||||
"Couldn't load target `TELEMT_SYNLIMIT': No such file or directory"
|
||||
));
|
||||
assert!(!is_missing_command_or_iptables_rule(
|
||||
"iptables: Permission denied"
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iptables_wildcard_rule_omits_destination_match() {
|
||||
let target = test_rule(None, 443);
|
||||
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V4);
|
||||
|
||||
for rule in rules {
|
||||
assert!(!has_key(&rule, "-d"));
|
||||
assert!(has_pair(&rule, "--dport", "443"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,8 +39,14 @@ async fn wait_for_config_channel_close_and_reconcile(
|
||||
}
|
||||
|
||||
pub(crate) async fn reconcile_synlimit_rules(cfg: &ProxyConfig) {
|
||||
if let Err(error) = clear_synlimit_rules_all_backends().await {
|
||||
warn!(error = %error, "Failed to clear existing SYN limiter rules before reconcile");
|
||||
match clear_synlimit_rules_all_backends().await {
|
||||
Ok(true) => {
|
||||
warn!("Removed stale SYN limiter rules left by a previous run before reconcile");
|
||||
}
|
||||
Ok(false) => {}
|
||||
Err(error) => {
|
||||
warn!(error = %error, "Failed to clear stale SYN limiter rules before reconcile");
|
||||
}
|
||||
}
|
||||
|
||||
let targets = synlimit_targets(cfg);
|
||||
@@ -66,24 +72,40 @@ pub(crate) async fn reconcile_synlimit_rules(cfg: &ProxyConfig) {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<(), String> {
|
||||
pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<bool, String> {
|
||||
if !has_cap_net_admin() {
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let mut errors = Vec::new();
|
||||
if let Err(error) = nftables::clear_rules_all_families().await {
|
||||
errors.push(error);
|
||||
let mut removed = false;
|
||||
match nftables::clear_rules_all_families().await {
|
||||
Ok(value) => {
|
||||
removed |= value;
|
||||
}
|
||||
Err(error) => {
|
||||
errors.push(error);
|
||||
}
|
||||
}
|
||||
if let Err(error) = iptables::clear_rules_for_binary("iptables").await {
|
||||
errors.push(error);
|
||||
match iptables::clear_rules_for_binary("iptables").await {
|
||||
Ok(value) => {
|
||||
removed |= value;
|
||||
}
|
||||
Err(error) => {
|
||||
errors.push(error);
|
||||
}
|
||||
}
|
||||
if let Err(error) = iptables::clear_rules_for_binary("ip6tables").await {
|
||||
errors.push(error);
|
||||
match iptables::clear_rules_for_binary("ip6tables").await {
|
||||
Ok(value) => {
|
||||
removed |= value;
|
||||
}
|
||||
Err(error) => {
|
||||
errors.push(error);
|
||||
}
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
Ok(())
|
||||
Ok(removed)
|
||||
} else {
|
||||
Err(errors.join("; "))
|
||||
}
|
||||
|
||||
@@ -124,3 +124,111 @@ pub(super) fn test_rule(ip: Option<IpAddr>, port: u16) -> SynLimitRule {
|
||||
hashlimit_size: 32_768,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
|
||||
use super::*;
|
||||
use crate::config::ListenerConfig;
|
||||
|
||||
fn listener(ip: IpAddr, port: Option<u16>, synlimit: SynLimitMode) -> ListenerConfig {
|
||||
ListenerConfig {
|
||||
ip,
|
||||
port,
|
||||
client_mss: None,
|
||||
synlimit,
|
||||
synlimit_seconds: 60,
|
||||
synlimit_hitcount: 48,
|
||||
synlimit_burst: 1,
|
||||
synlimit_ios_seconds: 1,
|
||||
synlimit_ios_hitcount: 12,
|
||||
synlimit_ios_burst: 24,
|
||||
synlimit_hashlimit_expire_ms: 60_000,
|
||||
synlimit_hashlimit_size: 32_768,
|
||||
announce: None,
|
||||
announce_ip: None,
|
||||
proxy_protocol: None,
|
||||
reuse_allow: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synlimit_targets_deduplicate_and_use_legacy_port_fallback() {
|
||||
let mut cfg = ProxyConfig::default();
|
||||
cfg.server.port = 9443;
|
||||
cfg.server.listeners = vec![
|
||||
listener(
|
||||
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||
None,
|
||||
SynLimitMode::Iptables,
|
||||
),
|
||||
listener(
|
||||
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
|
||||
None,
|
||||
SynLimitMode::Iptables,
|
||||
),
|
||||
];
|
||||
|
||||
let targets = synlimit_targets(&cfg);
|
||||
|
||||
assert_eq!(targets.iptables_v4.len(), 1);
|
||||
assert_eq!(targets.iptables_v4[0].ip, None);
|
||||
assert_eq!(targets.iptables_v4[0].port, 9443);
|
||||
assert!(targets.iptables_v6.is_empty());
|
||||
assert!(targets.nft_v4.is_empty());
|
||||
assert!(targets.nft_v6.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synlimit_targets_separate_backends_and_ip_families() {
|
||||
let mut cfg = ProxyConfig::default();
|
||||
cfg.server.listeners = vec![
|
||||
listener(
|
||||
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)),
|
||||
Some(443),
|
||||
SynLimitMode::Iptables,
|
||||
),
|
||||
listener(
|
||||
IpAddr::V6(Ipv6Addr::LOCALHOST),
|
||||
Some(443),
|
||||
SynLimitMode::Iptables,
|
||||
),
|
||||
listener(
|
||||
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 2)),
|
||||
Some(444),
|
||||
SynLimitMode::Nftables,
|
||||
),
|
||||
listener(
|
||||
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
|
||||
Some(444),
|
||||
SynLimitMode::Nftables,
|
||||
),
|
||||
];
|
||||
|
||||
let targets = synlimit_targets(&cfg);
|
||||
|
||||
assert_eq!(targets.iptables_v4.len(), 1);
|
||||
assert_eq!(targets.iptables_v6.len(), 1);
|
||||
assert_eq!(targets.nft_v4.len(), 1);
|
||||
assert_eq!(targets.nft_v6.len(), 1);
|
||||
assert_eq!(
|
||||
targets.iptables_v4[0].ip,
|
||||
Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)))
|
||||
);
|
||||
assert_eq!(targets.iptables_v6[0].ip, Some(IpAddr::V6(Ipv6Addr::LOCALHOST)));
|
||||
assert_eq!(
|
||||
targets.nft_v4[0].ip,
|
||||
Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 2)))
|
||||
);
|
||||
assert_eq!(targets.nft_v6[0].ip, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synlimit_rate_arg_uses_native_units_without_fractional_rates() {
|
||||
assert_eq!(synlimit_rate_arg(1, 12), "12/second");
|
||||
assert_eq!(synlimit_rate_arg(60, 48), "48/minute");
|
||||
assert_eq!(synlimit_rate_arg(3600, 121), "121/hour");
|
||||
assert_eq!(synlimit_rate_arg(86400, 241), "241/day");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,26 +186,32 @@ fn push_nft_v6_rules(script: &mut String, target: &SynLimitRule, idx: usize) {
|
||||
));
|
||||
}
|
||||
|
||||
pub(super) async fn clear_rules_all_families() -> Result<(), String> {
|
||||
pub(super) async fn clear_rules_all_families() -> Result<bool, String> {
|
||||
let mut errors = Vec::new();
|
||||
let mut removed = false;
|
||||
for family in [NftFamily::Inet, NftFamily::Ip, NftFamily::Ip6] {
|
||||
if let Err(error) = run_command(
|
||||
match run_command(
|
||||
"nft",
|
||||
&["delete", "table", family.as_str(), NFT_TABLE],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
&& !is_missing_command_or_nft_table(&error)
|
||||
{
|
||||
errors.push(format!(
|
||||
"nft delete table {} {NFT_TABLE} failed: {error}",
|
||||
family.as_str()
|
||||
));
|
||||
Ok(()) => {
|
||||
removed = true;
|
||||
}
|
||||
Err(error) if is_missing_command_or_nft_table(&error) => {}
|
||||
Err(error) => {
|
||||
errors.push(format!(
|
||||
"nft delete table {} {NFT_TABLE} failed: {error}",
|
||||
family.as_str()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
Ok(())
|
||||
Ok(removed)
|
||||
} else {
|
||||
Err(errors.join(", "))
|
||||
}
|
||||
@@ -253,4 +259,37 @@ mod tests {
|
||||
assert!(script.contains("ip6 saddr limit rate over 12/second burst 24 packets"));
|
||||
assert!(script.contains("ip6 saddr limit rate over 48/minute burst 1 packets"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nft_missing_table_errors_are_cleanup_benign() {
|
||||
assert!(is_missing_command_or_nft_table("nft is not available"));
|
||||
assert!(is_missing_command_or_nft_table(
|
||||
"Error: No such file or directory"
|
||||
));
|
||||
assert!(!is_missing_command_or_nft_table(
|
||||
"Error: Operation not permitted"
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nft_apply_plan_keeps_dual_stack_rules_in_inet_table() {
|
||||
let v4_rule = test_rule(Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7))), 443);
|
||||
let v6_rule = test_rule(Some(IpAddr::V6(Ipv6Addr::LOCALHOST)), 443);
|
||||
let v4_rules = [v4_rule];
|
||||
let v6_rules = [v6_rule];
|
||||
let plans = nft_apply_plan(
|
||||
NftTableFamilies {
|
||||
inet: false,
|
||||
ip: false,
|
||||
ip6: false,
|
||||
},
|
||||
&v4_rules,
|
||||
&v6_rules,
|
||||
);
|
||||
|
||||
assert_eq!(plans.len(), 1);
|
||||
assert_eq!(plans[0].family.as_str(), "inet");
|
||||
assert_eq!(plans[0].v4_targets, v4_rules.as_slice());
|
||||
assert_eq!(plans[0].v6_targets, v6_rules.as_slice());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -675,8 +675,117 @@ fn hex_dump(data: &[u8]) -> String {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
fn upstream_egress(
|
||||
route_kind: UpstreamRouteKind,
|
||||
socks_bound_addr: Option<SocketAddr>,
|
||||
) -> UpstreamEgressInfo {
|
||||
UpstreamEgressInfo {
|
||||
upstream_id: 7,
|
||||
route_kind,
|
||||
local_addr: None,
|
||||
direct_bind_ip: None,
|
||||
socks_bound_addr,
|
||||
socks_proxy_addr: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn socks_bound_addr_is_used_only_for_public_same_family_tuple() {
|
||||
let v4_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
|
||||
let v6_bound = SocketAddr::new(
|
||||
IpAddr::V6(
|
||||
"2606:4700:4700::1111"
|
||||
.parse::<Ipv6Addr>()
|
||||
.expect("test IPv6 address must parse"),
|
||||
),
|
||||
443,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V4,
|
||||
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(v4_bound)))
|
||||
),
|
||||
Some(v4_bound)
|
||||
);
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V6,
|
||||
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(v6_bound)))
|
||||
),
|
||||
Some(v6_bound)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn socks_bound_addr_rejects_bogon_unspecified_wrong_family_and_non_socks_routes() {
|
||||
let bogon_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
|
||||
let unspecified_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 443);
|
||||
let public_v4_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
|
||||
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V4,
|
||||
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(bogon_bound)))
|
||||
),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V4,
|
||||
Some(upstream_egress(
|
||||
UpstreamRouteKind::Socks5,
|
||||
Some(unspecified_bound)
|
||||
))
|
||||
),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V6,
|
||||
Some(upstream_egress(
|
||||
UpstreamRouteKind::Socks5,
|
||||
Some(public_v4_bound)
|
||||
))
|
||||
),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
MePool::select_socks_bound_addr(
|
||||
IpFamily::V4,
|
||||
Some(upstream_egress(
|
||||
UpstreamRouteKind::Direct,
|
||||
Some(public_v4_bound)
|
||||
))
|
||||
),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn kdf_client_port_source_tracks_only_valid_socks_bound_port() {
|
||||
let bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
|
||||
let zero_port_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 0);
|
||||
|
||||
assert_eq!(
|
||||
KdfClientPortSource::from_socks_bound_port(Some(bound.port())),
|
||||
KdfClientPortSource::SocksBound
|
||||
);
|
||||
assert_eq!(
|
||||
KdfClientPortSource::from_socks_bound_port(
|
||||
Some(zero_port_bound).filter(|addr| addr.port() != 0).map(|addr| addr.port())
|
||||
),
|
||||
KdfClientPortSource::LocalSocket
|
||||
);
|
||||
assert_eq!(
|
||||
KdfClientPortSource::from_socks_bound_port(None),
|
||||
KdfClientPortSource::LocalSocket
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_configure_keepalive_loopback() {
|
||||
let listener = match TcpListener::bind("127.0.0.1:0").await {
|
||||
|
||||
@@ -464,7 +464,9 @@ impl MePool {
|
||||
if !self.writer_accepts_new_binding(w) {
|
||||
continue;
|
||||
}
|
||||
let (payload, meta) = build_routed_payload(our_addr);
|
||||
// Keep the advertised proxy IP aligned with the selected ME writer source.
|
||||
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
||||
let (payload, meta) = build_routed_payload(effective_our_addr);
|
||||
match w.tx.clone().try_reserve_owned() {
|
||||
Ok(permit) => {
|
||||
if !self.registry.bind_writer(conn_id, w.id, meta).await {
|
||||
@@ -519,7 +521,9 @@ impl MePool {
|
||||
}
|
||||
self.stats
|
||||
.increment_me_writer_pick_blocking_fallback_total();
|
||||
let (payload, meta) = build_routed_payload(our_addr);
|
||||
// Keep the advertised proxy IP aligned with the selected ME writer source.
|
||||
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
|
||||
let (payload, meta) = build_routed_payload(effective_our_addr);
|
||||
let reserve_result =
|
||||
if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout {
|
||||
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
|
||||
|
||||
@@ -323,7 +323,7 @@ async fn send_proxy_req_prunes_iterative_stale_bind_failures_without_data_replay
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_proxy_req_preserves_client_facing_our_addr_when_writer_source_ip_differs() {
|
||||
async fn send_proxy_req_uses_writer_source_ip_when_advertised_our_addr_differs() {
|
||||
let (pool, _rng) = make_pool().await;
|
||||
pool.rr.store(0, Ordering::Relaxed);
|
||||
|
||||
@@ -363,5 +363,68 @@ async fn send_proxy_req_preserves_client_facing_our_addr_when_writer_source_ip_d
|
||||
let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50))
|
||||
.await
|
||||
.expect("writer must receive routed payload");
|
||||
assert_eq!(proxy_req_our_addr_from_payload(&payload), our_addr);
|
||||
assert_eq!(
|
||||
proxy_req_our_addr_from_payload(&payload),
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 31)), our_addr.port())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_proxy_req_blocking_fallback_uses_writer_source_ip() {
|
||||
let (pool, _rng) = make_pool().await;
|
||||
pool.rr.store(0, Ordering::Relaxed);
|
||||
|
||||
let (conn_id, _rx) = pool.registry.register().await;
|
||||
let mut live_rx = insert_writer(
|
||||
&pool,
|
||||
32,
|
||||
2,
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 2, 32)), 443),
|
||||
true,
|
||||
)
|
||||
.await;
|
||||
let source_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 32));
|
||||
|
||||
let tx = {
|
||||
let mut writers = pool.writers.write().await;
|
||||
let writer = writers
|
||||
.iter_mut()
|
||||
.find(|writer| writer.id == 32)
|
||||
.expect("test writer must exist");
|
||||
writer.source_ip = source_ip;
|
||||
writer.tx.clone()
|
||||
};
|
||||
for _ in 0..8 {
|
||||
tx.try_send(WriterCommand::Close)
|
||||
.expect("test writer channel must accept preload");
|
||||
}
|
||||
|
||||
let our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 8)), 9443);
|
||||
let pool_for_send = pool.clone();
|
||||
let send_task = tokio::spawn(async move {
|
||||
pool_for_send
|
||||
.send_proxy_req(
|
||||
conn_id,
|
||||
2,
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 8)), 30003),
|
||||
our_addr,
|
||||
b"blocking",
|
||||
0,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
assert!(matches!(live_rx.recv().await, Some(WriterCommand::Close)));
|
||||
|
||||
let result = send_task.await.expect("send task must not panic");
|
||||
assert!(result.is_ok());
|
||||
let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50))
|
||||
.await
|
||||
.expect("writer must receive blocking fallback payload");
|
||||
assert_eq!(
|
||||
proxy_req_our_addr_from_payload(&payload),
|
||||
SocketAddr::new(source_ip, our_addr.port())
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user