refactoring

This commit is contained in:
Flowseal
2026-04-09 19:54:38 +03:00
parent 44e754ded0
commit 535d4126ed
9 changed files with 767 additions and 726 deletions

View File

@@ -174,7 +174,7 @@ CLI команды объявляются в `pyproject.toml` в секции `[
```toml ```toml
[project.scripts] [project.scripts]
tg-ws-proxy = "proxy.tg_ws_proxy:main" tg-ws-proxy = "proxy:main"
tg-ws-proxy-tray-win = "windows:main" tg-ws-proxy-tray-win = "windows:main"
tg-ws-proxy-tray-macos = "macos:main" tg-ws-proxy-tray-macos = "macos:main"
tg-ws-proxy-tray-linux = "linux:main" tg-ws-proxy-tray-linux = "linux:main"

View File

@@ -1 +1,6 @@
__version__ = "1.5.1" from .config import parse_dc_ip_list, proxy_config
from .utils import get_link_host
__version__ = "1.5.1"
__all__ = ["__version__", "get_link_host", "proxy_config", "parse_dc_ip_list"]

363
proxy/bridge.py Normal file
View File

@@ -0,0 +1,363 @@
import asyncio
import logging
import struct
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from typing import Dict, List, Optional
from .utils import *
from .stats import stats
from .config import proxy_config
from .raw_websocket import RawWebSocket
log = logging.getLogger('tg-mtproto-proxy')
_st_I_le = struct.Struct('<I')
ZERO_64 = b'\x00' * 64
DC_DEFAULT_IPS: Dict[int, str] = {
1: '149.154.175.50',
2: '149.154.167.51',
3: '149.154.175.100',
4: '149.154.167.91',
5: '149.154.171.5',
203: '91.105.192.100'
}
class MsgSplitter:
"""
Splits TCP stream data into individual MTProto transport packets
so each can be sent as a separate WS frame.
"""
__slots__ = ('_dec', '_proto', '_cipher_buf', '_plain_buf', '_disabled')
def __init__(self, relay_init: bytes, proto_int: int):
cipher = Cipher(algorithms.AES(relay_init[8:40]),
modes.CTR(relay_init[40:56]))
self._dec = cipher.encryptor()
self._dec.update(ZERO_64)
self._proto = proto_int
self._cipher_buf = bytearray()
self._plain_buf = bytearray()
self._disabled = False
def split(self, chunk: bytes) -> List[bytes]:
if not chunk:
return []
if self._disabled:
return [chunk]
self._cipher_buf.extend(chunk)
self._plain_buf.extend(self._dec.update(chunk))
parts = []
while self._cipher_buf:
packet_len = self._next_packet_len()
if packet_len is None:
break
if packet_len <= 0:
parts.append(bytes(self._cipher_buf))
self._cipher_buf.clear()
self._plain_buf.clear()
self._disabled = True
break
parts.append(bytes(self._cipher_buf[:packet_len]))
del self._cipher_buf[:packet_len]
del self._plain_buf[:packet_len]
return parts
def flush(self) -> List[bytes]:
if not self._cipher_buf:
return []
tail = bytes(self._cipher_buf)
self._cipher_buf.clear()
self._plain_buf.clear()
return [tail]
def _next_packet_len(self) -> Optional[int]:
if not self._plain_buf:
return None
if self._proto == PROTO_ABRIDGED_INT:
return self._next_abridged_len()
if self._proto in (PROTO_INTERMEDIATE_INT,
PROTO_PADDED_INTERMEDIATE_INT):
return self._next_intermediate_len()
return 0
def _next_abridged_len(self) -> Optional[int]:
first = self._plain_buf[0]
if first in (0x7F, 0xFF):
if len(self._plain_buf) < 4:
return None
payload_len = int.from_bytes(self._plain_buf[1:4], 'little') * 4
header_len = 4
else:
payload_len = (first & 0x7F) * 4
header_len = 1
if payload_len <= 0:
return 0
packet_len = header_len + payload_len
if len(self._plain_buf) < packet_len:
return None
return packet_len
def _next_intermediate_len(self) -> Optional[int]:
if len(self._plain_buf) < 4:
return None
payload_len = _st_I_le.unpack_from(self._plain_buf, 0)[0] & 0x7FFFFFFF
if payload_len <= 0:
return 0
packet_len = 4 + payload_len
if len(self._plain_buf) < packet_len:
return None
return packet_len
async def do_fallback(reader, writer, relay_init, label,
dc, is_media, media_tag,
clt_decryptor, clt_encryptor,
tg_encryptor, tg_decryptor,
splitter=None):
fallback_dst = DC_DEFAULT_IPS.get(dc)
use_cf = proxy_config.fallback_cfproxy
cf_first = proxy_config.fallback_cfproxy_priority
methods: List[str] = ['tcp']
if use_cf:
methods.insert(0 if cf_first else 1, 'cf')
for method in methods:
if method == 'cf':
ok = await _cfproxy_fallback(
reader, writer, relay_init, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
if ok:
return True
elif method == 'tcp' and fallback_dst:
log.info("[%s] DC%d%s -> TCP fallback to %s:443",
label, dc, media_tag, fallback_dst)
ok = await _tcp_fallback(
reader, writer, fallback_dst, 443,
relay_init, label, dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
if ok:
return True
return False
async def _cfproxy_fallback(reader, writer, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
splitter=None):
domain = f'kws{dc}.{proxy_config.fallback_cfproxy_domain}'
media_tag = ' media' if is_media else ''
ws = None
log.info("[%s] DC%d%s -> CF proxy wss://%s/apiws",
label, dc, media_tag, domain)
try:
ws = await RawWebSocket.connect(domain, domain,
timeout=10.0)
except Exception as exc:
log.warning("[%s] DC%d%s CF proxy %s failed: %s",
label, dc, media_tag, domain, exc)
if ws is None:
return False
stats.connections_cfproxy += 1
await ws.send(relay_init)
await bridge_ws_reencrypt(reader, writer, ws, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
return True
async def _tcp_fallback(reader, writer, dst, port, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
try:
rr, rw = await asyncio.wait_for(
asyncio.open_connection(dst, port), timeout=10)
except Exception as exc:
log.warning("[%s] TCP fallback to %s:%d failed: %s",
label, dst, port, exc)
return False
stats.connections_tcp_fallback += 1
rw.write(relay_init)
await rw.drain()
await _bridge_tcp_reencrypt(reader, writer, rr, rw, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
return True
async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
splitter: MsgSplitter = None):
"""
Bidirectional TCP(client) <-> WS(telegram) with re-encryption.
client ciphertext → decrypt(clt_key) → encrypt(tg_key) → WS
WS data → decrypt(tg_key) → encrypt(clt_key) → client TCP
"""
dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?"
up_bytes = 0
down_bytes = 0
up_packets = 0
down_packets = 0
start_time = asyncio.get_running_loop().time()
async def tcp_to_ws():
nonlocal up_bytes, up_packets
try:
while True:
chunk = await reader.read(65536)
if not chunk:
if splitter:
tail = splitter.flush()
if tail:
await ws.send(tail[0])
break
n = len(chunk)
stats.bytes_up += n
up_bytes += n
up_packets += 1
plain = clt_decryptor.update(chunk)
chunk = tg_encryptor.update(plain)
if splitter:
parts = splitter.split(chunk)
if not parts:
continue
if len(parts) > 1:
await ws.send_batch(parts)
else:
await ws.send(parts[0])
else:
await ws.send(chunk)
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
log.debug("[%s] tcp->ws ended: %s", label, e)
async def ws_to_tcp():
nonlocal down_bytes, down_packets
try:
while True:
data = await ws.recv()
if data is None:
break
n = len(data)
stats.bytes_down += n
down_bytes += n
down_packets += 1
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
writer.write(data)
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
log.debug("[%s] ws->tcp ended: %s", label, e)
tasks = [asyncio.create_task(tcp_to_ws()),
asyncio.create_task(ws_to_tcp())]
try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
finally:
for t in tasks:
t.cancel()
for t in tasks:
try:
await t
except BaseException:
pass
elapsed = asyncio.get_running_loop().time() - start_time
log.info("[%s] %s WS session closed: "
"^%s (%d pkts) v%s (%d pkts) in %.1fs",
label, dc_tag,
human_bytes(up_bytes), up_packets,
human_bytes(down_bytes), down_packets,
elapsed)
try:
await ws.close()
except BaseException:
pass
try:
writer.close()
await writer.wait_closed()
except BaseException:
pass
async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer,
label, dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
"""Bidirectional TCP <-> TCP with re-encryption."""
async def forward(src, dst_w, is_up):
try:
while True:
data = await src.read(65536)
if not data:
break
n = len(data)
if is_up:
stats.bytes_up += n
plain = clt_decryptor.update(data)
data = tg_encryptor.update(plain)
else:
stats.bytes_down += n
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
dst_w.write(data)
await dst_w.drain()
except asyncio.CancelledError:
pass
except Exception as e:
log.debug("[%s] forward ended: %s", label, e)
tasks = [
asyncio.create_task(forward(reader, remote_writer, True)),
asyncio.create_task(forward(remote_reader, writer, False)),
]
try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
finally:
for t in tasks:
t.cancel()
for t in tasks:
try:
await t
except BaseException:
pass
for w in (writer, remote_writer):
try:
w.close()
await w.wait_closed()
except BaseException:
pass

37
proxy/config.py Normal file
View File

@@ -0,0 +1,37 @@
import os
import socket as _socket
from typing import Dict, List
from dataclasses import dataclass, field
@dataclass
class ProxyConfig:
port: int = 1443
host: str = '127.0.0.1'
secret: str = field(default_factory=lambda: os.urandom(16).hex())
dc_redirects: Dict[int, str] = field(default_factory=lambda: {2: '149.154.167.220', 4: '149.154.167.220'})
buffer_size: int = 256 * 1024
pool_size: int = 4
fallback_cfproxy: bool = True
fallback_cfproxy_priority: bool = True
fallback_cfproxy_domain: str = 'pclead.co.uk'
proxy_config = ProxyConfig()
def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]:
dc_redirects: Dict[int, str] = {}
for entry in dc_ip_list:
if ':' not in entry:
raise ValueError(
f"Invalid --dc-ip format {entry!r}, expected DC:IP")
dc_s, ip_s = entry.split(':', 1)
try:
dc_n = int(dc_s)
_socket.inet_aton(ip_s)
except (ValueError, OSError):
raise ValueError(f"Invalid --dc-ip {entry!r}")
dc_redirects[dc_n] = ip_s
return dc_redirects

239
proxy/raw_websocket.py Normal file
View File

@@ -0,0 +1,239 @@
import os
import ssl
import base64
import struct
import asyncio
import socket as _socket
from typing import List, Optional, Tuple
from .config import proxy_config
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
_st_BBQ = struct.Struct('>BBQ')
_st_BB4s = struct.Struct('>BB4s')
_st_BBH4s = struct.Struct('>BBH4s')
_st_BBQ4s = struct.Struct('>BBQ4s')
_st_H = struct.Struct('>H')
_st_Q = struct.Struct('>Q')
_ssl_ctx = ssl.create_default_context()
_ssl_ctx.check_hostname = False
_ssl_ctx.verify_mode = ssl.CERT_NONE
class WsHandshakeError(Exception):
def __init__(self, status_code: int, status_line: str,
headers: dict = None, location: str = None):
self.status_code = status_code
self.status_line = status_line
self.headers = headers or {}
self.location = location
super().__init__(f"HTTP {status_code}: {status_line}")
@property
def is_redirect(self) -> bool:
return self.status_code in (301, 302, 303, 307, 308)
def _xor_mask(data: bytes, mask: bytes) -> bytes:
if not data:
return data
n = len(data)
mask_rep = (mask * (n // 4 + 1))[:n]
return (int.from_bytes(data, 'big') ^
int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
def set_sock_opts(transport, buffer_size):
sock = transport.get_extra_info('socket')
if sock is None:
return
try:
sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
except (OSError, AttributeError):
pass
try:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, buffer_size)
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_SNDBUF, buffer_size)
except OSError:
pass
class RawWebSocket:
__slots__ = ('reader', 'writer', '_closed')
OP_BINARY = 0x2
OP_CLOSE = 0x8
OP_PING = 0x9
OP_PONG = 0xA
def __init__(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self._closed = False
@staticmethod
async def connect(host: str, domain: str, timeout: float = 10.0) -> 'RawWebSocket':
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, 443, ssl=_ssl_ctx,
server_hostname=domain),
timeout=min(timeout, 10))
set_sock_opts(writer.transport, proxy_config.buffer_size)
ws_key = base64.b64encode(os.urandom(16)).decode()
req = (
f'GET /apiws HTTP/1.1\r\n'
f'Host: {domain}\r\n'
f'Upgrade: websocket\r\n'
f'Connection: Upgrade\r\n'
f'Sec-WebSocket-Key: {ws_key}\r\n'
f'Sec-WebSocket-Version: 13\r\n'
f'Sec-WebSocket-Protocol: binary\r\n'
f'Origin: https://web.telegram.org\r\n'
f'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
f'AppleWebKit/537.36 (KHTML, like Gecko) '
f'Chrome/131.0.0.0 Safari/537.36\r\n'
f'\r\n'
)
writer.write(req.encode())
await writer.drain()
response_lines: list[str] = []
try:
while True:
line = await asyncio.wait_for(reader.readline(),
timeout=timeout)
if line in (b'\r\n', b'\n', b''):
break
response_lines.append(
line.decode('utf-8', errors='replace').strip())
except asyncio.TimeoutError:
writer.close()
raise
if not response_lines:
writer.close()
raise WsHandshakeError(0, 'empty response')
first_line = response_lines[0]
parts = first_line.split(' ', 2)
try:
status_code = int(parts[1]) if len(parts) >= 2 else 0
except ValueError:
status_code = 0
if status_code == 101:
return RawWebSocket(reader, writer)
headers: dict[str, str] = {}
for hl in response_lines[1:]:
if ':' in hl:
k, v = hl.split(':', 1)
headers[k.strip().lower()] = v.strip()
writer.close()
raise WsHandshakeError(status_code, first_line, headers,
location=headers.get('location'))
async def send(self, data: bytes):
if self._closed:
raise ConnectionError("WebSocket closed")
frame = self._build_frame(self.OP_BINARY, data, mask=True)
self.writer.write(frame)
await self.writer.drain()
async def send_batch(self, parts: List[bytes]):
if self._closed:
raise ConnectionError("WebSocket closed")
for part in parts:
self.writer.write(
self._build_frame(self.OP_BINARY, part, mask=True))
await self.writer.drain()
async def recv(self) -> Optional[bytes]:
while not self._closed:
opcode, payload = await self._read_frame()
if opcode == self.OP_CLOSE:
self._closed = True
try:
self.writer.write(self._build_frame(
self.OP_CLOSE,
payload[:2] if payload else b'', mask=True))
await self.writer.drain()
except Exception:
pass
return None
if opcode == self.OP_PING:
try:
self.writer.write(
self._build_frame(self.OP_PONG, payload, mask=True))
await self.writer.drain()
except Exception:
pass
continue
if opcode == self.OP_PONG:
continue
if opcode in (0x1, 0x2):
return payload
continue
return None
async def close(self):
if self._closed:
return
self._closed = True
try:
self.writer.write(
self._build_frame(self.OP_CLOSE, b'', mask=True))
await self.writer.drain()
except Exception:
pass
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
@staticmethod
def _build_frame(opcode: int, data: bytes,
mask: bool = False) -> bytes:
length = len(data)
fb = 0x80 | opcode
if not mask:
if length < 126:
return _st_BB.pack(fb, length) + data
if length < 65536:
return _st_BBH.pack(fb, 126, length) + data
return _st_BBQ.pack(fb, 127, length) + data
mask_key = os.urandom(4)
masked = _xor_mask(data, mask_key)
if length < 126:
return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked
if length < 65536:
return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked
return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked
async def _read_frame(self) -> Tuple[int, bytes]:
hdr = await self.reader.readexactly(2)
opcode = hdr[0] & 0x0F
length = hdr[1] & 0x7F
if length == 126:
length = _st_H.unpack(await self.reader.readexactly(2))[0]
elif length == 127:
length = _st_Q.unpack(await self.reader.readexactly(8))[0]
if hdr[1] & 0x80:
mask_key = await self.reader.readexactly(4)
payload = await self.reader.readexactly(length)
return opcode, _xor_mask(payload, mask_key)
payload = await self.reader.readexactly(length)
return opcode, payload

33
proxy/stats.py Normal file
View File

@@ -0,0 +1,33 @@
from .utils import human_bytes
class _Stats:
def __init__(self):
self.connections_total = 0
self.connections_active = 0
self.connections_ws = 0
self.connections_tcp_fallback = 0
self.connections_cfproxy = 0
self.connections_bad = 0
self.ws_errors = 0
self.bytes_up = 0
self.bytes_down = 0
self.pool_hits = 0
self.pool_misses = 0
def summary(self) -> str:
pool_total = self.pool_hits + self.pool_misses
pool_s = (f"{self.pool_hits}/{pool_total}"
if pool_total else "n/a")
return (f"total={self.connections_total} "
f"active={self.connections_active} "
f"ws={self.connections_ws} "
f"tcp_fb={self.connections_tcp_fallback} "
f"cf={self.connections_cfproxy} "
f"bad={self.connections_bad} "
f"err={self.ws_errors} "
f"pool={pool_s} "
f"up={human_bytes(self.bytes_up)} "
f"down={human_bytes(self.bytes_down)}")
stats = _Stats()

View File

@@ -1,10 +1,8 @@
from __future__ import annotations from __future__ import annotations
import os import os
import ssl
import sys import sys
import time import time
import base64
import struct import struct
import asyncio import asyncio
import hashlib import hashlib
@@ -14,317 +12,30 @@ import logging.handlers
import socket as _socket import socket as _socket
from collections import deque from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
if __name__ == '__main__' and (__package__ is None or __package__ == ''):
_repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
__package__ = 'proxy'
@dataclass from .utils import *
class ProxyConfig: from .stats import stats
port: int = 1443 from .config import proxy_config, parse_dc_ip_list
host: str = '127.0.0.1' from .bridge import MsgSplitter, do_fallback, bridge_ws_reencrypt
secret: str = field(default_factory=lambda: os.urandom(16).hex()) from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts
dc_redirects: Dict[int, str] = field(default_factory=lambda: {2: '149.154.167.220', 4: '149.154.167.220'})
buffer_size: int = 256 * 1024
pool_size: int = 4
fallback_cfproxy: bool = True
fallback_cfproxy_priority: bool = True
fallback_cfproxy_domain: str = 'pclead.co.uk'
proxy_config = ProxyConfig()
log = logging.getLogger('tg-mtproto-proxy') log = logging.getLogger('tg-mtproto-proxy')
DC_DEFAULT_IPS: Dict[int, str] = {
1: '149.154.175.50',
2: '149.154.167.51',
3: '149.154.175.100',
4: '149.154.167.91',
5: '149.154.171.5',
203: '91.105.192.100'
}
HANDSHAKE_LEN = 64
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b'\xef\xef\xef\xef'
PROTO_TAG_INTERMEDIATE = b'\xee\xee\xee\xee'
PROTO_TAG_SECURE = b'\xdd\xdd\xdd\xdd'
PROTO_ABRIDGED_INT = 0xEFEFEFEF
PROTO_INTERMEDIATE_INT = 0xEEEEEEEE
PROTO_PADDED_INTERMEDIATE_INT = 0xDDDDDDDD
RESERVED_FIRST_BYTES = {0xEF}
RESERVED_STARTS = {b'\x48\x45\x41\x44', b'\x50\x4F\x53\x54',
b'\x47\x45\x54\x20', b'\xee\xee\xee\xee',
b'\xdd\xdd\xdd\xdd', b'\x16\x03\x01\x02'}
RESERVED_CONTINUE = b'\x00\x00\x00\x00'
DC_FAIL_COOLDOWN = 30.0 DC_FAIL_COOLDOWN = 30.0
WS_FAIL_TIMEOUT = 2.0 WS_FAIL_TIMEOUT = 2.0
ws_blacklist: Set[Tuple[int, bool]] = set() ws_blacklist: Set[Tuple[int, bool]] = set()
dc_fail_until: Dict[Tuple[int, bool], float] = {} dc_fail_until: Dict[Tuple[int, bool], float] = {}
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
_st_BBQ = struct.Struct('>BBQ')
_st_BB4s = struct.Struct('>BB4s')
_st_BBH4s = struct.Struct('>BBH4s')
_st_BBQ4s = struct.Struct('>BBQ4s')
_st_H = struct.Struct('>H')
_st_Q = struct.Struct('>Q')
_st_I_le = struct.Struct('<I')
ZERO_64 = b'\x00' * 64
_ssl_ctx = ssl.create_default_context()
_ssl_ctx.check_hostname = False
_ssl_ctx.verify_mode = ssl.CERT_NONE
def _set_sock_opts(transport):
sock = transport.get_extra_info('socket')
if sock is None:
return
try:
sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
except (OSError, AttributeError):
pass
try:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, proxy_config.buffer_size)
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_SNDBUF, proxy_config.buffer_size)
except OSError:
pass
def _xor_mask(data: bytes, mask: bytes) -> bytes:
if not data:
return data
n = len(data)
mask_rep = (mask * (n // 4 + 1))[:n]
return (int.from_bytes(data, 'big') ^
int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
def get_link_host(host: str) -> Optional[str]:
if host == '0.0.0.0':
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s:
_s.connect(('8.8.8.8', 80))
link_host = _s.getsockname()[0]
except OSError:
link_host = '127.0.0.1'
return link_host
else:
return host
class WsHandshakeError(Exception):
def __init__(self, status_code: int, status_line: str,
headers: dict = None, location: str = None):
self.status_code = status_code
self.status_line = status_line
self.headers = headers or {}
self.location = location
super().__init__(f"HTTP {status_code}: {status_line}")
@property
def is_redirect(self) -> bool:
return self.status_code in (301, 302, 303, 307, 308)
class RawWebSocket:
__slots__ = ('reader', 'writer', '_closed')
OP_BINARY = 0x2
OP_CLOSE = 0x8
OP_PING = 0x9
OP_PONG = 0xA
def __init__(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self._closed = False
@staticmethod
async def connect(ip: str, domain: str, path: str = '/apiws',
timeout: float = 10.0) -> 'RawWebSocket':
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ip, 443, ssl=_ssl_ctx,
server_hostname=domain),
timeout=min(timeout, 10))
_set_sock_opts(writer.transport)
ws_key = base64.b64encode(os.urandom(16)).decode()
req = (
f'GET {path} HTTP/1.1\r\n'
f'Host: {domain}\r\n'
f'Upgrade: websocket\r\n'
f'Connection: Upgrade\r\n'
f'Sec-WebSocket-Key: {ws_key}\r\n'
f'Sec-WebSocket-Version: 13\r\n'
f'Sec-WebSocket-Protocol: binary\r\n'
f'Origin: https://web.telegram.org\r\n'
f'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
f'AppleWebKit/537.36 (KHTML, like Gecko) '
f'Chrome/131.0.0.0 Safari/537.36\r\n'
f'\r\n'
)
writer.write(req.encode())
await writer.drain()
response_lines: list[str] = []
try:
while True:
line = await asyncio.wait_for(reader.readline(),
timeout=timeout)
if line in (b'\r\n', b'\n', b''):
break
response_lines.append(
line.decode('utf-8', errors='replace').strip())
except asyncio.TimeoutError:
writer.close()
raise
if not response_lines:
writer.close()
raise WsHandshakeError(0, 'empty response')
first_line = response_lines[0]
parts = first_line.split(' ', 2)
try:
status_code = int(parts[1]) if len(parts) >= 2 else 0
except ValueError:
status_code = 0
if status_code == 101:
return RawWebSocket(reader, writer)
headers: dict[str, str] = {}
for hl in response_lines[1:]:
if ':' in hl:
k, v = hl.split(':', 1)
headers[k.strip().lower()] = v.strip()
writer.close()
raise WsHandshakeError(status_code, first_line, headers,
location=headers.get('location'))
async def send(self, data: bytes):
if self._closed:
raise ConnectionError("WebSocket closed")
frame = self._build_frame(self.OP_BINARY, data, mask=True)
self.writer.write(frame)
await self.writer.drain()
async def send_batch(self, parts: List[bytes]):
if self._closed:
raise ConnectionError("WebSocket closed")
for part in parts:
self.writer.write(
self._build_frame(self.OP_BINARY, part, mask=True))
await self.writer.drain()
async def recv(self) -> Optional[bytes]:
while not self._closed:
opcode, payload = await self._read_frame()
if opcode == self.OP_CLOSE:
self._closed = True
try:
self.writer.write(self._build_frame(
self.OP_CLOSE,
payload[:2] if payload else b'', mask=True))
await self.writer.drain()
except Exception:
pass
return None
if opcode == self.OP_PING:
try:
self.writer.write(
self._build_frame(self.OP_PONG, payload, mask=True))
await self.writer.drain()
except Exception:
pass
continue
if opcode == self.OP_PONG:
continue
if opcode in (0x1, 0x2):
return payload
continue
return None
async def close(self):
if self._closed:
return
self._closed = True
try:
self.writer.write(
self._build_frame(self.OP_CLOSE, b'', mask=True))
await self.writer.drain()
except Exception:
pass
try:
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
@staticmethod
def _build_frame(opcode: int, data: bytes,
mask: bool = False) -> bytes:
length = len(data)
fb = 0x80 | opcode
if not mask:
if length < 126:
return _st_BB.pack(fb, length) + data
if length < 65536:
return _st_BBH.pack(fb, 126, length) + data
return _st_BBQ.pack(fb, 127, length) + data
mask_key = os.urandom(4)
masked = _xor_mask(data, mask_key)
if length < 126:
return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked
if length < 65536:
return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked
return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked
async def _read_frame(self) -> Tuple[int, bytes]:
hdr = await self.reader.readexactly(2)
opcode = hdr[0] & 0x0F
length = hdr[1] & 0x7F
if length == 126:
length = _st_H.unpack(await self.reader.readexactly(2))[0]
elif length == 127:
length = _st_Q.unpack(await self.reader.readexactly(8))[0]
if hdr[1] & 0x80:
mask_key = await self.reader.readexactly(4)
payload = await self.reader.readexactly(length)
return opcode, _xor_mask(payload, mask_key)
payload = await self.reader.readexactly(length)
return opcode, payload
def _human_bytes(n: int) -> str:
for unit in ('B', 'KB', 'MB', 'GB'):
if abs(n) < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}TB"
def _try_handshake(handshake: bytes, secret: bytes) -> Optional[Tuple[int, bool, bytes, bytes]]: def _try_handshake(handshake: bytes, secret: bytes) -> Optional[Tuple[int, bool, bytes, bytes]]:
dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN + PREKEY_LEN + IV_LEN] dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN + PREKEY_LEN + IV_LEN]
@@ -387,93 +98,6 @@ def _generate_relay_init(proto_tag: bytes, dc_idx: int) -> bytes:
return bytes(result) return bytes(result)
class _MsgSplitter:
"""
Splits TCP stream data into individual MTProto transport packets
so each can be sent as a separate WS frame.
"""
__slots__ = ('_dec', '_proto', '_cipher_buf', '_plain_buf', '_disabled')
def __init__(self, relay_init: bytes, proto_int: int):
cipher = Cipher(algorithms.AES(relay_init[8:40]),
modes.CTR(relay_init[40:56]))
self._dec = cipher.encryptor()
self._dec.update(ZERO_64)
self._proto = proto_int
self._cipher_buf = bytearray()
self._plain_buf = bytearray()
self._disabled = False
def split(self, chunk: bytes) -> List[bytes]:
if not chunk:
return []
if self._disabled:
return [chunk]
self._cipher_buf.extend(chunk)
self._plain_buf.extend(self._dec.update(chunk))
parts = []
while self._cipher_buf:
packet_len = self._next_packet_len()
if packet_len is None:
break
if packet_len <= 0:
parts.append(bytes(self._cipher_buf))
self._cipher_buf.clear()
self._plain_buf.clear()
self._disabled = True
break
parts.append(bytes(self._cipher_buf[:packet_len]))
del self._cipher_buf[:packet_len]
del self._plain_buf[:packet_len]
return parts
def flush(self) -> List[bytes]:
if not self._cipher_buf:
return []
tail = bytes(self._cipher_buf)
self._cipher_buf.clear()
self._plain_buf.clear()
return [tail]
def _next_packet_len(self) -> Optional[int]:
if not self._plain_buf:
return None
if self._proto == PROTO_ABRIDGED_INT:
return self._next_abridged_len()
if self._proto in (PROTO_INTERMEDIATE_INT,
PROTO_PADDED_INTERMEDIATE_INT):
return self._next_intermediate_len()
return 0
def _next_abridged_len(self) -> Optional[int]:
first = self._plain_buf[0]
if first in (0x7F, 0xFF):
if len(self._plain_buf) < 4:
return None
payload_len = int.from_bytes(self._plain_buf[1:4], 'little') * 4
header_len = 4
else:
payload_len = (first & 0x7F) * 4
header_len = 1
if payload_len <= 0:
return 0
packet_len = header_len + payload_len
if len(self._plain_buf) < packet_len:
return None
return packet_len
def _next_intermediate_len(self) -> Optional[int]:
if len(self._plain_buf) < 4:
return None
payload_len = _st_I_le.unpack_from(self._plain_buf, 0)[0] & 0x7FFFFFFF
if payload_len <= 0:
return 0
packet_len = 4 + payload_len
if len(self._plain_buf) < packet_len:
return None
return packet_len
def _ws_domains(dc: int, is_media) -> List[str]: def _ws_domains(dc: int, is_media) -> List[str]:
@@ -484,38 +108,6 @@ def _ws_domains(dc: int, is_media) -> List[str]:
return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org'] return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org']
class Stats:
def __init__(self):
self.connections_total = 0
self.connections_active = 0
self.connections_ws = 0
self.connections_tcp_fallback = 0
self.connections_cfproxy = 0
self.connections_bad = 0
self.ws_errors = 0
self.bytes_up = 0
self.bytes_down = 0
self.pool_hits = 0
self.pool_misses = 0
def summary(self) -> str:
pool_total = self.pool_hits + self.pool_misses
pool_s = (f"{self.pool_hits}/{pool_total}"
if pool_total else "n/a")
return (f"total={self.connections_total} "
f"active={self.connections_active} "
f"ws={self.connections_ws} "
f"tcp_fb={self.connections_tcp_fallback} "
f"cf={self.connections_cfproxy} "
f"bad={self.connections_bad} "
f"err={self.ws_errors} "
f"pool={pool_s} "
f"up={_human_bytes(self.bytes_up)} "
f"down={_human_bytes(self.bytes_down)}")
_stats = Stats()
class _WsPool: class _WsPool:
WS_POOL_MAX_AGE = 120.0 WS_POOL_MAX_AGE = 120.0
@@ -540,13 +132,13 @@ class _WsPool:
or ws.writer.transport.is_closing()): or ws.writer.transport.is_closing()):
asyncio.create_task(self._quiet_close(ws)) asyncio.create_task(self._quiet_close(ws))
continue continue
_stats.pool_hits += 1 stats.pool_hits += 1
log.debug("WS pool hit DC%d%s (age=%.1fs, left=%d)", log.debug("WS pool hit DC%d%s (age=%.1fs, left=%d)",
dc, 'm' if is_media else '', age, len(bucket)) dc, 'm' if is_media else '', age, len(bucket))
self._schedule_refill(key, target_ip, domains) self._schedule_refill(key, target_ip, domains)
return ws return ws
_stats.pool_misses += 1 stats.pool_misses += 1
self._schedule_refill(key, target_ip, domains) self._schedule_refill(key, target_ip, domains)
return None return None
@@ -615,273 +207,13 @@ class _WsPool:
_ws_pool = _WsPool() _ws_pool = _WsPool()
async def _bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
splitter: _MsgSplitter = None):
"""
Bidirectional TCP(client) <-> WS(telegram) with re-encryption.
client ciphertext → decrypt(clt_key) → encrypt(tg_key) → WS
WS data → decrypt(tg_key) → encrypt(clt_key) → client TCP
"""
dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?"
up_bytes = 0
down_bytes = 0
up_packets = 0
down_packets = 0
start_time = asyncio.get_running_loop().time()
async def tcp_to_ws():
nonlocal up_bytes, up_packets
try:
while True:
chunk = await reader.read(65536)
if not chunk:
if splitter:
tail = splitter.flush()
if tail:
await ws.send(tail[0])
break
n = len(chunk)
_stats.bytes_up += n
up_bytes += n
up_packets += 1
plain = clt_decryptor.update(chunk)
chunk = tg_encryptor.update(plain)
if splitter:
parts = splitter.split(chunk)
if not parts:
continue
if len(parts) > 1:
await ws.send_batch(parts)
else:
await ws.send(parts[0])
else:
await ws.send(chunk)
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
log.debug("[%s] tcp->ws ended: %s", label, e)
async def ws_to_tcp():
nonlocal down_bytes, down_packets
try:
while True:
data = await ws.recv()
if data is None:
break
n = len(data)
_stats.bytes_down += n
down_bytes += n
down_packets += 1
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
writer.write(data)
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
log.debug("[%s] ws->tcp ended: %s", label, e)
tasks = [asyncio.create_task(tcp_to_ws()),
asyncio.create_task(ws_to_tcp())]
try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
finally:
for t in tasks:
t.cancel()
for t in tasks:
try:
await t
except BaseException:
pass
elapsed = asyncio.get_running_loop().time() - start_time
log.info("[%s] %s WS session closed: "
"^%s (%d pkts) v%s (%d pkts) in %.1fs",
label, dc_tag,
_human_bytes(up_bytes), up_packets,
_human_bytes(down_bytes), down_packets,
elapsed)
try:
await ws.close()
except BaseException:
pass
try:
writer.close()
await writer.wait_closed()
except BaseException:
pass
async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer,
label, dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
"""Bidirectional TCP <-> TCP with re-encryption."""
async def forward(src, dst_w, is_up):
try:
while True:
data = await src.read(65536)
if not data:
break
n = len(data)
if is_up:
_stats.bytes_up += n
plain = clt_decryptor.update(data)
data = tg_encryptor.update(plain)
else:
_stats.bytes_down += n
plain = tg_decryptor.update(data)
data = clt_encryptor.update(plain)
dst_w.write(data)
await dst_w.drain()
except asyncio.CancelledError:
pass
except Exception as e:
log.debug("[%s] forward ended: %s", label, e)
tasks = [
asyncio.create_task(forward(reader, remote_writer, True)),
asyncio.create_task(forward(remote_reader, writer, False)),
]
try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
finally:
for t in tasks:
t.cancel()
for t in tasks:
try:
await t
except BaseException:
pass
for w in (writer, remote_writer):
try:
w.close()
await w.wait_closed()
except BaseException:
pass
async def _tcp_fallback(reader, writer, dst, port, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None):
try:
rr, rw = await asyncio.wait_for(
asyncio.open_connection(dst, port), timeout=10)
except Exception as exc:
log.warning("[%s] TCP fallback to %s:%d failed: %s",
label, dst, port, exc)
return False
_stats.connections_tcp_fallback += 1
rw.write(relay_init)
await rw.drain()
await _bridge_tcp_reencrypt(reader, writer, rr, rw, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
return True
def _fallback_ip(dc: int) -> Optional[str]:
return DC_DEFAULT_IPS.get(dc)
def _cfproxy_domains(dc: int) -> List[str]:
base = proxy_config.fallback_cfproxy_domain
return [f'kws{dc}.{base}']
async def _cfproxy_fallback(reader, writer, relay_init, label,
dc=None, is_media=False,
clt_decryptor=None, clt_encryptor=None,
tg_encryptor=None, tg_decryptor=None,
splitter=None):
domains = _cfproxy_domains(dc)
media_tag = ' media' if is_media else ''
ws = None
for domain in domains:
log.info("[%s] DC%d%s -> CF proxy wss://%s/apiws",
label, dc, media_tag, domain)
try:
ws = await RawWebSocket.connect(domain, domain,
timeout=10.0)
break
except Exception as exc:
log.warning("[%s] DC%d%s CF proxy %s failed: %s",
label, dc, media_tag, domain, exc)
if ws is None:
return False
_stats.connections_cfproxy += 1
await ws.send(relay_init)
await _bridge_ws_reencrypt(reader, writer, ws, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
return True
async def _do_fallback(reader, writer, relay_init, label,
dc, is_media, media_tag,
clt_decryptor, clt_encryptor,
tg_encryptor, tg_decryptor,
splitter=None):
"""Try CF proxy and/or TCP fallback based on config priority."""
fallback_dst = _fallback_ip(dc)
use_cf = proxy_config.fallback_cfproxy
cf_first = proxy_config.fallback_cfproxy_priority
methods: List[str] = []
if use_cf and cf_first:
methods = ['cf', 'tcp']
elif use_cf:
methods = ['tcp', 'cf']
else:
methods = ['tcp']
for method in methods:
if method == 'cf':
ok = await _cfproxy_fallback(
reader, writer, relay_init, label,
dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor,
splitter=splitter)
if ok:
return True
elif method == 'tcp' and fallback_dst:
log.info("[%s] DC%d%s -> TCP fallback to %s:443",
label, dc, media_tag, fallback_dst)
ok = await _tcp_fallback(
reader, writer, fallback_dst, 443,
relay_init, label, dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor,
tg_encryptor=tg_encryptor,
tg_decryptor=tg_decryptor)
if ok:
return True
return False
async def _handle_client(reader, writer, secret: bytes): async def _handle_client(reader, writer, secret: bytes):
_stats.connections_total += 1 stats.connections_total += 1
_stats.connections_active += 1 stats.connections_active += 1
peer = writer.get_extra_info('peername') peer = writer.get_extra_info('peername')
label = f"{peer[0]}:{peer[1]}" if peer else "?" label = f"{peer[0]}:{peer[1]}" if peer else "?"
_set_sock_opts(writer.transport) set_sock_opts(writer.transport, proxy_config.buffer_size)
try: try:
try: try:
@@ -893,7 +225,7 @@ async def _handle_client(reader, writer, secret: bytes):
result = _try_handshake(handshake, secret) result = _try_handshake(handshake, secret)
if result is None: if result is None:
_stats.connections_bad += 1 stats.connections_bad += 1
log.debug("[%s] bad handshake (wrong secret or proto)", label) log.debug("[%s] bad handshake (wrong secret or proto)", label)
try: try:
while await reader.read(4096): while await reader.read(4096):
@@ -971,10 +303,10 @@ async def _handle_client(reader, writer, secret: bytes):
label, dc, media_tag) label, dc, media_tag)
splitter = None splitter = None
try: try:
splitter = _MsgSplitter(relay_init, proto_int) splitter = MsgSplitter(relay_init, proto_int)
except Exception: except Exception:
pass pass
ok = await _do_fallback( ok = await do_fallback(
reader, writer, relay_init, label, reader, writer, relay_init, label,
dc, is_media, media_tag, dc, is_media, media_tag,
clt_decryptor, clt_encryptor, clt_decryptor, clt_encryptor,
@@ -1010,7 +342,7 @@ async def _handle_client(reader, writer, secret: bytes):
all_redirects = False all_redirects = False
break break
except WsHandshakeError as exc: except WsHandshakeError as exc:
_stats.ws_errors += 1 stats.ws_errors += 1
if exc.is_redirect: if exc.is_redirect:
ws_failed_redirect = True ws_failed_redirect = True
log.warning("[%s] DC%d%s got %d from %s -> %s", log.warning("[%s] DC%d%s got %d from %s -> %s",
@@ -1023,7 +355,7 @@ async def _handle_client(reader, writer, secret: bytes):
log.warning("[%s] DC%d%s WS handshake: %s", log.warning("[%s] DC%d%s WS handshake: %s",
label, dc, media_tag, exc.status_line) label, dc, media_tag, exc.status_line)
except Exception as exc: except Exception as exc:
_stats.ws_errors += 1 stats.ws_errors += 1
all_redirects = False all_redirects = False
log.warning("[%s] DC%d%s WS connect failed: %s", log.warning("[%s] DC%d%s WS connect failed: %s",
label, dc, media_tag, exc) label, dc, media_tag, exc)
@@ -1043,10 +375,10 @@ async def _handle_client(reader, writer, secret: bytes):
splitter_fb = None splitter_fb = None
try: try:
splitter_fb = _MsgSplitter(relay_init, proto_int) splitter_fb = MsgSplitter(relay_init, proto_int)
except Exception: except Exception:
pass pass
ok = await _do_fallback( ok = await do_fallback(
reader, writer, relay_init, label, reader, writer, relay_init, label,
dc, is_media, media_tag, dc, is_media, media_tag,
clt_decryptor, clt_encryptor, clt_decryptor, clt_encryptor,
@@ -1058,11 +390,11 @@ async def _handle_client(reader, writer, secret: bytes):
return return
dc_fail_until.pop(dc_key, None) dc_fail_until.pop(dc_key, None)
_stats.connections_ws += 1 stats.connections_ws += 1
splitter = None splitter = None
try: try:
splitter = _MsgSplitter(relay_init, proto_int) splitter = MsgSplitter(relay_init, proto_int)
log.debug("[%s] MsgSplitter activated for proto 0x%08X", log.debug("[%s] MsgSplitter activated for proto 0x%08X",
label, proto_int) label, proto_int)
except Exception: except Exception:
@@ -1070,7 +402,7 @@ async def _handle_client(reader, writer, secret: bytes):
await ws.send(relay_init) await ws.send(relay_init)
await _bridge_ws_reencrypt(reader, writer, ws, label, await bridge_ws_reencrypt(reader, writer, ws, label,
dc=dc, is_media=is_media, dc=dc, is_media=is_media,
clt_decryptor=clt_decryptor, clt_decryptor=clt_decryptor,
clt_encryptor=clt_encryptor, clt_encryptor=clt_encryptor,
@@ -1094,7 +426,7 @@ async def _handle_client(reader, writer, secret: bytes):
except Exception as exc: except Exception as exc:
log.error("[%s] unexpected: %s", label, exc, exc_info=True) log.error("[%s] unexpected: %s", label, exc, exc_info=True)
finally: finally:
_stats.connections_active -= 1 stats.connections_active -= 1
try: try:
writer.close() writer.close()
except BaseException: except BaseException:
@@ -1154,7 +486,7 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
bl = ', '.join( bl = ', '.join(
f'DC{d}{"m" if m else ""}' f'DC{d}{"m" if m else ""}'
for d, m in sorted(ws_blacklist)) or 'none' for d, m in sorted(ws_blacklist)) or 'none'
log.info("stats: %s | ws_bl: %s", _stats.summary(), bl) log.info("stats: %s | ws_bl: %s", stats.summary(), bl)
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
@@ -1197,22 +529,6 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
_server_instance = None _server_instance = None
def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]:
dc_redirects: Dict[int, str] = {}
for entry in dc_ip_list:
if ':' not in entry:
raise ValueError(
f"Invalid --dc-ip format {entry!r}, expected DC:IP")
dc_s, ip_s = entry.split(':', 1)
try:
dc_n = int(dc_s)
_socket.inet_aton(ip_s)
except (ValueError, OSError):
raise ValueError(f"Invalid --dc-ip {entry!r}")
dc_redirects[dc_n] = ip_s
return dc_redirects
def run_proxy(stop_event: Optional[asyncio.Event] = None): def run_proxy(stop_event: Optional[asyncio.Event] = None):
asyncio.run(_run(stop_event,)) asyncio.run(_run(stop_event,))
@@ -1274,18 +590,15 @@ def main():
secret_hex = os.urandom(16).hex() secret_hex = os.urandom(16).hex()
log.info("Generated secret: %s", secret_hex) log.info("Generated secret: %s", secret_hex)
global proxy_config proxy_config.port = args.port
proxy_config = ProxyConfig( proxy_config.host = args.host
port=args.port, proxy_config.secret = secret_hex
host=args.host, proxy_config.dc_redirects = dc_redirects
secret=secret_hex, proxy_config.buffer_size = max(4, args.buf_kb) * 1024
dc_redirects=dc_redirects, proxy_config.pool_size = max(0, args.pool_size)
buffer_size=max(4, args.buf_kb) * 1024, proxy_config.fallback_cfproxy = not args.no_cfproxy
pool_size=max(0, args.pool_size), proxy_config.fallback_cfproxy_priority = args.cfproxy_priority
fallback_cfproxy=not args.no_cfproxy, proxy_config.fallback_cfproxy_domain = args.cfproxy_domain
fallback_cfproxy_priority=args.cfproxy_priority,
fallback_cfproxy_domain=args.cfproxy_domain,
)
log_level = logging.DEBUG if args.verbose else logging.INFO log_level = logging.DEBUG if args.verbose else logging.INFO
log_fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', log_fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s',
@@ -1310,7 +623,7 @@ def main():
try: try:
asyncio.run(_run()) asyncio.run(_run())
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("Shutting down. Final stats: %s", _stats.summary()) log.info("Shutting down. Final stats: %s", stats.summary())
if __name__ == '__main__': if __name__ == '__main__':

48
proxy/utils.py Normal file
View File

@@ -0,0 +1,48 @@
import socket as _socket
from typing import Optional
ZERO_64 = b'\x00' * 64
HANDSHAKE_LEN = 64
SKIP_LEN = 8
PREKEY_LEN = 32
KEY_LEN = 32
IV_LEN = 16
PROTO_TAG_POS = 56
DC_IDX_POS = 60
PROTO_TAG_ABRIDGED = b'\xef\xef\xef\xef'
PROTO_TAG_INTERMEDIATE = b'\xee\xee\xee\xee'
PROTO_TAG_SECURE = b'\xdd\xdd\xdd\xdd'
PROTO_ABRIDGED_INT = 0xEFEFEFEF
PROTO_INTERMEDIATE_INT = 0xEEEEEEEE
PROTO_PADDED_INTERMEDIATE_INT = 0xDDDDDDDD
RESERVED_FIRST_BYTES = {0xEF}
RESERVED_STARTS = {b'\x48\x45\x41\x44', b'\x50\x4F\x53\x54',
b'\x47\x45\x54\x20', b'\xee\xee\xee\xee',
b'\xdd\xdd\xdd\xdd', b'\x16\x03\x01\x02'}
RESERVED_CONTINUE = b'\x00\x00\x00\x00'
def human_bytes(n: int) -> str:
for unit in ('B', 'KB', 'MB', 'GB'):
if abs(n) < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}TB"
def get_link_host(host: str) -> Optional[str]:
if host == '0.0.0.0':
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s:
_s.connect(('8.8.8.8', 80))
link_host = _s.getsockname()[0]
except OSError:
link_host = '127.0.0.1'
return link_host
else:
return host

View File

@@ -71,3 +71,6 @@ packages = ["proxy", "ui", "utils"]
[tool.hatch.version] [tool.hatch.version]
path = "proxy/__init__.py" path = "proxy/__init__.py"
[tool.ruff.lint]
ignore = ["F403", "F405"]