diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index c5b711c3..72e37523 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -6,12 +6,15 @@ from app.core import settings, MediaInfo, TorrentInfo, Context from app.log import logger from app.utils.http import RequestUtils from app.utils.singleton import Singleton +import telebot + +from app.utils.string import StringUtils class Telegram(metaclass=Singleton): - - _poll_timeout: int = 5 + _ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/messages?token={settings.API_TOKEN}" _event = Event() + _bot: telebot.TeleBot = None def __init__(self): """ @@ -21,14 +24,17 @@ class Telegram(metaclass=Singleton): self._telegram_token = settings.TELEGRAM_TOKEN # Chat Id self._telegram_chat_id = settings.TELEGRAM_CHAT_ID - # 用户Chat Id列表 - self._telegram_user_ids = settings.TELEGRAM_USERS.split(",") - # 管理员Chat Id列表 - self._telegram_admin_ids = settings.TELEGRAM_ADMINS.split(",") - # 消息轮循 + # 初始化机器人 if self._telegram_token and self._telegram_chat_id: - self._thread = Thread(target=self.__start_telegram_message_proxy) - self._thread.start() + _bot = telebot.TeleBot(self._telegram_token, parse_mode="markdown") + self._bot = _bot + + @_bot.message_handler(func=lambda message: True) + def echo_all(message): + RequestUtils(timeout=10).post_res(self._ds_url, json=message.json) + + # 启动轮询 + _bot.infinity_polling() def send_msg(self, title: str, text: str = "", image: str = "", userid: str = "") -> Optional[bool]: """ @@ -49,7 +55,7 @@ class Telegram(metaclass=Singleton): try: if text: # text中的Markdown特殊字符转义 - text = text.replace("[", r"\[").replace("_", r"\_").replace("*", r"\*").replace("`", r"\`").replace("\n\n", "\n") + text = StringUtils.escape_markdown(text) caption = f"*{title}*\n{text}" else: caption = title @@ -138,74 +144,20 @@ class Telegram(metaclass=Singleton): 向Telegram发送报文 """ - def __res_parse(result): - if result and result.status_code == 200: - ret_json = result.json() - status = ret_json.get("ok") - if status: - return True - else: - logger.error( - f"发送消息错误,错误码:{ret_json.get('error_code')},错误原因:{ret_json.get('description')}") - return False - elif result is not None: - logger.error(f"发送消息错误,错误码:{result.status_code},错误原因:{result.reason}") - return False - else: - logger.error("发送消息错误,未知错误") - return False - - # 请求 - request = RequestUtils(proxies=settings.PROXY) - - # 发送图文消息 if image: - photo_req = request.get_res(image) - if photo_req and photo_req.content: - res = request.post_res("https://api.telegram.org/bot%s/sendPhoto" % self._telegram_token, - data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"}, - files={"photo": photo_req.content}) - if __res_parse(res): - return True - # 发送文本消息 - res = request.get_res("https://api.telegram.org/bot%s/sendMessage?" % self._telegram_token + urlencode( - {"chat_id": chat_id, "text": caption, "parse_mode": "Markdown"})) - return __res_parse(res) + ret = self._bot.send_photo(chat_id=self._telegram_chat_id, + photo=image, + caption=caption, + parse_mode="markdown") + else: + ret = self._bot.send_message(chat_id=self._telegram_chat_id, + text=caption, + parse_mode="markdown") - def __start_telegram_message_proxy(self): - logger.info("Telegram消息接收服务启动") - - def consume_messages(_offset: int, _sc_url: str, _ds_url: str) -> int: - try: - res = RequestUtils(proxies=settings.PROXY).get_res( - _sc_url + urlencode({"timeout": self._poll_timeout, "offset": _offset})) - if res and res.json(): - for msg in res.json().get("result", []): - # 无论本地是否成功,先更新offset,即消息最多成功消费一次 - _offset = msg["update_id"] + 1 - logger.debug("Telegram接收到消息: %s" % msg) - local_res = RequestUtils(timeout=10).post_res(_ds_url, json=msg) - logger.debug("Telegram message: %s processed, response is: %s" % (msg, local_res.text)) - except Exception as e: - logger.error("Telegram 消息接收出现错误: %s" % e) - return _offset - - offset = 0 - - while True: - if self._event.is_set(): - logger.info("Telegram消息接收服务已停止") - break - index = 0 - while index < 20 and not self._event.is_set(): - offset = consume_messages(_offset=offset, - _sc_url="https://api.telegram.org/bot%s/getUpdates?" % self._telegram_token, - _ds_url="http://127.0.0.1:%s/api/v1/messages?token=%s" % ( - settings.PORT, settings.API_TOKEN)) - index += 1 + return True if ret else False def stop(self): """ 停止Telegram消息接收服务 """ - self._event.set() + self._bot.stop_polling() diff --git a/app/utils/string.py b/app/utils/string.py index d72707d9..614e391d 100644 --- a/app/utils/string.py +++ b/app/utils/string.py @@ -512,3 +512,21 @@ class StringUtils: 大写首字母兼容None """ return s.title() if s else s + + @staticmethod + def escape_markdown(content: str) -> str: + """ + Escapes Markdown characters in a string of Markdown. + + Credits to: simonsmh + + :param content: The string of Markdown to escape. + :type content: :obj:`str` + + :return: The escaped string. + :rtype: :obj:`str` + """ + + parses = re.sub(r"([_*\[\]()~`>#+\-=|.!{}])", r"\\\1", content) + reparse = re.sub(r"\\\\([_*\[\]()~`>#+\-=|.!{}])", r"\1", parses) + return reparse diff --git a/requirements.txt b/requirements.txt index c8285de2..3389d3b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,5 @@ func_timeout==4.3.5 selenium~=4.9.1 bs4~=0.0.1 beautifulsoup4~=4.12.2 -pillow==9.5.0 \ No newline at end of file +pillow~=9.5.0 +pyTelegramBotAPI~=4.12.0 \ No newline at end of file