# # miniirc_matrix # # Copyright © 2021 by luk3yx # from __future__ import annotations from collections.abc import Callable from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any, Optional, TypeVar, overload from urllib.parse import quote as _url_quote, urlparse as _urlparse import functools, hmac, html.parser, itertools, json, math, os, re, time, uuid import miniirc, requests, threading, traceback # type: ignore ver = (0, 0, 13) __version__ = '.'.join(map(str, ver)) def _room_processor(*event_names: str): def _make_wrapper(f: Callable[[Matrix, str, dict[str, Any]], None] ) -> Callable[[Matrix, dict[str, Any]], None]: @functools.wraps(f) def wrapper(self: Matrix, sync_msg: dict[str, Any]) -> None: for event_name in event_names: if event_name in sync_msg: for room_id, room in sync_msg[event_name].items(): f(self, room_id, room) return wrapper return _make_wrapper # Register events _events: dict[str, Callable[[Matrix, str, _Event], None]] = {} def _register_event(event_name: str): def _register(f: Callable[[Matrix, str, _Event], None] ) -> Callable[[Matrix, str, _Event], None]: assert event_name not in _events _events[event_name] = f return f return _register _invisible_formatting_re = re.compile( r'\x02|\x1d|\x1f|\x1e|\x11|\x16|\x0f' r'|\x03([0-9]{1,2})?(?:,([0-9]{1,2}))?' # Hex colours r'|\x04([0-9a-fA-F]{6})?(?:,([0-9a-fA-F]{6}))?' ) _full_formatting_re = re.compile( _invisible_formatting_re.pattern + # Matrix mentions # These currently get inserted without any escaping, if HTML characters get # added to the regex then make sure escaping gets added r'|\B@[a-z0-9\._=\-/+]+:[a-zA-Z0-9_\-\.]+\.[a-zA-Z]{2,}(?!\.\w)\b' ) _html_tags = {'\x02': 'strong', '\x1d': 'em', '\x1f': 'u', '\x1e': 'del', '\x11': 'code'} _media_url_re = re.compile( r'^mxc://([A-Za-z0-9_\-\.]+/[A-Za-z0-9_\-\.]+)(?:/(.*))?$' ) class _TagManager: __slots__ = ('text', 'formatting', 'open_tags', 'tags', 'fg', 'bg', 'reverse_colours') def __init__(self) -> None: self.text: list[str] = [] self.tags: dict[str, str] = {} self.open_tags: dict[str, str] = {} self.fg: Optional[str] = None self.bg: Optional[str] = None self.reverse_colours: bool = False def write_tags(self) -> None: if self.tags != self.open_tags: open_tags = tuple(self.open_tags.items()) tags = tuple(self.tags.items()) i = 0 for a, b in zip(open_tags, tags): if a != b: break i += 1 self.text.extend(f'' for t, _ in reversed(open_tags[i:])) self.text.extend(f'<{t}{params}>' for t, params in tags[i:]) self.open_tags = self.tags.copy() def write(self, s: str) -> None: if s: self.write_tags() self.text.append(s) @staticmethod def _encode_attribute(param: str) -> str: if param == 'href': return param return 'data-mx-' + param.replace('_', '-') def open(self, tag: str, **kwargs: Optional[str]) -> None: self.tags[tag] = ''.join( f' {self._encode_attribute(param)}="{value}"' for param, value in kwargs.items() if value is not None ) # Fix ordering if self.tags == self.open_tags: self.tags = self.open_tags.copy() def set_colours(self, fg: Optional[str] = None, bg: Optional[str] = None) -> None: if fg is not None: self.fg = fg or None if bg is not None: self.bg = bg or None if not self.fg and not self.bg: if 'span' in self.tags: self.close('span') elif self.fg == self.bg: self.open('span', spoiler='') elif self.reverse_colours: # This won't work for the "default" colour self.open('span', color=self.bg, bg_color=self.fg) else: self.open('span', color=self.fg, bg_color=self.bg) def close(self, tag: str) -> None: del self.tags[tag] def toggle(self, tag: str) -> None: if tag in self.tags: self.close(tag) else: self.open(tag) _colours = ( 0xffffff, # White 0x000000, # Black 0x00007f, # Blue 0x009300, # Green 0xff0000, # Red 0xa52a2a, # Brown 0xff00ff, # Magenta 0xffa500, # Orange 0xffff00, # Yellow 0x90ee90, # Light green 0x00ffff, # Cyan 0xe0ffff, # Light cyan 0xadd8e6, # Light blue 0xffc0cb, # Pink 0x808080, # Grey 0xd3d3d3, # Light grey # Colours 16-98 0x470000, 0x472100, 0x474700, 0x324700, 0x004700, 0x00472c, 0x004747, 0x002747, 0x000047, 0x2e0047, 0x470047, 0x47002a, 0x740000, 0x743a00, 0x747400, 0x517400, 0x007400, 0x007449, 0x007474, 0x004074, 0x000074, 0x4b0074, 0x740074, 0x740045, 0xb50000, 0xb56300, 0xb5b500, 0x7db500, 0x00b500, 0x00b571, 0x00b5b5, 0x0063b5, 0x0000b5, 0x7500b5, 0xb500b5, 0xb5006b, 0xff0000, 0xff8c00, 0xffff00, 0xb2ff00, 0x00ff00, 0x00ffa0, 0x00ffff, 0x008cff, 0x0000ff, 0xa500ff, 0xff00ff, 0xff0098, 0xff5959, 0xffb459, 0xffff71, 0xcfff60, 0x6fff6f, 0x65ffc9, 0x6dffff, 0x59b4ff, 0x5959ff, 0xc459ff, 0xff66ff, 0xff59bc, 0xff9c9c, 0xffd39c, 0xffff9c, 0xe2ff9c, 0x9cff9c, 0x9cffdb, 0x9cffff, 0x9cd3ff, 0x9c9cff, 0xdc9cff, 0xff9cff, 0xff94d3, 0x000000, 0x131313, 0x282828, 0x363636, 0x4d4d4d, 0x656565, 0x818181, 0x9f9f9f, 0xbcbcbc, 0xe2e2e2, 0xffffff ) def _irc_colour_to_hex(code: Optional[str]) -> Optional[str]: if code is None: return None try: return f'#{_colours[int(code)]:06X}' except (IndexError, ValueError): return '' def _irc_to_html(irc_msg: str) -> tuple[Optional[str], set[str]]: """ Converts IRC formatting to Matrix HTML. Returns None if the message contains no formatting. """ mentions: set[str] = set() # Escaping quotes seems to make matrix-appservice-discord do strange things irc_msg = html.escape(irc_msg, quote=False) # If there is no formatting return immediately it = _full_formatting_re.finditer(irc_msg) first_match = next(it, None) if first_match is None: return None, mentions tags = _TagManager() prev_end = start = 0 for match in itertools.chain([first_match], it): start = match.start() tags.write(irc_msg[prev_end:start]) char = irc_msg[start] if char in _html_tags: tags.toggle(_html_tags[char]) elif char == '\x03': fg = _irc_colour_to_hex(match.group(1)) bg = _irc_colour_to_hex(match.group(2)) # If this is a plain \x03 then reset colours if fg is None and bg is None: fg = bg = '' tags.set_colours(fg=fg, bg=bg) elif char == '\x04': fg = match.group(3) bg = match.group(4) if fg is None and bg is None: fg = bg = '' tags.set_colours(fg=fg and '#' + fg.upper(), bg=bg and '#' + bg.upper()) elif char == '\n': tags.text.append('
') elif char == '\x16': tags.reverse_colours = not tags.reverse_colours tags.set_colours() elif char == '\x0f': tags.fg = tags.bg = None tags.tags.clear() elif char == '@': # Matrix mention mention = match.group(0) tags.open('a', href=f'https://matrix.to/#/{mention}') tags.write(mention) tags.close('a') mentions.add(mention) prev_end = match.end() tags.write(irc_msg[prev_end:]) tags.tags.clear() tags.write_tags() return ''.join(tags.text).replace('\n', '
'), mentions # This simple space collapsing regex "collapses" newlines as well # _space_collapse_re = re.compile(r'[ \t\r\n]+') _colour_hex_re = re.compile(r'^#[0-9a-fA-F]{6}$') class _UnknownTagError(Exception): pass class _MatrixHTMLParser(html.parser.HTMLParser): irc_codes = {tag: irc_code for irc_code, tag in _html_tags.items()} irc_codes['b'] = irc_codes['strong'] irc_codes['i'] = irc_codes['em'] irc_codes['br'] = '\n' irc_codes['s'] = irc_codes['del'] irc_codes['strike'] = irc_codes['del'] def __init__(self) -> None: super().__init__() self.text: list[str] = [] self.in_reply = 0 def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None: if tag in ('mx-reply', 'script'): self.in_reply += 1 elif self.in_reply: return elif tag in self.irc_codes: self.text.append(self.irc_codes[tag]) elif tag != 'font': # Give up trying to parse the HTML and use the fallback raise _UnknownTagError(tag) def handle_endtag(self, tag: str) -> None: if self.in_reply: if tag in ('mx-reply', 'script'): self.in_reply -= 1 return if tag == 'br': return elif tag in self.irc_codes: self.text.append(self.irc_codes[tag]) elif tag != 'font': raise _UnknownTagError(tag) def handle_data(self, data: str) -> None: if not self.in_reply: self.text.append(data) def _matrix_html_to_irc(content: _Event) -> tuple[str, bool]: if content.format == 'org.matrix.custom.html': try: parser = _MatrixHTMLParser() parser.feed(content.formatted_body[str]) # return _space_collapse_re.sub(' ', ''.join(parser.text)), True return ''.join(parser.text), True except _UnknownTagError: # This is okay, just use the fallback text pass return content.body[str], False class _MediaProxyHandler(BaseHTTPRequestHandler): irc: Matrix def do_GET(self) -> None: try: with self.irc._download_media('mxc:/' + self.path) as resp: if resp.status_code != 200: self.send_error(resp.status_code) return self.send_response(200) self.send_header('X-Content-Type-Options', 'nosniff') self.send_header('Content-Security-Policy', "default-src 'none'") if 'Content-Length' in resp.headers: self.send_header('Content-Length', resp.headers['Content-Length']) # Only allow probably safe content types content_type = resp.headers.get('Content-Type', '') if (content_type.startswith(('image/', 'audio/', 'video/')) or content_type == 'text/plain'): self.send_header('Content-Type', content_type) else: self.send_header('Content-Type', 'application/octet-stream') self.end_headers() # Copy content for chunk in resp.iter_content(8192): self.wfile.write(chunk) except ValueError as exc: self.send_error(400, explain=str(exc)) return def log_message(self, format: str, *args) -> None: if self.irc.debug: super().log_message(format, *args) class _InvalidEventError(Exception): pass _T = TypeVar('_T') _T2 = TypeVar('_T2') class _Event: """ Event content is supposed to be considered untrusted and types must be validated. Usage: event = _Event(event_dict) msgtype1: str = event.content.msgtype[str] msgtype2: Optional[str] = event.content.msgtype.get(str) msgtype3: str = event.content.msgtype.get(str, 'default') assert event.content.msgtype == '' assert 'msgtype' in event.content assert event.content.non.existent.object == None assert event.content.m_test == '' """ __slots__ = ('_raw', '__dict__') def __init__(self, raw: Any) -> None: self._raw = raw def __getattr__(self, attr: str) -> _Event: if isinstance(self._raw, dict): obj = self._raw.get( 'm.' + attr[2:] if attr.startswith('m_') else attr ) else: obj = None res = _Event(obj) # Cache the newly created object. self.__dict__[attr] = res return res def __getitem__(self, result_type: type[_T]) -> _T: if result_type is float and isinstance(self._raw, (int, float)): return self._raw # type: ignore elif result_type is Any or isinstance(self._raw, result_type): return self._raw raise _InvalidEventError def __contains__(self, key: str) -> bool: return isinstance(self._raw, dict) and key in self._raw def __eq__(self, other: Any) -> bool: return self._raw == other @overload def get(self, result_type: type[_T]) -> Optional[_T]: ... @overload def get(self, result_type: type[_T], default: _T2) -> _T | _T2: ... def get(self, result_type, default=None): try: return self[result_type] except _InvalidEventError: return default class Matrix(miniirc.IRC): connected: Optional[bool] msglen = 4096 def __init__( self, ip: str, port: int = 0, nick: str = '', *args, auto_connect: bool = True, token: Optional[str] = None, media_proxy_port: Optional[int] = None, media_proxy_url: Optional[str] = None, media_proxy_key: Optional[bytes] = None, **kwargs ) -> None: # Cache _get_room_url # This is done here so that each class instance gets its own cache and # the cache doesn't store class instances. self._get_room_url = functools.lru_cache(self._get_room_url_no_cache) # Allow Matrix('matrix.org:443'), Matrix('matrix.org', 443), and # Matrix('matrix.org'). if not port: try: new_ip, raw_port = ip.rsplit(':', 1) port = int(raw_port) except ValueError: port = 443 else: ip = new_ip if token: self.token = token self._media_proxy: Optional[ThreadingHTTPServer] = None self._media_proxy_port = media_proxy_port if media_proxy_port and not media_proxy_url: media_proxy_url = f'http://127.0.0.1:{media_proxy_port}' self._media_proxy_url = media_proxy_url and media_proxy_url.rstrip('/') if media_proxy_port is not None: self._media_proxy_key = media_proxy_key or os.urandom(32) # Stop miniirc from trying to access the (non-existent) socket kwargs['ping_interval'] = kwargs['ping_timeout'] = None super().__init__(ip, port, nick, *args, auto_connect=False, **kwargs) self.__session = requests.Session() self.__session.headers['Authorization'] = f'Bearer {token}' if auto_connect: self.connect() else: self._update_baseurl() def _update_baseurl(self) -> None: hostname = f'{self.ip}:{self.port}' if (not self.ssl and self.ssl is not None and self.ip in ('localhost', '127.0.0.1', '::1', '0::1')): # Non-SSL localhost connections are probably to # https://github.com/matrix-org/pantalaimon which doesn't support # the "v3" URLs yet. baseurl = f'http://{hostname}' api_version = 'r0' else: api_version = 'v3' # Check for a .well-known/matrix/client res = requests.get(f'https://{hostname}/.well-known/matrix/client', timeout=10) if res.status_code == 200: e = _Event(res.json()) baseurl = e.m_homeserver.base_url[str].rstrip('/') # Ensure the baseurl is valid parsed_url = _urlparse(baseurl) if (parsed_url.scheme != 'https' or parsed_url.query or parsed_url.fragment): raise ValueError(f'Invalid URL {baseurl!r}') elif res.status_code == 404: # Use a shorter hostname if possible if self.port == 443: hostname = self.ip baseurl = f'https://{hostname}' else: res.raise_for_status() raise ValueError(f'Status code {res.status_code} returned') self._baseurl = f'{baseurl}/_matrix/client/{api_version}' self._media_baseurl = f'{baseurl}/_matrix/client/v1/media' def __get(self, endpoint: str, timeout: int = 5, /, **params: Optional[str | int]) -> Any: self.debug('GET', endpoint, params) return self.__session.get(f'{self._baseurl}/{endpoint}', params=params, timeout=timeout).json() def __post(self, endpoint: str, /, **params: Any) -> Any: self.debug('POST', endpoint, params) return self.__session.post(f'{self._baseurl}/{endpoint}', json=params, timeout=5).json() def __put(self, endpoint: str, /, **params: Any) -> Any: self.debug('PUT', endpoint, params) return self.__session.put(f'{self._baseurl}/{endpoint}', json=params, timeout=5).json() def _get_room_url_no_cache(self, room_id: str) -> str: """ Returns rooms/. This is wrapped with functools.lru_cache by __init__. """ if room_id.startswith('#'): res = self.__get(f'directory/room/{_url_quote(room_id)}') room_id = res.get('room_id', room_id) return f'rooms/{_url_quote(room_id)}' def __make_url_digest(self, path: str) -> str: return hmac.digest(self._media_proxy_key, path.encode('ascii'), 'sha256').hex() def _download_media(self, url: str) -> requests.Response: url_base, _, key = url.partition('?key=') match = _media_url_re.match(url_base) if not match: raise ValueError('Invalid media URL') path = match.group(1) if not hmac.compare_digest(self.__make_url_digest(path), key): raise ValueError('Invalid key parameter') return self.__session.get(f'{self._media_baseurl}/download/{path}', timeout=15, stream=True) @functools.cached_property def current_nick(self) -> str: return self.__get('account/whoami')['user_id'] def connect(self) -> None: if self.connected is not None: return with self._send_lock: self.connected = False self._update_baseurl() self.active_caps = self.ircv3_caps & { 'account-tag', 'echo-message', 'message-tags', } self.__start_time = math.floor(time.time()) * 1000 self.debug('Starting main loop (Matrix)') self._start_main_loop() if self._media_proxy_port: self.debug('Starting media proxy') class _handler(_MediaProxyHandler): irc = self self._media_proxy = ThreadingHTTPServer( ('127.0.0.1', self._media_proxy_port), _handler, ) th = threading.Thread(target=self._media_proxy.serve_forever) th.daemon = True th.start() def disconnect(self) -> None: with self._send_lock: self.connected = False if self._media_proxy is not None: self._media_proxy.shutdown() self._media_proxy = None def _main(self) -> None: try: self.__numeric('001', f'Welcome to Matrix {self.current_nick}') next_batch: Optional[str] = None self.connected = True while self.connected: try: res = self.__get('sync', 35, timeout='30000', since=next_batch) except (requests.ConnectionError, requests.ReadTimeout, json.JSONDecodeError): self.debug('Connection error when trying to fetch /sync') if self.persist: self.debug('Trying again in 5 seconds...') time.sleep(5) continue return if self.debug_file: self.debug(json.dumps(res, indent=4)) if 'error' in res: # TODO: Use self.debug or something print(f'[miniirc_matrix] Error returned when trying to ' f'fetch /sync: {res["error"]!r}') if self.persist: self.debug('Trying again in 15 seconds...') time.sleep(15) continue break next_batch = res['next_batch'] if 'rooms' in res: rooms = res['rooms'] self.__process_join(rooms) self.__process_invite(rooms) finally: self.connected = None def __fire_event(self, room_id: str, event: _Event) -> None: try: # Discard events that occurred before the bot started up if self.__start_time > event.origin_server_ts[float]: return if f := _events.get(event.type[str]): f(self, room_id, event) except _InvalidEventError: if self.debug_file: self.debug(f'Invalid event: {event!r}') self.debug(traceback.format_exc()) @_room_processor('join', 'leave') def __process_join(self, room_id: str, room: dict[str, Any]) -> None: # Joined rooms if 'timeline' in room: for raw_event in room['timeline']['events']: self.__fire_event(room_id, _Event(raw_event)) @_room_processor('invite') def __process_invite(self, room_id: str, room: dict[str, Any]) -> None: # Search for the person who invited for raw_event in reversed(room['invite_state']['events']): event = _Event(raw_event) if (event.type == 'm.room.member' and event.state_key == self.current_nick and event.content.membership == 'invite'): self.__fire_event(room_id, event) break def quote(self, *msg: str, force: Optional[bool] = None, tags: Optional[dict[Any, Any]] = None) -> None: cmd, _, tags2, args = miniirc.ircv3_message_parser(' '.join(msg)) if miniirc.ver[0] < 2 and args and args[-1].startswith(':'): args[-1] = args[-1][1:] self.send(cmd, *args, force=force, tags=tags or tags2) def send(self, cmd: str, *args: str, force: Optional[bool] = None, tags: Optional[dict[Any, Any]] = None) -> None: cmd = cmd.upper() if self.debug_file: self.debug('>>>', cmd, *args) if cmd in ('PRIVMSG', 'NOTICE') and len(args) == 2: channel, msg = args if cmd == 'NOTICE': msgtype = 'm.notice' elif msg.startswith('\x01ACTION'): msg = msg[8:].rstrip('\x01') msgtype = 'm.emote' else: msgtype = 'm.text' params: dict[str, Any] html_msg, mentions = _irc_to_html(msg) if html_msg: params = { 'msgtype': msgtype, 'body': _invisible_formatting_re.sub('', msg), 'format': 'org.matrix.custom.html', 'formatted_body': html_msg, } if mentions: params['m.mentions'] = {'user_ids': list(mentions)} else: # No formatting params = {'msgtype': msgtype, 'body': msg} if tags: if tags.get('+draft/reply'): params['m.relates_to'] = { 'm.in_reply_to': {'event_id': tags['+draft/reply']} } self.__send_tagmsg(channel, tags) self.debug( self.__put(f'{self._get_room_url(channel)}/send/m.room.message' f'/{uuid.uuid4()}', **params) ) elif cmd == 'TAGMSG' and len(args) == 1 and tags: self.__send_tagmsg(args[0], tags) elif cmd == 'JOIN' and len(args) == 1: self.debug(self.__post(f'join/{_url_quote(args[0])}')) elif cmd == 'PART' and len(args) == 1: self.debug(self.__post(f'{self._get_room_url(args[0])}/leave')) elif self.connected: self.debug('Unknown command:', cmd) self.__numeric('421', cmd, 'Unknown command') def __send_tagmsg(self, channel: str, tags: dict[Any, Any]) -> None: if tags.get('+draft/react') and tags.get('+draft/reply'): react = { 'event_id': tags['+draft/reply'], 'key': tags['+draft/react'], 'rel_type': 'm.annotation' } self.debug( self.__put(f'{self._get_room_url(channel)}/send/m.reaction' f'/{uuid.uuid4()}', **{'m.relates_to': react}) ) if miniirc.ver >= (2, 0, 0): def __irc_msg(self, event: _Event, command: str, args: list[str], tags: Optional[dict[str, str]] = None, *, sender: Optional[str] = None) -> None: if sender is None: sender = event.sender[str] tags = tags or {} tags['msgid'] = event.event_id[str] tags['account'] = sender self.handle_msg(miniirc.IRCMessage( command, (sender, sender, sender), tags, args )) def __numeric(self, numeric: str, *args: str) -> None: self.handle_msg(miniirc.IRCMessage( numeric, ('', '', ''), {}, [self.current_nick, *args] )) else: def __irc_msg(self, event: _Event, command: str, args: list[str], tags: Optional[dict[str, str]] = None, *, sender: Optional[str] = None) -> None: if sender is None: sender = event.sender[str] tags = tags or {} tags['msgid'] = event.event_id[str] tags['account'] = sender if args: args[-1] = ':' + args[-1] self._handle(command, (sender, sender, sender), tags, args) def __numeric(self, numeric: str, *args: str) -> None: raw_args = [self.current_nick, *args] raw_args[-1] = ':' + raw_args[-1] self._handle(numeric, (numeric, numeric, numeric), {}, raw_args) @_register_event('m.room.message') def _message_event(self, room_id: str, event: _Event) -> None: if ('echo-message' not in self.active_caps and event.sender == self.current_nick): return command = 'PRIVMSG' content = event.content tags: dict[str, str] = {} msg: str if 'url' in content: msg = content.url[str] if self._media_proxy_url and (match := _media_url_re.match(msg)): path = match.group(1) key = self.__make_url_digest(path) msg = f'{self._media_proxy_url}/{path}?key={key}' else: msg, html_parsed_ok = _matrix_html_to_irc(content) if content.msgtype == 'm.emote': msg = f'\x01ACTION {msg}\x01' elif content.msgtype == 'm.notice': command = 'NOTICE' # Convert replies try: tags['+draft/reply'] = \ content.m_relates_to.m_in_reply_to.event_id[str] except _InvalidEventError: pass else: # Remove the reply added by Element if (content.format == 'org.matrix.custom.html' and not html_parsed_ok and msg.startswith('> ')): msg = msg.split('\n\n', 1)[-1] self.__irc_msg(event, command, [room_id, msg], tags) @_register_event('m.room.member') def _member_event(self, room_id: str, event: _Event) -> None: membership = event.content.membership[str] if membership == 'invite': self.__irc_msg(event, 'INVITE', [event.state_key[str], room_id]) elif membership == 'join': self.__irc_msg(event, 'JOIN', [room_id], sender=event.state_key[str]) elif membership == 'leave': # Don't send the PART event if this is an un-invite if event.unsigned.prev_content.membership == 'invite': # This isn't a standard IRC message, Ergo sends something # similar but only to the client that did /uninvite and not the # one that was uninvited. self.__irc_msg(event, 'UNINVITE', [ event.state_key[str], room_id ]) elif event.sender == event.state_key: self.__irc_msg(event, 'PART', [ room_id, event.content.reason.get(str, 'Leaving') ]) else: self.__irc_msg(event, 'KICK', [ room_id, event.state_key[str], event.content.reason.get(str, 'Kicked') ]) @_register_event('m.room.topic') def _topic_event(self, room_id: str, event: _Event) -> None: self.__irc_msg(event, 'TOPIC', [room_id, event.content.topic[str]]) @_register_event('m.reaction') def _reaction_event(self, room_id: str, event: _Event) -> None: relates_to = event.content.m_relates_to self.__irc_msg(event, 'TAGMSG', [room_id], { '+draft/react': relates_to.key[str], '+draft/reply': relates_to.event_id[str] }) @_register_event('im.vector.modular.widgets') def _widget_event(self, room_id: str, event: _Event) -> None: if event.content.type == 'jitsi': data = event.content.data msg = (f'\x01ACTION started a video conference: https://' f'{data.domain[str]}/{data.conferenceId[str]}\x01') elif event.unsigned.prev_content.type == 'jitsi': msg = '\x01ACTION ended a video conference\x01' else: return self.__irc_msg(event, 'PRIVMSG', [room_id, msg], {}) # Helpers @classmethod def _login(cls, homeserver: str, username: str, password: str) -> str: """ Logs in. Returns a new token or raises an exception. """ matrix = cls(homeserver, auto_connect=False) res = matrix.__post( 'login', type='m.login.password', identifier={'type': 'm.id.user', 'user': username}, password=password, ) if 'error' in res: raise ValueError(f'{res.get("errcode")}: {res.get("error")}') return res['access_token'] @classmethod def _logout(cls, homeserver: str, token: str) -> None: """ Logs out / voids the specified token. """ cls(homeserver, token=token, auto_connect=False).__post('logout') @classmethod def _logout_all(cls, homeserver: str, username: str, password: str) -> None: """ Logs out everywhere / voids all tokens for the user. """ token = cls._login(homeserver, username, password) matrix = Matrix(homeserver, token=token, auto_connect=False) res = matrix.__post('logout/all') assert 'error' not in res, res login = Matrix._login logout = Matrix._logout logout_all = Matrix._logout_all