From b3ed5c09dbefcb80f0d4f6930d4e555d40e98fff Mon Sep 17 00:00:00 2001 From: Flowseal Date: Sun, 26 Apr 2026 16:58:17 +0300 Subject: [PATCH] Windows auto update --- utils/update_check.py | 52 +++++++++ windows.py | 240 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 290 insertions(+), 2 deletions(-) diff --git a/utils/update_check.py b/utils/update_check.py index 026dd41..ac8c739 100644 --- a/utils/update_check.py +++ b/utils/update_check.py @@ -30,6 +30,7 @@ _state: Dict[str, Any] = { "latest": None, "html_url": None, "error": None, + "assets": [], } @@ -162,6 +163,7 @@ def run_check(current_version: str) -> None: tag = (cache.get("tag_name") or "").strip() if tag: _apply_release_tag(tag, cache.get("html_url") or "", current_version) + _state["assets"] = cache.get("assets") or [] return err = cache.get("last_error") _state["error"] = ( @@ -181,6 +183,7 @@ def run_check(current_version: str) -> None: tag = (cache.get("tag_name") or "").strip() url = (cache.get("html_url") or "").strip() or RELEASES_PAGE_URL _apply_release_tag(tag, url, current_version) + _state["assets"] = cache.get("assets") or [] if new_etag: cache["etag"] = new_etag _save_cache(cache_path, cache) @@ -200,6 +203,13 @@ def run_check(current_version: str) -> None: cache["etag"] = new_etag cache["tag_name"] = tag cache["html_url"] = html_url + assets = [ + {"name": a.get("name", ""), "url": a.get("browser_download_url", ""), "digest": a.get("digest", "")} + for a in (data.get("assets") or []) + if a.get("name") and a.get("browser_download_url") + ] + _state["assets"] = assets + cache["assets"] = assets cache.pop("last_error", None) _save_cache(cache_path, cache) except (HTTPError, URLError, OSError, TimeoutError, ValueError, json.JSONDecodeError) as e: @@ -221,3 +231,45 @@ def run_check(current_version: str) -> None: def get_status() -> Dict[str, Any]: """Снимок состояния после run_check (для подписей в настройках).""" return dict(_state) + + +def get_update_asset(exe_path: Path) -> Optional[Tuple[str, str]]: + assets = _state.get("assets") or [] + if not assets: + return None + + # Try SHA256 match against release asset digests + try: + import hashlib + h = hashlib.sha256() + with open(exe_path, "rb") as f: + while True: + chunk = f.read(65536) + if not chunk: + break + h.update(chunk) + exe_sha = h.hexdigest().lower() + for a in assets: + d = (a.get("digest") or "").lower() + if d.startswith("sha256:") and d[7:] == exe_sha: + return a["url"], a["name"] + except Exception: + pass + + # Fallback + import struct + is_64 = struct.calcsize("P") * 8 == 64 + try: + is_modern = sys.getwindowsversion().major >= 10 + except Exception: + is_modern = True + if is_modern: + name = "TgWsProxy_windows.exe" + elif is_64: + name = "TgWsProxy_windows_7_64bit.exe" + else: + name = "TgWsProxy_windows_7_32bit.exe" + for a in assets: + if a.get("name") == name: + return a["url"], a["name"] + return None diff --git a/windows.py b/windows.py index e3b5ecb..0430781 100644 --- a/windows.py +++ b/windows.py @@ -2,6 +2,7 @@ from __future__ import annotations import ctypes import os +import subprocess import sys import threading import time @@ -40,7 +41,7 @@ from utils.tray_common import ( APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IS_FROZEN, LOG_FILE, acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog, ensure_ctk_thread, ensure_dirs, load_config, load_icon, log, - maybe_notify_update, quit_ctk, release_lock, restart_proxy, + quit_ctk, release_lock, restart_proxy, save_config, start_proxy, stop_proxy, tg_proxy_url, ) from ui.ctk_tray_ui import ( @@ -101,7 +102,9 @@ _u32.MessageBoxW.restype = ctypes.c_int _MB_OK_ERR = 0x10 _MB_OK_INFO = 0x40 _MB_YESNO_Q = 0x24 +_MB_YESNOCANCEL_Q = 0x23 _IDYES = 6 +_IDNO = 7 def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None: @@ -116,6 +119,227 @@ def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool: return _u32.MessageBoxW(None, text, title, _MB_YESNO_Q) == _IDYES +def update_ctk_form( + text: str, title: str = "TG WS Proxy", download_url: Optional[str] = None, + release_url: Optional[str] = None, +) -> str: + if ctk is None or not ensure_ctk_thread(ctk, _config.get("appearance", "auto")): + result = _u32.MessageBoxW(None, text, title, _MB_YESNOCANCEL_Q) + if result == _IDYES: + return "update" + if result == _IDNO: + return "open" + return "close" + + result = {"value": "close"} + + def _build(done: threading.Event) -> None: + theme = ctk_theme_for_platform() + root = create_ctk_toplevel( + ctk, + title=title, + width=310 if IS_FROZEN else 210, + height=130 if IS_FROZEN else 100, + theme=theme, + after_create=lambda r: r.iconbitmap(ICON_PATH), + ) + frame = main_content_frame(ctk, root, theme, padx=16, pady=14) + + ctk.CTkLabel( + frame, + text=text, + justify="left", + anchor="w", + wraplength=270, + font=(theme.ui_font_family, 12), + text_color=theme.text_primary, + ).pack(fill="x", pady=(0, 10)) + + row = ctk.CTkFrame(frame, fg_color="transparent") + row.pack(fill="x") + + status_label = ctk.CTkLabel( + frame, text="", justify="left", anchor="w", wraplength=270, + font=(theme.ui_font_family, 11), text_color=theme.text_secondary, + ) + status_label.pack(fill="x", pady=(6, 0)) + + btns: list = [] + + def _set_status(msg: str) -> None: + root.after(0, lambda: status_label.configure(text=msg)) + + def _close_with(value: str) -> None: + result["value"] = value + root.destroy() + done.set() + + def _on_update() -> None: + if not download_url: + if release_url: + webbrowser.open(release_url) + _close_with("open") + return + for b in btns: + b.configure(state="disabled") + root.protocol("WM_DELETE_WINDOW", lambda: None) + def _run(): + _perform_update(download_url, set_status=_set_status) + root.after(0, lambda: [b.configure(state="normal") for b in btns]) + root.after(0, lambda: root.protocol("WM_DELETE_WINDOW", lambda: _close_with("close"))) + threading.Thread(target=_run, daemon=True).start() + + if IS_FROZEN: + btn_upd = ctk.CTkButton( + row, text="Обновить", width=88, height=34, + font=(theme.ui_font_family, 13), command=_on_update, + ) + btn_upd.pack(side="left", padx=(0, 6)) + btns.append(btn_upd) + btn_pg = ctk.CTkButton( + row, text="Страница", width=88, height=34, + font=(theme.ui_font_family, 13), command=lambda: _close_with("open"), + ) + btn_pg.pack(side="left", padx=(0, 6)) + btns.append(btn_pg) + btn_cl = ctk.CTkButton( + row, text="Закрыть", width=88, height=34, + font=(theme.ui_font_family, 13), + fg_color=theme.field_bg, hover_color=theme.field_border, + text_color=theme.text_primary, border_width=1, border_color=theme.field_border, + command=lambda: _close_with("close"), + ) + btn_cl.pack(side="left") + btns.append(btn_cl) + + root.protocol("WM_DELETE_WINDOW", lambda: _close_with("close")) + + ctk_run_dialog(_build) + return result["value"] + + +def _perform_update(download_url: str, set_status=None) -> None: + import tempfile + import urllib.request + + def _step(msg: str) -> None: + log.info("Update: %s", msg) + if set_status: + set_status(msg) + time.sleep(0.8) + + def _err(msg: str) -> None: + log.error("Update error: %s", msg) + if set_status: + set_status(f"Ошибка: {msg}") + else: + _show_error(msg) + + _step("Скачивание...") + cur_exe = Path(sys.executable) + old_exe = cur_exe.with_name(cur_exe.stem + "_oldtgws.exe") + tmp_path = None + try: + fd, tmp_name = tempfile.mkstemp(dir=cur_exe.parent, suffix=".tmp") + os.close(fd) + tmp_path = Path(tmp_name) + log.info("Downloading update from %s", download_url) + urllib.request.urlretrieve(download_url, str(tmp_path)) + except Exception as exc: + _err(f"Не удалось скачать:\n{exc}") + if tmp_path: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + return + + _step("Замена файла...") + try: + if old_exe.exists(): + old_exe.unlink() + cur_exe.rename(old_exe) + except Exception as exc: + _err(f"Не удалось переименовать файл:\n{exc}") + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + return + + try: + tmp_path.rename(cur_exe) + except Exception as exc: + _err(f"Не удалось переместить файл:\n{exc}") + try: + old_exe.rename(cur_exe) + except OSError: + pass + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + return + + _step("Перезапуск...") + _release_win_mutex() + stop_proxy() + + # Don't reuse existing _MEI* dir + env = os.environ.copy() + for _k in [k for k in env if k.startswith("_PYI_") or k == "_MEIPASS"]: + del env[_k] + if hasattr(sys, "_MEIPASS"): + _mei = os.path.normcase(sys._MEIPASS.rstrip("\\/")) + env["PATH"] = os.pathsep.join( + p for p in env.get("PATH", "").split(os.pathsep) + if os.path.normcase(p.rstrip("\\/")) != _mei + ) + + try: + subprocess.Popen( + [str(cur_exe)], + env=env, + creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP, + ) + except Exception as exc: + log.error("Failed to launch updated exe: %s", exc) + time.sleep(0.5) + os._exit(0) + + +def _maybe_do_update(cfg: dict, is_exiting) -> None: + if not cfg.get("check_updates", True): + return + + def _work(): + time.sleep(1.5) + if is_exiting(): + return + try: + from proxy import __version__ + from utils.update_check import RELEASES_PAGE_URL, get_status, get_update_asset, run_check + + run_check(__version__) + st = get_status() + if not st.get("has_update") or is_exiting(): + return + url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL + ver = st.get("latest") or "?" + asset = get_update_asset(Path(sys.executable)) if IS_FROZEN else None + choice = update_ctk_form( + f"Доступна новая версия: {ver}", + download_url=asset[0] if asset else None, + release_url=url, + ) + if choice == "open": + webbrowser.open(url) + except Exception as exc: + log.warning("Update check failed: %s", repr(exc)) + + threading.Thread(target=_work, daemon=True, name="update-check").start() + + # autostart (registry) _RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run" @@ -370,7 +594,7 @@ def run_tray() -> None: return start_proxy(_config, _show_error) - maybe_notify_update(_config, lambda: _exiting, _ask_yes_no) + _maybe_do_update(_config, lambda: _exiting) _show_first_run() check_ipv6_warning(_show_info) @@ -387,6 +611,18 @@ def main() -> None: _show_info("Приложение уже запущено.", os.path.basename(sys.argv[0])) return + if IS_FROZEN: + def _cleanup_old_exes(): + exe_dir = Path(sys.executable).parent + time.sleep(3) + for _f in exe_dir.glob("*_oldtgws.exe"): + try: + _f.unlink() + log.info("Deleted leftover: %s", _f) + except OSError: + pass + threading.Thread(target=_cleanup_old_exes, daemon=True, name="cleanup-old").start() + try: run_tray() finally: