fix 优化定时服务调度

This commit is contained in:
jxxghp 2023-09-24 12:41:59 +08:00
parent 41ff5363ea
commit 5cd48d5447
9 changed files with 268 additions and 207 deletions

View File

@ -11,9 +11,7 @@ from app.core.security import verify_token
from app.db import get_db
from app.db.models.transferhistory import TransferHistory
from app.scheduler import Scheduler
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
from app.utils.timer import TimerUtils
router = APIRouter()
@ -83,37 +81,7 @@ def schedule(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询后台服务信息
"""
# 返回计时任务
schedulers = []
# 去重
added = []
jobs = Scheduler().list()
# 按照下次运行时间排序
jobs.sort(key=lambda x: x.next_run_time)
for job in jobs:
if job.name not in added:
added.append(job.name)
else:
continue
if not StringUtils.is_chinese(job.name):
continue
if not job.next_run_time:
status = "已停止"
next_run = ""
else:
next_run = TimerUtils.time_difference(job.next_run_time)
if not next_run:
status = "正在运行"
else:
status = "阻塞" if job.pending else "等待"
schedulers.append(schemas.ScheduleInfo(
id=job.id,
name=job.name,
status=status,
next_run=next_run
))
return schedulers
return Scheduler().list()
@router.get("/transfer", summary="文件整理统计", response_model=List[int])

View File

@ -5,7 +5,6 @@ from sqlalchemy.orm import Session
from starlette.background import BackgroundTasks
from app import schemas
from app.chain.cookiecloud import CookieCloudChain
from app.chain.site import SiteChain
from app.chain.torrents import TorrentsChain
from app.core.event import EventManager
@ -15,19 +14,13 @@ from app.db.models.site import Site
from app.db.models.siteicon import SiteIcon
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.sites import SitesHelper
from app.scheduler import Scheduler
from app.schemas.types import SystemConfigKey, EventType
from app.utils.string import StringUtils
router = APIRouter()
def start_cookiecloud_sync(db: Session):
"""
后台启动CookieCloud站点同步
"""
CookieCloudChain(db).process(manual=True)
@router.get("/", summary="所有站点", response_model=List[schemas.Site])
def read_sites(db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> List[dict]:
@ -101,12 +94,11 @@ def delete_site(
@router.get("/cookiecloud", summary="CookieCloud同步", response_model=schemas.Response)
def cookie_cloud_sync(background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
运行CookieCloud同步站点信息
"""
background_tasks.add_task(start_cookiecloud_sync, db)
background_tasks.add_task(Scheduler().start, job_id="cookiecloud")
return schemas.Response(success=True, message="CookieCloud同步任务已启动")
@ -119,7 +111,8 @@ def cookie_cloud_sync(db: Session = Depends(get_db),
Site.reset(db)
SystemConfigOper().set(SystemConfigKey.IndexerSites, [])
SystemConfigOper().set(SystemConfigKey.RssSites, [])
CookieCloudChain().process(manual=True)
# 启动定时服务
Scheduler().start("cookiecloud", manual=True)
# 插件站点删除
EventManager().send_event(EventType.SiteDeleted,
{

View File

@ -1,5 +1,5 @@
import json
from typing import List, Any, Optional
from typing import List, Any
from fastapi import APIRouter, Request, BackgroundTasks, Depends, HTTPException, Header
from sqlalchemy.orm import Session
@ -12,6 +12,7 @@ from app.db import get_db
from app.db.models.subscribe import Subscribe
from app.db.models.user import User
from app.db.userauth import get_current_active_user
from app.scheduler import Scheduler
from app.schemas.types import MediaType
router = APIRouter()
@ -26,13 +27,6 @@ def start_subscribe_add(db: Session, title: str, year: str,
mtype=mtype, tmdbid=tmdbid, season=season, username=username)
def start_subscribe_search(db: Session, sid: Optional[int], state: Optional[str]):
"""
启动订阅搜索任务
"""
SubscribeChain(db).search(sid=sid, state=state, manual=True)
@router.get("/", summary="所有订阅", response_model=List[schemas.Subscribe])
def read_subscribes(
db: Session = Depends(get_db),
@ -140,35 +134,36 @@ def subscribe_mediaid(
@router.get("/refresh", summary="刷新订阅", response_model=schemas.Response)
def refresh_subscribes(
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
刷新所有订阅
"""
SubscribeChain(db).refresh()
Scheduler().start("subscribe_refresh")
return schemas.Response(success=True)
@router.get("/check", summary="刷新订阅 TMDB 信息", response_model=schemas.Response)
def check_subscribes(
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
刷新所有订阅
刷新订阅 TMDB 信息
"""
SubscribeChain(db).check()
Scheduler().start("subscribe_tmdb")
return schemas.Response(success=True)
@router.get("/search", summary="搜索所有订阅", response_model=schemas.Response)
def search_subscribes(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
搜索所有订阅
"""
background_tasks.add_task(start_subscribe_search, db=db, sid=None, state='R')
background_tasks.add_task(
Scheduler().start,
job_id="subscribe_search",
sid=None, state='R'
)
return schemas.Response(success=True)
@ -176,12 +171,15 @@ def search_subscribes(
def search_subscribe(
subscribe_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据订阅编号搜索订阅
"""
background_tasks.add_task(start_subscribe_search, db=db, sid=subscribe_id, state=None)
background_tasks.add_task(
Scheduler().start,
job_id="subscribe_search",
sid=subscribe_id, state=None
)
return schemas.Response(success=True)

View File

@ -1,9 +1,9 @@
import json
import time
import tailer
from datetime import datetime
from typing import Union
import tailer
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
@ -11,13 +11,13 @@ from sqlalchemy.orm import Session
from app import schemas
from app.chain.search import SearchChain
from app.core.config import settings
from app.core.event import eventmanager
from app.core.security import verify_token
from app.db import get_db
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.schemas.types import SystemConfigKey, EventType
from app.scheduler import Scheduler
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
from version import APP_VERSION
@ -214,15 +214,13 @@ def restart_system(_: schemas.TokenPayload = Depends(verify_token)):
return schemas.Response(success=ret, message=msg)
@router.get("/command", summary="执行命令", response_model=schemas.Response)
def execute_command(cmd: str,
@router.get("/runscheduler", summary="运行服务", response_model=schemas.Response)
def execute_command(jobid: str,
_: schemas.TokenPayload = Depends(verify_token)):
"""
执行命令
"""
if not cmd:
if not jobid:
return schemas.Response(success=False, message="命令不能为空!")
eventmanager.send_event(etype=EventType.CommandExcute, data={
"cmd": cmd
})
Scheduler().start(jobid)
return schemas.Response(success=True)

View File

@ -1,5 +1,5 @@
import base64
from typing import Tuple, Optional, Union
from typing import Tuple, Optional
from urllib.parse import urljoin
from lxml import etree
@ -16,7 +16,6 @@ from app.helper.message import MessageHelper
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import Notification, NotificationType, MessageChannel
from app.utils.http import RequestUtils
from app.utils.site import SiteUtils
@ -40,21 +39,6 @@ class CookieCloudChain(ChainBase):
password=settings.COOKIECLOUD_PASSWORD
)
def remote_sync(self, channel: MessageChannel, userid: Union[int, str]):
"""
远程触发同步站点发送消息
"""
self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage,
title="开始同步CookieCloud站点 ...", userid=userid))
# 开始同步
success, msg = self.process()
if success:
self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage,
title=f"同步站点成功,{msg}", userid=userid))
else:
self.post_message(Notification(channel=channel, mtype=NotificationType.SiteMessage,
title=f"同步站点失败:{msg}", userid=userid))
def process(self, manual=False) -> Tuple[bool, str]:
"""
通过CookieCloud同步站点Cookie

View File

@ -10,7 +10,6 @@ from app.core.config import settings
from app.db import SessionFactory
from app.db.mediaserver_oper import MediaServerOper
from app.log import logger
from app.schemas import MessageChannel, Notification
lock = threading.Lock()
@ -41,16 +40,6 @@ class MediaServerChain(ChainBase):
"""
return self.run_module("mediaserver_tv_episodes", server=server, item_id=item_id)
def remote_sync(self, channel: MessageChannel, userid: Union[int, str]):
"""
同步豆瓣想看数据发送消息
"""
self.post_message(Notification(channel=channel,
title="开始媒体服务器 ...", userid=userid))
self.sync()
self.post_message(Notification(channel=channel,
title="同步媒体服务器完成!", userid=userid))
def sync(self):
"""
同步媒体库所有数据到本地数据库

View File

@ -132,45 +132,6 @@ class SubscribeChain(ChainBase):
return True
return False
def remote_refresh(self, channel: MessageChannel, userid: Union[str, int] = None):
"""
远程刷新订阅发送消息
"""
self.post_message(Notification(channel=channel,
title=f"开始刷新订阅 ...", userid=userid))
self.refresh()
self.post_message(Notification(channel=channel,
title=f"订阅刷新完成!", userid=userid))
def remote_search(self, arg_str: str, channel: MessageChannel, userid: Union[str, int] = None):
"""
远程搜索订阅发送消息
"""
if arg_str and not str(arg_str).isdigit():
self.post_message(Notification(channel=channel,
title="请输入正确的命令格式:/subscribe_search [id]"
"[id]为订阅编号,不输入订阅编号时搜索所有订阅", userid=userid))
return
if arg_str:
sid = int(arg_str)
subscribe = self.subscribeoper.get(sid)
if not subscribe:
self.post_message(Notification(channel=channel,
title=f"订阅编号 {sid} 不存在!", userid=userid))
return
self.post_message(Notification(channel=channel,
title=f"开始搜索 {subscribe.name} ...", userid=userid))
# 搜索订阅
self.search(sid=int(arg_str))
self.post_message(Notification(channel=channel,
title=f"{subscribe.name} 搜索完成!", userid=userid))
else:
self.post_message(Notification(channel=channel,
title=f"开始搜索所有订阅 ...", userid=userid))
self.search(state='R')
self.post_message(Notification(channel=channel,
title=f"订阅搜索完成!", userid=userid))
def search(self, sid: int = None, state: str = 'N', manual: bool = False):
"""
订阅搜索

View File

@ -1,11 +1,9 @@
import traceback
from threading import Thread, Event
from typing import Any, Union
from typing import Any, Union, Dict
from app.chain import ChainBase
from app.chain.cookiecloud import CookieCloudChain
from app.chain.download import DownloadChain
from app.chain.mediaserver import MediaServerChain
from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.system import SystemChain
@ -15,6 +13,8 @@ from app.core.event import eventmanager, EventManager
from app.core.plugin import PluginManager
from app.db import SessionFactory
from app.log import logger
from app.scheduler import Scheduler
from app.schemas import Notification
from app.schemas.types import EventType, MessageChannel
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
@ -49,13 +49,15 @@ class Command(metaclass=Singleton):
self.pluginmanager = PluginManager()
# 处理链
self.chain = CommandChian(self._db)
# 定时服务管理
self.scheduler = Scheduler()
# 内置命令
self._commands = {
"/cookiecloud": {
"func": CookieCloudChain(self._db).remote_sync,
"id": "cookiecloud",
"type": "scheduler",
"description": "同步站点",
"category": "站点",
"data": {}
"category": "站点"
},
"/sites": {
"func": SiteChain(self._db).remote_list,
@ -79,10 +81,10 @@ class Command(metaclass=Singleton):
"data": {}
},
"/mediaserver_sync": {
"func": MediaServerChain(self._db).remote_sync,
"id": "mediaserver_sync",
"type": "scheduler",
"description": "同步媒体服务器",
"category": "管理",
"data": {}
"category": "管理"
},
"/subscribes": {
"func": SubscribeChain(self._db).remote_list,
@ -91,16 +93,16 @@ class Command(metaclass=Singleton):
"data": {}
},
"/subscribe_refresh": {
"func": SubscribeChain(self._db).remote_refresh,
"id": "subscribe_refresh",
"type": "scheduler",
"description": "刷新订阅",
"category": "订阅",
"data": {}
"category": "订阅"
},
"/subscribe_search": {
"func": SubscribeChain(self._db).remote_search,
"id": "subscribe_search",
"type": "scheduler",
"description": "搜索订阅",
"category": "订阅",
"data": {}
"category": "订阅"
},
"/subscribe_delete": {
"func": SubscribeChain(self._db).remote_delete,
@ -108,9 +110,9 @@ class Command(metaclass=Singleton):
"data": {}
},
"/subscribe_tmdb": {
"func": SubscribeChain(self._db).check,
"description": "订阅TMDB数据刷新",
"data": {}
"id": "subscribe_tmdb",
"type": "scheduler",
"description": "订阅元数据更新"
},
"/downloading": {
"func": DownloadChain(self._db).remote_downloading,
@ -119,10 +121,10 @@ class Command(metaclass=Singleton):
"data": {}
},
"/transfer": {
"func": TransferChain(self._db).process,
"id": "transfer",
"type": "scheduler",
"description": "下载文件整理",
"category": "管理",
"data": {}
"category": "管理"
},
"/redo": {
"func": TransferChain(self._db).remote_transfer,
@ -180,6 +182,56 @@ class Command(metaclass=Singleton):
except Exception as e:
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
def __run_command(self, command: Dict[str, any],
data_str: str = "",
channel: MessageChannel = None, userid: Union[str, int] = None):
"""
运行定时服务
"""
if command.get("type") == "scheduler":
# 定时服务
if userid:
self.chain.post_message(
Notification(
channel=channel,
title=f"开始执行 {command.get('description')} ...",
userid=userid
)
)
# 执行定时任务
self.scheduler.start(job_id=command.get("id"))
if userid:
self.chain.post_message(
Notification(
channel=channel,
title=f"{command.get('description')} 执行完成",
userid=userid
)
)
else:
# 命令
cmd_data = command['data'] if command.get('data') else {}
args_num = ObjectUtils.arguments(command['func'])
if args_num > 0:
if cmd_data:
# 有内置参数直接使用内置参数
data = cmd_data.get("data") or {}
data['channel'] = channel
data['user'] = userid
cmd_data['data'] = data
command['func'](**cmd_data)
elif args_num == 2:
# 没有输入参数只输入渠道和用户ID
command['func'](channel, userid)
elif args_num > 2:
# 多个输入参数用户输入、用户ID
command['func'](data_str, channel, userid)
else:
# 没有参数
command['func']()
def stop(self):
"""
停止事件处理线程
@ -225,25 +277,11 @@ class Command(metaclass=Singleton):
logger.info(f"用户 {userid} 开始执行:{command.get('description')} ...")
else:
logger.info(f"开始执行:{command.get('description')} ...")
cmd_data = command['data'] if command.get('data') else {}
args_num = ObjectUtils.arguments(command['func'])
if args_num > 0:
if cmd_data:
# 有内置参数直接使用内置参数
data = cmd_data.get("data") or {}
data['channel'] = channel
data['user'] = userid
cmd_data['data'] = data
command['func'](**cmd_data)
elif args_num == 2:
# 没有输入参数只输入渠道和用户ID
command['func'](channel, userid)
elif args_num > 2:
# 多个输入参数用户输入、用户ID
command['func'](data_str, channel, userid)
else:
# 没有参数
command['func']()
# 执行命令
self.__run_command(command, data_str=data_str,
channel=channel, userid=userid)
if userid:
logger.info(f"用户 {userid} {command.get('description')} 执行完成")
else:

View File

@ -1,10 +1,12 @@
import logging
from datetime import datetime, timedelta
from typing import List
import pytz
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from app import schemas
from app.chain import ChainBase
from app.chain.cookiecloud import CookieCloudChain
from app.chain.mediaserver import MediaServerChain
@ -40,65 +42,153 @@ class Scheduler(metaclass=Singleton):
def __init__(self):
# 数据库连接
self._db = SessionFactory()
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
"func": CookieCloudChain(self._db).process,
"running": False,
},
"mediaserver_sync": {
"func": MediaServerChain(self._db).sync,
"running": False,
},
"subscribe_tmdb": {
"func": SubscribeChain(self._db).check,
"running": False,
},
"subscribe_search": {
"func": SubscribeChain(self._db).search,
"running": False,
},
"subscribe_refresh": {
"func": SubscribeChain(self._db).refresh,
"running": False,
},
"transfer": {
"func": TransferChain(self._db).process,
"running": False,
}
}
# 调试模式不启动定时服务
if settings.DEV:
return
# CookieCloud定时同步
if settings.COOKIECLOUD_INTERVAL:
self._scheduler.add_job(CookieCloudChain(self._db).process,
"interval",
minutes=settings.COOKIECLOUD_INTERVAL,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
id="cookiecloud",
name="同步CookieCloud站点")
self._scheduler.add_job(
self.start,
"interval",
id="cookiecloud",
name="同步CookieCloud站点",
minutes=settings.COOKIECLOUD_INTERVAL,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=1),
kwargs={
'job_id': 'cookiecloud'
}
)
# 媒体服务器同步
if settings.MEDIASERVER_SYNC_INTERVAL:
self._scheduler.add_job(MediaServerChain(self._db).sync, "interval",
hours=settings.MEDIASERVER_SYNC_INTERVAL,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
id="mediaserver_sync",
name="同步媒体服务器")
self._scheduler.add_job(
self.start,
"interval",
id="mediaserver_sync",
name="同步媒体服务器",
hours=settings.MEDIASERVER_SYNC_INTERVAL,
next_run_time=datetime.now(pytz.timezone(settings.TZ)) + timedelta(minutes=5),
kwargs={
'job_id': 'mediaserver_sync'
}
)
# 新增订阅时搜索5分钟检查一次
self._scheduler.add_job(SubscribeChain(self._db).search, "interval",
minutes=5, kwargs={'state': 'N'})
self._scheduler.add_job(
self.start,
"interval",
minutes=5,
kwargs={
'job_id': 'subscribe_search',
'state': 'N'
}
)
# 检查更新订阅TMDB数据每隔6小时
self._scheduler.add_job(SubscribeChain(self._db).check, "interval", hours=6,
id="subscribe_tmdb", name="订阅元数据更新")
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_tmdb",
name="订阅元数据更新",
hours=6,
kwargs={
'job_id': 'subscribe_tmdb'
}
)
# 订阅状态每隔24小时搜索一次
if settings.SUBSCRIBE_SEARCH:
self._scheduler.add_job(SubscribeChain(self._db).search, "interval",
hours=24, kwargs={'state': 'R'},
id="subscribe_search", name="订阅搜索")
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_search",
name="订阅搜索",
hours=24,
kwargs={
'job_id': 'subscribe_search',
'state': 'R'
}
)
if settings.SUBSCRIBE_MODE == "spider":
# 站点首页种子定时刷新模式
triggers = TimerUtils.random_scheduler(num_executions=30)
for trigger in triggers:
self._scheduler.add_job(SubscribeChain(self._db).refresh, "cron",
hour=trigger.hour, minute=trigger.minute,
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新")
self._scheduler.add_job(
self.start,
"cron",
id=f"subscribe_refresh|{trigger.hour}:{trigger.minute}",
name="订阅刷新",
hour=trigger.hour,
minute=trigger.minute,
kwargs={
'job_id': 'subscribe_refresh'
})
else:
# RSS订阅模式
if not settings.SUBSCRIBE_RSS_INTERVAL:
settings.SUBSCRIBE_RSS_INTERVAL = 30
elif settings.SUBSCRIBE_RSS_INTERVAL < 5:
settings.SUBSCRIBE_RSS_INTERVAL = 5
self._scheduler.add_job(SubscribeChain(self._db).refresh, "interval",
minutes=settings.SUBSCRIBE_RSS_INTERVAL,
id="subscribe_refresh", name="订阅刷新")
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_refresh",
name="RSS订阅刷新",
minutes=settings.SUBSCRIBE_RSS_INTERVAL,
kwargs={
'job_id': 'subscribe_refresh'
}
)
# 下载器文件转移每5分钟
if settings.DOWNLOADER_MONITOR:
self._scheduler.add_job(TransferChain(self._db).process, "interval", minutes=5,
id="transfer", name="下载文件整理")
self._scheduler.add_job(
self.start,
"interval",
id="transfer",
name="下载文件整理",
minutes=5,
kwargs={
'job_id': 'transfer'
}
)
# 公共定时服务
self._scheduler.add_job(SchedulerChain(self._db).scheduler_job, "interval", minutes=10)
self._scheduler.add_job(
SchedulerChain(self._db).scheduler_job,
"interval",
minutes=10
)
# 打印服务
logger.debug(self._scheduler.print_jobs())
@ -106,11 +196,53 @@ class Scheduler(metaclass=Singleton):
# 启动定时服务
self._scheduler.start()
def list(self):
def start(self, job_id: str, *args, **kwargs):
"""
启动定时服务
"""
# 处理job_id格式
job = self._jobs.get(job_id)
if not job:
return
if job.get("running"):
logger.warning(f"定时任务 {job_id} 正在运行 ...")
return
self._jobs[job_id]["running"] = True
try:
job["func"](*args, **kwargs)
except Exception as e:
logger.error(f"定时任务 {job_id} 执行失败:{e}")
self._jobs[job_id]["running"] = False
def list(self) -> List[schemas.ScheduleInfo]:
"""
当前所有任务
"""
return self._scheduler.get_jobs()
# 返回计时任务
schedulers = []
# 去重
added = []
jobs = self._scheduler.get_jobs()
# 按照下次运行时间排序
jobs.sort(key=lambda x: x.next_run_time)
for job in jobs:
if job.name not in added:
added.append(job.name)
else:
continue
if not self._jobs.get(job.id):
continue
# 任务状态
status = "正在运行" if self._jobs[job.id].get("running") else "等待"
# 下次运行时间
next_run = TimerUtils.time_difference(job.next_run_time)
schedulers.append(schemas.ScheduleInfo(
id=job.id,
name=job.name,
status=status,
next_run=next_run
))
return schedulers
def stop(self):
"""