Merge pull request #783 from astronaut808/feature/user-rate-limits-api

Expose user rate limits through the API
This commit is contained in:
Alexey
2026-05-14 18:20:04 +03:00
committed by GitHub
6 changed files with 186 additions and 2 deletions

View File

@@ -205,6 +205,8 @@ Notes:
| `max_tcp_conns` | `usize` | no | Per-user concurrent TCP limit. |
| `expiration_rfc3339` | `string` | no | RFC3339 expiration timestamp. |
| `data_quota_bytes` | `u64` | no | Per-user traffic quota. |
| `rate_limit_up_bps` | `u64` | no | Per-user upload rate limit in bytes per second. |
| `rate_limit_down_bps` | `u64` | no | Per-user download rate limit in bytes per second. |
| `max_unique_ips` | `usize` | no | Per-user unique source IP limit. |
### `PatchUserRequest`
@@ -215,6 +217,8 @@ Notes:
| `max_tcp_conns` | `usize|null` | no | Per-user concurrent TCP limit; `null` removes the per-user override. |
| `expiration_rfc3339` | `string|null` | no | RFC3339 expiration timestamp; `null` removes the expiration. |
| `data_quota_bytes` | `u64|null` | no | Per-user traffic quota; `null` removes the per-user quota. |
| `rate_limit_up_bps` | `u64|null` | no | Per-user upload rate limit in bytes per second; `null` removes the upload direction limit. |
| `rate_limit_down_bps` | `u64|null` | no | Per-user download rate limit in bytes per second; `null` removes the download direction limit. |
| `max_unique_ips` | `usize|null` | no | Per-user unique source IP limit; `null` removes the per-user override. |
### `access.user_source_deny` via API
@@ -1166,6 +1170,8 @@ An empty request body is accepted and generates a new secret automatically.
| `max_tcp_conns` | `usize?` | Optional max concurrent TCP limit. |
| `expiration_rfc3339` | `string?` | Optional expiration timestamp. |
| `data_quota_bytes` | `u64?` | Optional data quota. |
| `rate_limit_up_bps` | `u64?` | Optional upload rate limit in bytes per second. |
| `rate_limit_down_bps` | `u64?` | Optional download rate limit in bytes per second. |
| `max_unique_ips` | `usize?` | Optional unique IP limit. |
| `current_connections` | `u64` | Current live connections. |
| `active_unique_ips` | `usize` | Current active unique source IPs. |
@@ -1242,6 +1248,12 @@ All mutating endpoints:
- Return new `revision` after successful write.
- Use process-local mutation lock + atomic write (`tmp + rename`) for config persistence.
Docker deployment note:
- Mutating endpoints require `config.toml` to live inside a writable mounted directory.
- Do not mount `config.toml` as a single bind-mounted file when API mutations are enabled; atomic `tmp + rename` writes can fail with `Device or resource busy`.
- Mount the config directory instead, for example `./config:/etc/telemt:rw`, and start Telemt with `/etc/telemt/config.toml`.
- A read-only single-file mount remains valid only for read-only deployments or when `[server.api].read_only=true`.
Delete path cleanup guarantees:
- Config cleanup removes only the requested username keys.
- Runtime unique-IP cleanup removes only this user's limiter and tracked IP state.

View File

@@ -254,6 +254,19 @@ docker compose down
> - `docker-compose.yml` maps `./config.toml` to `/app/config.toml` (read-only)
> - By default it publishes `443:443` and runs with dropped capabilities (only `NET_BIND_SERVICE` is added)
> - If you really need host networking (usually only for some IPv6 setups) uncomment `network_mode: host`
> - If you enable mutating Control API endpoints, mount a writable config directory instead of a single `config.toml` file. Telemt persists config changes with atomic `tmp + rename` writes, and a single bind-mounted file can fail with `Device or resource busy`.
Example writable config mount for Control API mutations:
```yaml
services:
telemt:
working_dir: /run/telemt
volumes:
- ./config:/etc/telemt:rw
tmpfs:
- /run/telemt:rw,mode=1777,size=4m
command: /usr/local/bin/telemt /etc/telemt/config.toml
```
**Run without Compose**
```bash

View File

@@ -7,7 +7,7 @@ use hyper::header::IF_MATCH;
use serde::Serialize;
use sha2::{Digest, Sha256};
use crate::config::ProxyConfig;
use crate::config::{ProxyConfig, RateLimitBps};
use super::model::ApiFailure;
@@ -18,6 +18,7 @@ pub(super) enum AccessSection {
UserMaxTcpConns,
UserExpirations,
UserDataQuota,
UserRateLimits,
UserMaxUniqueIps,
}
@@ -29,6 +30,7 @@ impl AccessSection {
Self::UserMaxTcpConns => "access.user_max_tcp_conns",
Self::UserExpirations => "access.user_expirations",
Self::UserDataQuota => "access.user_data_quota",
Self::UserRateLimits => "access.user_rate_limits",
Self::UserMaxUniqueIps => "access.user_max_unique_ips",
}
}
@@ -169,6 +171,15 @@ fn render_access_section(cfg: &ProxyConfig, section: AccessSection) -> Result<St
.collect();
serialize_table_body(&rows)?
}
AccessSection::UserRateLimits => {
let rows: BTreeMap<String, RateLimitBps> = cfg
.access
.user_rate_limits
.iter()
.map(|(key, value)| (key.clone(), *value))
.collect();
serialize_rate_limit_body(&rows)?
}
AccessSection::UserMaxUniqueIps => {
let rows: BTreeMap<String, usize> = cfg
.access
@@ -197,6 +208,7 @@ fn access_section_is_empty(cfg: &ProxyConfig, section: AccessSection) -> bool {
AccessSection::UserMaxTcpConns => cfg.access.user_max_tcp_conns.is_empty(),
AccessSection::UserExpirations => cfg.access.user_expirations.is_empty(),
AccessSection::UserDataQuota => cfg.access.user_data_quota.is_empty(),
AccessSection::UserRateLimits => cfg.access.user_rate_limits.is_empty(),
AccessSection::UserMaxUniqueIps => cfg.access.user_max_unique_ips.is_empty(),
}
}
@@ -206,6 +218,28 @@ fn serialize_table_body<T: Serialize>(value: &T) -> Result<String, ApiFailure> {
.map_err(|e| ApiFailure::internal(format!("failed to serialize access section: {}", e)))
}
fn serialize_rate_limit_body(rows: &BTreeMap<String, RateLimitBps>) -> Result<String, ApiFailure> {
let mut out = String::new();
for (key, value) in rows {
let key = serialize_toml_key(key)?;
out.push_str(&format!(
"{key} = {{ up_bps = {}, down_bps = {} }}\n",
value.up_bps, value.down_bps
));
}
Ok(out)
}
fn serialize_toml_key(key: &str) -> Result<String, ApiFailure> {
let mut row = BTreeMap::new();
row.insert(key.to_string(), 0_u8);
let rendered = serialize_table_body(&row)?;
rendered
.split_once(" = ")
.map(|(key, _)| key.to_string())
.ok_or_else(|| ApiFailure::internal("failed to serialize TOML key"))
}
fn upsert_toml_table(source: &str, table_name: &str, replacement: &str) -> String {
if let Some((start, end)) = find_toml_table_bounds(source, table_name) {
let mut out = String::with_capacity(source.len() + replacement.len());
@@ -285,3 +319,26 @@ fn write_atomic_sync(path: &Path, contents: &str) -> std::io::Result<()> {
}
write_result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn render_user_rate_limits_section() {
let mut cfg = ProxyConfig::default();
cfg.access.user_rate_limits.insert(
"alice".to_string(),
RateLimitBps {
up_bps: 1024,
down_bps: 2048,
},
);
let rendered = render_access_section(&cfg, AccessSection::UserRateLimits)
.expect("section must render");
assert!(rendered.starts_with("[access.user_rate_limits]\n"));
assert!(rendered.contains("alice = { up_bps = 1024, down_bps = 2048 }"));
}
}

View File

@@ -473,6 +473,8 @@ pub(super) struct UserInfo {
pub(super) max_tcp_conns: Option<usize>,
pub(super) expiration_rfc3339: Option<String>,
pub(super) data_quota_bytes: Option<u64>,
pub(super) rate_limit_up_bps: Option<u64>,
pub(super) rate_limit_down_bps: Option<u64>,
pub(super) max_unique_ips: Option<usize>,
pub(super) current_connections: u64,
pub(super) active_unique_ips: usize,
@@ -516,6 +518,8 @@ pub(super) struct CreateUserRequest {
pub(super) max_tcp_conns: Option<usize>,
pub(super) expiration_rfc3339: Option<String>,
pub(super) data_quota_bytes: Option<u64>,
pub(super) rate_limit_up_bps: Option<u64>,
pub(super) rate_limit_down_bps: Option<u64>,
pub(super) max_unique_ips: Option<usize>,
}
@@ -531,6 +535,10 @@ pub(super) struct PatchUserRequest {
#[serde(default, deserialize_with = "patch_field")]
pub(super) data_quota_bytes: Patch<u64>,
#[serde(default, deserialize_with = "patch_field")]
pub(super) rate_limit_up_bps: Patch<u64>,
#[serde(default, deserialize_with = "patch_field")]
pub(super) rate_limit_down_bps: Patch<u64>,
#[serde(default, deserialize_with = "patch_field")]
pub(super) max_unique_ips: Patch<usize>,
}

View File

@@ -114,7 +114,9 @@ mod tests {
"secret": "00112233445566778899aabbccddeeff",
"max_tcp_conns": 0,
"max_unique_ips": null,
"data_quota_bytes": 1024
"data_quota_bytes": 1024,
"rate_limit_up_bps": 4096,
"rate_limit_down_bps": null
}"#;
let req: PatchUserRequest = serde_json::from_str(raw).expect("valid json");
assert_eq!(
@@ -124,6 +126,8 @@ mod tests {
assert!(matches!(req.max_tcp_conns, Patch::Set(0)));
assert!(matches!(req.max_unique_ips, Patch::Remove));
assert!(matches!(req.data_quota_bytes, Patch::Set(1024)));
assert!(matches!(req.rate_limit_up_bps, Patch::Set(4096)));
assert!(matches!(req.rate_limit_down_bps, Patch::Remove));
assert!(matches!(req.expiration_rfc3339, Patch::Unchanged));
assert!(matches!(req.user_ad_tag, Patch::Unchanged));
}

View File

@@ -3,6 +3,7 @@ use std::net::IpAddr;
use hyper::StatusCode;
use crate::config::ProxyConfig;
use crate::config::RateLimitBps;
use crate::ip_tracker::UserIpTracker;
use crate::stats::Stats;
@@ -27,6 +28,8 @@ pub(super) async fn create_user(
let touches_user_max_tcp_conns = body.max_tcp_conns.is_some();
let touches_user_expirations = body.expiration_rfc3339.is_some();
let touches_user_data_quota = body.data_quota_bytes.is_some();
let touches_user_rate_limits =
body.rate_limit_up_bps.is_some() || body.rate_limit_down_bps.is_some();
let touches_user_max_unique_ips = body.max_unique_ips.is_some();
if !is_valid_username(&body.username) {
@@ -91,6 +94,15 @@ pub(super) async fn create_user(
.user_data_quota
.insert(body.username.clone(), quota);
}
if touches_user_rate_limits {
cfg.access.user_rate_limits.insert(
body.username.clone(),
RateLimitBps {
up_bps: body.rate_limit_up_bps.unwrap_or(0),
down_bps: body.rate_limit_down_bps.unwrap_or(0),
},
);
}
let updated_limit = body.max_unique_ips;
if let Some(limit) = updated_limit {
@@ -115,6 +127,9 @@ pub(super) async fn create_user(
if touches_user_data_quota {
touched_sections.push(AccessSection::UserDataQuota);
}
if touches_user_rate_limits {
touched_sections.push(AccessSection::UserRateLimits);
}
if touches_user_max_unique_ips {
touched_sections.push(AccessSection::UserMaxUniqueIps);
}
@@ -157,6 +172,8 @@ pub(super) async fn create_user(
.then_some(cfg.access.user_max_tcp_conns_global_each)),
expiration_rfc3339: None,
data_quota_bytes: None,
rate_limit_up_bps: body.rate_limit_up_bps.filter(|limit| *limit > 0),
rate_limit_down_bps: body.rate_limit_down_bps.filter(|limit| *limit > 0),
max_unique_ips: updated_limit,
current_connections: 0,
active_unique_ips: 0,
@@ -181,6 +198,8 @@ pub(super) async fn patch_user(
let touches_user_max_tcp_conns = !matches!(&body.max_tcp_conns, Patch::Unchanged);
let touches_user_expirations = !matches!(&body.expiration_rfc3339, Patch::Unchanged);
let touches_user_data_quota = !matches!(&body.data_quota_bytes, Patch::Unchanged);
let touches_user_rate_limits = !matches!(&body.rate_limit_up_bps, Patch::Unchanged)
|| !matches!(&body.rate_limit_down_bps, Patch::Unchanged);
let touches_user_max_unique_ips = !matches!(&body.max_unique_ips, Patch::Unchanged);
if let Some(secret) = body.secret.as_ref()
@@ -253,6 +272,31 @@ pub(super) async fn patch_user(
cfg.access.user_data_quota.insert(user.to_string(), quota);
}
}
if touches_user_rate_limits {
let mut rate_limit = cfg
.access
.user_rate_limits
.get(user)
.copied()
.unwrap_or_default();
match body.rate_limit_up_bps {
Patch::Unchanged => {}
Patch::Remove => rate_limit.up_bps = 0,
Patch::Set(limit) => rate_limit.up_bps = limit,
}
match body.rate_limit_down_bps {
Patch::Unchanged => {}
Patch::Remove => rate_limit.down_bps = 0,
Patch::Set(limit) => rate_limit.down_bps = limit,
}
if rate_limit.up_bps == 0 && rate_limit.down_bps == 0 {
cfg.access.user_rate_limits.remove(user);
} else {
cfg.access
.user_rate_limits
.insert(user.to_string(), rate_limit);
}
}
// Capture how the per-user IP limit changed, so the in-memory ip_tracker
// can be synced (set or removed) after the config is persisted.
let max_unique_ips_change = match body.max_unique_ips {
@@ -288,6 +332,9 @@ pub(super) async fn patch_user(
if touches_user_data_quota {
touched_sections.push(AccessSection::UserDataQuota);
}
if touches_user_rate_limits {
touched_sections.push(AccessSection::UserRateLimits);
}
if touches_user_max_unique_ips {
touched_sections.push(AccessSection::UserMaxUniqueIps);
}
@@ -355,6 +402,7 @@ pub(super) async fn rotate_secret(
AccessSection::UserMaxTcpConns,
AccessSection::UserExpirations,
AccessSection::UserDataQuota,
AccessSection::UserRateLimits,
AccessSection::UserMaxUniqueIps,
];
let revision =
@@ -414,6 +462,7 @@ pub(super) async fn delete_user(
cfg.access.user_max_tcp_conns.remove(user);
cfg.access.user_expirations.remove(user);
cfg.access.user_data_quota.remove(user);
cfg.access.user_rate_limits.remove(user);
cfg.access.user_max_unique_ips.remove(user);
cfg.validate()
@@ -424,6 +473,7 @@ pub(super) async fn delete_user(
AccessSection::UserMaxTcpConns,
AccessSection::UserExpirations,
AccessSection::UserDataQuota,
AccessSection::UserRateLimits,
AccessSection::UserMaxUniqueIps,
];
let revision =
@@ -485,6 +535,18 @@ pub(super) async fn users_from_config(
.get(&username)
.map(chrono::DateTime::<chrono::Utc>::to_rfc3339),
data_quota_bytes: cfg.access.user_data_quota.get(&username).copied(),
rate_limit_up_bps: cfg
.access
.user_rate_limits
.get(&username)
.map(|limit| limit.up_bps)
.filter(|limit| *limit > 0),
rate_limit_down_bps: cfg
.access
.user_rate_limits
.get(&username)
.map(|limit| limit.down_bps)
.filter(|limit| *limit > 0),
max_unique_ips: cfg
.access
.user_max_unique_ips
@@ -758,6 +820,34 @@ mod tests {
assert_eq!(alice.max_tcp_conns, None);
}
#[tokio::test]
async fn users_from_config_reports_user_rate_limits() {
let mut cfg = ProxyConfig::default();
cfg.access.users.insert(
"alice".to_string(),
"0123456789abcdef0123456789abcdef".to_string(),
);
cfg.access.user_rate_limits.insert(
"alice".to_string(),
RateLimitBps {
up_bps: 1024,
down_bps: 0,
},
);
let stats = Stats::new();
let tracker = UserIpTracker::new();
let users = users_from_config(&cfg, &stats, &tracker, None, None, None).await;
let alice = users
.iter()
.find(|entry| entry.username == "alice")
.expect("alice must be present");
assert_eq!(alice.rate_limit_up_bps, Some(1024));
assert_eq!(alice.rate_limit_down_bps, None);
}
#[tokio::test]
async fn users_from_config_marks_runtime_membership_when_snapshot_is_provided() {
let mut disk_cfg = ProxyConfig::default();