From c0d9b5f8e19dd511a341485ce0a71777ee211dfc Mon Sep 17 00:00:00 2001 From: Flowseal Date: Thu, 9 Apr 2026 23:43:06 +0300 Subject: [PATCH] refactoring --- proxy/bridge.py | 68 ++++++++++++++++++-------------------------- proxy/tg_ws_proxy.py | 18 ++++-------- 2 files changed, 33 insertions(+), 53 deletions(-) diff --git a/proxy/bridge.py b/proxy/bridge.py index b2d82fe..c2a64ca 100644 --- a/proxy/bridge.py +++ b/proxy/bridge.py @@ -25,6 +25,16 @@ DC_DEFAULT_IPS: Dict[int, str] = { } +class CryptoCtx: + __slots__ = ('clt_dec', 'clt_enc', 'tg_enc', 'tg_dec') + + def __init__(self, clt_dec, clt_enc, tg_enc, tg_dec): + self.clt_dec = clt_dec # decrypt from client + self.clt_enc = clt_enc # encrypt to client + self.tg_enc = tg_enc # encrypt to telegram + self.tg_dec = tg_dec # decrypt from telegram + + class MsgSplitter: """ Splits TCP stream data into individual MTProto transport packets @@ -117,9 +127,7 @@ class MsgSplitter: async def do_fallback(reader, writer, relay_init, label, dc, is_media, media_tag, - clt_decryptor, clt_encryptor, - tg_encryptor, tg_decryptor, - splitter=None): + ctx: CryptoCtx, splitter=None): fallback_dst = DC_DEFAULT_IPS.get(dc) use_cf = proxy_config.fallback_cfproxy cf_first = proxy_config.fallback_cfproxy_priority @@ -134,11 +142,7 @@ async def do_fallback(reader, writer, relay_init, label, ok = await _cfproxy_fallback( reader, writer, relay_init, label, dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor, - splitter=splitter) + ctx=ctx, splitter=splitter) if ok: return True elif method == 'tcp' and fallback_dst: @@ -146,11 +150,7 @@ async def do_fallback(reader, writer, relay_init, label, label, dc, media_tag, fallback_dst) ok = await _tcp_fallback( reader, writer, fallback_dst, 443, - relay_init, label, dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) + relay_init, label, dc=dc, is_media=is_media, ctx=ctx) if ok: return True return False @@ -158,9 +158,7 @@ async def do_fallback(reader, writer, relay_init, label, async def _cfproxy_fallback(reader, writer, relay_init, label, dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None, - splitter=None): + ctx: CryptoCtx = None, splitter=None): media_tag = ' media' if is_media else '' active = proxy_config.active_cfproxy_domain @@ -192,18 +190,12 @@ async def _cfproxy_fallback(reader, writer, relay_init, label, await ws.send(relay_init) await bridge_ws_reencrypt(reader, writer, ws, label, dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor, - splitter=splitter) + ctx=ctx, splitter=splitter) return True async def _tcp_fallback(reader, writer, dst, port, relay_init, label, - dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None): + dc=None, is_media=False, ctx: CryptoCtx = None): try: rr, rw = await asyncio.wait_for( asyncio.open_connection(dst, port), timeout=10) @@ -216,18 +208,13 @@ async def _tcp_fallback(reader, writer, dst, port, relay_init, label, rw.write(relay_init) await rw.drain() await _bridge_tcp_reencrypt(reader, writer, rr, rw, label, - dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) + dc=dc, is_media=is_media, ctx=ctx) return True async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None, + ctx: CryptoCtx = None, splitter: MsgSplitter = None): """ Bidirectional TCP(client) <-> WS(telegram) with re-encryption. @@ -257,8 +244,8 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, stats.bytes_up += n up_bytes += n up_packets += 1 - plain = clt_decryptor.update(chunk) - chunk = tg_encryptor.update(plain) + plain = ctx.clt_dec.update(chunk) + chunk = ctx.tg_enc.update(plain) if splitter: parts = splitter.split(chunk) if not parts: @@ -285,8 +272,8 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, stats.bytes_down += n down_bytes += n down_packets += 1 - plain = tg_decryptor.update(data) - data = clt_encryptor.update(plain) + plain = ctx.tg_dec.update(data) + data = ctx.clt_enc.update(plain) writer.write(data) await writer.drain() except (asyncio.CancelledError, ConnectionError, OSError): @@ -326,8 +313,7 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer, label, dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None): + ctx: CryptoCtx = None): """Bidirectional TCP <-> TCP with re-encryption.""" async def forward(src, dst_w, is_up): @@ -339,12 +325,12 @@ async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer, n = len(data) if is_up: stats.bytes_up += n - plain = clt_decryptor.update(data) - data = tg_encryptor.update(plain) + plain = ctx.clt_dec.update(data) + data = ctx.tg_enc.update(plain) else: stats.bytes_down += n - plain = tg_decryptor.update(data) - data = clt_encryptor.update(plain) + plain = ctx.tg_dec.update(data) + data = ctx.clt_enc.update(plain) dst_w.write(data) await dst_w.drain() except asyncio.CancelledError: diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 5a92764..548f265 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -26,7 +26,7 @@ if __name__ == '__main__' and (__package__ is None or __package__ == ''): from .utils import * from .stats import stats from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh, CFPROXY_DEFAULT_DOMAINS -from .bridge import MsgSplitter, do_fallback, bridge_ws_reencrypt +from .bridge import MsgSplitter, CryptoCtx, do_fallback, bridge_ws_reencrypt from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts @@ -291,6 +291,8 @@ async def _handle_client(reader, writer, secret: bytes): tg_encryptor.update(ZERO_64) + ctx = CryptoCtx(clt_decryptor, clt_encryptor, tg_encryptor, tg_decryptor) + dc_key = f'{dc}{"m" if is_media else ""}' media_tag = " media" if is_media else "" @@ -310,9 +312,7 @@ async def _handle_client(reader, writer, secret: bytes): ok = await do_fallback( reader, writer, relay_init, label, dc, is_media, media_tag, - clt_decryptor, clt_encryptor, - tg_encryptor, tg_decryptor, - splitter=splitter) + ctx, splitter=splitter) if not ok: log.warning("[%s] DC%d%s no fallback available", label, dc, media_tag) @@ -382,9 +382,7 @@ async def _handle_client(reader, writer, secret: bytes): ok = await do_fallback( reader, writer, relay_init, label, dc, is_media, media_tag, - clt_decryptor, clt_encryptor, - tg_encryptor, tg_decryptor, - splitter=splitter_fb) + ctx, splitter=splitter_fb) if ok: log.info("[%s] DC%d%s fallback closed", label, dc, media_tag) @@ -405,11 +403,7 @@ async def _handle_client(reader, writer, secret: bytes): await bridge_ws_reencrypt(reader, writer, ws, label, dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor, - splitter=splitter) + ctx=ctx, splitter=splitter) except asyncio.TimeoutError: log.warning("[%s] timeout during handshake", label)