diff --git a/app/chain/__init__.py b/app/chain/__init__.py index fc1e5c90..18feefbc 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -2,7 +2,7 @@ import traceback from abc import abstractmethod from typing import Optional, Any -from app.core import Context, ModuleManager, EventManager +from app.core import Context, ModuleManager from app.log import logger from app.utils.singleton import AbstractSingleton, Singleton @@ -17,7 +17,6 @@ class ChainBase(AbstractSingleton, metaclass=Singleton): 公共初始化 """ self.modulemanager = ModuleManager() - self.eventmanager = EventManager() @abstractmethod def process(self, *args, **kwargs) -> Optional[Context]: diff --git a/app/chain/user_message.py b/app/chain/user_message.py index e8eefa1c..6a11aa53 100644 --- a/app/chain/user_message.py +++ b/app/chain/user_message.py @@ -1,11 +1,8 @@ -from typing import Dict, Any +from typing import Any -from fastapi import Request - -from app.chain import ChainBase from app.chain.common import * from app.chain.search import SearchChain -from app.core import MediaInfo, TorrentInfo, MetaInfo +from app.core import MediaInfo, TorrentInfo, MetaInfo, EventManager from app.db.subscribes import Subscribes from app.log import logger from app.utils.types import EventType @@ -32,6 +29,7 @@ class UserMessageChain(ChainBase): self.subscribes = Subscribes() self.searchchain = SearchChain() self.torrent = TorrentHelper() + self.eventmanager = EventManager() def process(self, body: Any, form: Any, args: Any) -> None: """ @@ -60,6 +58,8 @@ class UserMessageChain(ChainBase): "cmd": text } ) + self.common.post_message(title=f"正在运行,请稍候 ...", userid=userid) + elif text.isdigit(): # 缓存 cache_data: dict = self._user_cache.get(userid) @@ -285,7 +285,7 @@ class UserMessageChain(ChainBase): 发送媒体列表消息 """ self.run_module('post_medias_message', - title=f"共找到{total}条相关信息,请回复数字选择对应媒体(p:上一页 n:下一页)", + title=f"共找到{total}条相关信息,请回复数字选择对应媒体(p: 上一页 n: 下一页)", items=items, userid=userid) @@ -294,6 +294,6 @@ class UserMessageChain(ChainBase): 发送种子列表消息 """ self.run_module('post_torrents_message', - title=f"共找到{total}条相关信息,请回复数字下载对应资源(0:自动选择 p:上一页 n:下一页)", + title=f"共找到{total}条相关信息,请回复数字下载对应资源(0: 自动选择 p: 上一页 n: 下一页)", items=items, userid=userid) diff --git a/app/command.py b/app/command.py index 086ecd5d..0a4a5eff 100644 --- a/app/command.py +++ b/app/command.py @@ -1,24 +1,36 @@ +import traceback +from threading import Thread, Event from typing import Any +from app.chain import ChainBase from app.chain.cookiecloud import CookieCloudChain from app.chain.douban_sync import DoubanSyncChain from app.chain.subscribe import SubscribeChain from app.core import eventmanager, PluginManager, EventManager -from app.core.event_manager import Event +from app.core.event_manager import Event as ManagerEvent from app.log import logger from app.utils.singleton import Singleton from app.utils.types import EventType +class CommandChian(ChainBase): + """ + 插件处理链 + """ + + def process(self, *args, **kwargs): + pass + + class Command(metaclass=Singleton): """ - 全局命令管理 + 全局命令管理,消费事件 """ # 内建命令 _commands = { "/cookiecloud": { "func": CookieCloudChain().process, - "description": "同步CookieCloud的Cookie", + "description": "同步站点Cookie", "data": {} }, "/doubansync": { @@ -35,9 +47,16 @@ class Command(metaclass=Singleton): } } + # 退出事件 + _event = Event() + def __init__(self): - # 注册插件命令 - plugin_commands = PluginManager().get_plugin_commands() + # 事件管理器 + self.eventmanager = EventManager() + # 插件管理器 + self.pluginmanager = PluginManager() + # 汇总插件命令 + plugin_commands = self.pluginmanager.get_plugin_commands() for command in plugin_commands: self.register( cmd=command.get('cmd'), @@ -48,6 +67,45 @@ class Command(metaclass=Singleton): 'data': command.get('data') } ) + # 处理链 + self.chain = CommandChian() + # 广播注册命令 + self.chain.run_module("register_commands", commands=self.get_commands()) + # 消息处理线程 + self._thread = Thread(target=self.__run) + # 启动事件处理线程 + self._thread.start() + + def __run(self): + """ + 事件处理线程 + """ + while not self._event.is_set(): + event, handlers = self.eventmanager.get_event() + if event: + logger.info(f"处理事件:{event.event_type} - {handlers}") + for handler in handlers: + try: + names = handler.__qualname__.split(".") + if names[0] == "Command": + self.command_event(event) + else: + self.pluginmanager.run_plugin_method(names[0], names[1], event) + except Exception as e: + logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + + def stop(self): + """ + 停止事件处理线程 + """ + self._event.set() + self._thread.join() + + def get_commands(self): + """ + 获取命令列表 + """ + return self._commands def register(self, cmd: str, func: Any, data: dict = None, desc: str = None) -> None: """ @@ -83,7 +141,7 @@ class Command(metaclass=Singleton): EventManager().send_event(etype, data) @eventmanager.register(EventType.CommandExcute) - def command_event(self, event: Event) -> None: + def command_event(self, event: ManagerEvent) -> None: """ 注册命令执行事件 event_data: { diff --git a/app/core/module_manager.py b/app/core/module_manager.py index be152f7a..b9a5e6eb 100644 --- a/app/core/module_manager.py +++ b/app/core/module_manager.py @@ -39,6 +39,14 @@ class ModuleManager(metaclass=Singleton): self._running_modules[module_id].init_module() logger.info(f"Moudle Loaded:{module_id}") + def stop(self): + """ + 停止所有模块 + """ + for _, module in self._running_modules.items(): + if hasattr(module, "stop"): + module.stop() + def get_modules(self, method: str) -> Generator: """ 获取模块列表 diff --git a/app/core/plugin_manager.py b/app/core/plugin_manager.py index 4362ecd8..4928f016 100644 --- a/app/core/plugin_manager.py +++ b/app/core/plugin_manager.py @@ -1,8 +1,6 @@ import traceback -from threading import Thread -from typing import Tuple, Optional, List, Any +from typing import List, Any -from app.core import EventManager from app.db.systemconfigs import SystemConfigs from app.helper import ModuleHelper from app.log import logger @@ -14,7 +12,6 @@ class PluginManager(metaclass=Singleton): 插件管理器 """ systemconfigs: SystemConfigs = None - eventmanager: EventManager = None # 插件列表 _plugins: dict = {} @@ -22,37 +19,17 @@ class PluginManager(metaclass=Singleton): _running_plugins: dict = {} # 配置Key _config_key: str = "plugin.%s" - # 事件处理线程 - _thread: Thread = None - # 开关 - _active: bool = False def __init__(self): self.init_config() def init_config(self): self.systemconfigs = SystemConfigs() - self.eventmanager = EventManager() # 停止已有插件 self.stop() # 启动插件 self.start() - def __run(self): - """ - 事件处理线程 - """ - while self._active: - event, handlers = self.eventmanager.get_event() - if event: - logger.info(f"处理事件:{event.event_type} - {handlers}") - for handler in handlers: - try: - names = handler.__qualname__.split(".") - self.run_plugin_method(names[0], names[1], event) - except Exception as e: - logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") - def start(self): """ 启动 @@ -60,21 +37,10 @@ class PluginManager(metaclass=Singleton): # 加载插件 self.__load_plugins() - # 将事件管理器设为启动 - self._active = True - self._thread = Thread(target=self.__run) - # 启动事件处理线程 - self._thread.start() - def stop(self): """ 停止 """ - # 将事件管理器设为停止 - self._active = False - # 等待事件处理线程退出 - if self._thread: - self._thread.join() # 停止所有插件 self.__stop_plugins() @@ -131,37 +97,6 @@ class PluginManager(metaclass=Singleton): return {} return self.systemconfigs.get(self._config_key % pid) or {} - def get_plugin_page(self, pid: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: - """ - 获取插件额外页面数据 - :return: 标题,页面内容,确定按钮响应函数 - """ - if not self._running_plugins.get(pid): - return None, None, None - if not hasattr(self._running_plugins[pid], "get_page"): - return None, None, None - return self._running_plugins[pid].get_page() - - def get_plugin_script(self, pid: str) -> Optional[str]: - """ - 获取插件额外脚本 - """ - if not self._running_plugins.get(pid): - return None - if not hasattr(self._running_plugins[pid], "get_script"): - return None - return self._running_plugins[pid].get_script() - - def get_plugin_state(self, pid: str) -> Optional[bool]: - """ - 获取插件状态 - """ - if not self._running_plugins.get(pid): - return None - if not hasattr(self._running_plugins[pid], "get_state"): - return None - return self._running_plugins[pid].get_state() - def save_plugin_config(self, pid: str, conf: dict) -> bool: """ 保存插件配置 @@ -170,98 +105,6 @@ class PluginManager(metaclass=Singleton): return False return self.systemconfigs.set(self._config_key % pid, conf) - @staticmethod - def __get_plugin_color(plugin: str) -> str: - """ - 获取插件的主题色 - """ - if hasattr(plugin, "plugin_color") and plugin.plugin_color: - return plugin.plugin_color - return "" - - def get_plugins_conf(self, auth_level: int) -> dict: - """ - 获取所有插件配置 - """ - all_confs = {} - for pid, plugin in self._running_plugins.items(): - # 基本属性 - conf = {} - # 权限 - if hasattr(plugin, "auth_level") \ - and plugin.auth_level > auth_level: - continue - # 名称 - if hasattr(plugin, "plugin_name"): - conf.update({"name": plugin.plugin_name}) - # 描述 - if hasattr(plugin, "plugin_desc"): - conf.update({"desc": plugin.plugin_desc}) - # 版本号 - if hasattr(plugin, "plugin_version"): - conf.update({"version": plugin.plugin_version}) - # 图标 - if hasattr(plugin, "plugin_icon"): - conf.update({"icon": plugin.plugin_icon}) - # ID前缀 - if hasattr(plugin, "plugin_config_prefix"): - conf.update({"prefix": plugin.plugin_config_prefix}) - # 插件额外的页面 - if hasattr(plugin, "get_page"): - title, _, _ = plugin.get_page() - conf.update({"page": title}) - # 插件额外的脚本 - if hasattr(plugin, "get_script"): - conf.update({"script": plugin.get_script()}) - # 主题色 - conf.update({"color": self.__get_plugin_color(plugin)}) - # 配置项 - conf.update({"fields": plugin.get_fields() or {}}) - # 配置值 - conf.update({"config": self.get_plugin_config(pid)}) - # 状态 - conf.update({"state": plugin.get_state()}) - # 汇总 - all_confs[pid] = conf - return all_confs - - def get_plugin_apps(self, auth_level: int) -> dict: - """ - 获取所有插件 - """ - all_confs = {} - for pid, plugin in self._plugins.items(): - # 基本属性 - conf = {} - # 权限 - if hasattr(plugin, "auth_level") \ - and plugin.auth_level > auth_level: - continue - # ID - conf.update({"id": pid}) - # 名称 - if hasattr(plugin, "plugin_name"): - conf.update({"name": plugin.plugin_name}) - # 描述 - if hasattr(plugin, "plugin_desc"): - conf.update({"desc": plugin.plugin_desc}) - # 版本 - if hasattr(plugin, "plugin_version"): - conf.update({"version": plugin.plugin_version}) - # 图标 - if hasattr(plugin, "plugin_icon"): - conf.update({"icon": plugin.plugin_icon}) - # 主题色 - conf.update({"color": self.__get_plugin_color(plugin)}) - if hasattr(plugin, "plugin_author"): - conf.update({"author": plugin.plugin_author}) - # 作者链接 - if hasattr(plugin, "author_url"): - conf.update({"author_url": plugin.author_url}) - # 汇总 - all_confs[pid] = conf - return all_confs - def get_plugin_commands(self) -> List[dict]: """ 获取插件命令 diff --git a/app/main.py b/app/main.py index 3fe06c3b..9b9c198a 100644 --- a/app/main.py +++ b/app/main.py @@ -3,6 +3,7 @@ from fastapi import FastAPI from uvicorn import Config from app.api.apiv1 import api_router +from app.command import Command from app.core import settings, ModuleManager, PluginManager from app.db.init import init_db, update_db from app.helper.sites import SitesHelper @@ -28,6 +29,10 @@ def shutdown_server(): Scheduler().stop() # 停止插件 PluginManager().stop() + # 停止模块 + ModuleManager().stop() + # 停止事件消费 + Command().stop() @App.on_event("startup") @@ -43,6 +48,8 @@ def start_module(): SitesHelper() # 启动定时服务 Scheduler() + # 启动事件消费 + Command() if __name__ == '__main__': diff --git a/app/modules/__init__.py b/app/modules/__init__.py index 11fbcb8c..06040431 100644 --- a/app/modules/__init__.py +++ b/app/modules/__init__.py @@ -225,3 +225,17 @@ class _ModuleBase(metaclass=ABCMeta): :return: 成功或失败 """ pass + + def register_commands(self, commands: dict): + """ + 注册命令,实现这个函数接收系统可用的命令菜单 + :param commands: 命令字典 + """ + pass + + @abstractmethod + def stop(self): + """ + 如果关闭时模块有服务需要停止,需要实现此方法 + """ + pass diff --git a/app/modules/douban/__init__.py b/app/modules/douban/__init__.py index a055a60c..87404440 100644 --- a/app/modules/douban/__init__.py +++ b/app/modules/douban/__init__.py @@ -23,6 +23,9 @@ class Douban(_ModuleBase): def init_module(self) -> None: pass + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: pass diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index 8543df5f..4d15eb1b 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -14,6 +14,9 @@ class EmbyModule(_ModuleBase): def init_module(self) -> None: self.emby = Emby() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "MEDIASERVER", "emby" diff --git a/app/modules/fanart/__init__.py b/app/modules/fanart/__init__.py index 5063e19d..00b05057 100644 --- a/app/modules/fanart/__init__.py +++ b/app/modules/fanart/__init__.py @@ -21,6 +21,9 @@ class FanartModule(_ModuleBase): def init_module(self) -> None: pass + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "FANART_API_KEY", True diff --git a/app/modules/filetransfer/__init__.py b/app/modules/filetransfer/__init__.py index 75b9aee2..c5eda970 100644 --- a/app/modules/filetransfer/__init__.py +++ b/app/modules/filetransfer/__init__.py @@ -20,6 +20,9 @@ class FileTransferModule(_ModuleBase): def init_module(self) -> None: pass + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: pass diff --git a/app/modules/filter/__init__.py b/app/modules/filter/__init__.py index 53766da0..c0804a8a 100644 --- a/app/modules/filter/__init__.py +++ b/app/modules/filter/__init__.py @@ -7,6 +7,7 @@ from app.modules.filter.RuleParser import RuleParser class FilterModule(_ModuleBase): + # 规则解析器 parser: RuleParser = None @@ -62,6 +63,9 @@ class FilterModule(_ModuleBase): def init_module(self) -> None: self.parser = RuleParser() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "FILTER_RULE", True diff --git a/app/modules/indexer/__init__.py b/app/modules/indexer/__init__.py index 0ba663c5..b38368f3 100644 --- a/app/modules/indexer/__init__.py +++ b/app/modules/indexer/__init__.py @@ -23,6 +23,9 @@ class IndexerModule(_ModuleBase): def init_module(self) -> None: pass + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "INDEXER", "builtin" diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index bfa301a9..204c6d83 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -13,6 +13,9 @@ class JellyfinModule(_ModuleBase): def init_module(self) -> None: self.jellyfin = Jellyfin() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "MEDIASERVER", "jellyfin" diff --git a/app/modules/plex/__init__.py b/app/modules/plex/__init__.py index 52f9625b..185ed563 100644 --- a/app/modules/plex/__init__.py +++ b/app/modules/plex/__init__.py @@ -14,6 +14,9 @@ class PlexModule(_ModuleBase): def init_module(self) -> None: self.plex = Plex() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "MEDIASERVER", "plex" diff --git a/app/modules/qbittorrent/__init__.py b/app/modules/qbittorrent/__init__.py index 21f74245..73716700 100644 --- a/app/modules/qbittorrent/__init__.py +++ b/app/modules/qbittorrent/__init__.py @@ -14,6 +14,9 @@ class QbittorrentModule(_ModuleBase): def init_module(self) -> None: self.qbittorrent = Qbittorrent() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "DOWNLOADER", "qbittorrent" diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py index 97311ac5..e71acf26 100644 --- a/app/modules/telegram/__init__.py +++ b/app/modules/telegram/__init__.py @@ -8,12 +8,14 @@ from app.modules.telegram.telegram import Telegram class TelegramModule(_ModuleBase): - telegram: Telegram = None def init_module(self) -> None: self.telegram = Telegram() + def stop(self): + self.telegram.stop() + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "MESSAGER", "telegram" @@ -51,13 +53,16 @@ class TelegramModule(_ModuleBase): } } """ + # 校验token + token = args.get("token") + if not token or token != settings.API_TOKEN: + return None try: - msg_json: dict = json.loads(body) + message: dict = json.loads(body) except Exception as err: logger.error(f"解析Telegram消息失败:{err}") return None - if msg_json: - message = msg_json.get("message", {}) + if message: text = message.get("text") user_id = message.get("from", {}).get("id") # 获取用户名 @@ -117,3 +122,10 @@ class TelegramModule(_ModuleBase): :return: 成功或失败 """ return self.telegram.send_torrents_msg(title=title, torrents=items, userid=userid) + + def register_commands(self, commands: dict): + """ + 注册命令,实现这个函数接收系统可用的命令菜单 + :param commands: 命令字典 + """ + self.telegram.register_commands(commands) diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 72e37523..024c8065 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -1,13 +1,13 @@ -from threading import Event, Thread +import threading +from threading import Event from typing import Optional, List -from urllib.parse import urlencode -from app.core import settings, MediaInfo, TorrentInfo, Context +import telebot + +from app.core import settings, MediaInfo, Context from app.log import logger from app.utils.http import RequestUtils from app.utils.singleton import Singleton -import telebot - from app.utils.string import StringUtils @@ -26,16 +26,25 @@ class Telegram(metaclass=Singleton): self._telegram_chat_id = settings.TELEGRAM_CHAT_ID # 初始化机器人 if self._telegram_token and self._telegram_chat_id: + # bot _bot = telebot.TeleBot(self._telegram_token, parse_mode="markdown") + # 记录句柄 self._bot = _bot @_bot.message_handler(func=lambda message: True) def echo_all(message): RequestUtils(timeout=10).post_res(self._ds_url, json=message.json) - # 启动轮询 + def run_polling(): + """ + 定义线程函数来运行 infinity_polling + """ _bot.infinity_polling() + # 启动线程来运行 infinity_polling + self._polling_thread = threading.Thread(target=run_polling) + self._polling_thread.start() + def send_msg(self, title: str, text: str = "", image: str = "", userid: str = "") -> Optional[bool]: """ 发送Telegram消息 @@ -156,8 +165,25 @@ class Telegram(metaclass=Singleton): return True if ret else False + def register_commands(self, commands: dict): + """ + 注册菜单命令 + """ + if not self._bot: + return + # 设置bot命令 + if commands: + self._bot.delete_my_commands() + self._bot.set_my_commands( + commands=[ + telebot.types.BotCommand(cmd[1:], str(desc.get("description"))) for cmd, desc in + commands.items() + ] + ) + def stop(self): """ 停止Telegram消息接收服务 """ self._bot.stop_polling() + self._polling_thread.join() diff --git a/app/modules/themoviedb/__init__.py b/app/modules/themoviedb/__init__.py index 03dfb96e..fb6348ea 100644 --- a/app/modules/themoviedb/__init__.py +++ b/app/modules/themoviedb/__init__.py @@ -33,6 +33,9 @@ class TheMovieDb(_ModuleBase): self.tmdb = TmdbHelper() self.category = CategoryHelper() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: pass diff --git a/app/modules/transmission/__init__.py b/app/modules/transmission/__init__.py index ca3d3c60..7a9e3671 100644 --- a/app/modules/transmission/__init__.py +++ b/app/modules/transmission/__init__.py @@ -14,6 +14,9 @@ class TransmissionModule(_ModuleBase): def init_module(self) -> None: self.transmission = Transmission() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "DOWNLOADER", "transmission" diff --git a/app/modules/wechat/__init__.py b/app/modules/wechat/__init__.py index e0f15abf..4fbdfb94 100644 --- a/app/modules/wechat/__init__.py +++ b/app/modules/wechat/__init__.py @@ -16,6 +16,9 @@ class WechatModule(_ModuleBase): def init_module(self) -> None: self.wechat = WeChat() + def stop(self): + pass + def init_setting(self) -> Tuple[str, Union[str, bool]]: return "MESSAGER", "wechat" diff --git a/app/plugins/__init__.py b/app/plugins/__init__.py index fe1f4413..25da6cb5 100644 --- a/app/plugins/__init__.py +++ b/app/plugins/__init__.py @@ -1,10 +1,10 @@ import json from abc import ABCMeta, abstractmethod from pathlib import Path -from typing import Any, Optional +from typing import Any from app.chain import ChainBase -from app.core import settings, Context +from app.core import settings from app.db import SessionLocal from app.db.models import Base from app.db.models.plugin import PluginData @@ -17,7 +17,7 @@ class PluginChian(ChainBase): 插件处理链 """ - def process(self, *args, **kwargs) -> Optional[Context]: + def process(self, *args, **kwargs): pass diff --git a/app/plugins/autosignin/__init__.py b/app/plugins/autosignin/__init__.py index caa43000..dc972852 100644 --- a/app/plugins/autosignin/__init__.py +++ b/app/plugins/autosignin/__init__.py @@ -31,11 +31,8 @@ class AutoSignIn(_PluginBase): event: EventManager = None # 定时器 _scheduler = None - # 加载的模块 _site_schema: list = [] - # 退出事件 - _event = Event() def init_plugin(self, config: dict = None): self.sites = SitesHelper() @@ -70,7 +67,7 @@ class AutoSignIn(_PluginBase): :return: 命令关键字、事件、描述、附带数据 """ return { - "cmd": "/pts", + "cmd": "/sitesignin", "event": EventType.SiteSignin, "desc": "站点自动签到", "data": {} @@ -191,9 +188,7 @@ class AutoSignIn(_PluginBase): if self._scheduler: self._scheduler.remove_all_jobs() if self._scheduler.running: - self._event.set() self._scheduler.shutdown() - self._event.clear() self._scheduler = None except Exception as e: logger.error("退出插件失败:%s" % str(e)) diff --git a/app/plugins/sitestatistic/__init__.py b/app/plugins/sitestatistic/__init__.py index 912e3536..cdcf6259 100644 --- a/app/plugins/sitestatistic/__init__.py +++ b/app/plugins/sitestatistic/__init__.py @@ -7,7 +7,8 @@ import requests from apscheduler.schedulers.background import BackgroundScheduler from ruamel.yaml import CommentedMap -from app.core import settings +from app.core import settings, eventmanager +from app.core.event_manager import Event from app.helper import ModuleHelper from app.helper.sites import SitesHelper from app.log import logger @@ -17,6 +18,9 @@ from app.utils.http import RequestUtils from app.utils.timer import TimerUtils import warnings + +from app.utils.types import EventType + warnings.filterwarnings("ignore", category=FutureWarning) @@ -57,6 +61,19 @@ class SiteStatistic(_PluginBase): self._scheduler.print_jobs() self._scheduler.start() + @staticmethod + def get_command() -> dict: + """ + 定义远程控制命令 + :return: 命令关键字、事件、描述、附带数据 + """ + return { + "cmd": "/sitestatistic", + "event": EventType.SiteStatistic, + "desc": "站点数据统计", + "data": {} + } + def stop_service(self): pass @@ -221,6 +238,14 @@ class SiteStatistic(_PluginBase): title=f"站点 {site_user_info.site_name} 收到 " f"{site_user_info.message_unread} 条新消息,请登陆查看") + @eventmanager.register(EventType.SiteStatistic) + def refresh(self, event: Event): + """ + 刷新站点数据 + """ + logger.info("开始执行站点数据刷新 ...") + self.refresh_all_site_data(force=True) + def refresh_all_site_data(self, force: bool = False, specify_sites: list = None): """ 多线程刷新站点下载上传量,默认间隔6小时 @@ -228,6 +253,8 @@ class SiteStatistic(_PluginBase): if not self.sites.get_indexers(): return + logger.info("开始刷新站点数据 ...") + with lock: if not force \ @@ -251,15 +278,13 @@ class SiteStatistic(_PluginBase): # 并发刷新 with ThreadPool(min(len(refresh_sites), self._MAX_CONCURRENCY)) as p: - site_user_infos: List[ISiteUserInfo] = p.map(self.__refresh_site_data, refresh_sites) - site_user_infos = [info for info in site_user_infos if info] - # 保存数据 - for site_user_info in site_user_infos: - # 获取今天的日期 - key = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - value = site_user_info.to_dict() - # 按日期保存为字典 - self.save_data(key, value) + p.map(self.__refresh_site_data, refresh_sites) + # 获取今天的日期 + key = datetime.now().strftime('%Y-%m-%d') + # 保存数据 + self.save_data(key, self._sites_data) # 更新时间 self._last_update_time = datetime.now() + + logger.info("站点数据刷新完成") diff --git a/app/utils/types.py b/app/utils/types.py index f2296cab..a678098d 100644 --- a/app/utils/types.py +++ b/app/utils/types.py @@ -19,6 +19,8 @@ class EventType(Enum): CommandExcute = "command.excute" # 站点签到 SiteSignin = "site.signin" + # 站点数据统计 + SiteStatistic = "site.statistic" # 系统配置Key字典