Merge pull request #1535 from cikezhu/main

This commit is contained in:
jxxghp 2024-02-24 06:46:43 +08:00 committed by GitHub
commit 37ba75b53c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 148 additions and 53 deletions

View File

@ -8,6 +8,7 @@ from app.core.security import verify_token
from app.db.systemconfig_oper import SystemConfigOper from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper from app.helper.plugin import PluginHelper
from app.schemas.types import SystemConfigKey from app.schemas.types import SystemConfigKey
from app.scheduler import Scheduler
router = APIRouter() router = APIRouter()
@ -90,6 +91,8 @@ def install(plugin_id: str,
SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins)
# 重载插件管理器 # 重载插件管理器
PluginManager().init_config() PluginManager().init_config()
# 注册插件服务
Scheduler().update_plugin_job(plugin_id)
return schemas.Response(success=True) return schemas.Response(success=True)
@ -123,6 +126,8 @@ def reset_plugin(plugin_id: str, _: schemas.TokenPayload = Depends(verify_token)
PluginManager().delete_plugin_config(plugin_id) PluginManager().delete_plugin_config(plugin_id)
# 重新生效插件 # 重新生效插件
PluginManager().reload_plugin(plugin_id, {}) PluginManager().reload_plugin(plugin_id, {})
# 注册插件服务
Scheduler().update_plugin_job(plugin_id)
return schemas.Response(success=True) return schemas.Response(success=True)
@ -144,6 +149,8 @@ def set_plugin_config(plugin_id: str, conf: dict,
PluginManager().save_plugin_config(plugin_id, conf) PluginManager().save_plugin_config(plugin_id, conf)
# 重新生效插件 # 重新生效插件
PluginManager().reload_plugin(plugin_id, conf) PluginManager().reload_plugin(plugin_id, conf)
# 注册插件服务
Scheduler().update_plugin_job(plugin_id)
return schemas.Response(success=True) return schemas.Response(success=True)
@ -163,6 +170,8 @@ def uninstall_plugin(plugin_id: str,
SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins) SystemConfigOper().set(SystemConfigKey.UserInstalledPlugins, install_plugins)
# 重载插件管理器 # 重载插件管理器
PluginManager().init_config() PluginManager().init_config()
# 移除插件服务
Scheduler().remove_plugin_job(plugin_id)
return schemas.Response(success=True) return schemas.Response(success=True)

View File

@ -246,6 +246,16 @@ class PluginManager(metaclass=Singleton):
ret_services.extend(services) ret_services.extend(services)
return ret_services return ret_services
def get_plugin_attr(self, pid: str, attr: str) -> Any:
"""
获取插件属性
"""
if not self._running_plugins.get(pid):
return None
if not hasattr(self._running_plugins[pid], attr):
return None
return getattr(self._running_plugins[pid], attr)
def run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any: def run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any:
""" """
运行插件方法 运行插件方法
@ -262,6 +272,12 @@ class PluginManager(metaclass=Singleton):
""" """
return list(self._plugins.keys()) return list(self._plugins.keys())
def get_running_plugin_ids(self) -> List[str]:
"""
获取所有运行态插件ID
"""
return list(self._running_plugins.keys())
def get_online_plugins(self) -> List[dict]: def get_online_plugins(self) -> List[dict]:
""" """
获取所有在线插件信息 获取所有在线插件信息

View File

@ -43,6 +43,8 @@ class Scheduler(metaclass=Singleton):
}) })
# 退出事件 # 退出事件
_event = threading.Event() _event = threading.Event()
# 锁
_lock = threading.Lock()
def __init__(self): def __init__(self):
@ -232,25 +234,8 @@ class Scheduler(metaclass=Singleton):
) )
# 注册插件公共服务 # 注册插件公共服务
plugin_services = PluginManager().get_plugin_services() for pid in PluginManager().get_running_plugin_ids():
for service in plugin_services: self.update_plugin_job(pid)
try:
self._jobs[service["id"]] = {
"func": service["func"],
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=service["id"],
name=service["name"],
**service["kwargs"],
kwargs={
'job_id': service["id"]
}
)
except Exception as e:
logger.error(f"注册插件服务失败:{str(e)} - {service}")
# 打印服务 # 打印服务
logger.debug(self._scheduler.print_jobs()) logger.debug(self._scheduler.print_jobs())
@ -263,6 +248,7 @@ class Scheduler(metaclass=Singleton):
启动定时服务 启动定时服务
""" """
# 处理job_id格式 # 处理job_id格式
with self._lock:
job = self._jobs.get(job_id) job = self._jobs.get(job_id)
if not job: if not job:
return return
@ -276,12 +262,79 @@ class Scheduler(metaclass=Singleton):
job["func"](*args, **kwargs) job["func"](*args, **kwargs)
except Exception as e: except Exception as e:
logger.error(f"定时任务 {job_id} 执行失败:{str(e)}") logger.error(f"定时任务 {job_id} 执行失败:{str(e)}")
# 如果在job["func"]()运行时, 编辑配置导致任务被移除时, 该id已经不存在, 忽略错误
with self._lock:
try:
self._jobs[job_id]["running"] = False self._jobs[job_id]["running"] = False
except:
pass
# 如果是单次任务, 应立即移除缓存
if not self._scheduler.get_job(job_id):
self._jobs.pop(job_id, None)
def update_plugin_job(self, pid: str):
"""
更新插件定时服务
"""
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().run_plugin_method(pid, "get_service") or []
except:
return
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{pid}_{service['id']}"
self._jobs[sid] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
kwargs={
'job_id': sid
}
)
logger.info(f"注册插件服务({plugin_name}){service['name']}")
except Exception as e:
logger.error(f"注册插件服务失败:{str(e)} - {service}")
def remove_plugin_job(self, pid: str):
"""
移除插件定时服务
"""
with self._lock:
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
for job_id, service in self._jobs.copy().items():
try:
if service.get("pid") == pid:
self._jobs.pop(job_id, None)
try:
self._scheduler.remove_job(job_id)
except:
pass
logger.info(f"移除插件服务({plugin_name}){service.get('name')}")
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
def list(self) -> List[schemas.ScheduleInfo]: def list(self) -> List[schemas.ScheduleInfo]:
""" """
当前所有任务 当前所有任务
""" """
with self._lock:
# 返回计时任务 # 返回计时任务
schedulers = [] schedulers = []
# 去重 # 去重
@ -289,21 +342,36 @@ class Scheduler(metaclass=Singleton):
jobs = self._scheduler.get_jobs() jobs = self._scheduler.get_jobs()
# 按照下次运行时间排序 # 按照下次运行时间排序
jobs.sort(key=lambda x: x.next_run_time) jobs.sort(key=lambda x: x.next_run_time)
# 将正在运行的任务提取出来 (保障一次性任务正常显示)
for job_id, service in self._jobs.items():
name = service.get("name")
plugin_name = service.get("plugin_name")
if service.get("running") and name and plugin_name:
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=name,
provider=plugin_name,
status="正在运行",
next_run="~"
))
# 获取其他待执行任务
for job in jobs: for job in jobs:
if job.name not in added: if job.name not in added:
added.append(job.name) added.append(job.name)
else: else:
continue continue
job_id = job.id.split("|")[0] job_id = job.id.split("|")[0]
if not self._jobs.get(job_id): service = self._jobs.get(job_id)
if not service:
continue continue
# 任务状态 # 任务状态
status = "正在运行" if self._jobs[job_id].get("running") else "等待" status = "正在运行" if service.get("running") else "等待"
# 下次运行时间 # 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time) next_run = TimerUtils.time_difference(job.next_run_time)
schedulers.append(schemas.ScheduleInfo( schedulers.append(schemas.ScheduleInfo(
id=job_id, id=job_id,
name=job.name, name=job.name,
provider=service.get("plugin_name", "MoviePilot"),
status=status, status=status,
next_run=next_run next_run=next_run
)) ))

View File

@ -56,6 +56,8 @@ class ScheduleInfo(BaseModel):
id: Optional[str] = None id: Optional[str] = None
# 名称 # 名称
name: Optional[str] = None name: Optional[str] = None
# 提供者
provider: Optional[str] = None
# 状态 # 状态
status: Optional[str] = None status: Optional[str] = None
# 下次执行时间 # 下次执行时间