refactoring

This commit is contained in:
Flowseal
2026-04-09 23:43:06 +03:00
parent 4041fd9f05
commit c0d9b5f8e1
2 changed files with 33 additions and 53 deletions

View File

@@ -25,6 +25,16 @@ DC_DEFAULT_IPS: Dict[int, str] = {
}
class CryptoCtx:
__slots__ = ('clt_dec', 'clt_enc', 'tg_enc', 'tg_dec')
def __init__(self, clt_dec, clt_enc, tg_enc, tg_dec):
self.clt_dec = clt_dec # decrypt from client
self.clt_enc = clt_enc # encrypt to client
self.tg_enc = tg_enc # encrypt to telegram
self.tg_dec = tg_dec # decrypt from telegram
class MsgSplitter:
"""
Splits TCP stream data into individual MTProto transport packets
@@ -117,9 +127,7 @@ class MsgSplitter:
async def do_fallback(reader, writer, relay_init, label,
dc, is_media, media_tag,
clt_decryptor, clt_encryptor,
tg_encryptor, tg_decryptor,
splitter=None):
ctx: CryptoCtx, splitter=None):
fallback_dst = DC_DEFAULT_IPS.get(dc)
use_cf = proxy_config.fallback_cfproxy
cf_first = proxy_config.fallback_cfproxy_priority
@@ -134,11 +142,7 @@ async def do_fallback(reader, writer, relay_init, label,
ok = await _cfproxy_fallback(
reader, writer, relay_init, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
ctx=ctx, splitter=splitter)
if ok:
return True
elif method == 'tcp' and fallback_dst:
@@ -146,11 +150,7 @@ async def do_fallback(reader, writer, relay_init, label,
label, dc, media_tag, fallback_dst)
ok = await _tcp_fallback(
reader, writer, fallback_dst, 443,
relay_init, label, dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
relay_init, label, dc=dc, is_media=is_media, ctx=ctx)
if ok:
return True
return False
@@ -158,9 +158,7 @@ async def do_fallback(reader, writer, relay_init, label,
async def _cfproxy_fallback(reader, writer, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
splitter=None):
ctx: CryptoCtx = None, splitter=None):
media_tag = ' media' if is_media else ''
active = proxy_config.active_cfproxy_domain
@@ -192,18 +190,12 @@ async def _cfproxy_fallback(reader, writer, relay_init, label,
await ws.send(relay_init)
await bridge_ws_reencrypt(reader, writer, ws, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
ctx=ctx, splitter=splitter)
return True
async def _tcp_fallback(reader, writer, dst, port, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
dc=None, is_media=False, ctx: CryptoCtx = None):
try:
rr, rw = await asyncio.wait_for(
asyncio.open_connection(dst, port), timeout=10)
@@ -216,18 +208,13 @@ async def _tcp_fallback(reader, writer, dst, port, relay_init, label,
rw.write(relay_init)
await rw.drain()
await _bridge_tcp_reencrypt(reader, writer, rr, rw, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
dc=dc, is_media=is_media, ctx=ctx)
return True
async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
ctx: CryptoCtx = None,
splitter: MsgSplitter = None):
"""
Bidirectional TCP(client) <-> WS(telegram) with re-encryption.
@@ -257,8 +244,8 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
stats.bytes_up += n
up_bytes += n
up_packets += 1
plain = clt_decryptor.update(chunk)
chunk = tg_encryptor.update(plain)
plain = ctx.clt_dec.update(chunk)
chunk = ctx.tg_enc.update(plain)
if splitter:
parts = splitter.split(chunk)
if not parts:
@@ -285,8 +272,8 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
stats.bytes_down += n
down_bytes += n
down_packets += 1
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
plain = ctx.tg_dec.update(data)
data = ctx.clt_enc.update(plain)
writer.write(data)
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError):
@@ -326,8 +313,7 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer,
label, dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
ctx: CryptoCtx = None):
"""Bidirectional TCP <-> TCP with re-encryption."""
async def forward(src, dst_w, is_up):
@@ -339,12 +325,12 @@ async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer,
n = len(data)
if is_up:
stats.bytes_up += n
plain = clt_decryptor.update(data)
data = tg_encryptor.update(plain)
plain = ctx.clt_dec.update(data)
data = ctx.tg_enc.update(plain)
else:
stats.bytes_down += n
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
plain = ctx.tg_dec.update(data)
data = ctx.clt_enc.update(plain)
dst_w.write(data)
await dst_w.drain()
except asyncio.CancelledError:

View File

@@ -26,7 +26,7 @@ if __name__ == '__main__' and (__package__ is None or __package__ == ''):
from .utils import *
from .stats import stats
from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh, CFPROXY_DEFAULT_DOMAINS
from .bridge import MsgSplitter, do_fallback, bridge_ws_reencrypt
from .bridge import MsgSplitter, CryptoCtx, do_fallback, bridge_ws_reencrypt
from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts
@@ -291,6 +291,8 @@ async def _handle_client(reader, writer, secret: bytes):
tg_encryptor.update(ZERO_64)
ctx = CryptoCtx(clt_decryptor, clt_encryptor, tg_encryptor, tg_decryptor)
dc_key = f'{dc}{"m" if is_media else ""}'
media_tag = " media" if is_media else ""
@@ -310,9 +312,7 @@ async def _handle_client(reader, writer, secret: bytes):
ok = await do_fallback(
reader, writer, relay_init, label,
dc, is_media, media_tag,
clt_decryptor, clt_encryptor,
tg_encryptor, tg_decryptor,
splitter=splitter)
ctx, splitter=splitter)
if not ok:
log.warning("[%s] DC%d%s no fallback available",
label, dc, media_tag)
@@ -382,9 +382,7 @@ async def _handle_client(reader, writer, secret: bytes):
ok = await do_fallback(
reader, writer, relay_init, label,
dc, is_media, media_tag,
clt_decryptor, clt_encryptor,
tg_encryptor, tg_decryptor,
splitter=splitter_fb)
ctx, splitter=splitter_fb)
if ok:
log.info("[%s] DC%d%s fallback closed",
label, dc, media_tag)
@@ -405,11 +403,7 @@ async def _handle_client(reader, writer, secret: bytes):
await bridge_ws_reencrypt(reader, writer, ws, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
ctx=ctx, splitter=splitter)
except asyncio.TimeoutError:
log.warning("[%s] timeout during handshake", label)