diff --git a/docs/README.md b/docs/README.md index 42d69b2..a707710 100644 --- a/docs/README.md +++ b/docs/README.md @@ -174,7 +174,7 @@ CLI команды объявляются в `pyproject.toml` в секции `[ ```toml [project.scripts] -tg-ws-proxy = "proxy.tg_ws_proxy:main" +tg-ws-proxy = "proxy:main" tg-ws-proxy-tray-win = "windows:main" tg-ws-proxy-tray-macos = "macos:main" tg-ws-proxy-tray-linux = "linux:main" diff --git a/proxy/__init__.py b/proxy/__init__.py index 64d9de7..d46221c 100644 --- a/proxy/__init__.py +++ b/proxy/__init__.py @@ -1 +1,6 @@ -__version__ = "1.5.1" \ No newline at end of file +from .config import parse_dc_ip_list, proxy_config +from .utils import get_link_host + +__version__ = "1.5.1" + +__all__ = ["__version__", "get_link_host", "proxy_config", "parse_dc_ip_list"] \ No newline at end of file diff --git a/proxy/bridge.py b/proxy/bridge.py new file mode 100644 index 0000000..4c93bca --- /dev/null +++ b/proxy/bridge.py @@ -0,0 +1,363 @@ +import asyncio +import logging +import struct + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from typing import Dict, List, Optional + +from .utils import * +from .stats import stats +from .config import proxy_config +from .raw_websocket import RawWebSocket + + +log = logging.getLogger('tg-mtproto-proxy') +_st_I_le = struct.Struct(' List[bytes]: + if not chunk: + return [] + if self._disabled: + return [chunk] + + self._cipher_buf.extend(chunk) + self._plain_buf.extend(self._dec.update(chunk)) + + parts = [] + while self._cipher_buf: + packet_len = self._next_packet_len() + if packet_len is None: + break + if packet_len <= 0: + parts.append(bytes(self._cipher_buf)) + self._cipher_buf.clear() + self._plain_buf.clear() + self._disabled = True + break + parts.append(bytes(self._cipher_buf[:packet_len])) + del self._cipher_buf[:packet_len] + del self._plain_buf[:packet_len] + return parts + + def flush(self) -> List[bytes]: + if not self._cipher_buf: + return [] + tail = bytes(self._cipher_buf) + self._cipher_buf.clear() + self._plain_buf.clear() + return [tail] + + def _next_packet_len(self) -> Optional[int]: + if not self._plain_buf: + return None + if self._proto == PROTO_ABRIDGED_INT: + return self._next_abridged_len() + if self._proto in (PROTO_INTERMEDIATE_INT, + PROTO_PADDED_INTERMEDIATE_INT): + return self._next_intermediate_len() + return 0 + + def _next_abridged_len(self) -> Optional[int]: + first = self._plain_buf[0] + if first in (0x7F, 0xFF): + if len(self._plain_buf) < 4: + return None + payload_len = int.from_bytes(self._plain_buf[1:4], 'little') * 4 + header_len = 4 + else: + payload_len = (first & 0x7F) * 4 + header_len = 1 + if payload_len <= 0: + return 0 + packet_len = header_len + payload_len + if len(self._plain_buf) < packet_len: + return None + return packet_len + + def _next_intermediate_len(self) -> Optional[int]: + if len(self._plain_buf) < 4: + return None + payload_len = _st_I_le.unpack_from(self._plain_buf, 0)[0] & 0x7FFFFFFF + if payload_len <= 0: + return 0 + packet_len = 4 + payload_len + if len(self._plain_buf) < packet_len: + return None + return packet_len + + + +async def do_fallback(reader, writer, relay_init, label, + dc, is_media, media_tag, + clt_decryptor, clt_encryptor, + tg_encryptor, tg_decryptor, + splitter=None): + fallback_dst = DC_DEFAULT_IPS.get(dc) + use_cf = proxy_config.fallback_cfproxy + cf_first = proxy_config.fallback_cfproxy_priority + + methods: List[str] = ['tcp'] + + if use_cf: + methods.insert(0 if cf_first else 1, 'cf') + + for method in methods: + if method == 'cf': + ok = await _cfproxy_fallback( + reader, writer, relay_init, label, + dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor, + splitter=splitter) + if ok: + return True + elif method == 'tcp' and fallback_dst: + log.info("[%s] DC%d%s -> TCP fallback to %s:443", + label, dc, media_tag, fallback_dst) + ok = await _tcp_fallback( + reader, writer, fallback_dst, 443, + relay_init, label, dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor) + if ok: + return True + return False + + +async def _cfproxy_fallback(reader, writer, relay_init, label, + dc=None, is_media=False, + clt_decryptor=None, clt_encryptor=None, + tg_encryptor=None, tg_decryptor=None, + splitter=None): + domain = f'kws{dc}.{proxy_config.fallback_cfproxy_domain}' + media_tag = ' media' if is_media else '' + ws = None + + log.info("[%s] DC%d%s -> CF proxy wss://%s/apiws", + label, dc, media_tag, domain) + try: + ws = await RawWebSocket.connect(domain, domain, + timeout=10.0) + except Exception as exc: + log.warning("[%s] DC%d%s CF proxy %s failed: %s", + label, dc, media_tag, domain, exc) + + if ws is None: + return False + + stats.connections_cfproxy += 1 + await ws.send(relay_init) + await bridge_ws_reencrypt(reader, writer, ws, label, + dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor, + splitter=splitter) + return True + + +async def _tcp_fallback(reader, writer, dst, port, relay_init, label, + dc=None, is_media=False, + clt_decryptor=None, clt_encryptor=None, + tg_encryptor=None, tg_decryptor=None): + try: + rr, rw = await asyncio.wait_for( + asyncio.open_connection(dst, port), timeout=10) + except Exception as exc: + log.warning("[%s] TCP fallback to %s:%d failed: %s", + label, dst, port, exc) + return False + + stats.connections_tcp_fallback += 1 + rw.write(relay_init) + await rw.drain() + await _bridge_tcp_reencrypt(reader, writer, rr, rw, label, + dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor) + return True + + +async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, + dc=None, is_media=False, + clt_decryptor=None, clt_encryptor=None, + tg_encryptor=None, tg_decryptor=None, + splitter: MsgSplitter = None): + """ + Bidirectional TCP(client) <-> WS(telegram) with re-encryption. + client ciphertext → decrypt(clt_key) → encrypt(tg_key) → WS + WS data → decrypt(tg_key) → encrypt(clt_key) → client TCP + """ + dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?" + + up_bytes = 0 + down_bytes = 0 + up_packets = 0 + down_packets = 0 + start_time = asyncio.get_running_loop().time() + + async def tcp_to_ws(): + nonlocal up_bytes, up_packets + try: + while True: + chunk = await reader.read(65536) + if not chunk: + if splitter: + tail = splitter.flush() + if tail: + await ws.send(tail[0]) + break + n = len(chunk) + stats.bytes_up += n + up_bytes += n + up_packets += 1 + plain = clt_decryptor.update(chunk) + chunk = tg_encryptor.update(plain) + if splitter: + parts = splitter.split(chunk) + if not parts: + continue + if len(parts) > 1: + await ws.send_batch(parts) + else: + await ws.send(parts[0]) + else: + await ws.send(chunk) + except (asyncio.CancelledError, ConnectionError, OSError): + return + except Exception as e: + log.debug("[%s] tcp->ws ended: %s", label, e) + + async def ws_to_tcp(): + nonlocal down_bytes, down_packets + try: + while True: + data = await ws.recv() + if data is None: + break + n = len(data) + stats.bytes_down += n + down_bytes += n + down_packets += 1 + plain = tg_decryptor.update(data) + data = clt_encryptor.update(plain) + writer.write(data) + await writer.drain() + except (asyncio.CancelledError, ConnectionError, OSError): + return + except Exception as e: + log.debug("[%s] ws->tcp ended: %s", label, e) + + tasks = [asyncio.create_task(tcp_to_ws()), + asyncio.create_task(ws_to_tcp())] + try: + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + finally: + for t in tasks: + t.cancel() + for t in tasks: + try: + await t + except BaseException: + pass + elapsed = asyncio.get_running_loop().time() - start_time + log.info("[%s] %s WS session closed: " + "^%s (%d pkts) v%s (%d pkts) in %.1fs", + label, dc_tag, + human_bytes(up_bytes), up_packets, + human_bytes(down_bytes), down_packets, + elapsed) + try: + await ws.close() + except BaseException: + pass + try: + writer.close() + await writer.wait_closed() + except BaseException: + pass + + +async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer, + label, dc=None, is_media=False, + clt_decryptor=None, clt_encryptor=None, + tg_encryptor=None, tg_decryptor=None): + """Bidirectional TCP <-> TCP with re-encryption.""" + + async def forward(src, dst_w, is_up): + try: + while True: + data = await src.read(65536) + if not data: + break + n = len(data) + if is_up: + stats.bytes_up += n + plain = clt_decryptor.update(data) + data = tg_encryptor.update(plain) + else: + stats.bytes_down += n + plain = tg_decryptor.update(data) + data = clt_encryptor.update(plain) + dst_w.write(data) + await dst_w.drain() + except asyncio.CancelledError: + pass + except Exception as e: + log.debug("[%s] forward ended: %s", label, e) + + tasks = [ + asyncio.create_task(forward(reader, remote_writer, True)), + asyncio.create_task(forward(remote_reader, writer, False)), + ] + try: + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + finally: + for t in tasks: + t.cancel() + for t in tasks: + try: + await t + except BaseException: + pass + for w in (writer, remote_writer): + try: + w.close() + await w.wait_closed() + except BaseException: + pass \ No newline at end of file diff --git a/proxy/config.py b/proxy/config.py new file mode 100644 index 0000000..5df365c --- /dev/null +++ b/proxy/config.py @@ -0,0 +1,37 @@ +import os +import socket as _socket + +from typing import Dict, List +from dataclasses import dataclass, field + + +@dataclass +class ProxyConfig: + port: int = 1443 + host: str = '127.0.0.1' + secret: str = field(default_factory=lambda: os.urandom(16).hex()) + dc_redirects: Dict[int, str] = field(default_factory=lambda: {2: '149.154.167.220', 4: '149.154.167.220'}) + buffer_size: int = 256 * 1024 + pool_size: int = 4 + fallback_cfproxy: bool = True + fallback_cfproxy_priority: bool = True + fallback_cfproxy_domain: str = 'pclead.co.uk' + + +proxy_config = ProxyConfig() + + +def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]: + dc_redirects: Dict[int, str] = {} + for entry in dc_ip_list: + if ':' not in entry: + raise ValueError( + f"Invalid --dc-ip format {entry!r}, expected DC:IP") + dc_s, ip_s = entry.split(':', 1) + try: + dc_n = int(dc_s) + _socket.inet_aton(ip_s) + except (ValueError, OSError): + raise ValueError(f"Invalid --dc-ip {entry!r}") + dc_redirects[dc_n] = ip_s + return dc_redirects \ No newline at end of file diff --git a/proxy/raw_websocket.py b/proxy/raw_websocket.py new file mode 100644 index 0000000..6b12652 --- /dev/null +++ b/proxy/raw_websocket.py @@ -0,0 +1,239 @@ +import os +import ssl +import base64 +import struct +import asyncio +import socket as _socket + +from typing import List, Optional, Tuple +from .config import proxy_config + + +_st_BB = struct.Struct('>BB') +_st_BBH = struct.Struct('>BBH') +_st_BBQ = struct.Struct('>BBQ') +_st_BB4s = struct.Struct('>BB4s') +_st_BBH4s = struct.Struct('>BBH4s') +_st_BBQ4s = struct.Struct('>BBQ4s') +_st_H = struct.Struct('>H') +_st_Q = struct.Struct('>Q') + +_ssl_ctx = ssl.create_default_context() +_ssl_ctx.check_hostname = False +_ssl_ctx.verify_mode = ssl.CERT_NONE + + +class WsHandshakeError(Exception): + def __init__(self, status_code: int, status_line: str, + headers: dict = None, location: str = None): + self.status_code = status_code + self.status_line = status_line + self.headers = headers or {} + self.location = location + super().__init__(f"HTTP {status_code}: {status_line}") + + @property + def is_redirect(self) -> bool: + return self.status_code in (301, 302, 303, 307, 308) + + +def _xor_mask(data: bytes, mask: bytes) -> bytes: + if not data: + return data + n = len(data) + mask_rep = (mask * (n // 4 + 1))[:n] + return (int.from_bytes(data, 'big') ^ + int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') + + +def set_sock_opts(transport, buffer_size): + sock = transport.get_extra_info('socket') + if sock is None: + return + + try: + sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1) + except (OSError, AttributeError): + pass + + try: + sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, buffer_size) + sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_SNDBUF, buffer_size) + except OSError: + pass + + +class RawWebSocket: + __slots__ = ('reader', 'writer', '_closed') + + OP_BINARY = 0x2 + OP_CLOSE = 0x8 + OP_PING = 0x9 + OP_PONG = 0xA + + def __init__(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer + self._closed = False + + @staticmethod + async def connect(host: str, domain: str, timeout: float = 10.0) -> 'RawWebSocket': + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, 443, ssl=_ssl_ctx, + server_hostname=domain), + timeout=min(timeout, 10)) + + set_sock_opts(writer.transport, proxy_config.buffer_size) + + ws_key = base64.b64encode(os.urandom(16)).decode() + req = ( + f'GET /apiws HTTP/1.1\r\n' + f'Host: {domain}\r\n' + f'Upgrade: websocket\r\n' + f'Connection: Upgrade\r\n' + f'Sec-WebSocket-Key: {ws_key}\r\n' + f'Sec-WebSocket-Version: 13\r\n' + f'Sec-WebSocket-Protocol: binary\r\n' + f'Origin: https://web.telegram.org\r\n' + f'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + f'AppleWebKit/537.36 (KHTML, like Gecko) ' + f'Chrome/131.0.0.0 Safari/537.36\r\n' + f'\r\n' + ) + writer.write(req.encode()) + await writer.drain() + + response_lines: list[str] = [] + try: + while True: + line = await asyncio.wait_for(reader.readline(), + timeout=timeout) + if line in (b'\r\n', b'\n', b''): + break + response_lines.append( + line.decode('utf-8', errors='replace').strip()) + except asyncio.TimeoutError: + writer.close() + raise + + if not response_lines: + writer.close() + raise WsHandshakeError(0, 'empty response') + + first_line = response_lines[0] + parts = first_line.split(' ', 2) + try: + status_code = int(parts[1]) if len(parts) >= 2 else 0 + except ValueError: + status_code = 0 + + if status_code == 101: + return RawWebSocket(reader, writer) + + headers: dict[str, str] = {} + for hl in response_lines[1:]: + if ':' in hl: + k, v = hl.split(':', 1) + headers[k.strip().lower()] = v.strip() + + writer.close() + raise WsHandshakeError(status_code, first_line, headers, + location=headers.get('location')) + + async def send(self, data: bytes): + if self._closed: + raise ConnectionError("WebSocket closed") + frame = self._build_frame(self.OP_BINARY, data, mask=True) + self.writer.write(frame) + await self.writer.drain() + + async def send_batch(self, parts: List[bytes]): + if self._closed: + raise ConnectionError("WebSocket closed") + for part in parts: + self.writer.write( + self._build_frame(self.OP_BINARY, part, mask=True)) + await self.writer.drain() + + async def recv(self) -> Optional[bytes]: + while not self._closed: + opcode, payload = await self._read_frame() + + if opcode == self.OP_CLOSE: + self._closed = True + try: + self.writer.write(self._build_frame( + self.OP_CLOSE, + payload[:2] if payload else b'', mask=True)) + await self.writer.drain() + except Exception: + pass + return None + + if opcode == self.OP_PING: + try: + self.writer.write( + self._build_frame(self.OP_PONG, payload, mask=True)) + await self.writer.drain() + except Exception: + pass + continue + + if opcode == self.OP_PONG: + continue + + if opcode in (0x1, 0x2): + return payload + continue + return None + + async def close(self): + if self._closed: + return + self._closed = True + try: + self.writer.write( + self._build_frame(self.OP_CLOSE, b'', mask=True)) + await self.writer.drain() + except Exception: + pass + try: + self.writer.close() + await self.writer.wait_closed() + except Exception: + pass + + @staticmethod + def _build_frame(opcode: int, data: bytes, + mask: bool = False) -> bytes: + length = len(data) + fb = 0x80 | opcode + if not mask: + if length < 126: + return _st_BB.pack(fb, length) + data + if length < 65536: + return _st_BBH.pack(fb, 126, length) + data + return _st_BBQ.pack(fb, 127, length) + data + mask_key = os.urandom(4) + masked = _xor_mask(data, mask_key) + if length < 126: + return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked + if length < 65536: + return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked + return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked + + async def _read_frame(self) -> Tuple[int, bytes]: + hdr = await self.reader.readexactly(2) + opcode = hdr[0] & 0x0F + length = hdr[1] & 0x7F + if length == 126: + length = _st_H.unpack(await self.reader.readexactly(2))[0] + elif length == 127: + length = _st_Q.unpack(await self.reader.readexactly(8))[0] + if hdr[1] & 0x80: + mask_key = await self.reader.readexactly(4) + payload = await self.reader.readexactly(length) + return opcode, _xor_mask(payload, mask_key) + payload = await self.reader.readexactly(length) + return opcode, payload \ No newline at end of file diff --git a/proxy/stats.py b/proxy/stats.py new file mode 100644 index 0000000..3a9e61b --- /dev/null +++ b/proxy/stats.py @@ -0,0 +1,33 @@ +from .utils import human_bytes + +class _Stats: + def __init__(self): + self.connections_total = 0 + self.connections_active = 0 + self.connections_ws = 0 + self.connections_tcp_fallback = 0 + self.connections_cfproxy = 0 + self.connections_bad = 0 + self.ws_errors = 0 + self.bytes_up = 0 + self.bytes_down = 0 + self.pool_hits = 0 + self.pool_misses = 0 + + def summary(self) -> str: + pool_total = self.pool_hits + self.pool_misses + pool_s = (f"{self.pool_hits}/{pool_total}" + if pool_total else "n/a") + return (f"total={self.connections_total} " + f"active={self.connections_active} " + f"ws={self.connections_ws} " + f"tcp_fb={self.connections_tcp_fallback} " + f"cf={self.connections_cfproxy} " + f"bad={self.connections_bad} " + f"err={self.ws_errors} " + f"pool={pool_s} " + f"up={human_bytes(self.bytes_up)} " + f"down={human_bytes(self.bytes_down)}") + + +stats = _Stats() \ No newline at end of file diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 63bfcb1..e0461cc 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -1,10 +1,8 @@ from __future__ import annotations import os -import ssl import sys import time -import base64 import struct import asyncio import hashlib @@ -14,317 +12,30 @@ import logging.handlers import socket as _socket from collections import deque -from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Tuple from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +if __name__ == '__main__' and (__package__ is None or __package__ == ''): + _repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) + __package__ = 'proxy' -@dataclass -class ProxyConfig: - port: int = 1443 - host: str = '127.0.0.1' - secret: str = field(default_factory=lambda: os.urandom(16).hex()) - dc_redirects: Dict[int, str] = field(default_factory=lambda: {2: '149.154.167.220', 4: '149.154.167.220'}) - buffer_size: int = 256 * 1024 - pool_size: int = 4 - fallback_cfproxy: bool = True - fallback_cfproxy_priority: bool = True - fallback_cfproxy_domain: str = 'pclead.co.uk' +from .utils import * +from .stats import stats +from .config import proxy_config, parse_dc_ip_list +from .bridge import MsgSplitter, do_fallback, bridge_ws_reencrypt +from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts -proxy_config = ProxyConfig() log = logging.getLogger('tg-mtproto-proxy') -DC_DEFAULT_IPS: Dict[int, str] = { - 1: '149.154.175.50', - 2: '149.154.167.51', - 3: '149.154.175.100', - 4: '149.154.167.91', - 5: '149.154.171.5', - 203: '91.105.192.100' -} - -HANDSHAKE_LEN = 64 -SKIP_LEN = 8 -PREKEY_LEN = 32 -KEY_LEN = 32 -IV_LEN = 16 -PROTO_TAG_POS = 56 -DC_IDX_POS = 60 - -PROTO_TAG_ABRIDGED = b'\xef\xef\xef\xef' -PROTO_TAG_INTERMEDIATE = b'\xee\xee\xee\xee' -PROTO_TAG_SECURE = b'\xdd\xdd\xdd\xdd' - -PROTO_ABRIDGED_INT = 0xEFEFEFEF -PROTO_INTERMEDIATE_INT = 0xEEEEEEEE -PROTO_PADDED_INTERMEDIATE_INT = 0xDDDDDDDD - -RESERVED_FIRST_BYTES = {0xEF} -RESERVED_STARTS = {b'\x48\x45\x41\x44', b'\x50\x4F\x53\x54', - b'\x47\x45\x54\x20', b'\xee\xee\xee\xee', - b'\xdd\xdd\xdd\xdd', b'\x16\x03\x01\x02'} -RESERVED_CONTINUE = b'\x00\x00\x00\x00' - DC_FAIL_COOLDOWN = 30.0 WS_FAIL_TIMEOUT = 2.0 ws_blacklist: Set[Tuple[int, bool]] = set() dc_fail_until: Dict[Tuple[int, bool], float] = {} -_st_BB = struct.Struct('>BB') -_st_BBH = struct.Struct('>BBH') -_st_BBQ = struct.Struct('>BBQ') -_st_BB4s = struct.Struct('>BB4s') -_st_BBH4s = struct.Struct('>BBH4s') -_st_BBQ4s = struct.Struct('>BBQ4s') -_st_H = struct.Struct('>H') -_st_Q = struct.Struct('>Q') -_st_I_le = struct.Struct(' bytes: - if not data: - return data - n = len(data) - mask_rep = (mask * (n // 4 + 1))[:n] - return (int.from_bytes(data, 'big') ^ - int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') - - -def get_link_host(host: str) -> Optional[str]: - if host == '0.0.0.0': - try: - with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s: - _s.connect(('8.8.8.8', 80)) - link_host = _s.getsockname()[0] - except OSError: - link_host = '127.0.0.1' - return link_host - else: - return host - - -class WsHandshakeError(Exception): - def __init__(self, status_code: int, status_line: str, - headers: dict = None, location: str = None): - self.status_code = status_code - self.status_line = status_line - self.headers = headers or {} - self.location = location - super().__init__(f"HTTP {status_code}: {status_line}") - - @property - def is_redirect(self) -> bool: - return self.status_code in (301, 302, 303, 307, 308) - - -class RawWebSocket: - __slots__ = ('reader', 'writer', '_closed') - - OP_BINARY = 0x2 - OP_CLOSE = 0x8 - OP_PING = 0x9 - OP_PONG = 0xA - - def __init__(self, reader: asyncio.StreamReader, - writer: asyncio.StreamWriter): - self.reader = reader - self.writer = writer - self._closed = False - - @staticmethod - async def connect(ip: str, domain: str, path: str = '/apiws', - timeout: float = 10.0) -> 'RawWebSocket': - reader, writer = await asyncio.wait_for( - asyncio.open_connection(ip, 443, ssl=_ssl_ctx, - server_hostname=domain), - timeout=min(timeout, 10)) - _set_sock_opts(writer.transport) - - ws_key = base64.b64encode(os.urandom(16)).decode() - req = ( - f'GET {path} HTTP/1.1\r\n' - f'Host: {domain}\r\n' - f'Upgrade: websocket\r\n' - f'Connection: Upgrade\r\n' - f'Sec-WebSocket-Key: {ws_key}\r\n' - f'Sec-WebSocket-Version: 13\r\n' - f'Sec-WebSocket-Protocol: binary\r\n' - f'Origin: https://web.telegram.org\r\n' - f'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' - f'AppleWebKit/537.36 (KHTML, like Gecko) ' - f'Chrome/131.0.0.0 Safari/537.36\r\n' - f'\r\n' - ) - writer.write(req.encode()) - await writer.drain() - - response_lines: list[str] = [] - try: - while True: - line = await asyncio.wait_for(reader.readline(), - timeout=timeout) - if line in (b'\r\n', b'\n', b''): - break - response_lines.append( - line.decode('utf-8', errors='replace').strip()) - except asyncio.TimeoutError: - writer.close() - raise - - if not response_lines: - writer.close() - raise WsHandshakeError(0, 'empty response') - - first_line = response_lines[0] - parts = first_line.split(' ', 2) - try: - status_code = int(parts[1]) if len(parts) >= 2 else 0 - except ValueError: - status_code = 0 - - if status_code == 101: - return RawWebSocket(reader, writer) - - headers: dict[str, str] = {} - for hl in response_lines[1:]: - if ':' in hl: - k, v = hl.split(':', 1) - headers[k.strip().lower()] = v.strip() - - writer.close() - raise WsHandshakeError(status_code, first_line, headers, - location=headers.get('location')) - - async def send(self, data: bytes): - if self._closed: - raise ConnectionError("WebSocket closed") - frame = self._build_frame(self.OP_BINARY, data, mask=True) - self.writer.write(frame) - await self.writer.drain() - - async def send_batch(self, parts: List[bytes]): - if self._closed: - raise ConnectionError("WebSocket closed") - for part in parts: - self.writer.write( - self._build_frame(self.OP_BINARY, part, mask=True)) - await self.writer.drain() - - async def recv(self) -> Optional[bytes]: - while not self._closed: - opcode, payload = await self._read_frame() - - if opcode == self.OP_CLOSE: - self._closed = True - try: - self.writer.write(self._build_frame( - self.OP_CLOSE, - payload[:2] if payload else b'', mask=True)) - await self.writer.drain() - except Exception: - pass - return None - - if opcode == self.OP_PING: - try: - self.writer.write( - self._build_frame(self.OP_PONG, payload, mask=True)) - await self.writer.drain() - except Exception: - pass - continue - - if opcode == self.OP_PONG: - continue - - if opcode in (0x1, 0x2): - return payload - continue - return None - - async def close(self): - if self._closed: - return - self._closed = True - try: - self.writer.write( - self._build_frame(self.OP_CLOSE, b'', mask=True)) - await self.writer.drain() - except Exception: - pass - try: - self.writer.close() - await self.writer.wait_closed() - except Exception: - pass - - @staticmethod - def _build_frame(opcode: int, data: bytes, - mask: bool = False) -> bytes: - length = len(data) - fb = 0x80 | opcode - if not mask: - if length < 126: - return _st_BB.pack(fb, length) + data - if length < 65536: - return _st_BBH.pack(fb, 126, length) + data - return _st_BBQ.pack(fb, 127, length) + data - mask_key = os.urandom(4) - masked = _xor_mask(data, mask_key) - if length < 126: - return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked - if length < 65536: - return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked - return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked - - async def _read_frame(self) -> Tuple[int, bytes]: - hdr = await self.reader.readexactly(2) - opcode = hdr[0] & 0x0F - length = hdr[1] & 0x7F - if length == 126: - length = _st_H.unpack(await self.reader.readexactly(2))[0] - elif length == 127: - length = _st_Q.unpack(await self.reader.readexactly(8))[0] - if hdr[1] & 0x80: - mask_key = await self.reader.readexactly(4) - payload = await self.reader.readexactly(length) - return opcode, _xor_mask(payload, mask_key) - payload = await self.reader.readexactly(length) - return opcode, payload - - -def _human_bytes(n: int) -> str: - for unit in ('B', 'KB', 'MB', 'GB'): - if abs(n) < 1024: - return f"{n:.1f}{unit}" - n /= 1024 - return f"{n:.1f}TB" - def _try_handshake(handshake: bytes, secret: bytes) -> Optional[Tuple[int, bool, bytes, bytes]]: dec_prekey_and_iv = handshake[SKIP_LEN:SKIP_LEN + PREKEY_LEN + IV_LEN] @@ -387,93 +98,6 @@ def _generate_relay_init(proto_tag: bytes, dc_idx: int) -> bytes: return bytes(result) -class _MsgSplitter: - """ - Splits TCP stream data into individual MTProto transport packets - so each can be sent as a separate WS frame. - """ - __slots__ = ('_dec', '_proto', '_cipher_buf', '_plain_buf', '_disabled') - - def __init__(self, relay_init: bytes, proto_int: int): - cipher = Cipher(algorithms.AES(relay_init[8:40]), - modes.CTR(relay_init[40:56])) - self._dec = cipher.encryptor() - self._dec.update(ZERO_64) - self._proto = proto_int - self._cipher_buf = bytearray() - self._plain_buf = bytearray() - self._disabled = False - - def split(self, chunk: bytes) -> List[bytes]: - if not chunk: - return [] - if self._disabled: - return [chunk] - - self._cipher_buf.extend(chunk) - self._plain_buf.extend(self._dec.update(chunk)) - - parts = [] - while self._cipher_buf: - packet_len = self._next_packet_len() - if packet_len is None: - break - if packet_len <= 0: - parts.append(bytes(self._cipher_buf)) - self._cipher_buf.clear() - self._plain_buf.clear() - self._disabled = True - break - parts.append(bytes(self._cipher_buf[:packet_len])) - del self._cipher_buf[:packet_len] - del self._plain_buf[:packet_len] - return parts - - def flush(self) -> List[bytes]: - if not self._cipher_buf: - return [] - tail = bytes(self._cipher_buf) - self._cipher_buf.clear() - self._plain_buf.clear() - return [tail] - - def _next_packet_len(self) -> Optional[int]: - if not self._plain_buf: - return None - if self._proto == PROTO_ABRIDGED_INT: - return self._next_abridged_len() - if self._proto in (PROTO_INTERMEDIATE_INT, - PROTO_PADDED_INTERMEDIATE_INT): - return self._next_intermediate_len() - return 0 - - def _next_abridged_len(self) -> Optional[int]: - first = self._plain_buf[0] - if first in (0x7F, 0xFF): - if len(self._plain_buf) < 4: - return None - payload_len = int.from_bytes(self._plain_buf[1:4], 'little') * 4 - header_len = 4 - else: - payload_len = (first & 0x7F) * 4 - header_len = 1 - if payload_len <= 0: - return 0 - packet_len = header_len + payload_len - if len(self._plain_buf) < packet_len: - return None - return packet_len - - def _next_intermediate_len(self) -> Optional[int]: - if len(self._plain_buf) < 4: - return None - payload_len = _st_I_le.unpack_from(self._plain_buf, 0)[0] & 0x7FFFFFFF - if payload_len <= 0: - return 0 - packet_len = 4 + payload_len - if len(self._plain_buf) < packet_len: - return None - return packet_len def _ws_domains(dc: int, is_media) -> List[str]: @@ -484,38 +108,6 @@ def _ws_domains(dc: int, is_media) -> List[str]: return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org'] -class Stats: - def __init__(self): - self.connections_total = 0 - self.connections_active = 0 - self.connections_ws = 0 - self.connections_tcp_fallback = 0 - self.connections_cfproxy = 0 - self.connections_bad = 0 - self.ws_errors = 0 - self.bytes_up = 0 - self.bytes_down = 0 - self.pool_hits = 0 - self.pool_misses = 0 - - def summary(self) -> str: - pool_total = self.pool_hits + self.pool_misses - pool_s = (f"{self.pool_hits}/{pool_total}" - if pool_total else "n/a") - return (f"total={self.connections_total} " - f"active={self.connections_active} " - f"ws={self.connections_ws} " - f"tcp_fb={self.connections_tcp_fallback} " - f"cf={self.connections_cfproxy} " - f"bad={self.connections_bad} " - f"err={self.ws_errors} " - f"pool={pool_s} " - f"up={_human_bytes(self.bytes_up)} " - f"down={_human_bytes(self.bytes_down)}") - -_stats = Stats() - - class _WsPool: WS_POOL_MAX_AGE = 120.0 @@ -540,13 +132,13 @@ class _WsPool: or ws.writer.transport.is_closing()): asyncio.create_task(self._quiet_close(ws)) continue - _stats.pool_hits += 1 + stats.pool_hits += 1 log.debug("WS pool hit DC%d%s (age=%.1fs, left=%d)", dc, 'm' if is_media else '', age, len(bucket)) self._schedule_refill(key, target_ip, domains) return ws - _stats.pool_misses += 1 + stats.pool_misses += 1 self._schedule_refill(key, target_ip, domains) return None @@ -615,273 +207,13 @@ class _WsPool: _ws_pool = _WsPool() -async def _bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, - dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None, - splitter: _MsgSplitter = None): - """ - Bidirectional TCP(client) <-> WS(telegram) with re-encryption. - client ciphertext → decrypt(clt_key) → encrypt(tg_key) → WS - WS data → decrypt(tg_key) → encrypt(clt_key) → client TCP - """ - dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?" - - up_bytes = 0 - down_bytes = 0 - up_packets = 0 - down_packets = 0 - start_time = asyncio.get_running_loop().time() - - async def tcp_to_ws(): - nonlocal up_bytes, up_packets - try: - while True: - chunk = await reader.read(65536) - if not chunk: - if splitter: - tail = splitter.flush() - if tail: - await ws.send(tail[0]) - break - n = len(chunk) - _stats.bytes_up += n - up_bytes += n - up_packets += 1 - plain = clt_decryptor.update(chunk) - chunk = tg_encryptor.update(plain) - if splitter: - parts = splitter.split(chunk) - if not parts: - continue - if len(parts) > 1: - await ws.send_batch(parts) - else: - await ws.send(parts[0]) - else: - await ws.send(chunk) - except (asyncio.CancelledError, ConnectionError, OSError): - return - except Exception as e: - log.debug("[%s] tcp->ws ended: %s", label, e) - - async def ws_to_tcp(): - nonlocal down_bytes, down_packets - try: - while True: - data = await ws.recv() - if data is None: - break - n = len(data) - _stats.bytes_down += n - down_bytes += n - down_packets += 1 - plain = tg_decryptor.update(data) - data = clt_encryptor.update(plain) - writer.write(data) - await writer.drain() - except (asyncio.CancelledError, ConnectionError, OSError): - return - except Exception as e: - log.debug("[%s] ws->tcp ended: %s", label, e) - - tasks = [asyncio.create_task(tcp_to_ws()), - asyncio.create_task(ws_to_tcp())] - try: - await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - finally: - for t in tasks: - t.cancel() - for t in tasks: - try: - await t - except BaseException: - pass - elapsed = asyncio.get_running_loop().time() - start_time - log.info("[%s] %s WS session closed: " - "^%s (%d pkts) v%s (%d pkts) in %.1fs", - label, dc_tag, - _human_bytes(up_bytes), up_packets, - _human_bytes(down_bytes), down_packets, - elapsed) - try: - await ws.close() - except BaseException: - pass - try: - writer.close() - await writer.wait_closed() - except BaseException: - pass - - -async def _bridge_tcp_reencrypt(reader, writer, remote_reader, remote_writer, - label, dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None): - """Bidirectional TCP <-> TCP with re-encryption.""" - - async def forward(src, dst_w, is_up): - try: - while True: - data = await src.read(65536) - if not data: - break - n = len(data) - if is_up: - _stats.bytes_up += n - plain = clt_decryptor.update(data) - data = tg_encryptor.update(plain) - else: - _stats.bytes_down += n - plain = tg_decryptor.update(data) - data = clt_encryptor.update(plain) - dst_w.write(data) - await dst_w.drain() - except asyncio.CancelledError: - pass - except Exception as e: - log.debug("[%s] forward ended: %s", label, e) - - tasks = [ - asyncio.create_task(forward(reader, remote_writer, True)), - asyncio.create_task(forward(remote_reader, writer, False)), - ] - try: - await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - finally: - for t in tasks: - t.cancel() - for t in tasks: - try: - await t - except BaseException: - pass - for w in (writer, remote_writer): - try: - w.close() - await w.wait_closed() - except BaseException: - pass - - -async def _tcp_fallback(reader, writer, dst, port, relay_init, label, - dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None): - try: - rr, rw = await asyncio.wait_for( - asyncio.open_connection(dst, port), timeout=10) - except Exception as exc: - log.warning("[%s] TCP fallback to %s:%d failed: %s", - label, dst, port, exc) - return False - - _stats.connections_tcp_fallback += 1 - rw.write(relay_init) - await rw.drain() - await _bridge_tcp_reencrypt(reader, writer, rr, rw, label, - dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) - return True - - -def _fallback_ip(dc: int) -> Optional[str]: - return DC_DEFAULT_IPS.get(dc) - - -def _cfproxy_domains(dc: int) -> List[str]: - base = proxy_config.fallback_cfproxy_domain - return [f'kws{dc}.{base}'] - - -async def _cfproxy_fallback(reader, writer, relay_init, label, - dc=None, is_media=False, - clt_decryptor=None, clt_encryptor=None, - tg_encryptor=None, tg_decryptor=None, - splitter=None): - domains = _cfproxy_domains(dc) - media_tag = ' media' if is_media else '' - ws = None - for domain in domains: - log.info("[%s] DC%d%s -> CF proxy wss://%s/apiws", - label, dc, media_tag, domain) - try: - ws = await RawWebSocket.connect(domain, domain, - timeout=10.0) - break - except Exception as exc: - log.warning("[%s] DC%d%s CF proxy %s failed: %s", - label, dc, media_tag, domain, exc) - if ws is None: - return False - _stats.connections_cfproxy += 1 - await ws.send(relay_init) - await _bridge_ws_reencrypt(reader, writer, ws, label, - dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor, - splitter=splitter) - return True - - -async def _do_fallback(reader, writer, relay_init, label, - dc, is_media, media_tag, - clt_decryptor, clt_encryptor, - tg_encryptor, tg_decryptor, - splitter=None): - """Try CF proxy and/or TCP fallback based on config priority.""" - fallback_dst = _fallback_ip(dc) - use_cf = proxy_config.fallback_cfproxy - cf_first = proxy_config.fallback_cfproxy_priority - - methods: List[str] = [] - if use_cf and cf_first: - methods = ['cf', 'tcp'] - elif use_cf: - methods = ['tcp', 'cf'] - else: - methods = ['tcp'] - - for method in methods: - if method == 'cf': - ok = await _cfproxy_fallback( - reader, writer, relay_init, label, - dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor, - splitter=splitter) - if ok: - return True - elif method == 'tcp' and fallback_dst: - log.info("[%s] DC%d%s -> TCP fallback to %s:443", - label, dc, media_tag, fallback_dst) - ok = await _tcp_fallback( - reader, writer, fallback_dst, 443, - relay_init, label, dc=dc, is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) - if ok: - return True - return False - - async def _handle_client(reader, writer, secret: bytes): - _stats.connections_total += 1 - _stats.connections_active += 1 + stats.connections_total += 1 + stats.connections_active += 1 peer = writer.get_extra_info('peername') label = f"{peer[0]}:{peer[1]}" if peer else "?" - _set_sock_opts(writer.transport) + set_sock_opts(writer.transport, proxy_config.buffer_size) try: try: @@ -893,7 +225,7 @@ async def _handle_client(reader, writer, secret: bytes): result = _try_handshake(handshake, secret) if result is None: - _stats.connections_bad += 1 + stats.connections_bad += 1 log.debug("[%s] bad handshake (wrong secret or proto)", label) try: while await reader.read(4096): @@ -971,10 +303,10 @@ async def _handle_client(reader, writer, secret: bytes): label, dc, media_tag) splitter = None try: - splitter = _MsgSplitter(relay_init, proto_int) + splitter = MsgSplitter(relay_init, proto_int) except Exception: pass - ok = await _do_fallback( + ok = await do_fallback( reader, writer, relay_init, label, dc, is_media, media_tag, clt_decryptor, clt_encryptor, @@ -1010,7 +342,7 @@ async def _handle_client(reader, writer, secret: bytes): all_redirects = False break except WsHandshakeError as exc: - _stats.ws_errors += 1 + stats.ws_errors += 1 if exc.is_redirect: ws_failed_redirect = True log.warning("[%s] DC%d%s got %d from %s -> %s", @@ -1023,7 +355,7 @@ async def _handle_client(reader, writer, secret: bytes): log.warning("[%s] DC%d%s WS handshake: %s", label, dc, media_tag, exc.status_line) except Exception as exc: - _stats.ws_errors += 1 + stats.ws_errors += 1 all_redirects = False log.warning("[%s] DC%d%s WS connect failed: %s", label, dc, media_tag, exc) @@ -1043,10 +375,10 @@ async def _handle_client(reader, writer, secret: bytes): splitter_fb = None try: - splitter_fb = _MsgSplitter(relay_init, proto_int) + splitter_fb = MsgSplitter(relay_init, proto_int) except Exception: pass - ok = await _do_fallback( + ok = await do_fallback( reader, writer, relay_init, label, dc, is_media, media_tag, clt_decryptor, clt_encryptor, @@ -1058,11 +390,11 @@ async def _handle_client(reader, writer, secret: bytes): return dc_fail_until.pop(dc_key, None) - _stats.connections_ws += 1 + stats.connections_ws += 1 splitter = None try: - splitter = _MsgSplitter(relay_init, proto_int) + splitter = MsgSplitter(relay_init, proto_int) log.debug("[%s] MsgSplitter activated for proto 0x%08X", label, proto_int) except Exception: @@ -1070,7 +402,7 @@ async def _handle_client(reader, writer, secret: bytes): await ws.send(relay_init) - await _bridge_ws_reencrypt(reader, writer, ws, label, + await bridge_ws_reencrypt(reader, writer, ws, label, dc=dc, is_media=is_media, clt_decryptor=clt_decryptor, clt_encryptor=clt_encryptor, @@ -1094,7 +426,7 @@ async def _handle_client(reader, writer, secret: bytes): except Exception as exc: log.error("[%s] unexpected: %s", label, exc, exc_info=True) finally: - _stats.connections_active -= 1 + stats.connections_active -= 1 try: writer.close() except BaseException: @@ -1154,7 +486,7 @@ async def _run(stop_event: Optional[asyncio.Event] = None): bl = ', '.join( f'DC{d}{"m" if m else ""}' for d, m in sorted(ws_blacklist)) or 'none' - log.info("stats: %s | ws_bl: %s", _stats.summary(), bl) + log.info("stats: %s | ws_bl: %s", stats.summary(), bl) except asyncio.CancelledError: raise @@ -1197,22 +529,6 @@ async def _run(stop_event: Optional[asyncio.Event] = None): _server_instance = None -def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]: - dc_redirects: Dict[int, str] = {} - for entry in dc_ip_list: - if ':' not in entry: - raise ValueError( - f"Invalid --dc-ip format {entry!r}, expected DC:IP") - dc_s, ip_s = entry.split(':', 1) - try: - dc_n = int(dc_s) - _socket.inet_aton(ip_s) - except (ValueError, OSError): - raise ValueError(f"Invalid --dc-ip {entry!r}") - dc_redirects[dc_n] = ip_s - return dc_redirects - - def run_proxy(stop_event: Optional[asyncio.Event] = None): asyncio.run(_run(stop_event,)) @@ -1274,18 +590,15 @@ def main(): secret_hex = os.urandom(16).hex() log.info("Generated secret: %s", secret_hex) - global proxy_config - proxy_config = ProxyConfig( - port=args.port, - host=args.host, - secret=secret_hex, - dc_redirects=dc_redirects, - buffer_size=max(4, args.buf_kb) * 1024, - pool_size=max(0, args.pool_size), - fallback_cfproxy=not args.no_cfproxy, - fallback_cfproxy_priority=args.cfproxy_priority, - fallback_cfproxy_domain=args.cfproxy_domain, - ) + proxy_config.port = args.port + proxy_config.host = args.host + proxy_config.secret = secret_hex + proxy_config.dc_redirects = dc_redirects + proxy_config.buffer_size = max(4, args.buf_kb) * 1024 + proxy_config.pool_size = max(0, args.pool_size) + proxy_config.fallback_cfproxy = not args.no_cfproxy + proxy_config.fallback_cfproxy_priority = args.cfproxy_priority + proxy_config.fallback_cfproxy_domain = args.cfproxy_domain log_level = logging.DEBUG if args.verbose else logging.INFO log_fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', @@ -1310,7 +623,7 @@ def main(): try: asyncio.run(_run()) except KeyboardInterrupt: - log.info("Shutting down. Final stats: %s", _stats.summary()) + log.info("Shutting down. Final stats: %s", stats.summary()) if __name__ == '__main__': diff --git a/proxy/utils.py b/proxy/utils.py new file mode 100644 index 0000000..10d5f57 --- /dev/null +++ b/proxy/utils.py @@ -0,0 +1,48 @@ +import socket as _socket + +from typing import Optional + + +ZERO_64 = b'\x00' * 64 +HANDSHAKE_LEN = 64 +SKIP_LEN = 8 +PREKEY_LEN = 32 +KEY_LEN = 32 +IV_LEN = 16 +PROTO_TAG_POS = 56 +DC_IDX_POS = 60 + +PROTO_TAG_ABRIDGED = b'\xef\xef\xef\xef' +PROTO_TAG_INTERMEDIATE = b'\xee\xee\xee\xee' +PROTO_TAG_SECURE = b'\xdd\xdd\xdd\xdd' + +PROTO_ABRIDGED_INT = 0xEFEFEFEF +PROTO_INTERMEDIATE_INT = 0xEEEEEEEE +PROTO_PADDED_INTERMEDIATE_INT = 0xDDDDDDDD + +RESERVED_FIRST_BYTES = {0xEF} +RESERVED_STARTS = {b'\x48\x45\x41\x44', b'\x50\x4F\x53\x54', + b'\x47\x45\x54\x20', b'\xee\xee\xee\xee', + b'\xdd\xdd\xdd\xdd', b'\x16\x03\x01\x02'} +RESERVED_CONTINUE = b'\x00\x00\x00\x00' + + +def human_bytes(n: int) -> str: + for unit in ('B', 'KB', 'MB', 'GB'): + if abs(n) < 1024: + return f"{n:.1f}{unit}" + n /= 1024 + return f"{n:.1f}TB" + + +def get_link_host(host: str) -> Optional[str]: + if host == '0.0.0.0': + try: + with _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) as _s: + _s.connect(('8.8.8.8', 80)) + link_host = _s.getsockname()[0] + except OSError: + link_host = '127.0.0.1' + return link_host + else: + return host \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 2ae0a0c..006baff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,3 +71,6 @@ packages = ["proxy", "ui", "utils"] [tool.hatch.version] path = "proxy/__init__.py" + +[tool.ruff.lint] +ignore = ["F403", "F405"] \ No newline at end of file