add pyTelegramBotAPI

This commit is contained in:
jxxghp 2023-06-09 07:37:08 +08:00
parent fa4159c563
commit f5f9f5349a
3 changed files with 46 additions and 75 deletions

View File

@ -6,12 +6,15 @@ from app.core import settings, MediaInfo, TorrentInfo, Context
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
import telebot
from app.utils.string import StringUtils
class Telegram(metaclass=Singleton):
_poll_timeout: int = 5
_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/messages?token={settings.API_TOKEN}"
_event = Event()
_bot: telebot.TeleBot = None
def __init__(self):
"""
@ -21,14 +24,17 @@ class Telegram(metaclass=Singleton):
self._telegram_token = settings.TELEGRAM_TOKEN
# Chat Id
self._telegram_chat_id = settings.TELEGRAM_CHAT_ID
# 用户Chat Id列表
self._telegram_user_ids = settings.TELEGRAM_USERS.split(",")
# 管理员Chat Id列表
self._telegram_admin_ids = settings.TELEGRAM_ADMINS.split(",")
# 消息轮循
# 初始化机器人
if self._telegram_token and self._telegram_chat_id:
self._thread = Thread(target=self.__start_telegram_message_proxy)
self._thread.start()
_bot = telebot.TeleBot(self._telegram_token, parse_mode="markdown")
self._bot = _bot
@_bot.message_handler(func=lambda message: True)
def echo_all(message):
RequestUtils(timeout=10).post_res(self._ds_url, json=message.json)
# 启动轮询
_bot.infinity_polling()
def send_msg(self, title: str, text: str = "", image: str = "", userid: str = "") -> Optional[bool]:
"""
@ -49,7 +55,7 @@ class Telegram(metaclass=Singleton):
try:
if text:
# text中的Markdown特殊字符转义
text = text.replace("[", r"\[").replace("_", r"\_").replace("*", r"\*").replace("`", r"\`").replace("\n\n", "\n")
text = StringUtils.escape_markdown(text)
caption = f"*{title}*\n{text}"
else:
caption = title
@ -138,74 +144,20 @@ class Telegram(metaclass=Singleton):
向Telegram发送报文
"""
def __res_parse(result):
if result and result.status_code == 200:
ret_json = result.json()
status = ret_json.get("ok")
if status:
return True
else:
logger.error(
f"发送消息错误,错误码:{ret_json.get('error_code')},错误原因:{ret_json.get('description')}")
return False
elif result is not None:
logger.error(f"发送消息错误,错误码:{result.status_code},错误原因:{result.reason}")
return False
else:
logger.error("发送消息错误,未知错误")
return False
# 请求
request = RequestUtils(proxies=settings.PROXY)
# 发送图文消息
if image:
photo_req = request.get_res(image)
if photo_req and photo_req.content:
res = request.post_res("https://api.telegram.org/bot%s/sendPhoto" % self._telegram_token,
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
files={"photo": photo_req.content})
if __res_parse(res):
return True
# 发送文本消息
res = request.get_res("https://api.telegram.org/bot%s/sendMessage?" % self._telegram_token + urlencode(
{"chat_id": chat_id, "text": caption, "parse_mode": "Markdown"}))
return __res_parse(res)
ret = self._bot.send_photo(chat_id=self._telegram_chat_id,
photo=image,
caption=caption,
parse_mode="markdown")
else:
ret = self._bot.send_message(chat_id=self._telegram_chat_id,
text=caption,
parse_mode="markdown")
def __start_telegram_message_proxy(self):
logger.info("Telegram消息接收服务启动")
def consume_messages(_offset: int, _sc_url: str, _ds_url: str) -> int:
try:
res = RequestUtils(proxies=settings.PROXY).get_res(
_sc_url + urlencode({"timeout": self._poll_timeout, "offset": _offset}))
if res and res.json():
for msg in res.json().get("result", []):
# 无论本地是否成功先更新offset即消息最多成功消费一次
_offset = msg["update_id"] + 1
logger.debug("Telegram接收到消息: %s" % msg)
local_res = RequestUtils(timeout=10).post_res(_ds_url, json=msg)
logger.debug("Telegram message: %s processed, response is: %s" % (msg, local_res.text))
except Exception as e:
logger.error("Telegram 消息接收出现错误: %s" % e)
return _offset
offset = 0
while True:
if self._event.is_set():
logger.info("Telegram消息接收服务已停止")
break
index = 0
while index < 20 and not self._event.is_set():
offset = consume_messages(_offset=offset,
_sc_url="https://api.telegram.org/bot%s/getUpdates?" % self._telegram_token,
_ds_url="http://127.0.0.1:%s/api/v1/messages?token=%s" % (
settings.PORT, settings.API_TOKEN))
index += 1
return True if ret else False
def stop(self):
"""
停止Telegram消息接收服务
"""
self._event.set()
self._bot.stop_polling()

View File

@ -512,3 +512,21 @@ class StringUtils:
大写首字母兼容None
"""
return s.title() if s else s
@staticmethod
def escape_markdown(content: str) -> str:
"""
Escapes Markdown characters in a string of Markdown.
Credits to: simonsmh
:param content: The string of Markdown to escape.
:type content: :obj:`str`
:return: The escaped string.
:rtype: :obj:`str`
"""
parses = re.sub(r"([_*\[\]()~`>#+\-=|.!{}])", r"\\\1", content)
reparse = re.sub(r"\\\\([_*\[\]()~`>#+\-=|.!{}])", r"\1", parses)
return reparse

View File

@ -35,4 +35,5 @@ func_timeout==4.3.5
selenium~=4.9.1
bs4~=0.0.1
beautifulsoup4~=4.12.2
pillow==9.5.0
pillow~=9.5.0
pyTelegramBotAPI~=4.12.0