Compare commits

...

2 Commits
3.4.22 ... flow

Author SHA1 Message Date
Alexey
451227da60 Namespace synlimit netfilter rules per target set
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-07-01 01:47:14 +03:00
Alexey
81ae483201 Add regression coverage for ME routing, D2C padding, synlimit, and MSS bulk validation
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-06-30 13:13:11 +03:00
10 changed files with 856 additions and 82 deletions

View File

@@ -958,6 +958,10 @@ impl ProxyConfig {
.server
.client_mss_value()
.map_err(|error| ProxyError::Config(format!("server.client_mss {error}")))?;
config
.server
.client_mss_bulk_value()
.map_err(|error| ProxyError::Config(format!("server.client_mss_bulk {error}")))?;
for (idx, listener) in config.server.listeners.iter().enumerate() {
if listener.client_mss.is_some() {
listener

View File

@@ -1652,6 +1652,7 @@ fn client_mss_custom_value_is_accepted() {
let toml = r#"
[server]
client_mss = "4096"
client_mss_bulk = "1400"
[censorship]
tls_domain = "example.com"
@@ -1665,6 +1666,7 @@ fn client_mss_custom_value_is_accepted() {
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.server.client_mss_value(), Ok(Some(4096)));
assert_eq!(cfg.server.client_mss_bulk_value(), Ok(Some(1400)));
let _ = std::fs::remove_file(path);
}
@@ -1693,6 +1695,31 @@ fn client_mss_out_of_range_is_rejected() {
}
}
#[test]
fn client_mss_bulk_out_of_range_is_rejected() {
for value in ["87", "4097"] {
let toml = format!(
r#"
[server]
client_mss_bulk = "{value}"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#
);
let dir = std::env::temp_dir();
let path = dir.join(format!("telemt_client_mss_bulk_out_of_range_{value}_test.toml"));
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("server.client_mss_bulk custom value must be within [88, 4096]"));
let _ = std::fs::remove_file(path);
}
}
#[test]
fn client_mss_unquoted_number_is_rejected() {
let toml = r#"

View File

@@ -69,7 +69,9 @@ use self::quota::{
#[cfg(test)]
use self::c2me::enqueue_c2me_command;
#[cfg(test)]
use self::d2c::{compute_intermediate_secure_wire_len, process_me_writer_response};
use self::d2c::{
compute_intermediate_secure_wire_len, process_me_writer_response, write_client_payload,
};
#[cfg(test)]
pub(crate) use self::desync::{
clear_desync_dedup_for_testing_in_shared, desync_dedup_get_for_testing,
@@ -166,3 +168,7 @@ mod middle_relay_atomic_quota_invariant_tests;
#[cfg(test)]
#[path = "tests/middle_relay_baseline_invariant_tests.rs"]
mod middle_relay_baseline_invariant_tests;
#[cfg(test)]
#[path = "tests/middle_relay_d2c_flush_padding_security_tests.rs"]
mod middle_relay_d2c_flush_padding_security_tests;

View File

@@ -0,0 +1,148 @@
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use super::*;
use crate::crypto::AesCtr;
use crate::protocol::framing::INTERMEDIATE_WIRE_LEN_MASK;
#[derive(Clone, Default)]
struct RecordingWriter {
writes: Arc<Mutex<Vec<u8>>>,
flushes: Arc<AtomicUsize>,
}
impl RecordingWriter {
fn captured(&self) -> Vec<u8> {
self.writes
.lock()
.expect("test writer capture lock must not be poisoned")
.clone()
}
}
impl AsyncWrite for RecordingWriter {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.writes
.lock()
.expect("test writer capture lock must not be poisoned")
.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.flushes.fetch_add(1, Ordering::Relaxed);
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
fn crypto_writer(inner: RecordingWriter) -> CryptoWriter<RecordingWriter> {
let key = [0u8; 32];
CryptoWriter::new(inner, AesCtr::new(&key, 0), 8 * 1024 * 1024)
}
fn decrypt_capture(mut encrypted: Vec<u8>) -> Vec<u8> {
let key = [0u8; 32];
let mut cipher = AesCtr::new(&key, 0);
cipher.apply(&mut encrypted);
encrypted
}
fn secure_wire_len(cleartext: &[u8]) -> usize {
let header = cleartext
.get(..4)
.expect("secure frame must include an intermediate header");
(u32::from_le_bytes(
header
.try_into()
.expect("secure frame header must be four bytes"),
) & INTERMEDIATE_WIRE_LEN_MASK) as usize
}
async fn write_secure_payload(payload_len: usize) -> (MeD2cWriteMode, Vec<u8>) {
let inner = RecordingWriter::default();
let capture = inner.clone();
let mut writer = crypto_writer(inner);
let payload = vec![0xa5; payload_len];
let mut frame_buf = Vec::new();
let cancel = CancellationToken::new();
let rng = SecureRandom::new();
let mode = write_client_payload(
&mut writer,
ProtoTag::Secure,
0,
&payload,
&rng,
&mut frame_buf,
&cancel,
)
.await
.expect("secure payload write must succeed");
flush_client_or_cancel(&mut writer, &cancel)
.await
.expect("secure payload flush must succeed");
(mode, decrypt_capture(capture.captured()))
}
fn assert_secure_payload_with_tail_padding(cleartext: &[u8], payload_len: usize) {
let wire_len = secure_wire_len(cleartext);
assert_eq!(cleartext.len(), 4 + wire_len);
assert!(cleartext[4..4 + payload_len]
.iter()
.all(|byte| *byte == 0xa5));
let padding_len = wire_len
.checked_sub(payload_len)
.expect("secure wire length must include payload bytes");
assert!((1..=3).contains(&padding_len));
assert_ne!(wire_len % 4, 0);
}
#[tokio::test]
async fn queue_drain_flush_reason_performs_physical_client_flush() {
let inner = RecordingWriter::default();
let flushes = inner.flushes.clone();
let mut writer = crypto_writer(inner);
let cancel = CancellationToken::new();
assert!(me_d2c_flush_reason_requires_client_flush(
MeD2cFlushReason::QueueDrain
));
flush_client_or_cancel(&mut writer, &cancel)
.await
.expect("client flush must succeed");
assert_eq!(flushes.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn secure_payload_coalesced_path_keeps_tail_padding() {
let payload_len = 8;
let (mode, cleartext) = write_secure_payload(payload_len).await;
assert!(matches!(mode, MeD2cWriteMode::Coalesced));
assert_secure_payload_with_tail_padding(&cleartext, payload_len);
}
#[tokio::test]
async fn secure_payload_split_path_keeps_tail_padding() {
let payload_len = ME_D2C_SINGLE_WRITE_COALESCE_MAX_BYTES;
let (mode, cleartext) = write_secure_payload(payload_len).await;
assert!(matches!(mode, MeD2cWriteMode::Split));
assert_secure_payload_with_tail_padding(&cleartext, payload_len);
}

View File

@@ -1,10 +1,8 @@
use std::net::IpAddr;
use super::command::run_command;
use super::model::{SynLimitRule, SynLimitTargets, synlimit_rate_arg};
use super::model::{SynLimitNamespace, SynLimitRule, SynLimitTargets, synlimit_rate_arg};
const IPTABLES_CHAIN: &str = "TELEMT_SYNLIMIT";
const IPTABLES_HASHLIMIT_PREFIX: &str = "TMT-SYN";
const IPV4_IOS_PACKET_LENGTH: u16 = 64;
const IPV6_IOS_PACKET_LENGTH: u16 = 84;
const IOS_TTL_LIMIT: u8 = 65;
@@ -38,24 +36,41 @@ impl IpTablesFamily {
}
}
pub(super) async fn apply_synlimit_rules(targets: &SynLimitTargets) -> Result<(), String> {
apply_rules_for_binary("iptables", &targets.iptables_v4, IpTablesFamily::V4).await?;
apply_rules_for_binary("ip6tables", &targets.iptables_v6, IpTablesFamily::V6).await
pub(super) async fn apply_synlimit_rules(
targets: &SynLimitTargets,
namespace: &SynLimitNamespace,
) -> Result<(), String> {
apply_rules_for_binary(
"iptables",
&targets.iptables_v4,
IpTablesFamily::V4,
namespace,
)
.await?;
apply_rules_for_binary(
"ip6tables",
&targets.iptables_v6,
IpTablesFamily::V6,
namespace,
)
.await
}
async fn apply_rules_for_binary(
binary: &str,
targets: &[SynLimitRule],
family: IpTablesFamily,
namespace: &SynLimitNamespace,
) -> Result<(), String> {
if targets.is_empty() {
return Ok(());
}
let _ = run_command(binary, &["-t", "filter", "-N", IPTABLES_CHAIN], None).await;
run_command(binary, &["-t", "filter", "-F", IPTABLES_CHAIN], None).await?;
let chain = namespace.iptables_chain.as_str();
let _ = run_command(binary, &["-t", "filter", "-N", chain], None).await;
run_command(binary, &["-t", "filter", "-F", chain], None).await?;
if run_command(
binary,
&["-t", "filter", "-C", "INPUT", "-j", IPTABLES_CHAIN],
&["-t", "filter", "-C", "INPUT", "-j", chain],
None,
)
.await
@@ -63,24 +78,19 @@ async fn apply_rules_for_binary(
{
run_command(
binary,
&["-t", "filter", "-I", "INPUT", "1", "-j", IPTABLES_CHAIN],
&["-t", "filter", "-I", "INPUT", "1", "-j", chain],
None,
)
.await?;
}
for (idx, target) in targets.iter().enumerate() {
for rule in iptables_synfix_rule_args(target, idx, family) {
for rule in iptables_synfix_rule_args(target, idx, family, namespace) {
let refs: Vec<&str> = rule.iter().map(String::as_str).collect();
run_command(binary, &refs, None).await?;
}
}
run_command(
binary,
&["-t", "filter", "-A", IPTABLES_CHAIN, "-j", "RETURN"],
None,
)
.await?;
run_command(binary, &["-t", "filter", "-A", chain, "-j", "RETURN"], None).await?;
Ok(())
}
@@ -89,12 +99,13 @@ fn iptables_synfix_rule_args(
target: &SynLimitRule,
idx: usize,
family: IpTablesFamily,
namespace: &SynLimitNamespace,
) -> Vec<Vec<String>> {
vec![
iptables_ios_accept_rule_args(target, idx, family),
iptables_ios_reject_rule_args(target, family),
iptables_generic_accept_rule_args(target, idx, family),
iptables_generic_reject_rule_args(target),
iptables_ios_accept_rule_args(target, idx, family, namespace),
iptables_ios_reject_rule_args(target, family, namespace),
iptables_generic_accept_rule_args(target, idx, family, namespace),
iptables_generic_reject_rule_args(target, namespace),
]
}
@@ -102,12 +113,15 @@ fn iptables_ios_accept_rule_args(
target: &SynLimitRule,
idx: usize,
family: IpTablesFamily,
namespace: &SynLimitNamespace,
) -> Vec<String> {
let hashlimit_name = format!(
"{IPTABLES_HASHLIMIT_PREFIX}-I{}-{idx}",
"{}-I{}-{idx}",
namespace.iptables_hashlimit_prefix,
family.hashlimit_tag()
);
let mut args = iptables_base_rule_args(target.ip, target.port);
let mut args =
iptables_base_rule_args(namespace.iptables_chain.as_str(), target.ip, target.port);
args.extend(iptables_ios_match_args(family));
args.extend(iptables_hashlimit_args(
&hashlimit_name,
@@ -121,8 +135,13 @@ fn iptables_ios_accept_rule_args(
args
}
fn iptables_ios_reject_rule_args(target: &SynLimitRule, family: IpTablesFamily) -> Vec<String> {
let mut args = iptables_base_rule_args(target.ip, target.port);
fn iptables_ios_reject_rule_args(
target: &SynLimitRule,
family: IpTablesFamily,
namespace: &SynLimitNamespace,
) -> Vec<String> {
let mut args =
iptables_base_rule_args(namespace.iptables_chain.as_str(), target.ip, target.port);
args.extend(iptables_ios_match_args(family));
args.extend(iptables_reject_args());
args
@@ -132,12 +151,15 @@ fn iptables_generic_accept_rule_args(
target: &SynLimitRule,
idx: usize,
family: IpTablesFamily,
namespace: &SynLimitNamespace,
) -> Vec<String> {
let hashlimit_name = format!(
"{IPTABLES_HASHLIMIT_PREFIX}-G{}-{idx}",
"{}-G{}-{idx}",
namespace.iptables_hashlimit_prefix,
family.hashlimit_tag()
);
let mut args = iptables_base_rule_args(target.ip, target.port);
let mut args =
iptables_base_rule_args(namespace.iptables_chain.as_str(), target.ip, target.port);
args.extend(iptables_hashlimit_args(
&hashlimit_name,
target.generic_seconds,
@@ -150,18 +172,22 @@ fn iptables_generic_accept_rule_args(
args
}
fn iptables_generic_reject_rule_args(target: &SynLimitRule) -> Vec<String> {
let mut args = iptables_base_rule_args(target.ip, target.port);
fn iptables_generic_reject_rule_args(
target: &SynLimitRule,
namespace: &SynLimitNamespace,
) -> Vec<String> {
let mut args =
iptables_base_rule_args(namespace.iptables_chain.as_str(), target.ip, target.port);
args.extend(iptables_reject_args());
args
}
fn iptables_base_rule_args(ip: Option<IpAddr>, port: u16) -> Vec<String> {
fn iptables_base_rule_args(chain: &str, ip: Option<IpAddr>, port: u16) -> Vec<String> {
let mut args = vec![
"-t".to_string(),
"filter".to_string(),
"-A".to_string(),
IPTABLES_CHAIN.to_string(),
chain.to_string(),
"-p".to_string(),
"tcp".to_string(),
"--syn".to_string(),
@@ -226,13 +252,17 @@ fn iptables_reject_args() -> Vec<String> {
]
}
pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<bool, String> {
pub(super) async fn clear_rules_for_binary(
binary: &str,
namespace: &SynLimitNamespace,
) -> Result<bool, String> {
let mut errors = Vec::new();
let mut removed = false;
let chain = namespace.iptables_chain.as_str();
for _ in 0..8 {
match run_command(
binary,
&["-t", "filter", "-D", "INPUT", "-j", IPTABLES_CHAIN],
&["-t", "filter", "-D", "INPUT", "-j", chain],
None,
)
.await
@@ -247,7 +277,7 @@ pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<bool, String>
}
}
}
match run_command(binary, &["-t", "filter", "-F", IPTABLES_CHAIN], None).await {
match run_command(binary, &["-t", "filter", "-F", chain], None).await {
Ok(()) => {
removed = true;
}
@@ -256,7 +286,7 @@ pub(super) async fn clear_rules_for_binary(binary: &str) -> Result<bool, String>
errors.push(format!("{binary} flush chain failed: {error}"));
}
}
match run_command(binary, &["-t", "filter", "-X", IPTABLES_CHAIN], None).await {
match run_command(binary, &["-t", "filter", "-X", chain], None).await {
Ok(()) => {
removed = true;
}
@@ -292,12 +322,26 @@ mod tests {
.any(|pair| pair[0].as_str() == key && pair[1].as_str() == value)
}
fn has_key(args: &[String], key: &str) -> bool {
args.iter().any(|arg| arg == key)
}
fn test_namespace() -> SynLimitNamespace {
SynLimitNamespace {
nft_table: "telemt_synlimit_test".to_string(),
iptables_chain: "TMT_SYN_TEST".to_string(),
iptables_hashlimit_prefix: "TMTTEST".to_string(),
}
}
#[test]
fn iptables_rules_use_synfix_order_and_rejects() {
let target = test_rule(Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7))), 443);
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V4);
let namespace = test_namespace();
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V4, &namespace);
assert_eq!(rules.len(), 4);
assert!(has_pair(&rules[0], "-A", "TMT_SYN_TEST"));
assert!(has_pair(&rules[0], "--length", "64"));
assert!(has_pair(&rules[0], "--ttl-lt", "65"));
assert!(has_pair(&rules[0], "--hashlimit-upto", "12/second"));
@@ -314,10 +358,42 @@ mod tests {
#[test]
fn ip6tables_rules_use_ipv6_hoplimit_classifier() {
let target = test_rule(Some(IpAddr::V6(Ipv6Addr::LOCALHOST)), 443);
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V6);
let namespace = test_namespace();
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V6, &namespace);
assert!(has_pair(&rules[0], "--length", "84"));
assert!(has_pair(&rules[0], "--hl-lt", "65"));
assert!(has_pair(&rules[0], "-d", "::1"));
}
#[test]
fn iptables_missing_rule_errors_are_cleanup_benign() {
assert!(is_missing_command_or_iptables_rule(
"iptables is not available"
));
assert!(is_missing_command_or_iptables_rule(
"iptables: No chain/target/match by that name."
));
assert!(is_missing_command_or_iptables_rule(
"iptables: Chain TELEMT_SYNLIMIT does not exist."
));
assert!(is_missing_command_or_iptables_rule(
"Couldn't load target `TELEMT_SYNLIMIT': No such file or directory"
));
assert!(!is_missing_command_or_iptables_rule(
"iptables: Permission denied"
));
}
#[test]
fn iptables_wildcard_rule_omits_destination_match() {
let target = test_rule(None, 443);
let namespace = test_namespace();
let rules = iptables_synfix_rule_args(&target, 0, IpTablesFamily::V4, &namespace);
for rule in rules {
assert!(!has_key(&rule, "-d"));
assert!(has_pair(&rule, "--dport", "443"));
}
}
}

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
use tracing::warn;
@@ -11,7 +11,9 @@ mod model;
mod nftables;
use self::command::has_cap_net_admin;
use self::model::synlimit_targets;
use self::model::{SynLimitNamespace, synlimit_namespace, synlimit_targets};
static ACTIVE_SYNLIMIT_NAMESPACE: Mutex<Option<SynLimitNamespace>> = Mutex::new(None);
pub(crate) fn spawn_synlimit_controller(config_rx: watch::Receiver<Arc<ProxyConfig>>) {
if !cfg!(target_os = "linux") {
@@ -39,7 +41,34 @@ async fn wait_for_config_channel_close_and_reconcile(
}
pub(crate) async fn reconcile_synlimit_rules(cfg: &ProxyConfig) {
match clear_synlimit_rules_all_backends().await {
let targets = synlimit_targets(cfg);
let namespace = synlimit_namespace(&targets);
if let Some(previous_namespace) = set_active_synlimit_namespace(namespace.clone()) {
match clear_synlimit_rules_for_namespace(&previous_namespace).await {
Ok(true) => {
warn!("Removed previous SYN limiter namespace before reconcile");
}
Ok(false) => {}
Err(error) => {
warn!(error = %error, "Failed to clear previous SYN limiter namespace before reconcile");
}
}
}
if targets.is_empty() {
return;
}
let Some(namespace) = namespace else {
return;
};
if !has_cap_net_admin() {
warn!(
"SYN limiter configured but CAP_NET_ADMIN is not available; netfilter rules not applied"
);
return;
}
match clear_synlimit_rules_for_namespace(&namespace).await {
Ok(true) => {
warn!("Removed stale SYN limiter rules left by a previous run before reconcile");
}
@@ -49,37 +78,35 @@ pub(crate) async fn reconcile_synlimit_rules(cfg: &ProxyConfig) {
}
}
let targets = synlimit_targets(cfg);
if targets.is_empty() {
return;
}
if !has_cap_net_admin() {
warn!(
"SYN limiter configured but CAP_NET_ADMIN is not available; netfilter rules not applied"
);
return;
}
if targets.has_iptables_targets()
&& let Err(error) = iptables::apply_synlimit_rules(&targets).await
&& let Err(error) = iptables::apply_synlimit_rules(&targets, &namespace).await
{
warn!(error = %error, "Failed to apply iptables SYN limiter rules");
}
if targets.has_nft_targets()
&& let Err(error) = nftables::apply_synlimit_rules(&targets).await
&& let Err(error) = nftables::apply_synlimit_rules(&targets, &namespace).await
{
warn!(error = %error, "Failed to apply nftables SYN limiter rules");
}
}
pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<bool, String> {
let Some(namespace) = take_active_synlimit_namespace() else {
return Ok(false);
};
clear_synlimit_rules_for_namespace(&namespace).await
}
async fn clear_synlimit_rules_for_namespace(
namespace: &SynLimitNamespace,
) -> Result<bool, String> {
if !has_cap_net_admin() {
return Ok(false);
}
let mut errors = Vec::new();
let mut removed = false;
match nftables::clear_rules_all_families().await {
match nftables::clear_rules_all_families(namespace).await {
Ok(value) => {
removed |= value;
}
@@ -87,7 +114,7 @@ pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<bool, String>
errors.push(error);
}
}
match iptables::clear_rules_for_binary("iptables").await {
match iptables::clear_rules_for_binary("iptables", namespace).await {
Ok(value) => {
removed |= value;
}
@@ -95,7 +122,7 @@ pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<bool, String>
errors.push(error);
}
}
match iptables::clear_rules_for_binary("ip6tables").await {
match iptables::clear_rules_for_binary("ip6tables", namespace).await {
Ok(value) => {
removed |= value;
}
@@ -111,6 +138,32 @@ pub(crate) async fn clear_synlimit_rules_all_backends() -> Result<bool, String>
}
}
fn set_active_synlimit_namespace(next: Option<SynLimitNamespace>) -> Option<SynLimitNamespace> {
match ACTIVE_SYNLIMIT_NAMESPACE.lock() {
Ok(mut active) => {
if *active == next {
None
} else {
std::mem::replace(&mut *active, next)
}
}
Err(error) => {
warn!(error = %error, "Failed to update active SYN limiter namespace");
None
}
}
}
fn take_active_synlimit_namespace() -> Option<SynLimitNamespace> {
match ACTIVE_SYNLIMIT_NAMESPACE.lock() {
Ok(mut active) => active.take(),
Err(error) => {
warn!(error = %error, "Failed to read active SYN limiter namespace");
None
}
}
}
fn has_synlimit_config(cfg: &ProxyConfig) -> bool {
cfg.server
.listeners

View File

@@ -17,6 +17,13 @@ pub(super) struct SynLimitRule {
pub(super) hashlimit_size: u32,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(super) struct SynLimitNamespace {
pub(super) nft_table: String,
pub(super) iptables_chain: String,
pub(super) iptables_hashlimit_prefix: String,
}
#[derive(Default)]
pub(super) struct SynLimitTargets {
pub(super) iptables_v4: Vec<SynLimitRule>,
@@ -42,6 +49,44 @@ impl SynLimitTargets {
}
}
struct SynLimitNamespaceHasher {
value: u64,
}
impl SynLimitNamespaceHasher {
const OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const PRIME: u64 = 0x0000_0100_0000_01b3;
fn new() -> Self {
Self {
value: Self::OFFSET,
}
}
fn write(&mut self, bytes: &[u8]) {
for byte in bytes {
self.value ^= u64::from(*byte);
self.value = self.value.wrapping_mul(Self::PRIME);
}
}
fn write_u8(&mut self, value: u8) {
self.write(&[value]);
}
fn write_u16(&mut self, value: u16) {
self.write(&value.to_le_bytes());
}
fn write_u32(&mut self, value: u32) {
self.write(&value.to_le_bytes());
}
fn finish(self) -> u64 {
self.value
}
}
pub(super) fn synlimit_targets(cfg: &ProxyConfig) -> SynLimitTargets {
let mut iptables_v4 = BTreeSet::new();
let mut iptables_v6 = BTreeSet::new();
@@ -91,6 +136,64 @@ pub(super) fn synlimit_targets(cfg: &ProxyConfig) -> SynLimitTargets {
}
}
pub(super) fn synlimit_namespace(targets: &SynLimitTargets) -> Option<SynLimitNamespace> {
if targets.is_empty() {
return None;
}
let mut hasher = SynLimitNamespaceHasher::new();
write_namespace_rule_group(&mut hasher, b"iptables-v4", &targets.iptables_v4);
write_namespace_rule_group(&mut hasher, b"iptables-v6", &targets.iptables_v6);
write_namespace_rule_group(&mut hasher, b"nft-v4", &targets.nft_v4);
write_namespace_rule_group(&mut hasher, b"nft-v6", &targets.nft_v6);
let suffix = format!("{:016x}", hasher.finish());
let iptables_suffix = &suffix[..12];
let hashlimit_suffix = &suffix[..10];
Some(SynLimitNamespace {
nft_table: format!("telemt_synlimit_{suffix}"),
iptables_chain: format!("TMT_SYN_{iptables_suffix}"),
iptables_hashlimit_prefix: format!("TMT{hashlimit_suffix}"),
})
}
fn write_namespace_rule_group(
hasher: &mut SynLimitNamespaceHasher,
group: &[u8],
rules: &[SynLimitRule],
) {
hasher.write(group);
hasher.write_u32(rules.len() as u32);
for rule in rules {
write_namespace_rule(hasher, rule);
}
}
fn write_namespace_rule(hasher: &mut SynLimitNamespaceHasher, rule: &SynLimitRule) {
match rule.ip {
Some(IpAddr::V4(ip)) => {
hasher.write_u8(4);
hasher.write(&ip.octets());
}
Some(IpAddr::V6(ip)) => {
hasher.write_u8(6);
hasher.write(&ip.octets());
}
None => {
hasher.write_u8(0);
}
}
hasher.write_u16(rule.port);
hasher.write_u32(rule.generic_seconds);
hasher.write_u32(rule.generic_hitcount);
hasher.write_u32(rule.generic_burst);
hasher.write_u32(rule.ios_seconds);
hasher.write_u32(rule.ios_hitcount);
hasher.write_u32(rule.ios_burst);
hasher.write_u32(rule.hashlimit_expire_ms);
hasher.write_u32(rule.hashlimit_size);
}
pub(super) fn synlimit_rate_arg(seconds: u32, hitcount: u32) -> String {
let seconds = u64::from(seconds.max(1));
let hitcount = u64::from(hitcount.max(1));
@@ -124,3 +227,136 @@ pub(super) fn test_rule(ip: Option<IpAddr>, port: u16) -> SynLimitRule {
hashlimit_size: 32_768,
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use super::*;
use crate::config::ListenerConfig;
fn listener(ip: IpAddr, port: Option<u16>, synlimit: SynLimitMode) -> ListenerConfig {
ListenerConfig {
ip,
port,
client_mss: None,
synlimit,
synlimit_seconds: 60,
synlimit_hitcount: 48,
synlimit_burst: 1,
synlimit_ios_seconds: 1,
synlimit_ios_hitcount: 12,
synlimit_ios_burst: 24,
synlimit_hashlimit_expire_ms: 60_000,
synlimit_hashlimit_size: 32_768,
announce: None,
announce_ip: None,
proxy_protocol: None,
reuse_allow: false,
}
}
#[test]
fn synlimit_targets_deduplicate_and_use_legacy_port_fallback() {
let mut cfg = ProxyConfig::default();
cfg.server.port = 9443;
cfg.server.listeners = vec![
listener(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
None,
SynLimitMode::Iptables,
),
listener(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
None,
SynLimitMode::Iptables,
),
];
let targets = synlimit_targets(&cfg);
assert_eq!(targets.iptables_v4.len(), 1);
assert_eq!(targets.iptables_v4[0].ip, None);
assert_eq!(targets.iptables_v4[0].port, 9443);
assert!(targets.iptables_v6.is_empty());
assert!(targets.nft_v4.is_empty());
assert!(targets.nft_v6.is_empty());
}
#[test]
fn synlimit_targets_separate_backends_and_ip_families() {
let mut cfg = ProxyConfig::default();
cfg.server.listeners = vec![
listener(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)),
Some(443),
SynLimitMode::Iptables,
),
listener(
IpAddr::V6(Ipv6Addr::LOCALHOST),
Some(443),
SynLimitMode::Iptables,
),
listener(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 2)),
Some(444),
SynLimitMode::Nftables,
),
listener(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
Some(444),
SynLimitMode::Nftables,
),
];
let targets = synlimit_targets(&cfg);
assert_eq!(targets.iptables_v4.len(), 1);
assert_eq!(targets.iptables_v6.len(), 1);
assert_eq!(targets.nft_v4.len(), 1);
assert_eq!(targets.nft_v6.len(), 1);
assert_eq!(
targets.iptables_v4[0].ip,
Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)))
);
assert_eq!(targets.iptables_v6[0].ip, Some(IpAddr::V6(Ipv6Addr::LOCALHOST)));
assert_eq!(
targets.nft_v4[0].ip,
Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 2)))
);
assert_eq!(targets.nft_v6[0].ip, None);
}
#[test]
fn synlimit_namespace_is_stable_and_changes_by_targets() {
let mut cfg = ProxyConfig::default();
cfg.server.listeners = vec![listener(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)),
Some(443),
SynLimitMode::Nftables,
)];
let first = synlimit_namespace(&synlimit_targets(&cfg))
.expect("configured targets must have a namespace");
let second = synlimit_namespace(&synlimit_targets(&cfg))
.expect("configured targets must have a namespace");
cfg.server.listeners[0].port = Some(444);
let changed = synlimit_namespace(&synlimit_targets(&cfg))
.expect("configured targets must have a namespace");
assert_eq!(first, second);
assert_ne!(first, changed);
assert!(first.nft_table.starts_with("telemt_synlimit_"));
assert!(first.iptables_chain.starts_with("TMT_SYN_"));
assert!(first.iptables_chain.len() <= 28);
assert!(first.iptables_hashlimit_prefix.starts_with("TMT"));
}
#[test]
fn synlimit_rate_arg_uses_native_units_without_fractional_rates() {
assert_eq!(synlimit_rate_arg(1, 12), "12/second");
assert_eq!(synlimit_rate_arg(60, 48), "48/minute");
assert_eq!(synlimit_rate_arg(3600, 121), "121/hour");
assert_eq!(synlimit_rate_arg(86400, 241), "241/day");
}
}

View File

@@ -1,7 +1,6 @@
use super::command::{run_command, run_command_stdout};
use super::model::{SynLimitRule, SynLimitTargets, synlimit_rate_arg};
use super::model::{SynLimitNamespace, SynLimitRule, SynLimitTargets, synlimit_rate_arg};
const NFT_TABLE: &str = "telemt_synlimit";
const NFT_CHAIN: &str = "input";
const NFT_INPUT_PRIORITY: i16 = -5;
const IPV4_IOS_PACKET_LENGTH: u16 = 64;
@@ -38,10 +37,13 @@ impl NftFamily {
}
}
pub(super) async fn apply_synlimit_rules(targets: &SynLimitTargets) -> Result<(), String> {
pub(super) async fn apply_synlimit_rules(
targets: &SynLimitTargets,
namespace: &SynLimitNamespace,
) -> Result<(), String> {
let families = detect_nft_table_families().await;
for plan in nft_apply_plan(families, &targets.nft_v4, &targets.nft_v6) {
let script = nft_synlimit_script(plan);
let script = nft_synlimit_script(plan, namespace);
run_command("nft", &["-f", "-"], Some(script)).await?;
}
@@ -114,9 +116,13 @@ fn nft_apply_plan<'a>(
Vec::new()
}
fn nft_synlimit_script(plan: NftApplyPlan<'_>) -> String {
fn nft_synlimit_script(plan: NftApplyPlan<'_>, namespace: &SynLimitNamespace) -> String {
let mut script = String::new();
script.push_str(&format!("table {} {NFT_TABLE} {{\n", plan.family.as_str()));
script.push_str(&format!(
"table {} {} {{\n",
plan.family.as_str(),
namespace.nft_table
));
script.push_str(&format!(" chain {NFT_CHAIN} {{\n"));
script.push_str(&format!(
" type filter hook input priority {NFT_INPUT_PRIORITY}; policy accept;\n"
@@ -186,16 +192,14 @@ fn push_nft_v6_rules(script: &mut String, target: &SynLimitRule, idx: usize) {
));
}
pub(super) async fn clear_rules_all_families() -> Result<bool, String> {
pub(super) async fn clear_rules_all_families(
namespace: &SynLimitNamespace,
) -> Result<bool, String> {
let mut errors = Vec::new();
let mut removed = false;
let table = namespace.nft_table.as_str();
for family in [NftFamily::Inet, NftFamily::Ip, NftFamily::Ip6] {
match run_command(
"nft",
&["delete", "table", family.as_str(), NFT_TABLE],
None,
)
.await
match run_command("nft", &["delete", "table", family.as_str(), table], None).await
{
Ok(()) => {
removed = true;
@@ -203,8 +207,8 @@ pub(super) async fn clear_rules_all_families() -> Result<bool, String> {
Err(error) if is_missing_command_or_nft_table(&error) => {}
Err(error) => {
errors.push(format!(
"nft delete table {} {NFT_TABLE} failed: {error}",
family.as_str()
"nft delete table {} {table} failed: {error}",
family.as_str(),
));
}
}
@@ -228,15 +232,28 @@ mod tests {
use super::*;
use crate::synlimit_control::model::test_rule;
fn test_namespace(table: &str) -> SynLimitNamespace {
SynLimitNamespace {
nft_table: table.to_string(),
iptables_chain: "TMT_SYN_TEST".to_string(),
iptables_hashlimit_prefix: "TMTTEST".to_string(),
}
}
#[test]
fn nft_script_uses_synfix_v4_rules_and_early_priority() {
let rule = test_rule(Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7))), 443);
let script = nft_synlimit_script(NftApplyPlan {
family: NftFamily::Inet,
v4_targets: &[rule],
v6_targets: &[],
});
let namespace = test_namespace("telemt_synlimit_test_a");
let script = nft_synlimit_script(
NftApplyPlan {
family: NftFamily::Inet,
v4_targets: &[rule],
v6_targets: &[],
},
&namespace,
);
assert!(script.contains("table inet telemt_synlimit_test_a"));
assert!(script.contains("type filter hook input priority -5; policy accept;"));
assert!(script.contains("ip daddr 203.0.113.7"));
assert!(script.contains("meta length 64 ip ttl < 65"));
@@ -248,15 +265,53 @@ mod tests {
#[test]
fn nft_script_uses_ipv6_hoplimit_classifier() {
let rule = test_rule(Some(IpAddr::V6(Ipv6Addr::LOCALHOST)), 443);
let script = nft_synlimit_script(NftApplyPlan {
family: NftFamily::Inet,
v4_targets: &[],
v6_targets: &[rule],
});
let namespace = test_namespace("telemt_synlimit_test_b");
let script = nft_synlimit_script(
NftApplyPlan {
family: NftFamily::Inet,
v4_targets: &[],
v6_targets: &[rule],
},
&namespace,
);
assert!(script.contains("table inet telemt_synlimit_test_b"));
assert!(script.contains("ip6 daddr ::1"));
assert!(script.contains("meta length 84 ip6 hoplimit < 65"));
assert!(script.contains("ip6 saddr limit rate over 12/second burst 24 packets"));
assert!(script.contains("ip6 saddr limit rate over 48/minute burst 1 packets"));
}
#[test]
fn nft_missing_table_errors_are_cleanup_benign() {
assert!(is_missing_command_or_nft_table("nft is not available"));
assert!(is_missing_command_or_nft_table(
"Error: No such file or directory"
));
assert!(!is_missing_command_or_nft_table(
"Error: Operation not permitted"
));
}
#[test]
fn nft_apply_plan_keeps_dual_stack_rules_in_inet_table() {
let v4_rule = test_rule(Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7))), 443);
let v6_rule = test_rule(Some(IpAddr::V6(Ipv6Addr::LOCALHOST)), 443);
let v4_rules = [v4_rule];
let v6_rules = [v6_rule];
let plans = nft_apply_plan(
NftTableFamilies {
inet: false,
ip: false,
ip6: false,
},
&v4_rules,
&v6_rules,
);
assert_eq!(plans.len(), 1);
assert_eq!(plans[0].family.as_str(), "inet");
assert_eq!(plans[0].v4_targets, v4_rules.as_slice());
assert_eq!(plans[0].v6_targets, v6_rules.as_slice());
}
}

View File

@@ -675,8 +675,117 @@ fn hex_dump(data: &[u8]) -> String {
mod tests {
use super::*;
use std::io::ErrorKind;
use std::net::{Ipv4Addr, Ipv6Addr};
use tokio::net::{TcpListener, TcpStream};
fn upstream_egress(
route_kind: UpstreamRouteKind,
socks_bound_addr: Option<SocketAddr>,
) -> UpstreamEgressInfo {
UpstreamEgressInfo {
upstream_id: 7,
route_kind,
local_addr: None,
direct_bind_ip: None,
socks_bound_addr,
socks_proxy_addr: None,
}
}
#[test]
fn socks_bound_addr_is_used_only_for_public_same_family_tuple() {
let v4_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
let v6_bound = SocketAddr::new(
IpAddr::V6(
"2606:4700:4700::1111"
.parse::<Ipv6Addr>()
.expect("test IPv6 address must parse"),
),
443,
);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V4,
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(v4_bound)))
),
Some(v4_bound)
);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V6,
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(v6_bound)))
),
Some(v6_bound)
);
}
#[test]
fn socks_bound_addr_rejects_bogon_unspecified_wrong_family_and_non_socks_routes() {
let bogon_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 443);
let unspecified_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 443);
let public_v4_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V4,
Some(upstream_egress(UpstreamRouteKind::Socks5, Some(bogon_bound)))
),
None
);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V4,
Some(upstream_egress(
UpstreamRouteKind::Socks5,
Some(unspecified_bound)
))
),
None
);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V6,
Some(upstream_egress(
UpstreamRouteKind::Socks5,
Some(public_v4_bound)
))
),
None
);
assert_eq!(
MePool::select_socks_bound_addr(
IpFamily::V4,
Some(upstream_egress(
UpstreamRouteKind::Direct,
Some(public_v4_bound)
))
),
None
);
}
#[test]
fn kdf_client_port_source_tracks_only_valid_socks_bound_port() {
let bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 443);
let zero_port_bound = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(93, 184, 216, 34)), 0);
assert_eq!(
KdfClientPortSource::from_socks_bound_port(Some(bound.port())),
KdfClientPortSource::SocksBound
);
assert_eq!(
KdfClientPortSource::from_socks_bound_port(
Some(zero_port_bound).filter(|addr| addr.port() != 0).map(|addr| addr.port())
),
KdfClientPortSource::LocalSocket
);
assert_eq!(
KdfClientPortSource::from_socks_bound_port(None),
KdfClientPortSource::LocalSocket
);
}
#[tokio::test]
async fn test_configure_keepalive_loopback() {
let listener = match TcpListener::bind("127.0.0.1:0").await {

View File

@@ -368,3 +368,63 @@ async fn send_proxy_req_uses_writer_source_ip_when_advertised_our_addr_differs()
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 31)), our_addr.port())
);
}
#[tokio::test]
async fn send_proxy_req_blocking_fallback_uses_writer_source_ip() {
let (pool, _rng) = make_pool().await;
pool.rr.store(0, Ordering::Relaxed);
let (conn_id, _rx) = pool.registry.register().await;
let mut live_rx = insert_writer(
&pool,
32,
2,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 2, 32)), 443),
true,
)
.await;
let source_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 32));
let tx = {
let mut writers = pool.writers.write().await;
let writer = writers
.iter_mut()
.find(|writer| writer.id == 32)
.expect("test writer must exist");
writer.source_ip = source_ip;
writer.tx.clone()
};
for _ in 0..8 {
tx.try_send(WriterCommand::Close)
.expect("test writer channel must accept preload");
}
let our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 8)), 9443);
let pool_for_send = pool.clone();
let send_task = tokio::spawn(async move {
pool_for_send
.send_proxy_req(
conn_id,
2,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 8)), 30003),
our_addr,
b"blocking",
0,
None,
)
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(matches!(live_rx.recv().await, Some(WriterCommand::Close)));
let result = send_task.await.expect("send task must not panic");
assert!(result.is_ok());
let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50))
.await
.expect("writer must receive blocking fallback payload");
assert_eq!(
proxy_req_our_addr_from_payload(&payload),
SocketAddr::new(source_ip, our_addr.port())
);
}