插件直接采用程序的定时任务模块, 可显示在前端页面

This commit is contained in:
叮叮当
2024-02-23 18:13:27 +08:00
parent 06c3985aa4
commit bbddec763a
3 changed files with 129 additions and 53 deletions

View File

@ -43,6 +43,8 @@ class Scheduler(metaclass=Singleton):
})
# 退出事件
_event = threading.Event()
# 锁
_lock = threading.Lock()
def __init__(self):
@ -232,25 +234,8 @@ class Scheduler(metaclass=Singleton):
)
# 注册插件公共服务
plugin_services = PluginManager().get_plugin_services()
for service in plugin_services:
try:
self._jobs[service["id"]] = {
"func": service["func"],
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=service["id"],
name=service["name"],
**service["kwargs"],
kwargs={
'job_id': service["id"]
}
)
except Exception as e:
logger.error(f"注册插件服务失败:{str(e)} - {service}")
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
# 打印服务
logger.debug(self._scheduler.print_jobs())
@ -263,51 +248,127 @@ class Scheduler(metaclass=Singleton):
启动定时服务
"""
# 处理job_id格式
job = self._jobs.get(job_id)
if not job:
return
if job.get("running"):
logger.warning(f"定时任务 {job_id} 正在运行 ...")
return
self._jobs[job_id]["running"] = True
with self._lock:
job = self._jobs.get(job_id)
if not job:
return
if job.get("running"):
logger.warning(f"定时任务 {job_id} 正在运行 ...")
return
self._jobs[job_id]["running"] = True
try:
if not kwargs:
kwargs = job.get("kwargs") or {}
job["func"](*args, **kwargs)
except Exception as e:
logger.error(f"定时任务 {job_id} 执行失败:{str(e)}")
self._jobs[job_id]["running"] = False
# 如果在job["func"]()运行时, 编辑配置导致任务被移除时, 该id已经不存在, 忽略错误
with self._lock:
try:
self._jobs[job_id]["running"] = False
except:
pass
# 如果是单次任务, 应立即移除缓存
if not self._scheduler.get_job(job_id):
self._jobs.pop(job_id, None)
def update_plugin_job(self, pid: str):
"""
更新插件定时服务
"""
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().run_plugin_method(pid, "get_service") or []
except:
return
# 开始注册插件服务
for service in plugin_services:
try:
sid = f"{pid}_{service['id']}"
self._jobs[sid] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"running": False,
}
self._scheduler.add_job(
self.start,
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
kwargs={
'job_id': sid
}
)
logger.info(f"注册插件服务({pid}){service['name']}")
except Exception as e:
logger.error(f"注册插件服务失败:{str(e)} - {service}")
def remove_plugin_job(self, pid: str):
"""
移除插件定时服务
"""
with self._lock:
for job_id, service in self._jobs.copy().items():
try:
if service.get("pid") == pid:
self._jobs.pop(job_id, None)
try:
self._scheduler.remove_job(job_id)
except:
pass
logger.info(f"移除插件服务({pid}){service.get('name')}")
except Exception as e:
logger.error(f"移除插件服务失败:{str(e)} - {job_id}: {service}")
def list(self) -> List[schemas.ScheduleInfo]:
"""
当前所有任务
"""
# 返回计时任务
schedulers = []
# 去重
added = []
jobs = self._scheduler.get_jobs()
# 按照下次运行时间排序
jobs.sort(key=lambda x: x.next_run_time)
for job in jobs:
if job.name not in added:
added.append(job.name)
else:
continue
job_id = job.id.split("|")[0]
if not self._jobs.get(job_id):
continue
# 任务状态
status = "正在运行" if self._jobs[job_id].get("running") else "等待"
# 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time)
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=job.name,
status=status,
next_run=next_run
))
return schedulers
with self._lock:
# 返回计时任务
schedulers = []
# 去重
added = []
jobs = self._scheduler.get_jobs()
# 按照下次运行时间排序
jobs.sort(key=lambda x: x.next_run_time)
# 将正在运行的任务提取出来 (保障一次性任务正常显示)
for job_id, value in self._jobs.items():
name = value.get("name")
if value.get("running") and name:
if name not in added:
added.append(name)
else:
continue
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=name,
status="正在运行",
next_run="~"
))
# 获取其他待执行任务
for job in jobs:
if job.name not in added:
added.append(job.name)
else:
continue
job_id = job.id.split("|")[0]
if not self._jobs.get(job_id):
continue
# 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time)
schedulers.append(schemas.ScheduleInfo(
id=job_id,
name=job.name,
status="等待",
next_run=next_run
))
return schedulers
def stop(self):
"""